python 多进程和多线程使用详解

进程和线程

进程是系统进行资源分配的最小单位,线程是系统进行调度执行的最小单位;

一个应用程序至少包含一个进程,一个进程至少包含一个线程;

每个进程在执行过程中拥有独立的内存空间,而一个进程中的线程之间是共享该进程的内存空间的;

  • 计算机的核心是CPU,它承担了所有的计算任务。它就像一座工厂,时刻在运行。
  • 假定工厂的电力有限,一次只能供给一个车间使用。也就是说,一个车间开工的时候,其他车间都必须停工。背后的含义就是,单个CPU一次只能运行一个任务。编者注: 多核的CPU就像有了多个发电厂,使多工厂(多进程)实现可能。
  • 进程就好比工厂的车间,它代表CPU所能处理的单个任务。任一时刻,CPU总是运行一个进程,其他进程处于非运行状态。
  • 一个车间里,可以有很多工人。他们协同完成一个任务。
  • 线程就好比车间里的工人。一个进程可以包括多个线程。
  • 车间的空间是工人们共享的,比如许多房间是每个工人都可以进出的。这象征一个进程的内存空间是共享的,每个线程都可以使用这些共享内存。
  • 可是,每间房间的大小不同,有些房间最多只能容纳一个人,比如厕所。里面有人的时候,其他人就不能进去了。这代表一个线程使用某些共享内存时,其他线程必须等它结束,才能使用这一块内存。
  • 一个防止他人进入的简单方法,就是门口加一把锁。先到的人锁上门,后到的人看到上锁,就在门口排队,等锁打开再进去。这就叫"互斥锁"(Mutual exclusion,缩写 Mutex),防止多个线程同时读写某一块内存区域。
  • 还有些房间,可以同时容纳n个人,比如厨房。也就是说,如果人数大于n,多出来的人只能在外面等着。这好比某些内存区域,只能供给固定数目的线程使用。
  • 这时的解决方法,就是在门口挂n把钥匙。进去的人就取一把钥匙,出来时再把钥匙挂回原处。后到的人发现钥匙架空了,就知道必须在门口排队等着了。这种做法叫做"信号量"(Semaphore),用来保证多个线程不会互相冲突。
  • 不难看出,mutex是semaphore的一种特殊情况(n=1时)。也就是说,完全可以用后者替代前者。但是,因为mutex较为简单,且效率高,所以在必须保证资源独占的情况下,还是采用这种设计。

Python的多进程

Python的多进程依赖于multiprocess模块;使用多进程可以利用多个CPU进行并行计算;

实例:

from multiprocessing import Process
import os
import time
 
def long_time_task(i):
    print('子进程: {} - 任务{}'.format(os.getpid(), i))
    time.sleep(2)
    print("结果: {}".format(8 ** 20))
 
if __name__=='__main__':
    print('当前母进程: {}'.format(os.getpid()))
    start = time.time()
    p1 = Process(target=long_time_task, args=(1,))
    p2 = Process(target=long_time_task, args=(2,))
    print('等待所有子进程完成。')
    p1.start()
    p2.start()
    p1.join()
    p2.join()
    end = time.time()
    print("总共用时{}秒".format((end - start)))

新创建进程和进程间切换是需要消耗资源的,所以应该控制进程数量;

同时可运行的进程数量收到CPU核数限制;

进程池

使用进程池pool创建进程:

使用进程池可以避免手工进行进程的创建的麻烦,默认数量是CPU核数;

Pool类可以提供指定数量的进程供用户使用,当有新的请求被提交到Pool中的时候,如果进程池还没有满,就会创建一个新的进程来执行请求;如果池已经满了,请求就会等待,等到有空闲进程可以使用时,才会执行请求;

几个方法:

1.apply_async

作用是向进程池提交需要执行的函数和参数,各个进程采用非阻塞的异步方式调用,每个进程只管自己运行,是默认方式;

2.map

会阻塞进程直到返回结果;

3.map_sunc

非阻塞进程;

4.close

关闭进程池,不再接受任务;

5.terminate

结束进程;

6.join

主进程阻塞,直到子进程执行结束;

实例:

from multiprocessing import Pool, cpu_count
import os
import time
 
