python线程池 ThreadPoolExecutor 的用法示例

前言

从Python3.2开始,标准库为我们提供了 concurrent.futures 模块,它提供了 ThreadPoolExecutor (线程池)和ProcessPoolExecutor (进程池)两个类。

相比 threading 等模块,该模块通过 submit 返回的是一个 future 对象,它是一个未来可期的对象,通过它可以获悉线程的状态主线程(或进程)中可以获取某一个线程(进程)执行的状态或者某一个任务执行的状态及返回值:

主线程可以获取某一个线程(或者任务的)的状态,以及返回值。
当一个线程完成的时候,主线程能够立即知道。
让多线程和多进程的编码接口一致。

线程池的基本使用

# coding: utf-8
from concurrent.futures import ThreadPoolExecutor
import time

def spider(page):
 time.sleep(page)
 print(f"crawl task{page} finished")
 return page

with ThreadPoolExecutor(max_workers=5) as t: # 创建一个最大容纳数量为5的线程池
 task1 = t.submit(spider, 1)
 task2 = t.submit(spider, 2) # 通过submit提交执行的函数到线程池中
 task3 = t.submit(spider, 3)

 print(f"task1: {task1.done()}") # 通过done来判断线程是否完成
 print(f"task2: {task2.done()}")
 print(f"task3: {task3.done()}")

 time.sleep(2.5)
 print(f"task1: {task1.done()}")
 print(f"task2: {task2.done()}")
 print(f"task3: {task3.done()}")
 print(task1.result()) # 通过result来获取返回值

执行结果如下:

task1: False
task2: False
task3: False
crawl task1 finished
crawl task2 finished
task1: True
task2: True
task3: False
1
crawl task3 finished

1.使用 with 语句 ,通过 ThreadPoolExecutor 构造实例,同时传入 max_workers 参数来设置线程池中最多能同时运行的线程数目。

2.使用 submit 函数来提交线程需要执行的任务到线程池中,并返回该任务的句柄(类似于文件、画图),注意 submit() 不是阻塞的,而是立即返回。

3.通过使用 done() 方法判断该任务是否结束。上面的例子可以看出,提交任务后立即判断任务状态,显示四个任务都未完成。在延时2.5后,task1 和 task2 执行完毕,task3 仍在执行中。

4.使用 result() 方法可以获取任务的返回值。

主要方法

  • wait

wait(fs, timeout=None, return_when=ALL_COMPLETED)

wait 接受三个参数:
fs: 表示需要执行的序列
timeout: 等待的最大时间,如果超过这个时间即使线程未执行完成也将返回
return_when:表示wait返回结果的条件,默认为 ALL_COMPLETED 全部执行完成再返回

还是用上面那个例子来熟悉用法
示例:

from concurrent.futures import ThreadPoolExecutor, wait, FIRST_COMPLETED, ALL_COMPLETED
import time

def spider(page):
 time.sleep(page)
 print(f"crawl task{page} finished")
 return page

with ThreadPoolExecutor(max_workers=5) as t:
 all_task = [t.submit(spider, page) for page in range(1, 5)]
 wait(all_task, return_when=FIRST_COMPLETED)
 print('finished')
 print(wait(all_task, timeout=2.5))

# 运行结果
crawl task1 finished
finished
crawl task2 finished
crawl task3 finished
DoneAndNotDoneFutures(done={<Future at 0x28c8710 state=finished returned int>, <Future at 0x2c2bfd0 state=finished returned int>, <Future at 0x2c1b7f0 state=finished returned int>}, not_done={<Future at 0x2c3a240 state=running>})
crawl task4 finished

1.代码中返回的条件是:当完成第一个任务的时候,就停止等待,继续主线程任务

2.由于设置了延时, 可以看到最后只有 task4 还在运行中

  • as_completed

上面虽然提供了判断任务是否结束的方法,但是不能在主线程中一直判断啊。最好的方法是当某个任务结束了,就给主线程返回结果,而不是一直判断每个任务是否结束。
ThreadPoolExecutorThreadPoolExecutor 中 的 as_completed() 就是这样一个方法,当子线程中的任务执行完后,直接用 result() 获取返回结果

用法如下:

# coding: utf-8
from concurrent.futures import ThreadPoolExecutor, as_completed
import time

def spider(page):
 time.sleep(page)
 print(f"crawl task{page} finished")
 return page

def main():
 with ThreadPoolExecutor(max_workers=5) as t:
  obj_list = []
  for page in range(1, 5):
   obj = t.submit(spider, page)
   obj_list.append(obj)

  for future in as_completed(obj_list):
   data = future.result()
   print(f"main: {data}")

# 执行结果
crawl task1 finished
main: 1
crawl task2 finished
main: 2
crawl task3 finished
main: 3
crawl task4 finished
main: 4

as_completed() 方法是一个生成器,在没有任务完成的时候,会一直阻塞,除非设置了 timeout。

