Python进程间通信方式

目录
  • 一、通信方式
  • 二、Queue介绍
  • 三、方法介绍
  • 三、生产者和消费者模型
  • 四、什么是生产者消费者模式
    • 实现方式一:Queue
    • 实现方式二:利用JoinableQueue

一、通信方式

进程彼此之间互相隔离,要实现进程间通信(IPC),multiprocessing模块主要通过队列方式

队列:队列类似于一条管道,元素先进先出

需要注意的一点是:队列都是在内存中操作,进程退出,队列清空,另外,队列也是一个阻塞的形态

二、Queue介绍

创建队列的类(底层就是以管道和锁定的方式实现):

Queue([maxsize]):创建共享的进程队列,Queue是多进程安全的队列,

可以使用Queue实现多进程之间的数据传递。maxsize是队列中允许最大项数,省略则无大小限制。

三、方法介绍

  • def put(self, obj, block=True, timeout=None):插入数据到队列中,Block值默认为True,代表当队列已满时,会阻塞。如果block为False,则队列满会报异常Queue.Full,timeout表示会阻塞到指定时间,直到有剩余的空间供插入,如果时间超时,则报异常Queue.Full
  • def get(self, block=True, timeout=None):从队列中取出数据,Block值默认为True,代表当队列为空时,会阻塞。如果block为False,则队列空会报异常Queue.Empty,timeout表示会等待到指定时间,直到取出数据,如果时间超时,则报异常Queue.Empty
  • def empty(self): 判断队列是否为空,如果空返回True
  • def full(self): 判断队列是否已满,如果满返回True
  • def qsize(self): 返回队列的大小

应用举例:

from multiprocessing import Process, Manager
q = Manager().Queue(2)
q.put(1)
q.put(2,block=False,timeout=2)
def func():
    print(q.get())
 
p = Process(target=func)
print("size",q.qsize())
print("full",q.full())
p.start()
p.join()
print("empty",q.empty())
print("get", q.get())
print("get", q.get(block=False,timeout=2))

输出结果:

三、生产者和消费者模型

在并发编程中使用生产者和消费者模式能够解决绝大多数并发问题。该模式通过平衡生产线程和消费线程的工作能力来提高程序的整体处理数据的速度。

为什么要使用生产者和消费者模式?

在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题于是引入了生产者和消费者模式。

四、什么是生产者消费者模式

生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯:

生产者,只需要往队列里面丢东西(生产者不需要关心消费者)

消费者,只需要从队列里面拿东西(消费者也不需要关心生产者)

阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。

实现方式一:Queue

from multiprocessing import Process,Manager,active_children
import random
import queue
import time
 
class Producer(Process):
 
    def __init__(self,queue):
        super().__init__()
        self.queue = queue
 
    def run(self):
        for i in range(6):
            r = random.randint(0, 99)
            time.sleep(1)
            self.queue.put(r)
            print("add data{}".format(r))
 
class Consumer(Process):
 
    def __init__(self,queue):
        super().__init__()
        self.queue = queue
 
    def run(self):
        while True:
          if not self.queue.empty():
                data = self.queue.get()
                print("minus data{}".format(data))
 
 
if __name__ == '__main__':
    q = Manager().Queue() # 创建队列
    p = Producer(q)
    c = Consumer(q)
    p.start()
    c.start()
    print(active_children())  # 查看现有的进程
    p.join()
    c.join()
    print("结束")
 
 
>>>输出
[<ForkProcess(SyncManager-1, started)>, <Producer(Producer-2, started)>, <Consumer(Consumer-3, started)>]
add data83
minus data83
add data72
minus data72
add data8
minus data8
add data63
minus data63
add data75
minus data75
add data52
minus data52

实现方式二:利用JoinableQueue

JoinableQueue([maxsize]):一个Queue对象,但队列允许项目的使用者通知生成者项目已经被成功处理。通知进程是使用共享的信号和条件变量来实现的。
JoinableQueue的实例除了与Queue对象相同的方法之外还具有:

     task_done():使用者使用此方法发出信号,表示get()的返回项目已经被处理。如果调用此方法的次数大于从队列中删除项目的数量,将引发ValueError异常

     join():生产者调用此方法进行阻塞,直到队列中所有的项目均被处理。阻塞将持续到队列中的每个项目均调用task_done()方法为止

from multiprocessing import Process,JoinableQueue
import os
import time
import random
 
 
def print_log(msg, log_type="prod"):
    if log_type == 'prod':
        print("\033[32;1m%s\033[0m" %msg)
    elif log_type == 'con':
        print("\033[31;1m%s\033[0m" %msg)
 
def producer(q):
    """
    生产者
    :param q: 
    :return: 
    """
    for i in range(10):
        data = random.randint(1,200)
        time.sleep(2)
        q.put(data)  # 放入队列
        msg = "add data {}".format(data)
        print_log(msg)
    q.join()  # 生产者调用此方法进行阻塞,直到队列中所有的项目均被处理。
    # 阻塞将持续到队列中的每个项目均调用q.task_done()方法为止
 
 
 
 
