解决python3 pika之连接断开的问题

问题描述

在消费rabbitMQ队列时, 每次进入回调函数内需要进行一些比较耗时的操作;操作完成后给rabbitMQ server发送ack信号以dequeue本条消息。

问题就发生在发送ack操作时, 程序提示链接已被断开或socket error。

源码示例

#!/usr/bin
#coding: utf-8

import pika
import time

USER = 'guest'
PWD = 'guest'
TEST_QUEUE = 'just4test'

def callback(ch, method, properties, body):
 print(body)
 time.sleep(600)
 ch.basic_publish('', routing_key=TEST_QUEUE, body="fortest")
 ch.basic_ack(delivery_tag = method.delivery_tag)

def test_main():
 s_conn = pika.BlockingConnection(
  pika.ConnectionParameters('127.0.0.1',
   credentials=pika.PlainCredentials(USER, PWD)))
 chan = s_conn.channel()
 chan.queue_declare(queue=TEST_QUEUE)

 chan.basic_publish('', routing_key=TEST_QUEUE, body="fortest")
 chan.basic_consume(callback, queue=TEST_QUEUE)
 chan.start_consuming()

if __name__ == "__main__":
 test_main()

运行一段时间后, 就会报错:

[ERROR][pika.adapters.base_connection][2017-08-18 12:33:49]Error event 25, None
[CRITICAL][pika.adapters.base_connection][2017-08-18 12:33:49]Tried to handle an error where no error existed
[ERROR][pika.adapters.base_connection][2017-08-18 12:33:49]Fatal Socket Error: BrokenPipeError(32, 'Broken pipe')

问题排查

猜测:pika客户端没有及时发送心跳,连接被server断开

一开始修改了heartbeat_interval参数值, 示例如下:

def test_main():
 s_conn = pika.BlockingConnection(
  pika.ConnectionParameters('127.0.0.1',
   heartbeat_interval=10,
   socket_timeout=5,
   credentials=pika.PlainCredentials(USER, PWD)))
 # ....

修改后运行依然报错,后来想想应该单线程被一直占用,pika无法发送心跳;

于是又加了个心跳线程, 示例如下:

#!/usr/bin
#coding: utf-8

import pika
import time
import logging
import threading

USER = 'guest'
PWD = 'guest'
TEST_QUEUE = 'just4test'

class Heartbeat(threading.Thread):
 def __init__(self, connection):
  super(Heartbeat, self).__init__()
  self.lock = threading.Lock()
  self.connection = connection
  self.quitflag = False
  self.stopflag = True
  self.setDaemon(True)

 def run(self):
  while not self.quitflag:
   time.sleep(10)
   self.lock.acquire()
   if self.stopflag :
    self.lock.release()
    continue
   try:
    self.connection.process_data_events()
   except Exception as ex:
    logging.warn("Error format: %s"%(str(ex)))
    self.lock.release()
    return
   self.lock.release()

 def startHeartbeat(self):
  self.lock.acquire()
  if self.quitflag==True:
   self.lock.release()
   return
  self.stopflag=False
  self.lock.release()

def callback(ch, method, properties, body):
 logging.info("recv_body:%s" % body)
 time.sleep(600)
 ch.basic_ack(delivery_tag = method.delivery_tag)

def test_main():
 s_conn = pika.BlockingConnection(
  pika.ConnectionParameters('127.0.0.1',
   heartbeat_interval=10,
   socket_timeout=5,
   credentials=pika.PlainCredentials(USER, PWD)))
 chan = s_conn.channel()
 chan.queue_declare(queue=TEST_QUEUE)
 chan.basic_consume(callback,
      queue=TEST_QUEUE)

 heartbeat = Heartbeat(s_conn)
 heartbeat.start()   #开启心跳线程
 heartbeat.startHeartbeat()
 chan.start_consuming()

if __name__ == "__main__":
 test_main()

尝试运行,结果还是不行,不得不安静下来思考自己是不是想错了。

去看它的api,看到heartbeat_interval的解析:

:param int heartbeat_interval: How often to send heartbeats.
         Min between this value and server's proposal
         will be used. Use 0 to deactivate heartbeats
         and None to accept server's proposal.

按这样说法,应该还是没有把心跳值给设置好。上面的程序期望是10秒发一次心跳,但是理论上发送心跳的间隔会比10秒多一点。所以艾玛,我应该是把heartbeat_interval的作用搞错了, 它是指超过这个时间间隔不发心跳或不给server任何信息,server就会断开连接, 而不是说pika会按这个间隔来发心跳。 结果我把heartbeat_interval值设置高一点(比实际发送心跳/信息的间隔更长),比如上面设置成60秒,就正常运行了。

如果不指定heartbeat_interval, 它默认为None, 意味着按rabbitMQ server的配置来检测心跳是否正常。

如果设置heartbeat_interval=0, 意味着不检测心跳,server端将不会主动断开连接。

以上这篇解决python3 pika之连接断开的问题就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持我们。

(0)