当有某个任务完成的时候,会 yield 这个任务,就能执行 for 循环下面的语句,然后继续阻塞住,循环到所有的任务结束。同时,先完成的任务会先返回给主线程。

  • map

map(fn, *iterables, timeout=None)

fn: 第一个参数 fn 是需要线程执行的函数;
iterables:第二个参数接受一个可迭代对象;
timeout: 第三个参数 timeout 跟 wait() 的 timeout 一样,但由于 map 是返回线程执行的结果,如果 timeout小于线程执行时间会抛异常 TimeoutError。

用法如下:

import time
from concurrent.futures import ThreadPoolExecutor

def spider(page):
 time.sleep(page)
 return page

start = time.time()
executor = ThreadPoolExecutor(max_workers=4)

i = 1
for result in executor.map(spider, [2, 3, 1, 4]):
 print("task{}:{}".format(i, result))
 i += 1

# 运行结果
task1:2
task2:3
task3:1
task4:4

使用 map 方法,无需提前使用 submit 方法,map 方法与 python 高阶函数 map 的含义相同,都是将序列中的每个元素都执行同一个函数。

上面的代码对列表中的每个元素都执行 spider() 函数,并分配各线程池。

可以看到执行结果与上面的 as_completed() 方法的结果不同,输出顺序和列表的顺序相同,就算 1s 的任务先执行完成,也会先打印前面提交的任务返回的结果。

多线程实战

以某网站为例,演示线程池和单线程两种方式爬取的差异

# coding: utf-8
import requests
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
import json
from requests import adapters

from proxy import get_proxies

headers = {
 "Host": "splcgk.court.gov.cn",
 "Origin": "https://splcgk.court.gov.cn",
 "User-Agent": "Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/71.0.3578.98 Safari/537.36",
 "Referer": "https://splcgk.court.gov.cn/gzfwww/ktgg",
}
url = "https://splcgk.court.gov.cn/gzfwww/ktgglist?pageNo=1"

def spider(page):
 data = {
  "bt": "",
  "fydw": "",
  "pageNum": page,
 }
 for _ in range(5):
  try:
   response = requests.post(url, headers=headers, data=data, proxies=get_proxies())
   json_data = response.json()
  except (json.JSONDecodeError, adapters.SSLError):
   continue
  else:
   break
 else:
  return {}

 return json_data

def main():
 with ThreadPoolExecutor(max_workers=10) as t:
  obj_list = []
  begin = time.time()
  for page in range(1, 15):
   obj = t.submit(spider, page)
   obj_list.append(obj)

  for future in as_completed(obj_list):
   data = future.result()
   print(data)
   print('*' * 50)
  times = time.time() - begin
  print(times)

if __name__ == "__main__":
 main()

运行结果:

单线程实战

下面我们可以使用单线程来爬取,代码基本和上面的一样,加个单线程函数
代码如下:

# coding: utf-8
import requests
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
import json
from requests import adapters

from proxy import get_proxies

headers = {
 "Host": "splcgk.court.gov.cn",
 "Origin": "https://splcgk.court.gov.cn",
 "User-Agent": "Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/71.0.3578.98 Safari/537.36",
 "Referer": "https://splcgk.court.gov.cn/gzfwww/ktgg",
}
url = "https://splcgk.court.gov.cn/gzfwww/ktgglist?pageNo=1"

def spider(page):
 data = {
  "bt": "",
  "fydw": "",
  "pageNum": page,
 }
 for _ in range(5):
  try:
   response = requests.post(url, headers=headers, data=data, proxies=get_proxies())
   json_data = response.json()
  except (json.JSONDecodeError, adapters.SSLError):
   continue
  else:
   break
 else:
  return {}

 return json_data

def single():
 begin = time.time()
 for page in range(1, 15):
  data = spider(page)
  print(data)
  print('*' * 50)

 times = time.time() - begin
 print(times)

if __name__ == "__main__":
 single()

运行结果:

可以看到,总共花了 19 秒。真是肉眼可见的差距啊!如果数据量大的话,运行时间差距会更大!

以上就是python线程池 ThreadPoolExecutor 的用法示例的详细内容,更多关于python线程池 ThreadPoolExecutor 的用法及实战的资料请关注我们其它相关文章!

(0)

