python 多进程队列数据处理详解

我就废话不多说了,直接上代码吧!

# -*- coding:utf8 -*-
import paho.mqtt.client as mqtt
from multiprocessing import Process, Queue
import time, random, os
import camera_person_num

MQTTHOST = "172.19.4.4"
MQTTPORT = 1883
mqttClient = mqtt.Client()
q = Queue() 

# 连接MQTT服务器
def on_mqtt_connect():
  mqttClient.connect(MQTTHOST, MQTTPORT, 60)
  mqttClient.loop_start()

# 消息处理函数
def on_message_come(lient, userdata, msg):
  # print(msg.topic + ":" + str(msg.payload.decode("utf-8")))

  q.put(msg.payload.decode("utf-8")) # 放入队列
  print("产生消息", msg.payload.decode("utf-8"))
  # 消息处理开启多进程
  # p = Process(target=talk, args=("/camera/person/num/result", msg.payload.decode("utf-8")))
  # p.start()

def consumer(q, pid):
  print("开启消费序列进程", pid)
  while True:
    msg = q.get()
    # p = Process(target=talk, args=("/camera/person/num/result", msg, pid))
    # p.start()
    talk("/camera/person/num/result", msg, pid) 

# subscribe 消息订阅
def on_subscribe():
  mqttClient.subscribe("test123", 1) # 主题为"test"
  mqttClient.on_message = on_message_come # 消息到来处理函数

# publish 消息发布
def on_publish(topic, msg, qos):
  mqttClient.publish(topic, msg, qos);

# 多进程中发布消息需要重新初始化mqttClient
def talk(topic, msg, pid):
  cameraPsersonNum = camera_person_num.CameraPsersonNum(msg)
  t_max, t_mean, t_min = cameraPsersonNum.personNum()
  # time.sleep(20)
  print("消费消息", pid, msg)
  mqttClient2 = mqtt.Client()
  mqttClient2.connect(MQTTHOST, MQTTPORT, 60)
  mqttClient2.loop_start()
  mqttClient2.publish(topic, '{"max":' + str(t_max) + ',"mean":' + str(t_mean) + ',"min:"' + t_min + '}', 1)
  mqttClient2.disconnect()

def main():

  on_mqtt_connect()
  on_subscribe()
  for i in range(1, 3):
    c1 = Process(target=consumer, args=(q, i))
    c1.start()
  while True:
    pass

if __name__ == '__main__':
  main()

以上这篇python 多进程队列数据处理详解就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持我们。

(0)

