Python+Pika+RabbitMQ环境部署及实现工作队列的实例教程

rabbitmq中文翻译的话,主要还是mq字母上:Message Queue,即消息队列的意思。前面还有个rabbit单词,就是兔子的意思,和python语言叫python一样,老外还是蛮幽默的。rabbitmq服务类似于mysql、apache服务,只是提供的功能不一样。rabbimq是用来提供发送消息的服务,可以用在不同的应用程序之间进行通信。

安装rabbitmq
先来安装下rabbitmq,在ubuntu 12.04下可以直接通过apt-get安装:

sudo apt-get install rabbitmq-server

安装好后,rabbitmq服务就已经启动好了。接下来看下python编写Hello World!的实例。实例的内容就是从send.py发送“Hello World!”到rabbitmq,receive.py从rabbitmq接收send.py发送的信息。

其中P表示produce,生产者的意思,也可以称为发送者,实例中表现为send.py;C表示consumer,消费者的意思,也可以称为接收者,实例中表现为receive.py;中间红色的表示队列的意思,实例中表现为hello队列。

python使用rabbitmq服务,可以使用现成的类库pika、txAMQP或者py-amqplib,这里选择了pika。

安装pika

安装pika可以使用pip来进行安装,pip是python的软件管理包,如果没有安装,可以通过apt-get安装

sudo apt-get install python-pip

通过pip安装pika:

sudo pip install pika

send.py代码

连接到rabbitmq服务器,因为是在本地测试,所以就用localhost就可以了。

connection = pika.BlockingConnection(pika.ConnectionParameters(
        'localhost'))
channel = connection.channel()

声明消息队列,消息将在这个队列中进行传递。如果将消息发送到不存在的队列,rabbitmq将会自动清除这些消息。

channel.queue_declare(queue='hello')

发送消息到上面声明的hello队列,其中exchange表示交换器,能精确指定消息应该发送到哪个队列,routing_key设置为队列的名称,body就是发送的内容,具体发送细节暂时先不关注。

channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')

关闭连接

connection.close()

完整代码

#!/usr/bin/env python
#coding=utf8
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
        'localhost'))
channel = connection.channel()

channel.queue_declare(queue='hello')

channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')
print " [x] Sent 'Hello World!'"
connection.close()

先来执行下这个程序,执行成功的话,rabbitmqctl应该成功增加了hello队列,并且队列里应该有一条信息,用rabbitmqctl命令来查看下

rabbitmqctl list_queues

在笔者的电脑上输出如下信息:

确实有一个hello队列,并且队列里有一条信息。接下来用receive.py来获取队列里的信息。

receive.py代码

和send.py的前面两个步骤一样,都是要先连接服务器,然后声明消息的队列,这里就不再贴同样代码了。

接收消息更为复杂一些,需要定义一个回调函数来处理,这边的回调函数就是将信息打印出来。

def callback(ch, method, properties, body):
  print "Received %r" % (body,)

告诉rabbitmq使用callback来接收信息

channel.basic_consume(callback, queue='hello', no_ack=True)

开始接收信息,并进入阻塞状态,队列里有信息才会调用callback进行处理。按ctrl+c退出。

channel.start_consuming()

完整代码

#!/usr/bin/env python
#coding=utf8
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
        'localhost'))
channel = connection.channel()

channel.queue_declare(queue='hello')

def callback(ch, method, properties, body):
  print " [x] Received %r" % (body,)

channel.basic_consume(callback, queue='hello', no_ack=True)

print ' [*] Waiting for messages. To exit press CTRL+C'
channel.start_consuming()

执行程序,就能够接收到队列hello里的消息Hello World!,然后打印在屏幕上。换一个终端,再次执行send.py,可以看到receive.py这边会再次接收到信息。

工作队列示例

1.准备工作(Preparation)

在实例程序中,用new_task.py来模拟任务分配者, worker.py来模拟工作者。

修改send.py,从命令行参数里接收信息,并发送

import sys

message = ' '.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(exchange='',
           routing_key='hello',
           body=message)
print " [x] Sent %r" % (message,)

修改receive.py的回调函数。

import time

def callback(ch, method, properties, body):
  print " [x] Received %r" % (body,)
  time.sleep( body.count('.') )
  print " [x] Done"

这边先打开两个终端,都运行worker.py,处于监听状态,这边就相当于两个工作者。打开第三个终端,运行new_task.py

$ python new_task.py First message.
$ python new_task.py Second message..
$ python new_task.py Third message...
$ python new_task.py Fourth message....
$ python new_task.py Fifth message.....

观察worker.py接收到任务,其中一个工作者接收到3个任务 :

$ python worker.py
 [*] Waiting for messages. To exit press CTRL+C
 [x] Received 'First message.'
 [x] Received 'Third message...'
 [x] Received 'Fifth message.....'