相关推荐

  • Python 如何创建一个线程池

    问题 你创建一个工作者线程池,用来响应客户端请求或执行其他的工作. 解决方案 concurrent.futures 函数库有一个 ThreadPoolExecutor 类可以被用来完成这个任务. 下面是一个简单的TCP服务器,使用了一个线程池来响应客户端: from socket import AF_INET, SOCK_STREAM, socket from concurrent.futures import ThreadPoolExecutor def echo_client(sock, c

  • Python线程池模块ThreadPoolExecutor用法分析

    本文实例讲述了Python线程池模块ThreadPoolExecutor用法.分享给大家供大家参考,具体如下: python3内置的有Threadingpool和ThreadPoolExecutor模块,两个都可以做线程池,当然ThreadPoolExecutor会更好用一些,而且也有ProcessPoolExecutor进程池模块,使用方法基本一致. 首先导入模块 from concurrent.futures import ThreadPoolExecutor 使用方法很简单,最常用的可能就

  • 详解python中的线程与线程池

    线程 进程和线程 什么是进程? 进程就是正在运行的程序, 一个任务就是一个进程, 进程的主要工作是管理资源, 而不是实现功能 什么是线程? 线程的主要工作是去实现功能, 比如执行计算. 线程和进程的关系就像员工与老板的关系, 老板(进程) 提供资源 和 工作空间, 员工(线程) 负责去完成相应的任务 特点 一个进程至少由一个线程, 这一个必须存在的线程被称为主线程, 同时一个进程也可以有多个线程, 即多线程 当我们我们遇到一些需要重复执行的代码时, 就可以使用多线程分担一些任务, 进而加快运行速

  • python Event事件、进程池与线程池、协程解析

    Event事件 用来控制线程的执行 出现e.wait(),就会把这个线程设置为False,就不能执行这个任务: 只要有一个线程出现e.set(),就会告诉Event对象,把有e.wait的用户全部改为True,剩余的任务就会立马去执行.由一些线程去控制另一些线程,中间通过Event. from threading import Event from threading import Thread import time # 调用Event实例化出对象 e = Event() # # # 若该方法

  • 实例代码讲解Python 线程池

    大家都知道当任务过多,任务量过大时如果想提高效率的一个最简单的方法就是用多线程去处理,比如爬取上万个网页中的特定数据,以及将爬取数据和清洗数据的工作交给不同的线程去处理,也就是生产者消费者模式,都是典型的多线程使用场景. 那是不是意味着线程数量越多,程序的执行效率就越快呢. 显然不是.线程也是一个对象,是需要占用资源的,线程数量过多的话肯定会消耗过多的资源,同时线程间的上下文切换也是一笔不小的开销,所以有时候开辟过多的线程不但不会提高程序的执行效率,反而会适得其反使程序变慢,得不偿失. 所以,如

  • 解决python ThreadPoolExecutor 线程池中的异常捕获问题

    问题 最近写了涉及线程池及线程的 python 脚本,运行过程中发现一个有趣的现象,线程池中的工作线程出现问题,引发了异常,但是主线程没有捕获异常,还在发现 BUG 之前一度以为线程池代码正常返回. 先说重点 这里主要想介绍 python concurrent.futuresthread.ThreadPoolExecutor 线程池中的 worker 引发异常的时候,并不会直接向上抛起异常,而是需要主线程通过调用concurrent.futures.Future.exception(timeou

  • Python 线程池用法简单示例

    本文实例讲述了Python 线程池用法.分享给大家供大家参考,具体如下: # -*- coding:utf-8 -*- #! python3 ''' Created on 2019-10-2 @author: Administrator ''' from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor import os,time,random def task(n): print('%s is runing' %

  • python线程池如何使用

    线程池的使用 线程池的基类是 concurrent.futures 模块中的 Executor,Executor 提供了两个子类,即 ThreadPoolExecutor 和ProcessPoolExecutor,其中 ThreadPoolExecutor 用于创建线程池,而 ProcessPoolExecutor 用于创建进程池. 如果使用线程池/进程池来管理并发编程,那么只要将相应的 task 函数提交给线程池/进程池,剩下的事情就由线程池/进程池来搞定. Exectuor 提供了如下常用方

  • python线程池threadpool实现篇

    本文为大家分享了threadpool线程池中所有的操作,供大家参考,具体内容如下 首先介绍一下自己使用到的名词: 工作线程(worker):创建线程池时,按照指定的线程数量,创建工作线程,等待从任务队列中get任务: 任务(requests):即工作线程处理的任务,任务可能成千上万个,但是工作线程只有少数.任务通过          makeRequests来创建 任务队列(request_queue):存放任务的队列,使用了queue实现的.工作线程从任务队列中get任务进行处理: 任务处理函

  • Python定时器线程池原理详解

    这篇文章主要介绍了Python定时器线程池原理详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 定时器执行循环任务: 知识储备 Timer(interval, function, args=None, kwargs=None) interval ===> 时间间隔 单位为s function ===> 定制执行的函数 使用threading的 Timer 类 start() 为通用的开始执行方法 cancel ()为取消执行的方法 普通单次

  • python线程池threadpool使用篇

    最近在做一个视频设备管理的项目,设备包括(摄像机,DVR,NVR等),包括设备信息补全,设备状态推送,设备流地址推送等,如果同时导入的设备数量较多,如果使用单线程进行设备检测,那么由于设备数量较多,会带来较大的延时,因此考虑多线程处理此问题. 可以使用python语言自己实现线程池,或者可以使用第三方包threadpool线程池包,本主题主要介绍threadpool的使用以及其里面的具体实现. 1.安装 使用安装: pip installthreadpool 2.使用 (1)引入threadpo

随机推荐