相关推荐

  • Python3多进程 multiprocessing 模块实例详解

    本文实例讲述了Python3多进程 multiprocessing 模块.分享给大家供大家参考,具体如下: 多进程 Multiprocessing 模块 multiprocessing 模块官方说明文档 Process 类 Process 类用来描述一个进程对象.创建子进程的时候,只需要传入一个执行函数和函数的参数即可完成 Process 示例的创建. star() 方法启动进程, join() 方法实现进程间的同步,等待所有进程退出. close() 用来阻止多余的进程涌入进程池 Pool 造

  • Python中使用多进程来实现并行处理的方法小结

    进程和线程是计算机软件领域里很重要的概念,进程和线程有区别,也有着密切的联系,先来辨析一下这两个概念: 1.定义 进程是具有一定独立功能的程序关于某个数据集合上的一次运行活动,进程是系统进行资源分配和调度的一个独立单位. 线程是进程的一个实体,是CPU调度和分派的基本单位,它是比进程更小的能独立运行的基本单位.线程自己基本上不拥有系统资源,只拥有一点在运行中必不可少的资源(如程序计数器,一组寄存器和栈),但是它可与同属一个进程的其他的线程共享进程所拥有的全部资源. 2.关系 一个线程可以创建和撤

  • Python并发之多进程的方法实例代码

    一,进程的理论基础 一个应用程序,归根结底是一堆代码,是静态的,而进程才是执行中的程序,在一个程序运行的时候会有多个进程并发执行. 进程和线程的区别: 进程是系统资源分配的基本单位. 一个进程内可以包含多个线程,属于一对多的关系,进程内的资源,被其内的线程共享 线程是进程运行的最小单位,如果说进程是完成一个功能,那么其线程就是完成这个功能的基本单位 进程间资源不共享,多进程切换资源开销,难度大,同一进程内的线程资源共享,多线程切换资源开销,难度小 进程与线程的共同点: 都是为了提高程序运行效率,

  • Python实现多进程的四种方式

    方式一: os.fork() # -*- coding:utf-8 -*- """ pid=os.fork() 1.只用在Unix系统中有效,Windows系统中无效 2.fork函数调用一次,返回两次:在父进程中返回值为子进程id,在子进程中返回值为0 """ import os pid=os.fork() if pid==0: print("执行子进程,子进程pid={pid},父进程ppid={ppid}".format

  • python 多进程队列数据处理详解

    我就废话不多说了,直接上代码吧! # -*- coding:utf8 -*- import paho.mqtt.client as mqtt from multiprocessing import Process, Queue import time, random, os import camera_person_num MQTTHOST = "172.19.4.4" MQTTPORT = 1883 mqttClient = mqtt.Client() q = Queue() # 连

  • Python多进程机制实例详解

    本文实例讲述了Python多进程机制.分享给大家供大家参考.具体如下: 在以前只是接触过PYTHON的多线程机制,今天搜了一下多进程,相关文章好像不是特别多.看了几篇,小试了一把.程序如下,主要内容就是通过PRODUCER读一个本地文件,一行一行的放到队列中去.然后会有相应的WORKER从队列中取出这些行. import multiprocessing import os import sys import Queue import time def writeQ(q,obj): q.put(o

  • Python多进程fork()函数详解

    进程 进程是程序的一次动态执行过程,它对应了从代码加载.执行到执行完毕的一个完整过程.进程是系统进行资源分配和调度的一个独立单位.进程是由代码(堆栈段).数据(数据段).内核状态和一组寄存器组成. 在多任务操作系统中,通过运行多个进程来并发地执行多个任务.由于每个线程都是一个能独立执行自身指令的不同控制流,因此一个包含多个线程的进程也能够实现进程内多任务的并发执行. 进程是一个内核级的实体,进程结构的所有成分都在内核空间中,一个用户程序不能直接访问这些数据. 进程的状态: 创建.准备.运行.阻塞

  • 基于python爬虫数据处理(详解)

    一.首先理解下面几个函数 设置变量 length()函数 char_length() replace() 函数 max() 函数 1.1.设置变量 set @变量名=值 set @address='中国-山东省-聊城市-莘县'; select @address 1.2 .length()函数 char_length()函数区别 select length('a') ,char_length('a') ,length('中') ,char_length('中') 1.3. replace() 函数

  • 基于Python的身份证验证识别和数据处理详解

    根据GB11643-1999公民身份证号码是特征组合码,由十七位数字本体码和一位数字校验码组成,排列顺序从左至右依次为: 六位数字地址码八位数字出生日期码三位数字顺序码一位数字校验码(数字10用罗马X表示) 校验系统: 校验码采用ISO7064:1983,MOD11-2校验码系统(图为校验规则样例) 用身份证号的前17位的每一位号码字符值分别乘上对应的加权因子值,得到的结果求和后对11进行取余,最后的结果放到表2检验码字符值..换算关系表中得出最后的一位身份证号码 代码: # coding=ut

  • python分布式爬虫中消息队列知识点详解

    当排队等待人数过多的时候,我们需要设置一个等待区防止秩序混乱,同时再有新来的想要排队也可以呆在这个地方.那么在python分布式爬虫中,消息队列就相当于这样的一个区域,爬虫要进入这个区域找寻自己想要的资源,当然这个是一定的次序的,不然数据获取就会出现重复.就下来我们就python分布式爬虫中的消息队列进行详细解释,小伙伴们可以进一步了解一下. 实现分布式爬取的关键是消息队列,这个问题以消费端为视角更容易理解.你的爬虫程序部署到很多台机器上,那么他们怎么知道自己要爬什么呢?总要有一个地方存储了他们

  • pytorch中的自定义数据处理详解

    pytorch在数据中采用Dataset的数据保存方式,需要继承data.Dataset类,如果需要自己处理数据的话,需要实现两个基本方法. :.getitem:返回一条数据或者一个样本,obj[index] = obj.getitem(index). :.len:返回样本的数量 . len(obj) = obj.len(). Dataset 在data里,调用的时候使用 from torch.utils import data import os from PIL import Image 数

  • Python基础之进程详解

    一.前言 进程,一个新鲜的字眼,可能有些人并不了解,它是系统某个运行程序的载体,这个程序可以有单个或者多个进程,一般来说,进程是通过系统CPU 内核数来分配并设置的,我们可以来看下系统中的进程: 可以看到,360浏览器是真的皮,这么多进程啊,当然可以这样来十分清楚的看进程线程使用情况: 通过任务管理器中的资源监视器,是不是很厉害了,哈哈哈.讲完了这些,再说说用法. 二.基本用法 进程能干什么,这是我们要深思熟虑的事情.我们都知道一个程序运行会创建进程,所以程序在创建这些进程的时候,为了让它们更能

  • Tornado 多进程实现分析详解

    引子 Tornado 是一个网络异步的的web开发框架, 并且可以利用多进程进行提高效率, 下面是创建一个多进程 tornado 程序的例子. #!/usr/bin/env python # -*- coding:utf-8 -*- import os import time import tornado.web import tornado.httpserver import tornado.ioloop import tornado.netutil import tornado.proces

  • python多线程超详细详解

    python中的多线程是一个非常重要的知识点,今天为大家对多线程进行详细的说明,代码中的注释有多线程的知识点还有测试用的实例. import threading from threading import Lock,Thread import time,os ''' python多线程详解 什么是线程? 线程也叫轻量级进程,是操作系统能够进行运算调度的最小单位,它被包涵在进程之中,是进程中的实际运作单位. 线程自己不拥有系统资源,只拥有一点儿在运行中必不可少的资源,但它可与同属一个进程的其他线程

随机推荐