另外一个工作者接收到2个任务 :

$ python worker.py
 [*] Waiting for messages. To exit press CTRL+C
 [x] Received 'Second message..'
 [x] Received 'Fourth message....'

从上面来看,每个工作者,都会依次分配到任务。那么如果一个工作者,在处理任务的时候挂掉,这个任务就没有完成,应当交由其他工作者处理。所以应当有一种机制,当一个工作者完成任务时,会反馈消息。

2.消息确认(Message acknowledgment)

消息确认就是当工作者完成任务后,会反馈给rabbitmq。修改worker.py中的回调函数:

def callback(ch, method, properties, body):
  print " [x] Received %r" % (body,)
  time.sleep(5)
  print " [x] Done"
  ch.basic_ack(delivery_tag = method.delivery_tag)

这边停顿5秒,可以方便ctrl+c退出。

去除no_ack=True参数或者设置为False也可以。

channel.basic_consume(callback, queue='hello', no_ack=False)

用这个代码运行,即使其中一个工作者ctrl+c退出后,正在执行的任务也不会丢失,rabbitmq会将任务重新分配给其他工作者。

3.消息持久化存储(Message durability)

虽然有了消息反馈机制,但是如果rabbitmq自身挂掉的话,那么任务还是会丢失。所以需要将任务持久化存储起来。声明持久化存储:

channel.queue_declare(queue='hello', durable=True)

但是这个程序会执行错误,因为hello这个队列已经存在,并且是非持久化的,rabbitmq不允许使用不同的参数来重新定义存在的队列。重新定义一个队列:

channel.queue_declare(queue='task_queue', durable=True)

在发送任务的时候,用delivery_mode=2来标记任务为持久化存储:

channel.basic_publish(exchange='',
           routing_key="task_queue",
           body=message,
           properties=pika.BasicProperties(
             delivery_mode = 2, # make message persistent
           ))

4.公平调度(Fair dispatch)

上面实例中,虽然每个工作者是依次分配到任务,但是每个任务不一定一样。可能有的任务比较重,执行时间比较久;有的任务比较轻,执行时间比较短。如果能公平调度就最好了,使用basic_qos设置prefetch_count=1,使得rabbitmq不会在同一时间给工作者分配多个任务,即只有工作者完成任务之后,才会再次接收到任务。

channel.basic_qos(prefetch_count=1)

new_task.py完整代码

#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)

message = ' '.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(exchange='',
           routing_key='task_queue',
           body=message,
           properties=pika.BasicProperties(
             delivery_mode = 2, # make message persistent
           ))
print " [x] Sent %r" % (message,)
connection.close()
worker.py完整代码

#!/usr/bin/env python
import pika
import time

connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)
print ' [*] Waiting for messages. To exit press CTRL+C'

def callback(ch, method, properties, body):
  print " [x] Received %r" % (body,)
  time.sleep( body.count('.') )
  print " [x] Done"
  ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,
           queue='task_queue')

channel.start_consuming()
(0)

