Python 多进程原理及实现

1 进程的基本概念

什么是进程?

​ 进程就是一个程序在一个数据集上的一次动态执行过程。进程一般由程序、数据集、进程控制块三部分组成。我们编写的程序用来描述进程要完成哪些功能以及如何完成;数据集则是程序在执行过程中所需要使用的资源;进程控制块用来记录进程的外部特征,描述进程的执行变化过程,系统可以利用它来控制和管理进程,它是系统感知进程存在的唯一标志。

进程的生命周期:创建(New)、就绪(Runnable)、运行(Running)、阻塞(Block)、销毁(Destroy)

进程的状态(分类):(Actived)活动进程、可见进程(Visiable)、后台进程(Background)、服务进程(Service)、空进程

2 父进程和子进程​

Linux 操作系统提供了一个 fork() 函数用来创建子进程,这个函数很特殊,调用一次,返回两次,因为操作系统是将当前的进程(父进程)复制了一份(子进程),然后分别在父进程和子进程内返回。子进程永远返回0,而父进程返回子进程的 PID。我们可以通过判断返回值是不是 0 来判断当前是在父进程还是子进程中执行。

​ 在 Python 中同样提供了 fork() 函数,此函数位于 os 模块下。

# -*- coding: utf-8 -*-
import os
import time

print("在创建子进程前: pid=%s, ppid=%s" % (os.getpid(), os.getppid()))

pid = os.fork()
if pid == 0:
  print("子进程信息: pid=%s, ppid=%s" % (os.getpid(), os.getppid()))
  time.sleep(5)
else:
  print("父进程信息: pid=%s, ppid=%s" % (os.getpid(), os.getppid()))
  # pid表示回收的子进程的pid
  #pid, result = os.wait() # 回收子进程资源  阻塞
  time.sleep(5)
  #print("父进程:回收的子进程pid=%d" % pid)
  #print("父进程:子进程退出时 result=%d" % result)

# 下面的内容会被打印两次,一次是在父进程中,一次是在子进程中。
# 父进程中拿到的返回值是创建的子进程的pid,大于0
print("fork创建完后: pid=%s, ppid=%s" % (os.getpid(), os.getppid()))

2.1 父子进程如何区分?

子进程是父进程通过fork()产生出来的,pid = os.fork()

​ 通过返回值pid是否为0,判断是否为子进程,如果是0,则表示是子进程

​ 由于 fork() 是 Linux 上的概念,所以如果要跨平台,最好还是使用 subprocess 模块来创建子进程。

2.2 子进程如何回收?

python中采用os.wait()方法用来回收子进程占用的资源

pid, result = os.wait() # 回收子进程资源  阻塞,等待子进程执行完成回收

如果有子进程没有被回收的,但是父进程已经死掉了,这个子进程就是僵尸进程。

3 Python进程模块

python的进程multiprocessing模块有多种创建进程的方式,每种创建方式和进程资源的回收都不太相同,下面分别针对Process,Pool及系统自带的fork三种进程分析。

3.1 fork()

import os
pid = os.fork() # 创建一个子进程
os.wait() # 等待子进程结束释放资源
pid为0的代表子进程。

缺点: 
​ 1.兼容性差,只能在类linux系统下使用,windows系统不可使用; 
​ 2.扩展性差,当需要多条进程的时候,进程管理变得很复杂; 
​ 3.会产生“孤儿”进程和“僵尸”进程,需要手动回收资源。 
优点: 
​ 是系统自带的接近低层的创建方式,运行效率高。

3.2 Process进程

multiprocessing模块提供Process类实现新建进程

# -*- coding: utf-8 -*-
import os
from multiprocessing import Process
import time

def fun(name):
  print("2 子进程信息: pid=%s, ppid=%s" % (os.getpid(), os.getppid()))
  print("hello " + name)

def test():
  print('ssss')

