Python进程间通讯与进程池超详细讲解

目录
  • 进程间通讯
    • 队列Queue
    • 管道Pipe
  • 进程池Pool

在《多进程并发与同步》中介绍了进程创建与信息共享,除此之外python还提供了更方便的进程间通讯方式。

进程间通讯

multiprocessing中提供了Pipe(一对一)和Queue(多对多)用于进程间通讯。

队列Queue

队列是一个可用于进程间共享的Queue(内部使用pipe与锁),其接口与普通队列类似:

put(obj[, block[, timeout]]):插入数据到队列(默认阻塞,且没有超时时间);

  • 若设定了超时且队列已满,会抛出queue.Full异常;
  • 队列已关闭时,抛出ValueError异常

get([block[, timeout]]):读取并删除一个元素;

  • 若设定了超时且队列为空,会抛出queue.Empty异常;
  • 队列已关闭时,抛出ValueError异常;若已阻塞后,再关闭则会一直阻塞;

qsize():返回一个近似队列长度(因多进程原因,长度会有误差);

empty()/full():队列空或慢(因多进程原因,会有误差);

close():关闭队列;

当主进程(创建Queue的)关闭队列时,子进程中的队列并没有关闭,所以getElement进程会一直阻塞等待(为保证能正常退出,需要设为后台进程):

def putElement(name, qu: multiprocessing.Queue):
    try:
        for i in range(10):
            qu.put(f"{name}-{i + 1}")
            time.sleep(.1)
    except ValueError:
        print("queue closed")
    print(f"{name}: put complete")
def getElement(name, qu: multiprocessing.Queue):
    try:
        while True:
            r = qu.get()
            print(f"{name} recv: {r}")
    except ValueError:
        print("queue closed")
    print(f"{name}: get complete")
if __name__ == '__main__':
    qu = multiprocessing.Queue(100)
    puts = [multiprocessing.Process(target=putElement, args=(f"send{i}", qu)) for i in range(10)]
    gets = [multiprocessing.Process(target=getElement, args=(f"recv{i}", qu), daemon=True) for i in range(2)]
    list(map(lambda f: f.start(), puts))
    list(map(lambda f: f.start(), gets))
    for f in puts:
        f.join()
    print("To close")
    qu.close() # 只是main中的close了,其他进程中的并没有

管道Pipe

multiprocessing.Pipe([duplex])返回一个连接对象对(conn1, conn2)。若duplex为True(默认),创建的是双向管道;否则conn1只能用于接收消息,conn2只能用于发送消息:

  • send():发送消息;
  • recv():接收消息;

进程间的Pipe基于fork机制建立:

  • 主进程创建Pipe:Pipe的两个Connections连接的的都是主进程;
  • 创建子进程后,Pipe也被拷贝了一份:此时有了4个Connections;
  • 主进程关闭一个Out Connection,子进程关闭一个In Connection:就建立好了一个输入在主进程,输出在子进程的管道。
def pipeProc(pipe):
    outPipe, inPipe = pipe
    inPipe.close() # 必须关闭,否则结束时不会收到EOFError异常
    try:
        while True:
            r = outPipe.recv()
            print("Recv:", r)
    except EOFError:
        print("RECV end")
if __name__ == '__main__':
    outPipe, inPipe = multiprocessing.Pipe()
    sub = multiprocessing.Process(target=pipeProc, args=((outPipe, inPipe),))
    sub.start()
    outPipe.close() # 必须在进程成功运行后,才可关闭
    with inPipe:
        for x in range(10):
            inPipe.send(x)
            time.sleep(.1)
    print("send complete")
    sub.join()

进程池Pool

虽然使用多进程能提高效率,但进程的创建与销毁会消耗较长时间;同时,过多进程会引起频繁的调度,也增加了开销。

进程池中有固定数量的进程:

  • 请求到来时,从池中取出一个进程来处理任务;理完毕后,进程并不立即关闭,而是再放回进程池中;
  • 当池中进程数量不够,请求就要等待,直到拿到空闲进程后才能继续执行;
  • 池中进程的数量是固定的,隐藏同一时间最多有固定数量的进程在运行。

multiprocessing.Pool([processes[, initializer[, initargs]]])

  • processes:要创建进程数量(默认os.cpu_count()个),在需要时才会创建;
  • initializer(*initargs):每个工作进程启动时执行的方法(一般processes为几就执行几次);

