Python线程池的正确使用方法

目录
  • Python线程池的正确使用
    • 1、为什么要使用线程池呢?
    • 2、线程池怎么用呢?
    • 3、如何非阻塞的获取线程执行的结果
    • 4、线程池的运行策略

Python线程池的正确使用

1、为什么要使用线程池呢?

因为线程执行完任务之后就会被系统销毁,下次再执行任务的时候再进行创建。这种方式在逻辑上没有啥问题。但是系统启动一个新线程的成本是比较高,因为其中涉及与操作系统的交互,操作系统需要给新线程分配资源。打个比方吧!就像软件公司招聘员工干活一样。当有活干时,就招聘一个外包人员干活。当活干完之后就把这个人员辞退掉。你说在这过程中所耗费的时间成本和沟通成本是不是很大。那么公司一般的做法是:当项目立项时就确定需要几名开发人员,然后将这些人员配齐。然后这些人员就常驻在项目组,有活就干,没活就摸鱼。线程池也是同样的道理。线程池可以定义最大线程数,这些线程有任务就执行任务,没任务就进入线程池中歇着。

2、线程池怎么用呢?

线程池的基类是concurrent.futures模块中的Executor类,而Executor类提供了两个子类,即ThreadPoolExecutor类和ProcessPoolExecutor类。其中ThreadPoolExecutor用于创建线程池,而ProcessPoolExecutor用于创建进程池。本文将重点介绍ThreadPoolExecutor类的使用。首先,让我们来看看ThreadPoolExecutor类的构造函数。这里使用的Python版本是:3.6.7。

      def __init__(self, max_workers=None, thread_name_prefix=''):
        """Initializes a new ThreadPoolExecutor instance.

        Args:
            max_workers: The maximum number of threads that can be used to
                execute the given calls.
            thread_name_prefix: An optional name prefix to give our threads.
        """
        if max_workers is None:
            # Use this number because ThreadPoolExecutor is often
            # used to overlap I/O instead of CPU work.
            max_workers = (os.cpu_count() or 1) * 5
        if max_workers <= 0:
            raise ValueError("max_workers must be greater than 0")

        self._max_workers = max_workers
        self._work_queue = queue.Queue()
        self._threads = set()
        self._shutdown = False
        self._shutdown_lock = threading.Lock()
        self._thread_name_prefix = (thread_name_prefix or
                                    ("ThreadPoolExecutor-%d" % self._counter()))

他的构造函数只有两个参数:一个是max_workers参数,用于指定线程池的最大线程数,如果不指定的话则默认是CPU核数的5倍。另一个参数是thread_name_prefix,它用来指定线程池中线程的名称前缀。其他参数:

  • _shutdown初始值值为False,默认情况下线程池不销毁,即线程池的生命周期跟项目的生命周期一致。
  • self._work_queue = queue.Queue()生成缓冲队列。
  • _threads没有任务被提交时,线程的数量设置为0。
  • _shutdown_lock 指定线程池的锁是Lock锁。
  • 说完了线程池的创建之后,接着来看看线程池中比较常用的几个方法吧。
  • submit(self, fn, *args, **kwargs):
  • 该方法用提交任务,即将fn函数提交给线程池,*args代表传给fn函数的参数,**kwargs代表以关键字参数的形式为fn函数传入参数。
  • shutdown(self, wait=True):
  • 关闭线程池
  • map(func, *iterables, timeout=None, chunksize=1):
  • 该函数类似于全局函数map(func,*iterables),只是该函数将会启动多个线程,以异步方式立即对iterables执行map处理。

程序将task函数通过submit方法提交给线程池之后,线程池会返回一个Future对象,该对象的作用主要是用于获取线程任务函数的返回值。Future提供了如下几个方法。

  • cancel():取消该Future代表的线程任务。如果该任务正在执行,不可取消,则该方法返回False;否则,程序会取消该任务,并返回True。
  • result(timeout=None):获取该 Future 代表的线程任务最后返回的结果。如果 Future 代表的线程任务还未完成,该方法将会阻塞当前线程,其中 timeout 参数指定最多阻塞多少秒。
  • add_done_callback(fn):为该 Future 代表的线程任务注册一个“回调函数”,当该任务成功完成时,程序会自动触发该 fn 函数。
  • done():如果该Future代表的线程任务被成功取消或执行完成,则该方法返回True。