if __name__ == "__main__":
  print("1 主进程信息: pid=%s, ppid=%s" % (os.getpid(), os.getppid()))
  ps = Process(target=fun, args=('jingsanpang', ))
  print("111 ##### ps pid: " + str(ps.pid) + ", ident:" + str(ps.ident))
  print("3 进程信息: pid=%s, ppid=%s" % (os.getpid(), os.getppid()))
  print(ps.is_alive()) # 启动之前 is_alive为False(系统未创建)
  ps.start()
  print(ps.is_alive()) # 启动之后,is_alive为True(系统已创建)

  print("222 #### ps pid: " + str(ps.pid) + ", ident:" + str(ps.ident))
  print("4 进程信息: pid=%s, ppid=%s" % (os.getpid(), os.getppid()))
  ps.join() # 等待子进程完成任务  类似于os.wait()
  print(ps.is_alive())
  print("5 进程信息: pid=%s, ppid=%s" % (os.getpid(), os.getppid()))
  ps.terminate() #终断进程
  print("6 进程信息: pid=%s, ppid=%s" % (os.getpid(), os.getppid()))

特点:

​1.注意:Process对象可以创建进程,但Process对象不是进程,其删除与否与系统资源是否被回收没有直接的关系。 
2.主进程执行完后会默认等待子进程结束后回收资源,不需要手动回收资源;join()函数用来控制子进程结束的顺序,其内部也有一个清除僵尸进程的函数,可以回收资源; 
3.Process进程创建时,子进程会将主进程的Process对象完全复制一份,这样在主进程和子进程各有一个 Process对象,但是p.start()启动的是子进程,主进程中的Process对象作为一个静态对象存在,不执行。

4.当子进程执行完毕后,会产生一个僵尸进程,其会被join函数回收,或者再有一条进程开启,start函数也会回收僵尸进程,所以不一定需要写join函数。 
5.windows系统在子进程结束后会立即自动清除子进程的Process对象,而linux系统子进程的Process对象如果没有join函数和start函数的话会在主进程结束后统一清除。

另外还可以通过继承Process对象来重写run方法创建进程

3.3 进程池POOL (多个进程)

import multiprocessing
import time

def work(msg):
  mult_proces_name = multiprocessing.current_process().name
  print('process: ' + mult_proces_name + '-' + msg)

if __name__ == "__main__":
  pool = multiprocessing.Pool(processes=5) # 创建5个进程
  for i in range(20):
    msg = "process %d" %(i)
    pool.apply_async(work, (msg, ))
  pool.close() # 关闭进程池,表示不能在往进程池中添加进程
  pool.join() # 等待进程池中的所有进程执行完毕,必须在close()之后调用
  print("Sub-process all done.")

上述代码中的pool.apply_async()是apply()函数的变体,apply_async()是apply()的并行版本,apply()是apply_async()的阻塞版本,使用apply()主进程会被阻塞直到函数执行结束,所以说是阻塞版本。apply()既是Pool的方法,也是Python内置的函数,两者等价。可以看到输出结果并不是按照代码for循环中的顺序输出的。

多个子进程并返回值

apply_async()本身就可以返回被进程调用的函数的返回值。上一个创建多个子进程的代码中,如果在函数func中返回一个值,那么pool.apply_async(func, (msg, ))的结果就是返回pool中所有进程的值的对象(注意是对象,不是值本身)。

import multiprocessing
import time

def func(msg):
  return multiprocessing.current_process().name + '-' + msg

if __name__ == "__main__":
  pool = multiprocessing.Pool(processes=4) # 创建4个进程
  results = []
  for i in range(20):
    msg = "process %d" %(i)
    results.append(pool.apply_async(func, (msg, )))
  pool.close() # 关闭进程池,表示不能再往进程池中添加进程,需要在join之前调用
  pool.join() # 等待进程池中的所有进程执行完毕
  print ("Sub-process(es) done.")

  for res in results:
    print (res.get())

与之前的输出不同,这次的输出是有序的。