def long_time_task(i):
    print('子进程: {} - 任务{}'.format(os.getpid(), i))
    time.sleep(2)
    print("结果: {}".format(8 ** 20))
 
if __name__=='__main__':
    print("CPU内核数:{}".format(cpu_count()))
    print('当前母进程: {}'.format(os.getpid()))
    start = time.time()
    p = Pool(4)
    for i in range(5):
        p.apply_async(long_time_task, args=(i,))
    print('等待所有子进程完成。')
    p.close()
    p.join()
    end = time.time()
    print("总共用时{}秒".format((end - start)))

在join之前,必须使用close或者terminate,让进程池不再接受任务;

多进程间的数据通信与共享

通常,进程之间是相互独立的,每个进程都有独立的内存。通过共享内存(nmap模块),进程之间可以共享对象,使多个进程可以访问同一个变量(地址相同,变量名可能不同)。多进程共享资源必然会导致进程间相互竞争,所以应该尽最大可能防止使用共享状态。还有一种方式就是使用队列queue来实现不同进程间的通信或数据共享,这一点和多线程编程类似。

下例这段代码中中创建了2个独立进程,一个负责写(pw), 一个负责读(pr), 实现了共享一个队列queue。

from multiprocessing import Process, Queue
import os, time, random
 
# 写数据进程执行的代码:
def write(q):
    print('Process to write: {}'.format(os.getpid()))
    for value in ['A', 'B', 'C']:
        print('Put %s to queue...' % value)
        q.put(value)
        time.sleep(random.random())
 
# 读数据进程执行的代码:
def read(q):
    print('Process to read:{}'.format(os.getpid()))
    while True:
        value = q.get(True)
        print('Get %s from queue.' % value)
 
if __name__=='__main__':
    # 父进程创建Queue,并传给各个子进程:
    q = Queue()
    pw = Process(target=write, args=(q,))
    pr = Process(target=read, args=(q,))
    # 启动子进程pw,写入:
    pw.start()
    # 启动子进程pr,读取:
    pr.start()
    # 等待pw结束:
    pw.join()
    # pr进程里是死循环,无法等待其结束,只能强行终止:
    pr.terminate()

Python的多线程

python 3中的多进程编程主要依靠threading模块。创建新线程与创建新进程的方法非常类似。threading.Thread方法可以接收两个参数, 第一个是target,一般指向函数名,第二个时args,需要向函数传递的参数。对于创建的新线程,调用start()方法即可让其开始。我们还可以使用current_thread().name打印出当前线程的名字。

import threading
import time
 
def long_time_task(i):
    print('当前子线程: {} 任务{}'.format(threading.current_thread().name, i))
    time.sleep(2)
    print("结果: {}".format(8 ** 20))
 
if __name__=='__main__':
    start = time.time()
    print('这是主线程:{}'.format(threading.current_thread().name))
    thread_list = []
    for i in range(1, 3):
        t = threading.Thread(target=long_time_task, args=(i, ))
        thread_list.append(t)
    for t in thread_list:
        t.start()
    for t in thread_list:
        t.join()
    end = time.time()
    print("总共用时{}秒".format((end - start)))

多线程间的数据共享

一个进程所含的不同线程间共享内存,这就意味着任何一个变量都可以被任何一个线程修改,因此线程之间共享数据最大的危险在于多个线程同时改一个变量,把内容给改乱了。如果不同线程间有共享的变量,其中一个方法就是在修改前给其上一把锁lock,确保一次只有一个线程能修改它。threading.lock()方法可以轻易实现对一个共享变量的锁定,修改完后release供其它线程使用。

import threading
 
class Account:
    def __init__(self):
        self.balance = 0
 
    def add(self, lock):
        # 获得锁
        lock.acquire()
        for i in range(0, 100000):
            self.balance += 1
        # 释放锁
        lock.release()
 
    def delete(self, lock):
        # 获得锁
        lock.acquire()
        for i in range(0, 100000):
            self.balance -= 1
            # 释放锁
        lock.release()
 
if __name__ == "__main__":
    account = Account()
    lock = threading.Lock()
    # 创建线程
   thread_add = threading.Thread(target=account.add, args=(lock,), name='Add')
    thread_delete = threading.Thread(target=account.delete, args=(lock,), name='Delete')
 
    # 启动线程
   thread_add.start()
    thread_delete.start()
 
    # 等待线程结束
   thread_add.join()
    thread_delete.join()
 
    print('The final balance is: {}'.format(account.balance))

