Python的消息队列包SnakeMQ使用初探

一、关于snakemq的官方介绍
SnakeMQ的GitHub项目页:https://github.com/dsiroky/snakemq
1.纯python实现,跨平台

2.自动重连接

3.可靠发送--可配置的消息方式与消息超时方式

4.持久化/临时 两种队列

5.支持异步 -- poll()

6.symmetrical -- 单个TCP连接可用于双工通讯

7.多数据库支持 -- SQLite、MongoDB……

8.brokerless - 类似ZeroMQ的实现原理

9.扩展模块:RPC, bandwidth throttling

以上都是官话,需要自己验证,动手封装了一下,感觉萌萌哒。

二、几个主要问题说明

1.支持自动重连,不需要自己动手写心跳逻辑,你只需要关注发送和接收就行

2.支持数据持久化,如果开始持久化,在重连之后会自动发送数据。

3.数据的接收,snakemq通过提供回调实现,你只需要写个接收方法添加到回调列表里去。

4.数据的发送,在此发送的都是bytes类型(二进制),因此需要转换。我在程序中测试的都是文本字符串,使用str.encode(‘utf-8')转换成bytes,接收时再转换回来。

5.术语解释,Connector:类似于socket的TcpClient,Lisenter:类似于socket的TcpServer,每个connector或者listener都一个一个ident标识,发送和接收数据时就知道是谁的数据了。

6.使用sqlite持久化时,需要修改源码,sqlite3.connect(filename,check_same_thread = False),用于解决多线程访问sqlite的问题。(会不会死锁?)

7.启动持久化时,如果重新连上,则会自动发送,保证可靠。

8.为了封装的需要,数据接收以后,我通过callback方式传送出去。

三、代码

说明代码中使用了自定义的日志模块

from common import nxlogger

import snakemqlogger as logger

可替换成logging的。

回调类(callbacks.py):

# -*- coding:utf-8 -*-

'''synchronized callback'''

class Callback(object):

  def __init__(self):

    self.callbacks = []

  def add(self, func):

    self.callbacks.append(func)

  def remove(self, func):

    self.callbacks.remove(func)

  def __call__(self, *args, **kwargs):

    for callback in self.callbacks:

      callback(*args, **kwargs)

Connector类(snakemqConnector.py):

# -*- coding:utf-8 -*-

import threading

import snakemq

import snakemq.link

import snakemq.packeter

import snakemq.messaging

import snakemq.message

from snakemq.storage.sqlite import SqliteQueuesStorage

from snakemq.message import FLAG_PERSISTENT

from common.callbacks import Callback

from common import nxlogger

import snakemqlogger as logger

class SnakemqConnector(threading.Thread):

     def __init__(self, snakemqident = None, remoteIp = "localhost", remotePort = 9090, persistent = False):

         super(SnakemqConnector,self).__init__()

         self.messaging = None

         self.link = None

         self.snakemqident = snakemqident

         self.pktr = None

         self.remoteIp = remoteIp

         self.remotePort = remotePort

         self.persistent = persistent

         self.on_recv = Callback()

         self._initConnector()

     def run(self):

         logger.info("connector start...")

         if self.link != None:

              self.link.loop()

         logger.info("connector end...")

     def terminate(self):

         logger.info("connetor terminating...")

         if self.link != None:

              self.link.stop()

              self.link.cleanup()

         logger.info("connetor terminated")

     def on_recv_message(self, conn, ident, message):

         try:

              self.on_recv(ident, message.data.decode('utf-8'))#dispatch received data

         except Exception as e:

              logger.error("connector recv:{0}".format(e))

              print(e)

     '''send message to dest host named destIdent'''

     def sendMsg(self, destIdent, byteseq):

         msg = None

         if self.persistent:

              msg = snakemq.message.Message(byteseq, ttl=60, flags=FLAG_PERSISTENT)

         else:

              msg = snakemq.message.Message(byteseq, ttl=60)

         if self.messaging == None:

              logger.error("connector:messaging is not initialized, send message failed")

              return

         self.messaging.send_message(destIdent, msg)

     '''

     '''

     def _initConnector(self):

         try:

              self.link = snakemq.link.Link()

              self.link.add_connector((self.remoteIp, self.remotePort))

              self.pktr = snakemq.packeter.Packeter(self.link)

              if self.persistent:

                  storage = SqliteQueuesStorage("SnakemqStorage.db")

                  self.messaging = snakemq.messaging.Messaging(self.snakemqident, "", self.pktr, storage)

              else:

                  self.messaging = snakemq.messaging.Messaging(self.snakemqident, "", self.pktr)

              self.messaging.on_message_recv.add(self.on_recv_message)

         except Exception as e:

              logger.error("connector:{0}".format(e))

         finally:

              logger.info("connector[{0}] loop ended...".format(self.snakemqident))

Listener类(snakemqListener.py):

# -*- coding:utf-8 -*-

import threading

import snakemq

import snakemq.link

import snakemq.packeter

import snakemq.messaging

import snakemq.message

from common import nxlogger

import snakemqlogger as logger

from common.callbacks import Callback

class SnakemqListener(threading.Thread):

     def __init__(self, snakemqident = None, ip = "localhost", port = 9090, persistent = False):

         super(SnakemqListener,self).__init__()

         self.messaging = None

         self.link = None

         self.pktr = None

         self.snakemqident = snakemqident

         self.ip = ip;

         self.port = port

         self.connectors = {}

         self.on_recv = Callback()

         self.persistent = persistent

         self._initlistener()

     '''

     thread run

     '''

     def run(self):

         logger.info("listener start...")

         if self.link != None:

              self.link.loop()

         logger.info("listener end...")

     '''

     terminate snakemq listener thread

     '''

     def terminate(self):

         logger.info("listener terminating...")

         if self.link != None:

              self.link.stop()

              self.link.cleanup()

         logger.info("listener terminated")

     '''

     receive message from host named ident

     '''

     def on_recv_message(self, conn, ident, message):

         try:

              self.on_recv(ident, message.data.decode('utf-8'))#dispatch received data

              self.sendMsg('bob','hello,{0}'.format(ident).encode('utf-8'))

         except Exception as e:

              logger.error("listener recv:{0}".format(e))

              print(e)

     def on_drop_message(self, ident, message):

         print("message dropped", ident, message)

         logger.debug("listener:message dropped,ident:{0},message:{1}".format(ident, message))

     '''client connect'''

     def on_connect(self, ident):

         logger.debug("listener:{0} connected".format(ident))

         self.connectors[ident] = ident

         self.sendMsg(ident, "hello".encode('utf-8'))

     '''client disconnect'''

     def on_disconnect(self, ident):

         logger.debug("listener:{0} disconnected".format(ident))

         if ident in self.connectors:

              self.connectors.pop(ident)

     '''

     listen start loop

     '''

     def _initlistener(self):

         try:

              self.link = snakemq.link.Link()

              self.link.add_listener((self.ip, self.port))

              self.pktr = snakemq.packeter.Packeter(self.link)

              self.pktr.on_connect.add(self.on_connect)

              self.pktr.on_disconnect.add(self.on_disconnect)

              if self.persistent:

                  storage = SqliteQueuesStorage("SnakemqStorage.db")

                  self.messaging = snakemq.messaging.Messaging(self.snakemqident, "", self.pktr, storage)

              else:

                  self.messaging = snakemq.messaging.Messaging(self.snakemqident, "", self.pktr)

              self.messaging.on_message_recv.add(self.on_recv_message)

              self.messaging.on_message_drop.add(self.on_drop_message)

         except Exception as e:

              logger.error("listener:{0}".format(e))

         finally:

              logger.info("listener:loop ended...")

     '''send message to dest host named destIdent'''

     def sendMsg(self, destIdent, byteseq):

         msg = None

         if self.persistent:

              msg = snakemq.message.Message(byteseq, ttl=60, flags=FLAG_PERSISTENT)

         else:

              msg = snakemq.message.Message(byteseq, ttl=60)

         if self.messaging == None:

              logger.error("listener:messaging is not initialized, send message failed")

              return

         self.messaging.send_message(destIdent, msg)

测试代码connector(testSnakeConnector.py):

读取本地一个1M的文件,然后发送给listener,然后listener发回一个hello的信息。

from netComm.snakemq import snakemqConnector

import time

import sys

import os

def received(ident, data):

     print(data)

if __name__ == "__main__":

     bob = snakemqConnector.SnakemqConnector('bob',"10.16.5.45",4002,True)

     bob.on_recv.add(received)

     bob.start()

     try:

         with open("testfile.txt",encoding='utf-8') as f:

              txt = f.read()

              for i in range(100):

                  bob.sendMsg("niess",txt.encode('utf-8'))

                  time.sleep(0.1)

     except Exception as e:

         print(e)

     time.sleep(5)

     bob.terminate()   

测试代码listener(testSnakeListener.py):

from netComm.snakemq import snakemqListener

import time

def received(ident, data):

     filename = "log/recFile{0}.txt".format(time.strftime('%S',time.localtime()))

     file = open(filename,'w')

     file.writelines(data)

     file.close()

if __name__ == "__main__":

     niess = snakemqListener.SnakemqListener("niess","10.16.5.45",4002)

     niess.on_recv.add(received)

     niess.start()

     print("niess start...")

     time.sleep(60)

     niess.terminate()  

     print("niess end...")
(0)

相关推荐

  • Python操作RabbitMQ服务器实现消息队列的路由功能

    Python使用Pika库(安装:sudo pip install pika)可以操作RabbitMQ消息队列服务器(安装:sudo apt-get install rabbitmq-server),这里我们来看一下MQ相关的路由功能. 路由键的实现 比如有一个需要给所有接收端发送消息的场景,但是如果需要自由定制,有的消息发给其中一些接收端,有些消息发送给另外一些接收端,要怎么办呢?这种情况下就要用到路由键了. 路由键的工作原理:每个接收端的消息队列在绑定交换机的时候,可以设定相应的路由键.发送

  • Python的Flask框架应用调用Redis队列数据的方法

    任务异步化 打开浏览器,输入地址,按下回车,打开了页面.于是一个HTTP请求(request)就由客户端发送到服务器,服务器处理请求,返回响应(response)内容. 我们每天都在浏览网页,发送大大小小的请求给服务器.有时候,服务器接到了请求,会发现他也需要给另外的服务器发送请求,或者服务器也需要做另外一些事情,于是最初们发送的请求就被阻塞了,也就是要等待服务器完成其他的事情. 更多的时候,服务器做的额外事情,并不需要客户端等待,这时候就可以把这些额外的事情异步去做.从事异步任务的工具有很多.

  • 详解Python操作RabbitMQ服务器消息队列的远程结果返回

    先说一下笔者这里的测试环境:Ubuntu14.04 + Python 2.7.4 RabbitMQ服务器 sudo apt-get install rabbitmq-server Python使用RabbitMQ需要Pika库 sudo pip install pika 远程结果返回 消息发送端发送消息出去后没有结果返回.如果只是单纯发送消息,当然没有问题了,但是在实际中,常常会需要接收端将收到的消息进行处理之后,返回给发送端. 处理方法描述:发送端在发送信息前,产生一个接收消息的临时队列,该队

  • Python实现优先级队列结构的方法详解

    最简单的实现 一个队列至少满足2个方法,put和get. 借助最小堆来实现. 这里按"值越大优先级越高"的顺序. #coding=utf-8 from heapq import heappush, heappop class PriorityQueue: def __init__(self): self._queue = [] def put(self, item, priority): heappush(self._queue, (-priority, item)) def get(

  • Python+Pika+RabbitMQ环境部署及实现工作队列的实例教程

    rabbitmq中文翻译的话,主要还是mq字母上:Message Queue,即消息队列的意思.前面还有个rabbit单词,就是兔子的意思,和python语言叫python一样,老外还是蛮幽默的.rabbitmq服务类似于mysql.apache服务,只是提供的功能不一样.rabbimq是用来提供发送消息的服务,可以用在不同的应用程序之间进行通信. 安装rabbitmq 先来安装下rabbitmq,在ubuntu 12.04下可以直接通过apt-get安装: sudo apt-get insta

  • Python 数据结构之队列的实现

    Python 队列 Queue 队列是一种先进先出(FIFO)的数据类型, 新的元素通过 入队 的方式添加进 Queue 的末尾, 出队 就是从 Queue 的头部删除元素. 用列表来做 Queue: queue = [] # 初始化一个列表数据类型对象, 作为一个队列 def enQ(): # 定义一个入栈方法 queue.append(raw_input('Enter New String: ').strip()) # 提示输入一个入队的 String 对象, 调用 Str.strip()

  • 详解Python的collections模块中的deque双端队列结构

    deque 是 double-ended queue的缩写,类似于 list,不过提供了在两端插入和删除的操作. appendleft 在列表左侧插入 popleft 弹出列表左侧的值 extendleft 在左侧扩展 例如: queue = deque() # append values to wait for processing queue.appendleft("first") queue.appendleft("second") queue.appendl

  • Python中线程的MQ消息队列实现以及消息队列的优点解析

    "消息队列"是在消息的传输过程中保存消息的容器.消息队列管理器在将消息从它的源中继到它的目标时充当中间人.队列的主要目的是提供路由并保证消息的传递:如果发送消息时接收者不可用,消息队列会保留消息,直到可以成功地传递它.相信对任何架构或应用来说,消息队列都是一个至关重要的组件,下面是十个理由: Python的消息队列示例: 1.threading+Queue实现线程队列 #!/usr/bin/env python import Queue import threading import

  • python 队列详解及实例代码

    队列特性:先进先出(FIFO)--先进队列的元素先出队列.来源于我们生活中的队列(先排队的先办完事). Queue模块最常与threading模块一起构成生产-消费者模型,提供了一个适用于多线程编程的先进先出的数据结构,即队列. 该模块源码中包含5个类: 其中,Empty和Full是两个异常类,当队列的Queue.get(block=0)或者调用get_nowait()时,如果队列为空,则抛EmptyException异常. 同理,当队列的Queue.put(block=0)或者调用put_no

  • Python的消息队列包SnakeMQ使用初探

    一.关于snakemq的官方介绍 SnakeMQ的GitHub项目页:https://github.com/dsiroky/snakemq 1.纯python实现,跨平台 2.自动重连接 3.可靠发送--可配置的消息方式与消息超时方式 4.持久化/临时 两种队列 5.支持异步 -- poll() 6.symmetrical -- 单个TCP连接可用于双工通讯 7.多数据库支持 -- SQLite.MongoDB-- 8.brokerless - 类似ZeroMQ的实现原理 9.扩展模块:RPC,

  • 利用Python操作消息队列RabbitMQ的方法教程

    前言 RabbitMQ是一个在AMQP基础上完整的,可复用的企业消息系统.他遵循Mozilla Public License开源协议. MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法.应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接它们.消 息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术.排队指的是应用程序通过 队列来通信.队列的使用除去了接收和发

  • Python RabbitMQ消息队列实现rpc

    上个项目中用到了ActiveMQ,只是简单应用,安装完成后直接是用就可以了.由于新项目中一些硬件的限制,需要把消息队列换成RabbitMQ. RabbitMQ中的几种模式和机制比ActiveMQ多多了,根据业务需要,使用RPC实现功能,其中踩过的一些坑,有必要记录一下了. 上代码,目录结构分为 c_server.c_client.c_hanlder: c_server: #!/usr/bin/env python # -*- coding:utf-8 -*- import pika import

  • 利用Python学习RabbitMQ消息队列

    RabbitMQ可以当做一个消息代理,它的核心原理非常简单:即接收和发送消息,可以把它想象成一个邮局:我们把信件放入邮箱,邮递员就会把信件投递到你的收件人处,RabbitMQ就是一个邮箱.邮局.投递员功能综合体,整个过程就是:邮箱接收信件,邮局转发信件,投递员投递信件到达收件人处. RabbitMQ和邮局的主要区别就是RabbitMQ接收.存储和发送的是二进制数据----消息. rabbitmq基本管理命令: 一步启动Erlang node和Rabbit应用:sudo rabbitmq-serv

  • Python进程间通信Queue消息队列用法分析

    本文实例讲述了Python进程间通信Queue消息队列用法.分享给大家供大家参考,具体如下: 进程间通信-Queue Process之间有时需要通信,操作系统提供了很多机制来实现进程间的通信. 1. Queue的使用 可以使用multiprocessing模块的Queue实现多进程之间的数据传递,Queue本身是一个消息列队程序,首先用一个小实例来演示下Queue的工作原理: 代码如下: #coding=utf-8 from multiprocessing import Queue #初始化一个

  • python实现RabbitMQ的消息队列的示例代码

    最近在研究redis做消息队列时,顺便看了一下RabbitMQ做消息队列的实现.以下是总结的RabbitMQ中三种exchange模式的实现,分别是fanout, direct和topic. base.py: import pika # 获取认证对象,参数是用户名.密码.远程连接时需要认证 credentials = pika.PlainCredentials("admin", "admin") # BlockingConnection(): 实例化连接对象 # C

  • Python高级编程之消息队列(Queue)与进程池(Pool)实例详解

    本文实例讲述了Python高级编程之消息队列(Queue)与进程池(Pool).分享给大家供大家参考,具体如下: Queue消息队列 1.创建 import multiprocessing queue = multiprocessing.Queue(队列长度) 2.方法 方法 描述 put 变量名.put(数据),放入数据(如队列已满,则程序进入阻塞状态,等待队列取出后再放入) put_nowait 变量名.put_nowati(数据),放入数据(如队列已满,则不等待队列信息取出后再放入,直接报

  • python分布式爬虫中消息队列知识点详解

    当排队等待人数过多的时候,我们需要设置一个等待区防止秩序混乱,同时再有新来的想要排队也可以呆在这个地方.那么在python分布式爬虫中,消息队列就相当于这样的一个区域,爬虫要进入这个区域找寻自己想要的资源,当然这个是一定的次序的,不然数据获取就会出现重复.就下来我们就python分布式爬虫中的消息队列进行详细解释,小伙伴们可以进一步了解一下. 实现分布式爬取的关键是消息队列,这个问题以消费端为视角更容易理解.你的爬虫程序部署到很多台机器上,那么他们怎么知道自己要爬什么呢?总要有一个地方存储了他们

随机推荐