Pool类中主要方法:

  • apply(func[, args[, kwds]]):以阻塞方式,从池中获取进程并执行func(*args,**kwargs)
  • apply_async(func[, args[, kwds[, callback[, error_callback]]]]):异步方式(从池中获取一个进程)执行func(*args,**kwargs),返回AsyncResult;
  • map(func, iterable[, chunksize])/map_async:map的并行版本(可同时处理多个任务),异步时返回MapResult;
  • starmap(func, iterable[, chunksize])/starmap_async:与map的区别是允许传入多个参数;
  • imap(func, iterable[, chunksize]):map的惰性版本(返回结果是可迭代对象),内存消耗会低些,返回迭代器IMapIterator;
  • imap_unordered(func, iterable[, chunksize]):imap返回的结果顺序与map顺序是相同的,而此方法返回的顺序是乱序的(不依次等待每个任务完成,先完成的先返回),返回迭代器IMapIterator;
  • close():关闭,禁止继续提交任务(已提交任务会继续执行完成);
  • terminate():立即终止所有任务;
  • join():等待工作进程完成(必须已close或terminate了);
def poolWorker():
    print(f"worker in process {os.getpid()}")
    time.sleep(1)
def poolWorkerOne(name):
    print(f"worker one {name} in process {os.getpid()}")
    time.sleep(random.random())
    return name
def poolWorkerTwo(first, second):
    res = first + second
    print(f"worker two {res} in process {os.getpid()}")
    time.sleep(1./(first+1))
    return res
def poolInit():
    print("pool init")
if __name__ == '__main__':
    workers = multiprocessing.Pool(5, poolInit) # poolInit会被调用5次(线程启动时)
    with workers:
        for i in range(5):
            workers.apply_async(poolWorker)
        arg = [(i, i) for i in range(10)]
        workers.map_async(poolWorkerOne, arg)
        results = workers.starmap_async(poolWorkerTwo, arg) # 每个元素(元组)会被拆分为独立的参数
        print("Starmap:", results.get())
        results = workers.imap_unordered(poolWorkerOne, arg)
        for r in results: # r是乱序的(若使用imap,则与输入arg的顺序相同)
            print("Unordered:", r)
    # 必须保证workers已close了
    workers.join()