相关推荐

  • Python获取当前公网ip并自动断开宽带连接实例代码

    今天写了一个获取当前公网ip并且自动断开宽带连接的文件,和大家分享下. 这个文件的具体用途大家懂的,可以尽管拿去用,不过目前只适用于Windows平台,我的Python版本是2.7的,win32ras模块需要下载pywin32. 代码如下: #!coding: cp936 import win32ras import time,os def Connect(dialname, account, passwd): dial_params = (dialname, '', '', account,

  • python中pika模块问题的深入探究

    前言 工作中经常用到rabbitmq,而用的语言主要是python,所以也就经常会用到python中的pika模块,但是这个模块的使用,也给我带了很多问题,这里整理一下关于这个模块我在使用过程的改变历程已经中间碰到一些问题的解决方法 关于MQ: MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法.应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接它们.消息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来

  • 解决python3 pika之连接断开的问题

    问题描述 在消费rabbitMQ队列时, 每次进入回调函数内需要进行一些比较耗时的操作;操作完成后给rabbitMQ server发送ack信号以dequeue本条消息. 问题就发生在发送ack操作时, 程序提示链接已被断开或socket error. 源码示例 #!/usr/bin #coding: utf-8 import pika import time USER = 'guest' PWD = 'guest' TEST_QUEUE = 'just4test' def callback(c

  • python3 自动识别usb连接状态,即对usb重连的判断方法

    在做自动化测试时,遇到两种情况需要判断usb是否已连接上(注,本文仅针对用adb命令来control手机) 一种是在开测时(前提是同时要测试多台), 希望等待所有设备usb全部识别后同时进行测试.对于这一种,当然是可以另开一个窗口不断的输入adb devices来检测,但不够AI. 一种是在测试过程中有时usb会自动断开,这时如果不判断usb是否有重新连接的话,则测试还会往下执行,测试指定是失败的. 第一种情况:其思路还是要利用adb devices来不断的获取当前已连接的设备id. 1.先获取

  • 解决python3 安装完Pycurl在import pycurl时报错的问题

    此次遇到的问题是在import pycurl 时报错 pycurl:libcurl link-time version is older than compile-time version 在网上看了很多解释和方法,但都没有很好的解决和分析这个问题,我先说下自己的过程 1.安装的事centos7 ,默认安装的是python2.7,python3是后使用src安装的,同样先下载了curl-7.61的包和pycurl-7.43的包,应该都是最新的了 2.先make && make instal

  • 解决python3 Pycharm上连接数据库时报错的问题

    最近在学习python. 今天在学习python连接Mysql数据库时报错: AttributeError: 'NoneType' object has no attribute 'encoding 使用pyCharm+python3+pyMysql+mysql5.56 数据库连接: connect = pymysql.Connect(host='localhost',port=3333,user='root',passwd='root',db='circle',charset='utf-8')

  • 解决Python3.7.0 SSL低版本导致Pip无法使用问题

    终于下决心把python从2.7升到了3.7.懒人安装当然使用Anaconda. 安装成功,编译成功.但是用pip 安装包的时候提示: pip is configured with locations that require TLS/SSL, however the ssl module in Python is not available. 原因是python3.7为了安全性考虑,要求使用openssl 1.0.2之后的版本.但是自带的openssl,版本是1.0.1. 解决办法: 重装一下

  • 解决python3在anaconda下安装caffe失败的问题

    Python 跟 Python3 完全就是两种语言 1. import caffe FAILED 环境为 Ubuntu 16 cuda 8.0 NVIDIA 361.77 Anaconda2.昨天莫名其妙Caffe不能用了: >>> import caffe Traceback (most recent call last): File "<stdin>", line 1, in <module> File "/home/duchen

  • 解决python3中自定义wsgi函数,make_server函数报错的问题

    #coding:utf-8 from wsgiref.simple_server import make_server def RunServer(environ, start_response): start_response('200 OK', [('Content-Type', 'text/html')]) return '<h1>Hello, web!</h1>' if __name__ == '__main__': httpd = make_server('localho

  • 解决python3爬虫无法显示中文的问题

    有时候使用python从网站上爬数据的时候,如果数据里包含中文,有时候显示的却是如下所示...\xe4\xba\xba\xef\xbc\x8c\xe6...类似与国际化 解决方法: import urllib.request import sys weburl="..." webhead=... req=urllib.request.Request(url=weburl,headers=webhead) response=urllib.request.urlopen(req) cont

  • Python3之读取连接过的网络并定位的方法

    如下所示: #!/usr/bin/python # coding=utf-8 import json from urllib.request import urlopen from winreg import * def val2addr(val): addr = "" for ch in val: addr += ("%02x " % ord(ch)) addr = addr.strip(" ").replace(" ",

  • 解决python3中解压zip文件是文件名乱码的问题

    在zip标准中,对文件名的 encoding 用的不是 unicode,而可能是各种软件根据系统的默认字符集来采用(此为猜测),因此zipfile中根据文件 flag 检测的时候,只支持 cp437 和 utf-8. 具体就是查找 zipfile.py 源代码找到下面的代码: 1: if flags & 0x800: 2: # UTF-8 file names extension 3: filename = filename.decode('utf-8') 4: else: 5: # Histo

随机推荐