来个简单的例子:

该例中创建了一个最大线程数是2的线程池来执行async_add函数。

from concurrent.futures import ThreadPoolExecutor
import threading
import time

def async_add(max):
    sum = 0
    for i in range(max):
        sum = sum + i
    time.sleep(1)
    print(threading.current_thread().name + "执行求和操作求得的和是=" + str(sum))
    return sum

# 创建两个线程
pool = ThreadPoolExecutor(max_workers=2, thread_name_prefix='测试线程')
# 向线程池提交一个task,20作为async_add()函数的参数
future1 = pool.submit(async_add, 20)
# 向线程池再提交一个task
future2 = pool.submit(async_add, 50)
# 判断future1代表的任务是否执行完
time.sleep(2)
print(future1.done())
print(future2.done())
# 查看future1代表的任务返回的结果
print('线程一的执行结果是=' + str(future1.result()))
# 查看future2代表的任务的返回结果
print('线程二的执行结果是=' + str(future2.result()))
print("----" + threading.current_thread().name + "----主线程执行结束-----")

运行结果是:

测试线程_0执行求和操作求得的和是=190
测试线程_1执行求和操作求得的和是=1225
True
True
线程一的执行结果是=190
线程二的执行结果是=1225
----MainThread----主线程执行结束-----

本例中定义了一个最大线程数是2的线程池,并向线程池中提交了两个任务,其中async_add函数就是要执行的任务。在async_add函数中添加 time.sleep(1) 休眠一秒是为了验证done()方法返回的结果。最后才打印主线程执行结束表明result()方法是阻塞的。如果将result()屏蔽掉。
改成如下形式:

# 创建两个线程
pool = ThreadPoolExecutor(max_workers=2, thread_name_prefix='测试线程')
# 向线程池提交一个task,20作为async_add()函数的参数
future1 = pool.submit(async_add, 20)
# 向线程池再提交一个task
future2 = pool.submit(async_add, 50)
# 判断future1代表的任务是否执行完
print(future1.done())
print(future2.done())
print("----" + threading.current_thread().name + "----主线程执行结束-----")

则运行结果是:

False
False
----MainThread----主线程执行结束-----
测试线程_0执行求和操作求得的和是=190
测试线程_1执行求和操作求得的和是=1225

3、如何非阻塞的获取线程执行的结果

前面介绍的result()方法是通过阻塞的方式来获取线程的运行结果的。那么如果通过非阻塞的方法来获取线程任务最后的返回结果呢?这里就需要使用线程的回调函数来获取线程的返回结果。

from concurrent.futures import ThreadPoolExecutor
import threading
import time

def async_add(max):
    sum = 0
    for i in range(max):
        sum = sum + i
    time.sleep(1)
    print(threading.current_thread().name + "执行求和操作求得的和是=" + str(sum))
    return sum

with ThreadPoolExecutor(max_workers=2) as pool:
    # 向线程池提交一个task
    future1 = pool.submit(async_add, 20)
    future2 = pool.submit(async_add, 50)

    # 定义获取结果的函数
    def get_result(future):
        print(threading.current_thread().name + '运行结果:' + str(future.result()))

    # 查看future1代表的任务返回的结果
    future1.add_done_callback(get_result)
    # 查看future2代表的任务的返回结果
    future2.add_done_callback(get_result)
    print('------------主线程执行结束----')

运行结果是:

------------主线程执行结束----
ThreadPoolExecutor-0_1执行求和操作求得的和是=1225
ThreadPoolExecutor-0_1运行结果:1225
ThreadPoolExecutor-0_0执行求和操作求得的和是=190
ThreadPoolExecutor-0_0运行结果:190

从结果可以看出获取线程执行结果的方法完全没有阻塞到主线程的运行。这里通过add_done_callback函数向线程池中注册了一个获取线程执行结果的函数get_result。
由于线程池实现了上下文管理协议(Context Manage Protocol),因此程序可以使用with语句来管理线程池,这样即可避免手动关闭线程池。