使用queue队列通信-经典的生产者和消费者模型

from queue import Queue
import random, threading, time
 
# 生产者类
class Producer(threading.Thread):
    def __init__(self, name, queue):
        threading.Thread.__init__(self, name=name)
        self.queue = queue
 
    def run(self):
        for i in range(1, 5):
            print("{} is producing {} to the queue!".format(self.getName(), i))
            self.queue.put(i)
            time.sleep(random.randrange(10) / 5)
        print("%s finished!" % self.getName())
 
# 消费者类
class Consumer(threading.Thread):
    def __init__(self, name, queue):
        threading.Thread.__init__(self, name=name)
        self.queue = queue
 
    def run(self):
        for i in range(1, 5):
            val = self.queue.get()
            print("{} is consuming {} in the queue.".format(self.getName(), val))
            time.sleep(random.randrange(10))
        print("%s finished!" % self.getName())
 
def main():
    queue = Queue()
    producer = Producer('Producer', queue)
    consumer = Consumer('Consumer', queue)
 
    producer.start()
    consumer.start()
 
    producer.join()
    consumer.join()
    print('All threads finished!')
 
if __name__ == '__main__':
    main()
  • 对CPU密集型代码(比如循环计算) - 多进程效率更高
  • 对IO密集型代码(比如文件操作,网络爬虫) - 多线程效率更高。

对于IO密集型操作,大部分消耗时间其实是等待时间,在等待时间中CPU是不需要工作的,那你在此期间提供双CPU资源也是利用不上的,相反对于CPU密集型代码,2个CPU干活肯定比一个CPU快很多。那么为什么多线程会对IO密集型代码有用呢?这时因为python碰到等待会释放GIL供新的线程使用,实现了线程间的切换。

以上就是python 多进程和多线程使用详解的详细内容,更多关于python 多进程和多线程的资料请关注我们其它相关文章!

(0)