​如果电脑是八核,建立8个进程,在Ubuntu下输入top命令再按下大键盘的1,可以看到每个CPU的使用率是比较平均的

4 进程间通信方式

管道pipe:管道是一种半双工的通信方式,数据只能单向流动,而且只能在具有亲缘关系的进程间使用。进程的亲缘关系通常是指父子进程关系。
命名管道FIFO:有名管道也是半双工的通信方式,但是它允许无亲缘关系进程间的通信。
消息队列MessageQueue:消息队列是由消息的链表,存放在内核中并由消息队列标识符标识。消息队列克服了信号传递信息少、管道只能承载无格式字节流以及缓冲区大小受限等缺点。
共享存储SharedMemory:共享内存就是映射一段能被其他进程所访问的内存,这段共享内存由一个进程创建,但多个进程都可以访问。共享内存是最快的 IPC 方式,它是针对其他进程间通信方式运行效率低而专门设计的。它往往与其他通信机制,如信号两,配合使用,来实现进程间的同步和通信。
以上几种进程间通信方式中,消息队列是使用的比较频繁的方式。

(1)管道pipe

import multiprocessing

def foo(conn):
  conn.send('hello father')  #向管道pipe发消息
  print(conn.recv())

if __name__ == '__main__':
  conn1,conn2=multiprocessing.Pipe(True)  #开辟两个口,都是能进能出,括号中如果False即单向通信
  p=multiprocessing.Process(target=foo,args=(conn1,)) #子进程使用sock口,调用foo函数
  p.start()
  print(conn2.recv()) #主进程使用conn口接收,从管道(Pipe)中读取消息
  conn2.send('hi son') #主进程使用conn口发送

(2)消息队列Queue

Queue是多进程的安全队列,可以使用Queue实现多进程之间的数据传递。

Queue的一些常用方法:

  • Queue.qsize():返回当前队列包含的消息数量;
  • Queue.empty():如果队列为空,返回True,反之False ;
  • Queue.full():如果队列满了,返回True,反之False;
  • Queue.get():获取队列中的一条消息,然后将其从列队中移除,可传参超时时长。
  • Queue.get_nowait():相当Queue.get(False),取不到值时触发异常:Empty;
  • Queue.put():将一个值添加进数列,可传参超时时长。
  • Queue.put_nowait():相当于Queue.get(False),当队列满了时报错:Full。

案例:

from multiprocessing import Process, Queue
import time

def write(q):
  for i in ['A', 'B', 'C', 'D', 'E']:
   print('Put %s to queue' % i)
   q.put(i)
   time.sleep(0.5)

def read(q):
  while True:
   v = q.get(True)
   print('get %s from queue' % v)

if __name__ == '__main__':
  q = Queue()
  pw = Process(target=write, args=(q,))
  pr = Process(target=read, args=(q,))
  print('write process = ', pw)
  print('read process = ', pr)
  pw.start()
  pr.start()
  pw.join()
  pr.join()
  pr.terminate()
  pw.terminate()

Queue和pipe只是实现了数据交互,并没实现数据共享,即一个进程去更改另一个进程的数据。

注:进程间通信应该尽量避免使用共享数据的方式

5 多进程实现生产者消费者

以下通过多进程实现生产者,消费者模式

import multiprocessing
from multiprocessing import Process
from time import sleep
import time