到此这篇关于Python进程间通讯与进程池超详细讲解的文章就介绍到这了,更多相关Python进程间通讯内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • Python语法学习之进程池与进程锁详解

    目录 进程池 什么是进程池 进程池的创建模块 - multiprocessing 创建进程池函数 - Pool 进程池的常用方法 apply_async 函数演示案例 close 函数与 join 函数 演示 进程锁 进程锁的概念 进程锁的加锁与解锁 NICE!大家好,在上一章节,我们学习了 multiprocessing 模块 的关于进程的创建与进场常用的方法的相关知识. 通过在一个主进程下创建多个子进程可以帮助我们加速程序的运行,并且提高工作效率.不过上一章节文末我们也说过进程的问题,由于每

  • Python通过队列实现进程间通信详情

    目录 一.前言 二.队列简介 三.多进程队列的使用 四.使用队列在进程间通信 一.前言 在多进程中,每个进程之间是什么关系呢?其实每个进程都有自己的地址空间.内存.数据栈以及其他记录其运行状态的辅助数据.下面通过一个例子,验证一下进程之间能否直接共享信息. 定义一个全局变量g_num,分别创建2个子进程对g_num执行不同的操作,并输出操作后的结果. 代码如下: # _*_ coding:utf-8 _*_ from multiprocessing import Process def plus

  • Python语法学习之进程间的通信方式

    目录 什么是进程的通信 队列的创建 - multiprocessing 进程之间通信的方法 进程间的通信 - 队列演示案例 批量给 send 函数加入数据 小节 进程间通信的其他方式 - 补充 什么是进程的通信 这里举一个例子接介绍通信的机制:通信 一词大家并不陌生,比如一个人要给他的女友打电话.当建立了通话之后,在这个通话的过程中就是建立了一条隐形的 队列 (记住这个词).此时这个人就会通过对话的方式不停的将信息告诉女友,而这个人的女友也是在倾听着.(嗯…我个人觉得大部分情况下可能是反着来的)

  • Python进程池基本概念

    目录 一.python进程池 二.进程池如何使用? 申请() apply_async 地图() map_async() close() 终端() 加入() 三.代码实列 四.进程池中的进程和一般的进程有什么区别? 前言: 创建进程池可以形象地理解为创建一个并行的流水线,只需创建一次流水线的消耗,处理接收到的任务的,不使用进程池. ,浪费时间. 中方本来没有进程的,除了python的,使用线程池的语言,是进程的其他线程池(而进程是执行业务的其他任务).python的原因(因为Cython的概念),

  • Python解决多进程间访问效率低的方法总结

    目录 前言 使用进程间Queue效率问题场景 采用管道模式解决 总结 前言 最近在解决一些算法优化的问题,为了实时性要求,必须精益求精的将资源利用率用到极致.同时对算法中一些处理进行多线程或者多进程处理. 在对代码的调试过程中,发现在进程间队列使用耗时很长,特别是图片这种比较大的数据的时候. 可以先看一下我下面的demo是不是符合你的场景. 下面还有我的解决方案. 使用进程间Queue效率问题场景 代码样例如下,模拟从两个视频读取图片帧进行处理. #!/user/bin/env python #

  • Python进程间的通信一起来了解下

    目录 通信方式 Queue介绍: 生产者和消费者模型 为什么要使用生产者和消费者模式 什么是生产者消费者模式 实现方式一:Queue 实现方式二:利用JoinableQueue 总结 通信方式 进程彼此之间互相隔离,要实现进程间通信(IPC),multiprocessing模块主要通过队列方式 队列:队列类似于一条管道,元素先进先出 需要注意的一点是:队列都是在内存中操作,进程退出,队列清空,另外,队列也是一个阻塞的形态 Queue介绍: 创建队列的类(底层就是以管道和锁定的方式实现): Que

  • Python进程间通信方式

    目录 一.通信方式 二.Queue介绍 三.方法介绍 三.生产者和消费者模型 四.什么是生产者消费者模式 实现方式一:Queue 实现方式二:利用JoinableQueue 一.通信方式 进程彼此之间互相隔离,要实现进程间通信(IPC),multiprocessing模块主要通过队列方式 队列:队列类似于一条管道,元素先进先出 需要注意的一点是:队列都是在内存中操作,进程退出,队列清空,另外,队列也是一个阻塞的形态 二.Queue介绍 创建队列的类(底层就是以管道和锁定的方式实现): Queue

  • Python的进程及进程池详解

    目录 进程 进程和程序 进程的状态 Python中的进程 创建⼦进程 全局变量问题 守护进程 进程池 总结 进程 进程是操作系统分配资源的基本单元,是程序隔离的边界. 进程和程序 程序只是一组指令的集合,它本身没有任何运行的含义,它是静态的. 进程程序的执行实例,是动态的,有自己的生命周期,有创建有撤销,存在是暂时的. 进程和程序不是一一对应的,一个程序可以对应多个进程,一个进程也可以执行一个或者多个程序. 我们可以这样理解:编写完的代码,没有运行时称为程序,正在运行的代码,会启动一个(或多个)

  • Python进程间通讯与进程池超详细讲解

    目录 进程间通讯 队列Queue 管道Pipe 进程池Pool 在<多进程并发与同步>中介绍了进程创建与信息共享,除此之外python还提供了更方便的进程间通讯方式. 进程间通讯 multiprocessing中提供了Pipe(一对一)和Queue(多对多)用于进程间通讯. 队列Queue 队列是一个可用于进程间共享的Queue(内部使用pipe与锁),其接口与普通队列类似: put(obj[, block[, timeout]]):插入数据到队列(默认阻塞,且没有超时时间): 若设定了超时且

  • python正则表达式re.sub各个参数的超详细讲解

    目录 一.re.sub(pattern, repl, string, count=0, flags=0) 二.参数讲解 1.pattern参数 2.repl参数 2.1.repl是字符串 2.2.repl是函数 3.string参数 4.count参数 5.flags参数 5.1.IGNORECASE(简写I) 5.2.LOCALE(简写L) 5.3.MULTILINE(简写M) 5.4.DOTALL(简写S) 5.5.VERBOSE(简写X) 补充:repl为函数时的用法 总结 一.re.su

  • php进程间通讯实例分析

    本文实例讲述了php进程间通讯的方法.分享给大家供大家参考,具体如下: php单进程单线程处理批量任务太慢了,受不鸟了,但是php不能多线程,最终选择了多进程处理批量任务. php多进程主要使用for进行分裂,然后利用的unix/linux的信号量进行进程间通讯. 本例使用的是:生产者=>消费者=>收集器,的模式. <?php // ===== 全局变量 ===== // ipc进程间通讯 $key = ftok(__FILE__, "a"); $queue = ms

  • PHP中实现进程间通讯

    PHP中实现进程间通讯 邱文宇 本文将讨论在PHP4环境下如何使用进程间通讯机制--IPC(Inter-Process-Communication).本文讨论的软件环境是linux+php4.0.4或更高版本.首先,我们假设你已经装好了PHP4和UNIX, 为了使得php4可以使用共享内存和信号量,必须在编译php4程序时激活shmop和sysvsem这两个扩展模块. 实现方法:在PHP设定(configure)时加入如下选项. --enable-shmop --enable-sysvsem  

  • C语言中进程间通讯的方式详解

    目录 一.无名管道 1.1无名管道的原理 1.2功能 1.3无名管道通信特点 1.4无名管道的实例 二.有名管道 2.1有名管道的原理 2.2有名管道的特点 2.3有名管道实例 三.信号 3.1信号的概念 3.2发送信号的函数 3.3常用的信号 3.4实例 四.IPC进程间通信 4.1IPC进程间通信的种类 4.2查看IPC进程间通信的命令 4.3消息队列 4.4共享内存 4.5信号灯集合 一.无名管道 1.1无名管道的原理 无名管道只能用于亲缘间进程的通信,无名管道的大小是64K.无名管道是内

  • Python多进程并发与同步机制超详细讲解

    目录 多进程 僵尸进程 Process类 函数方式 继承方式 同步机制 状态管理Managers 在<多线程与同步>中介绍了多线程及存在的问题,而通过使用多进程而非线程可有效地绕过全局解释器锁. 因此,通过multiprocessing模块可充分地利用多核CPU的资源. 多进程 多进程是通过multiprocessing包来实现的,multiprocessing.Process对象(和多线程的threading.Thread类似)用来创建一个进程对象: 在类UNIX平台上,需要对每个Proce

  • Python超详细讲解内存管理机制

    目录 什么是内存管理机制 一.引用计数机制 二.数据池和缓存 什么是内存管理机制 python中创建的对象的时候,首先会去申请内存地址,然后对对象进行初始化,所有对象都会维护在一 个叫做refchain的双向循环链表中,每个数据都保存如下信息: 1. 链表中数据前后数据的指针 2. 数据的类型 3. 数据值 4. 数据的引用计数 5. 数据的长度(list,dict..) 一.引用计数机制 引用计数增加: 1.1 对象被创建 1.2 对象被别的变量引用(另外起了个名字) 1.3 对象被作为元素,

  • Python实战之画哆啦A梦(超详细步骤)

    一.写在前面 本文基于64位windows系统(鼠标右键点击桌面"此电脑"图标--属性可查看电脑系统版本).python3.x(pycharm自动安装的版本, 3.0以上).文中代码内容所使用的工具是pycharm-community-2020.1,实践中如有碰到问题,可留言提问. 前阵子有看到zh上有大神画了这个哆啦A梦的大头贴,自己也来试了一下,很简单,但长篇整段的代码对刚刚学会海龟绘图语法的初学者来说还是有一定难度,所以来做一个拆解版详细步骤讲解实现. 二.效果图 言归正传,先上

  • 超详细讲解python正则表达式

    目录 正则表达式 1.1 正则表达式字符串 1.1.1 元字符 1.1.2 字符转义 1.1.3 开始与结束字符 1.2 字符类 1.2.1 定义字符类 1.2.2 字符串取反 1.2.3 区间 1.2.4 预定义字符类 1.3 量词 1.3.1 量词的使用 1.3.2 贪婪量词和懒惰量词 1.4 分组 1.4.1 分组的使用 1.4.2 分组命名 1.4.3 反向引用分组 1.4.4 非捕获分组 1.5 re模块 1.5.1 search()和match()函数 1.5.2 findall()

  • 超详细讲解Java线程池

    目录 池化技术 池化思想介绍 池化技术的应用 如何设计一个线程池 Java线程池解析 ThreadPoolExecutor使用介绍 内置线程池使用 ThreadPoolExecutor解析 整体设计 线程池生命周期 任务管理解析 woker对象 Java线程池实践建议 不建议使用Exectuors 线程池大小设置 线程池监控 带着问题阅读 1.什么是池化,池化能带来什么好处 2.如何设计一个资源池 3.Java的线程池如何使用,Java提供了哪些内置线程池 4.线程池使用有哪些注意事项 池化技术

随机推荐