相关推荐

  • Python 多进程、多线程效率对比

    Python 界有条不成文的准则: 计算密集型任务适合多进程,IO 密集型任务适合多线程.本篇来作个比较. 通常来说多线程相对于多进程有优势,因为创建一个进程开销比较大,然而因为在 python 中有 GIL 这把大锁的存在,导致执行计算密集型任务时多线程实际只能是单线程.而且由于线程之间切换的开销导致多线程往往比实际的单线程还要慢,所以在 python 中计算密集型任务通常使用多进程,因为各个进程有各自独立的 GIL,互不干扰. 而在 IO 密集型任务中,CPU 时常处于等待状态,操作系统需要

  • Python多进程与多线程的使用场景详解

    前言 Python多进程适用的场景:计算密集型(CPU密集型)任务 Python多线程适用的场景:IO密集型任务 计算密集型任务一般指需要做大量的逻辑运算,比如上亿次的加减乘除,使用多核CPU可以并发提高计算性能. IO密集型任务一般指输入输出型,比如文件的读取,或者网络的请求,这类场景一般会遇到IO阻塞,使用多核CPU来执行并不会有太高的性能提升. 下面使用一台64核的虚拟机来执行任务,通过示例代码来区别它们, 示例1:执行计算密集型任务,进行1亿次运算 使用多进程 from multipro

  • Python多线程多进程实例对比解析

    多线程适合于多io操作 多进程适合于耗cpu(计算)的操作 # 多进程编程 # 耗cpu的操作,用多进程编程, 对于io操作来说,使用多线程编程 import time from concurrent.futures import ThreadPoolExecutor, as_completed from concurrent.futures import ProcessPoolExecutor def fib(n): if n <= 2: return 1 return fib(n - 2)

  • python多线程和多进程关系详解

    关于多线程的大概讲解: 在Python的标准库中给出了2个模块:_thread和threading,_thread是低级模块不支持守护线程,当主线程退出了时,全部子线程都会被强制退出了.而threading是高级模块,用作对_thread进行了封装支持守护线程.在大部分状况下人们只需要采用threading这个高级模块即可. 关于多进程的大概讲解: 多进程是multiprocessing模块给出远程与本地的并发,在一个multiprocessing库的采用场景下,全部的子进程全是由一个父进程运行

  • Python2.7实现多进程下开发多线程示例

    简单的基于Python2.7版本的多进程下开发多线程的示例,供大家参考,具体内容如下 可以使得程序执行效率至少提升10倍 #!/usr/bin/env python # -*- coding: utf-8 -*- """ @Time : 2018/10/24 @Author : LiuXueWen @Site : @File : transfer.py @Software: PyCharm @Description: """ import os

  • Python之多进程与多线程的使用

    进程与线程 想象在学校的一个机房,有固定数量的电脑,老师安排了一个爬虫任务让大家一起完成,每个学生使用一台电脑爬取部分数据,将数据放到一个公共数据库.共同资源就像公共数据库,进程就像每一个学生,每多一个学生,就多一个进程来完成这个任务,机房里的电脑数量就像CPU,所以进程数量是CPU决定的,线程就像学生用一台电脑开多个爬虫,爬虫数量由每台电脑的运行内存决定. 一个CPU可以有多个进程,一个进程有一个或多个线程. 多进程 1.导包 from multiprocessing import Proce

  • Python实现多线程/多进程的TCP服务器

    多线程的TCP服务器,供大家参考,具体内容如下 背景:同学公司的传感器设备需要将收集的数据发到服务器上,前期想写一个简单的服务器来测试下使用效果,设备收集的数据非常的重要,所以考虑使用TCP协议来实现. 因为只是测试使用,所以采用多线程的方式,毕竟节省资源嘛(使用协程时会导致I/O阻塞) 开门见山,直接搬上来了 一.tcp_server_v1.0使用说明: 1.运行环境:python3解释器,并安装socket.threading模块: 2.该版本使用多线程实现的多任务: 3.支持多台设备同时连

  • Python全局锁中如何合理运用多线程(多进程)

    Python全局锁 (1)全局锁导致的问题 全局锁的英文简称是GIL,全称是Global Interpreter Lock(全局解释器锁),来源是python设计之初的考虑,为了数据安全所做的决定,每个线程在执行时候都需要先获取GIL,保证同一时刻只有一个线程可以执行代码,即同一时刻只有一个线程使用CPU,也就是说多线程并不是真正意义上的同时执行. 每个CPU在同一时间只能执行一个线程(在单核CPU下的多线程其实都只是并发,不是并行,并发和并行从宏观上来讲都是同时处理多路请求的概念.但并发和并行

  • 处理python中多线程与多进程中的数据共享问题

    之前在写多线程与多进程的时候,因为一般情况下都是各自完成各自的任务,各个子线程或者各个子进程之前并没有太多的联系,如果需要通信的话我会使用队列或者数据库来完成,但是最近我在写一些多线程与多进程的代码时,发现如果它们需要用到共享变量的话,需要有一些注意的地方 多线程之间的共享数据 标准数据类型在线程间共享 看以下代码 #coding:utf-8 import threading def test(name,data): print("in thread {} name is {}".fo

  • python多线程与多进程及其区别详解

    前言 个人一直觉得对学习任何知识而言,概念是相当重要的.掌握了概念和原理,细节可以留给实践去推敲.掌握的关键在于理解,通过具体的实例和实际操作来感性的体会概念和原理可以起到很好的效果.本文通过一些具体的例子简单介绍一下python的多线程和多进程,后续会写一些进程通信和线程通信的一些文章. python多线程 python中提供两个标准库thread和threading用于对线程的支持,python3中已放弃对前者的支持,后者是一种更高层次封装的线程库,接下来均以后者为例. 创建线程 pytho

  • python线程安全及多进程多线程实现方法详解

    进程和线程的区别 进程是对运行时程序的封装,是系统资源调度和分配的基本单位 线程是进程的子任务,cpu调度和分配的基本单位,实现进程内并发. 一个进程可以包含多个线程,线程依赖进程存在,并共享进程内存 什么是线程安全 一个线程的修改被另一个线程的修改覆盖掉. python中哪些操作是线程安全的 一个操作可以在多线程环境中使用,并且获得正确的结果. 线程安全的操作线程是顺序执行的而不是并发执行的. 一般涉及到写操作需要考虑如何让多个线程安全访问数据. 线程同步的方式 互斥量(锁): 通过互斥机制防

随机推荐