4、线程池的运行策略

这里有必要介绍一下线程池的执行策略,也就是说当线程池中的任务数大于线程池的最大线程数时,线程池该如何处理这些任务呢?处理不了的任务是直接丢弃还是慢慢处理呢?再回答这个问题之前,让我们来看下下面这个例子:这里定义了一个最大线程数是4个线程池,然后向线程池中提交了100个task任务。

def async_add(max):
    sum = 0
    for i in range(max):
        sum = sum + i
    time.sleep(1)
    print(threading.current_thread().name + "执行求和操作求得的和是=" + str(sum))
    return sum

with ThreadPoolExecutor(max_workers=4) as pool:
    for i in range(100):
        pool.submit(async_add, i)
    print('------------主线程执行结束----')

运行结果是:

------------主线程执行结束----
ThreadPoolExecutor-0_1执行求和操作求得的和是=0
ThreadPoolExecutor-0_0执行求和操作求得的和是=0
ThreadPoolExecutor-0_3执行求和操作求得的和是=3
ThreadPoolExecutor-0_2执行求和操作求得的和是=1
...省略部分结果.....
ThreadPoolExecutor-0_1执行求和操作求得的和是=4656
ThreadPoolExecutor-0_2执行求和操作求得的和是=4753
ThreadPoolExecutor-0_0执行求和操作求得的和是=4560
ThreadPoolExecutor-0_3执行求和操作求得的和是=4851

从运行结果可以看出:一直都是相同的线程来执行这些任务,并且所有的任务都没有被丢弃。并且任务按照先来后到的顺序来执行。这里就需要说到线程池默认的缓冲队列了。self._work_queue = queue.Queue() 该语句会创建一个大小无限制的缓冲队列。该队列是一个 FIFO(先进先出)的常规队列。所以当任务数超过最大线程数时,任务会暂时放在缓冲队列queue中。当线程空闲之后会从缓冲队列中取出任务来执行。
该队列有个参数maxsize可以限制队列的大小。如果队列的大小达到队列的上限,就会加锁,再次加入元素时,就会被阻塞,直到队列中的元素被消费。如果将maxsize的设置为0或者负数时,则该队列的大小就是无限制的。