class MultiProcessProducer(multiprocessing.Process):
  def __init__(self, num, queue):
   """Constructor"""
   multiprocessing.Process.__init__(self)
   self.num = num
   self.queue = queue

  def run(self):
   t1 = time.time()
   print('producer start ' + str(self.num))
   for i in range(1000):
     self.queue.put((i, self.num))
   # print 'producer put', i, self.num
   t2 = time.time()

   print('producer exit ' + str(self.num))
   use_time = str(t2 - t1)
   print('producer ' + str(self.num) + ',
   use_time: '+ use_time)

class MultiProcessConsumer(multiprocessing.Process):
  def __init__(self, num, queue):
   """Constructor"""
   multiprocessing.Process.__init__(self)
   self.num = num
   self.queue = queue

  def run(self):
   t1 = time.time()
   print('consumer start ' + str(self.num))
   while True:
     d = self.queue.get()
     if d != None:
      # print 'consumer get', d, self.num
      continue
     else:
      break
   t2 = time.time()
   print('consumer exit ' + str(self.num))
   print('consumer ' + str(self.num) + ', use time:' + str(t2 - t1))

def main():
  # create queue
  queue = multiprocessing.Queue()

  # create processes
  producer = []
  for i in range(5):
   producer.append(MultiProcessProducer(i, queue))

  consumer = []
  for i in range(5):
   consumer.append(MultiProcessConsumer(i, queue))

  # start processes
  for i in range(len(producer)):
   producer[i].start()

  for i in range(len(consumer)):
   consumer[i].start()

  # wait for processs to exit
  for i in range(len(producer)):
   producer[i].join()

  for i in range(len(consumer)):
   queue.put(None)

  for i in range(len(consumer)):
   consumer[i].join()

  print('all done finish')

if __name__ == "__main__":
  main()

6 总结

​ python中的多进程创建有以下两种方式:

(1)fork子进程

(2)采用 multiprocessing 这个库创建子进程

​ 需要注意的是队列中queue.Queue是线程安全的,但并不是进程安全,所以多进程一般使用线程、进程安全的multiprocessing.Queue()

​ 另外, 进程池使用 multiprocessing.Pool实现,pool = multiprocessing.Pool(processes = 3),产生一个进程池,pool.apply_async实现非租塞模式,pool.apply实现阻塞模式。

apply_async和 apply函数,前者是非阻塞的,后者是阻塞。可以看出运行时间相差的倍数正是进程池数量。

​ 同时可以通过result.append(pool.apply_async(func, (msg, )))获取非租塞式调用结果信息的。

以上就是Python 多进程原理及实现的详细内容,更多关于python 多进程的资料请关注我们其它相关文章!

(0)

相关推荐

  • python multiprocessing多进程变量共享与加锁的实现

    python多进程和多线程是大家会重点了解的部分,因为很多工作如果并没有前后相互依赖关系的话其实顺序并不是非常的重要,采用顺序执行的话就必定会造成无谓的等待,任凭cpu和内存白白浪费,这是我们不想看到的. 为了解决这个问题,我们就可以采用多线程或者多进程的方式,(多线程我们之后再讲),而这两者之间是有本质区别的.就内存而言,已知进程是在执行过程中有独立的内存单元的,而多个线程是共享内存的,这是多进程和多线程的一大区别. 利用Value在不同进程中同步变量 在多进程中,由于进程之间内存相互是隔离的

  • Python2.7实现多进程下开发多线程示例

    简单的基于Python2.7版本的多进程下开发多线程的示例,供大家参考,具体内容如下 可以使得程序执行效率至少提升10倍 #!/usr/bin/env python # -*- coding: utf-8 -*- """ @Time : 2018/10/24 @Author : LiuXueWen @Site : @File : transfer.py @Software: PyCharm @Description: """ import os

  • Python Process多进程实现过程

    进程的概念 程序是没有运行的代码,静态的: 进程是运行起来的程序,进程是一个程序运行起来之后和资源的总称: 程序只有一个,但同一份程序可以有多个进程:例如,电脑上多开QQ: 程序和进程的区别在于有没有资源,进程有资源而程序没有资源,进程是一个资源分配的基本单元: 程序在没运行的时候没有资源,没有显卡,没有网卡,等等:双击运行后有摄像头,有网速等等,就叫做进程: 进程的状态 进程状态图 就绪态:运行的条件都已经慢去,正在等在cpu执行 执行态:cpu正在执行其功能 等待态:等待某些条件满足,例如一

  • python进程池实现的多进程文件夹copy器完整示例

    本文实例讲述了python进程池实现的多进程文件夹copy器.分享给大家供大家参考,具体如下: 应用:文件夹copy器(多进程版) import multiprocessing import os import time import random def copy_file(queue, file_name,source_folder_name, dest_folder_name): """copy文件到指定的路径""" f_read = op

  • Python基于进程池实现多进程过程解析

    1.注意:pool必须在 if __name__ == '__main__' 下面运行,不然会报错 2.多进程内出现错误会直接跳过该进程,并且默认不会打印错误信息 3.if__name__下面的数据需要通过参数传入主函数里面,不然主函数获取不到该数据值而报错. 4.若不通过传参形式传入数据,可以定义全局变量.但是全局变量的值不能在多进程里面进行修改. 代码如下 from multiprocessing import Pool # 进程池,用于多进程 import os # 用于获取当前执行的文件

  • Python Multiprocessing多进程 使用tqdm显示进度条的实现

    1.背景 在python运行一些,计算复杂度比较高的函数时,服务器端单核CPU的情况比较耗时,因此需要多CPU使用多进程加快速度 2.函数要求 笔者使用的是:pathos.multiprocessing 库,进度条显示用tqdm库,安装方法: pip install pathos 安装完成后 from pathos.multiprocessing import ProcessingPool as Pool from tqdm import tqdm 这边使用pathos的原因是因为,multip

  • keras tensorflow 实现在python下多进程运行

    如下所示: from multiprocessing import Process import os def training_function(...): import keras # 此处需要在子进程中 ... if __name__ == '__main__': p = Process(target=training_function, args=(...,)) p.start() 原文地址:https://stackoverflow.com/questions/42504669/ker

  • python实现多进程按序号批量修改文件名的方法示例

    本文实例讲述了python实现多进程按序号批量修改文件名的方法.分享给大家供大家参考,具体如下: 说明 文件名命名方式如图,是数字序号开头,但是中间有些文件删掉了,序号不连续,这里将序号连续起来,总的文件量有40w+,故使用多进程 代码 import os import re from multiprocessing import Pool def getAllFilePath(pathFolder,filter=[".jpg",".txt"]): #遍历文件夹下所

  • python 多进程并行编程 ProcessPoolExecutor的实现

    使用 ProcessPoolExecutor from concurrent.futures import ProcessPoolExecutor, as_completed import random 斐波那契数列 当 n 大于 30 时抛出异常 def fib(n): if n > 30: raise Exception('can not > 30, now %s' % n) if n <= 2: return 1 return fib(n-1) + fib(n-2) 准备数组 nu

  • Python多进程原理与用法分析

    本文实例讲述了Python多进程原理与用法.分享给大家供大家参考,具体如下: 进程是程序在计算机上的一次执行活动.当你运行一个程序,你就启动了一个进程.显然,程序是死的(静态的),进程是活的(动态的).进程可以分为系统进程和用户进程.凡是用于完成操作系统的各种功能的进程就是系统进程,它们就是处于运行状态下的操作系统本身:所有由你启动的进程都是用户进程.进程是操作系统进行资源分配的单位. 开启一个进程 import multiprocessing,time,os def runtask(): ti

  • Python 多进程原理及实现

    1 进程的基本概念 什么是进程? ​ 进程就是一个程序在一个数据集上的一次动态执行过程.进程一般由程序.数据集.进程控制块三部分组成.我们编写的程序用来描述进程要完成哪些功能以及如何完成:数据集则是程序在执行过程中所需要使用的资源:进程控制块用来记录进程的外部特征,描述进程的执行变化过程,系统可以利用它来控制和管理进程,它是系统感知进程存在的唯一标志. 进程的生命周期:创建(New).就绪(Runnable).运行(Running).阻塞(Block).销毁(Destroy) 进程的状态(分类)

  • Python multiprocessing多进程原理与应用示例

    本文实例讲述了Python multiprocessing多进程原理与应用.分享给大家供大家参考,具体如下: multiprocessing包是Python中的多进程管理包,可以利用multiprocessing.Process对象来创建进程,Process对象拥有is_alive().join([timeout]).run().start().terminate()等方法. multprocessing模块的核心就是使管理进程像管理线程一样方便,每个进程有自己独立的GIL,所以不存在进程间争抢

  • Python多进程与服务器并发原理及用法实例分析

    本文实例分析了Python多进程与服务器并发原理及用法.分享给大家供大家参考,具体如下: 进程 什么是进程 进程:正在进行的一个过程或者说一个任务.而负责执行任务则是cpu. 进程与程序的区别 程序仅仅只是一堆代码而已,而进程指的是程序的运行过程. 并发与并行 无论是并行还是并发,在用户看来都是'同时'运行的,不管是进程还是线程,都只是一个任务而已,真是干活的是cpu,cpu来做这些任务,而一个cpu同一时刻只能执行一个任务 一 并发:是伪并行,即看起来是同时运行.单个cpu+多道技术就可以实现

  • 探究Python多进程编程下线程之间变量的共享问题

     1.问题: 群中有同学贴了如下一段代码,问为何 list 最后打印的是空值? from multiprocessing import Process, Manager import os manager = Manager() vip_list = [] #vip_list = manager.list() def testFunc(cc): vip_list.append(cc) print 'process id:', os.getpid() if __name__ == '__main_

  • Python多进程multiprocessing.Pool类详解

    multiprocessing模块 multiprocessing包是Python中的多进程管理包.它与 threading.Thread类似,可以利用multiprocessing.Process对象来创建一个进程.该进程可以允许放在Python程序内部编写的函数中.该Process对象与Thread对象的用法相同,拥有is_alive().join([timeout]).run().start().terminate()等方法.属性有:authkey.daemon(要通过start()设置)

  • python多进程下实现日志记录按时间分割

    python多进程下实现日志记录按时间分割,供大家参考,具体内容如下 原理:自定义日志handler继承TimedRotatingFileHandler,并重写computeRollover与doRollover函数.其中重写computeRollover是为了能按整分钟/小时/天来分割日志,如按天分割,2018-04-10 00:00:00~2018-04-11 00:00:00,是一个半闭半开区间,且不是原意的:从日志创建时间或当前时间开始,到明天的这个时候. 代码如下: #!/usr/bi

  • Python3.5多进程原理与用法实例分析

    本文实例讲述了Python3.5多进程原理与用法.分享给大家供大家参考,具体如下: 进程类:Process 示例及代码: (1)创建函数作为单进程 #!/usr/bin/env python # -*- coding:utf-8 -*- # Author:ZhengzhengLiu import multiprocessing import time #创建函数并将其作为单个进程 def worker(interval): n = 5 #进程数 while n>0: print("The

  • 提升python处理速度原理及方法实例

    这篇文章主要介绍了提升python处理速度原理及方法实例,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 导读:作为日常生产开发中非常实用的一门语言,python广泛应用于网络爬虫.web开发.自动化测试.数据分析和人工智能等领域.但python是单线程的,想要提升python的处理速度,涉及到一个很关键的技术--协程.本篇文章,将讲述python协程的理解与使用. 1.操作系统相关概念 在理解与使用协程之前,先简单的了解几个与操作系统相关的概念

  • python多进程中的生产者和消费者模型详解

    目录 Python生产者消费者模型 一.消费模式 二.传输原理 三.实现方式 Python生产者消费者模型 一.消费模式 生产者消费者模式 是Controlnet网络中特有的一种传输数据的模式.用于两个CPU之间传输数据,即使是不同类型同一厂家的CPU也可以通过设置来使用. 二.传输原理 类似与点对点传送,又略有不同,一个生产者可以对应N个消费者,但是一个消费者只能对应一个生产者: 每个生产者消费者对应一个地址,占一个网络节点,属于预定性数据,在网络中优先级最高: 此模式如果在网络中设置过多会影

随机推荐