相关推荐

  • docker搭建rabbitmq集群环境的方法

    本文主要讲述如何用docker搭建rabbitmq的集群.分享给大家,希望此文章对各位有所帮助. 下载镜像 采用bijukunjummen该镜像. git clone https://github.com/bijukunjummen/docker-rabbitmq-cluster.git 运行 启动集群 cd docker-rabbitmq-cluster/cluster docker-compose up -d ...... Status: Downloaded newer image for

  • CentOS下RabbitMq高可用集群环境搭建教程

    CentOS下RabbitMq高可用集群环境搭建教程分享给大家. 准备工作 1.准备两台或多台安装有rabbitmq-server服务的服务器 我这里准备了两台,分别如下: 192.168.40.130 rabbitmq01 192.168.40.131 rabbitmq02 2.确保防火墙是关闭的3,官网参考资料 http://www.rabbitmq.com/clustering.html hosts映射 修改每台服务上的hosts文件(路径:/etc/hosts),设置成如下: 192.1

  • Python+Pika+RabbitMQ环境部署及实现工作队列的实例教程

    rabbitmq中文翻译的话,主要还是mq字母上:Message Queue,即消息队列的意思.前面还有个rabbit单词,就是兔子的意思,和python语言叫python一样,老外还是蛮幽默的.rabbitmq服务类似于mysql.apache服务,只是提供的功能不一样.rabbimq是用来提供发送消息的服务,可以用在不同的应用程序之间进行通信. 安装rabbitmq 先来安装下rabbitmq,在ubuntu 12.04下可以直接通过apt-get安装: sudo apt-get insta

  • Python远程开发环境部署与调试过程图解

    这篇文章主要介绍了Python远程开发环境部署与调试过程图解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 一.下载相应开发工具 Pycharm :下载地址 二.部署开发机 一般在工作过程中,开发环境并不是本地环境,而是指在开发机:因为,有很多依赖本地部署非常麻烦,而开发机中则内置了很多相关的服务 三.代码自动化部署 由于我们在本地进行代码编辑.在开发机中进行代码的运行及调试,因此,需要一种很方便的方式进行代码的远程自动化部署Pycharm 基

  • Python和Pycharm 环境部署详细步骤

    一.python下载安装 下载安装python最新版本 https://www.python.org/downloads/windows/ 这里勾选添加到环境变量 cmd中运行一下看是否安装成功 二.pycharm安装 下载安装社区免费版本 下载:https://www.jetbrains.com/pycharm/ 安装 更改安装路径 看自己需求勾选,相关解释如下 (1)创建快捷方式:根据你当前系统是32位还是64位进行选择: (2)将 pycharm 的启动目录添加到环境变量(需要重启),如果

  • Python自动化运维和部署项目工具Fabric使用实例

    Fabric 是使用 Python 开发的一个自动化运维和部署项目的一个好工具,可以通过 SSH 的方式与远程服务器进行自动化交互,例如将本地文件传到服务器,在服务器上执行shell 命令. 下面给出一个自动化部署 Django 项目的例子 # -*- coding: utf-8 -*- # 文件名要保存为 fabfile.py from __future__ import unicode_literals from fabric.api import * # 登录用户和主机名: env.use

  • 使用Python的urllib和urllib2模块制作爬虫的实例教程

    urllib 学习python完基础,有些迷茫.眼睛一闭,一种空白的窒息源源不断而来.还是缺少练习,遂拿爬虫来练练手.学习完斯巴达python爬虫课程后,将心得整理如下,供后续翻看.整篇笔记主要分以下几个部分: 1.做一个简单的爬虫程序 2.小试牛刀--抓取百度贴吧图片 3.总结 1.做一个简单的爬虫程序 首先环境描述 Device: Mba 2012 Yosemite 10.10.1 Python: python 2.7.9 编辑器: Sublime Text 3 这个没有什么好说的,直接上代

  • Django+python服务器部署与环境部署教程详解

    需要准备环境:python3.6.vultr(或者其他服务器).xshell 第一步:python安装必备环境Django库 Xshell链接远程主机: 点击连接之后:弹窗输入访问用户及密码,一般为root用户 成功连接到目标服务器: [root@vultr ~]# 安装python以及需要环境(此为安装完python3.6环境),运行pip安装即可: pip install django 提示Success安装成功 第二步:项目创建 首先cd到自己想要新建项目的路径: 我这里选择的是在data

  • 用docker部署RabbitMQ环境的详细介绍

    前置条件: 已经安装好docker 1.查找镜像(有2种方式) ①登录rabbitmq官网找到docker镜像,选择想要的镜像的tag https://www.rabbitmq.com/download.html https://hub.docker.com/_/rabbitmq 如果需要访问web管理页面,就选择tag为management的 ps:带有alpine的是用最小linux镜像构建的,体积最小可以达5M初学者不建议这么折腾,而且 Alpine Linux使用了muslmusl实现的

  • 详解Python操作RabbitMQ服务器消息队列的远程结果返回

    先说一下笔者这里的测试环境:Ubuntu14.04 + Python 2.7.4 RabbitMQ服务器 sudo apt-get install rabbitmq-server Python使用RabbitMQ需要Pika库 sudo pip install pika 远程结果返回 消息发送端发送消息出去后没有结果返回.如果只是单纯发送消息,当然没有问题了,但是在实际中,常常会需要接收端将收到的消息进行处理之后,返回给发送端. 处理方法描述:发送端在发送信息前,产生一个接收消息的临时队列,该队

  • Python通过RabbitMQ服务器实现交换机功能的实例教程

    快速回顾一下RabbitMQ服务器的安装: sudo apt-get install rabbitmq-server Python使用RabbitMQ需要Pika库: sudo pip install pika 好了,接下来我们先看交换机的工作原理:消息发送端先将消息发送给交换机,交换机再将消息发送到绑定的消息队列,而后每个接收端都能从各自的消息队列里接收到信息. 下面用send.py和receive.py来模拟实现交换机的功能.send.py表示发送端,receive.py表示接收端. rec

  • 利用Python学习RabbitMQ消息队列

    RabbitMQ可以当做一个消息代理,它的核心原理非常简单:即接收和发送消息,可以把它想象成一个邮局:我们把信件放入邮箱,邮递员就会把信件投递到你的收件人处,RabbitMQ就是一个邮箱.邮局.投递员功能综合体,整个过程就是:邮箱接收信件,邮局转发信件,投递员投递信件到达收件人处. RabbitMQ和邮局的主要区别就是RabbitMQ接收.存储和发送的是二进制数据----消息. rabbitmq基本管理命令: 一步启动Erlang node和Rabbit应用:sudo rabbitmq-serv

随机推荐