到此这篇关于Python线程池的正确使用方法的文章就介绍到这了,更多相关Python线程池的正确使用内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • python线程池的四种好处总结

    1.使用好处 提高性能:由于减去了大量新建终止线程的费用,重用了线程资源: 适用场景:适用于处理大量突发请求或需要大量线程完成任务,但实际任务处理时间短. 防御功能:可以有效避免系统因线程过多而导致系统负载过大而相应变慢的问题. 代码优势:使用线程池的语法比创建自己的线程更简单. 2.实例 """ @file : 004-线程池的使用.py @author : xiaolu @email : luxiaonlp@163.com @time : 2021-02-01 "

  • Python 线程池模块之多线程操作代码

    1.线程池模块 引入 from concurrent.futures import ThreadPoolExecutor 2.使用线程池 一个简单的线程池使用案例 from concurrent.futures import ThreadPoolExecutor import time pool = ThreadPoolExecutor(10, 'Python') def fun(): time.sleep(1) print(1, end='') if __name__ == '__main__

  • Python 使用threading+Queue实现线程池示例

    一.线程池 1.为什么需要使用线程池 1.1 创建/销毁线程伴随着系统开销,过于频繁的创建/销毁线程,会很大程度上影响处理效率. 记创建线程消耗时间T1,执行任务消耗时间T2,销毁线程消耗时间T3,如果T1+T3>T2,那说明开启一个线程来执行这个任务太不划算了!在线程池缓存线程可用已有的闲置线程来执行新任务,避免了创建/销毁带来的系统开销. 1.2 线程并发数量过多,抢占系统资源从而导致阻塞. 线程能共享系统资源,如果同时执行的线程过多,就有可能导致系统资源不足而产生阻塞的情况. 1.3 对线

  • python线程池 ThreadPoolExecutor 的用法示例

    前言 从Python3.2开始,标准库为我们提供了 concurrent.futures 模块,它提供了 ThreadPoolExecutor (线程池)和ProcessPoolExecutor (进程池)两个类. 相比 threading 等模块,该模块通过 submit 返回的是一个 future 对象,它是一个未来可期的对象,通过它可以获悉线程的状态主线程(或进程)中可以获取某一个线程(进程)执行的状态或者某一个任务执行的状态及返回值: 主线程可以获取某一个线程(或者任务的)的状态,以及返

  • 实例代码讲解Python 线程池

    大家都知道当任务过多,任务量过大时如果想提高效率的一个最简单的方法就是用多线程去处理,比如爬取上万个网页中的特定数据,以及将爬取数据和清洗数据的工作交给不同的线程去处理,也就是生产者消费者模式,都是典型的多线程使用场景. 那是不是意味着线程数量越多,程序的执行效率就越快呢. 显然不是.线程也是一个对象,是需要占用资源的,线程数量过多的话肯定会消耗过多的资源,同时线程间的上下文切换也是一笔不小的开销,所以有时候开辟过多的线程不但不会提高程序的执行效率,反而会适得其反使程序变慢,得不偿失. 所以,如

  • Python 如何创建一个线程池

    问题 你创建一个工作者线程池,用来响应客户端请求或执行其他的工作. 解决方案 concurrent.futures 函数库有一个 ThreadPoolExecutor 类可以被用来完成这个任务. 下面是一个简单的TCP服务器,使用了一个线程池来响应客户端: from socket import AF_INET, SOCK_STREAM, socket from concurrent.futures import ThreadPoolExecutor def echo_client(sock, c

  • Python爬虫之线程池的使用

    一.前言 学到现在,我们可以说已经学习了爬虫的基础知识,如果没有那些奇奇怪怪的反爬虫机制,基本上只要有时间分析,一般的数据都是可以爬取的,那么到了这个时候我们需要考虑的就是爬取的效率了,关于提高爬虫效率,也就是实现异步爬虫,我们可以考虑以下两种方式:一是线程池的使用(也就是实现单进程下的多线程),一是协程的使用(如果没有记错,我所使用的协程模块是从python3.4以后引入的,我写博客时使用的python版本是3.9). 今天我们先来讲讲线程池. 二.同步代码演示 我们先用普通的同步的形式写一段

  • python爬虫线程池案例详解(梨视频短视频爬取)

    python爬虫-梨视频短视频爬取(线程池) 示例代码 import requests from lxml import etree import random from multiprocessing.dummy import Pool # 多进程要传的方法,多进程pool.map()传的第二个参数是一个迭代器对象 # 而传的get_video方法也要有一个迭代器参数 def get_video(dic): headers = { 'User-Agent':'Mozilla/5.0 (Wind

  • python线程池如何使用

    线程池的使用 线程池的基类是 concurrent.futures 模块中的 Executor,Executor 提供了两个子类,即 ThreadPoolExecutor 和ProcessPoolExecutor,其中 ThreadPoolExecutor 用于创建线程池,而 ProcessPoolExecutor 用于创建进程池. 如果使用线程池/进程池来管理并发编程,那么只要将相应的 task 函数提交给线程池/进程池,剩下的事情就由线程池/进程池来搞定. Exectuor 提供了如下常用方

  • Python定时器线程池原理详解

    这篇文章主要介绍了Python定时器线程池原理详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 定时器执行循环任务: 知识储备 Timer(interval, function, args=None, kwargs=None) interval ===> 时间间隔 单位为s function ===> 定制执行的函数 使用threading的 Timer 类 start() 为通用的开始执行方法 cancel ()为取消执行的方法 普通单次

  • 解决python ThreadPoolExecutor 线程池中的异常捕获问题

    问题 最近写了涉及线程池及线程的 python 脚本,运行过程中发现一个有趣的现象,线程池中的工作线程出现问题,引发了异常,但是主线程没有捕获异常,还在发现 BUG 之前一度以为线程池代码正常返回. 先说重点 这里主要想介绍 python concurrent.futuresthread.ThreadPoolExecutor 线程池中的 worker 引发异常的时候,并不会直接向上抛起异常,而是需要主线程通过调用concurrent.futures.Future.exception(timeou

随机推荐