def consumer(q):
    """
    消费者
    :param q: 
    :return: 
    """
    while True:
        if not q.empty():
            time.sleep(5)
            data = q.get()
            msg = "minus data{}".format(data)
            print_log(msg,"con")
            q.task_done()  # q.get()的返回项目已经被处理
 
 
if __name__ == '__main__':
    q = JoinableQueue()
    prod = Process(target=producer, args=(q,))
    con = Process(target=consumer, args=(q,))
    con.daemon = True  # 设置为守护进程,但是不用担心,producer内调用q.join保证了consumer已经处理完队列中的所有元素
    # 开启进程
    prod.start()
    con.start()
 
    prod.join()  # 等待生产和消费完成,主线程结束
    print("结束")

输出结果:

到此这篇关于Python进程间通信方式的文章就介绍到这了,更多相关Python进程间通信内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • python实现跨进程(跨py文件)通信示例

    目录 前言 一.server端 二.client端 三.运行效果 总结 前言 项目中总会遇到数据需要跨进程通信的问题,今天就给大家带来一套简单的跨进程通信代码.代码分为服务端与客户端两部分. 一.server端 import multiprocessing import time def do_socket(conn, addr, ): try: while True: if conn.poll(1) == False: time.sleep(0.5) continue data = conn.

  • Python通过队列来实现进程间通信的示例

    Python程序中,在进程和进程之间是不共享全局变量的数据的. 我们来看一个例子: from multiprocessing import Process import os import time nums = [11, 22] def work1(): """子进程要执行的代码""" print("in process1 pid=%d ,nums=%s" % (os.getpid(), nums)) for i in ra

  • python中进程间通信详细介绍

    目录 进程间通信(IPC) 管道通信(Pipe) 1.通信原理 2. 实现方法 共享内存 1.通信原理 2.实现方法 信号量(信号灯集) 1.通信原理 2. 实现方法 3.代码演示 进程间通信(IPC) 必要性 进程间空间独立,资源不共享,此时在需要进程间数据传输时就需要特定的手段进行数据通信 常用进程间通信方法 管道 消息队列 共享内存 型号 信号量 套接字 管道通信(Pipe) 1.通信原理 在内存中开辟管道空间,生成管道操作对象,多个进程使用同一个管道对象进行读写即可实现通信 代码演示(w

  • Python多进程之进程同步及通信详解

    目录 进程同步 Lock(锁) 进程通信 Queue(队列) Pipe(管道) Semaphore(信号量) Event(事件) 总结 上篇文章介绍了什么是进程.进程与程序的关系.进程的创建与使用.创建进程池等,接下来就来介绍一下进程同步及进程通信. 进程同步 当多个进程使用同一份数据资源的时候,因为进程的运行没有顺序,运行起来也无法控制,如果不加以干预,往往会引发数据安全或顺序混乱的问题,所以要在多个进程读写共享数据资源的时候加以适当的策略,来保证数据的一致性问题. Lock(锁) 一个Loc

  • Python通过4种方式实现进程数据通信

    python提供了4种方式来满足进程间的数据通信 1. 使用multiprocessing.Queue可以在进程间通信,但不能在Pool池创建的进程间进行通信 2. 使用multiprocessing.Manager.Queue可以在Pool进程池创建的进程间进行通信 3. 通过Pipe进行线程间的通信, pipe进程间通信的性能高于Queue,但是它只能在两个进程间进行通信 4. 使用Manager类提供的数据结构可以进行进程间的通信 from multiprocessing import P

  • Python的进程间通信详解

    目录 进程概述 队列简介 多进程队列的使用 使用队列在进程间通信 总结 进程概述 ​ 进程(Process)是计算机中已运行程序的实体.进程与程序不同,程序本身只是指令.数据及器组织形式的描述,进程才是程序(那些指令和数据)的真正运行实体.例如在没有打开QQ时,QQ只是程序.打开以后,操作系统为QQ开启一个进程.再打开一个QQ,则又开启一个进程. ​ 那么在多进程中,每个进程之间是什么关系呢?其实每个进程都有自己的地址空间.内存.数据栈以及其他记录其运行状态的辅助数据.下通过一个例子验证一下进程

  • Python进程间通信方式

    目录 一.通信方式 二.Queue介绍 三.方法介绍 三.生产者和消费者模型 四.什么是生产者消费者模式 实现方式一:Queue 实现方式二:利用JoinableQueue 一.通信方式 进程彼此之间互相隔离,要实现进程间通信(IPC),multiprocessing模块主要通过队列方式 队列:队列类似于一条管道,元素先进先出 需要注意的一点是:队列都是在内存中操作,进程退出,队列清空,另外,队列也是一个阻塞的形态 二.Queue介绍 创建队列的类(底层就是以管道和锁定的方式实现): Queue

  • Python进程间通信Queue消息队列用法分析

    本文实例讲述了Python进程间通信Queue消息队列用法.分享给大家供大家参考,具体如下: 进程间通信-Queue Process之间有时需要通信,操作系统提供了很多机制来实现进程间的通信. 1. Queue的使用 可以使用multiprocessing模块的Queue实现多进程之间的数据传递,Queue本身是一个消息列队程序,首先用一个小实例来演示下Queue的工作原理: 代码如下: #coding=utf-8 from multiprocessing import Queue #初始化一个

  • Python进程间通信用法实例

    本文实例讲述了Python进程间通信用法.分享给大家供大家参考.具体如下: #!/usr/bin/env python # -*- coding=utf-8 -*- import multiprocessing def counsumer(input_q): while True: item = input_q.get() #处理项目 print item #此处替换为有用的工作 #发出信号通知任务完成 input_q.task_done() def producer(sequence,outp

  • python多线程方式执行多个bat代码

    python多线程方式执行多个bat的代码,感兴趣的朋友可以参考下. import threading from win32api import * class MyThread(threading.Thread): def __init__(self, bat_path, **kwargs): threading.Thread.__init__(self, **kwargs) self.bat_path = bat_path def run(self): ShellExecute(0, Non

  • Linux进程间通信方式之socket使用实例

    套接字是一种通信机制,凭借这种机制,客户/服务器系统的开发工作既可以在本地单机上进行,也可以跨网络进行. 套接字的特性有三个属性确定,它们是:域(domain),类型(type),和协议(protocol).套接字还用地址作为它的名字.地址的格式随域(又被称为协议族,protocol family)的不同而不同.每个协议族又可以使用一个或多个地址族定义地址格式. 1.套接字的域 域指定套接字通信中使用的网络介质.最常见的套接字域是AF_INET,它是指Internet网络,许多Linux局域网使

  • 浅谈Linux进程间通信方式及优缺点

    1)管道 管道分为有名管道和无名管道 无名管道是一种半双工的通信方式,数据只能单向流动,而且只能在具有亲缘关系的进程间使用.进程的亲缘关系一般指的是父子关系.无明管道一般用于两个不同进程之间的通信.当一个进程创建了一个管道,并调用fork创建自己的一个子进程后,父进程关闭读管道端,子进程关闭写管道端,这样提供了两个进程之间数据流动的一种方式. 有名管道也是一种半双工的通信方式,但是它允许无亲缘关系进程间的通信. 2)信号量 信号量是一个计数器,可以用来控制多个线程对共享资源的访问.,它不是用于交

  • Python进程间通信Queue实例解析

    本文研究的主要是Python进程间通信Queue的相关实例,具体如下. 1.Queue使用方法: Queue.qsize():返回当前队列包含的消息数量: Queue.empty():如果队列为空,返回True,反之False : Queue.full():如果队列满了,返回True,反之False: Queue.get():获取队列中的一条消息,然后将其从列队中移除,可传参超时时长. Queue.get_nowait():相当Queue.get(False),取不到值时触发异常:Empty:

  • Python多进程方式抓取基金网站内容的方法分析

    本文实例讲述了Python多进程方式抓取基金网站内容的方法.分享给大家供大家参考,具体如下: 在前面这篇//www.jb51.net/article/162418.htm我们已经简单了解了"python的多进程",现在我们需要把抓取基金网站(28页)内容写成多进程的方式. 因为进程也不是越多越好,我们计划分3个进程执行.意思就是 :把总共要抓取的28页分成三部分. 怎么分呢? # 初始range r = range(1,29) # 步长 step = 10 myList = [r[x:

  • 基于数据归一化以及Python实现方式

    数据归一化: 数据的标准化是将数据按比例缩放,使之落入一个小的特定区间,去除数据的单位限制,将其转化为无量纲的纯数值,便于不同单位或量级的指标能够进行比较和加权. 为什么要做归一化: 1)加快梯度下降求最优解的速度 如果两个特征的区间相差非常大,其所形成的等高线非常尖,很有可能走"之字型"路线(垂直等高线走),从而导致需要迭代很多次才能收敛. 2)有可能提高精度 一些分类器需要计算样本之间的距离,如果一个特征值域范围非常大,那么距离计算就主要取决于这个特征,从而与实际情况相悖(比如这时

  • python 不同方式读取文件速度不同的实例

    1.按行读取较慢较耗时: srcFiles = open('inputFile.txt', 'r') for file_path in srcFiles: file_path = file_path.rstrip() 2.快速读取所有行: with open('inputFile.txt', 'r') as fRead: srcPaths = fRead.readlines() #txt中所有字符串读入list列表srcPaths random.shuffle(srcPaths) #打乱list

随机推荐