Python 如何创建一个线程池

问题

你创建一个工作者线程池,用来响应客户端请求或执行其他的工作。

解决方案

concurrent.futures 函数库有一个 ThreadPoolExecutor 类可以被用来完成这个任务。 下面是一个简单的TCP服务器,使用了一个线程池来响应客户端:

from socket import AF_INET, SOCK_STREAM, socket
from concurrent.futures import ThreadPoolExecutor

def echo_client(sock, client_addr):
  '''
  Handle a client connection
  '''
  print('Got connection from', client_addr)
  while True:
    msg = sock.recv(65536)
    if not msg:
      break
    sock.sendall(msg)
  print('Client closed connection')
  sock.close()

def echo_server(addr):
  pool = ThreadPoolExecutor(128)
  sock = socket(AF_INET, SOCK_STREAM)
  sock.bind(addr)
  sock.listen(5)
  while True:
    client_sock, client_addr = sock.accept()
    pool.submit(echo_client, client_sock, client_addr)

echo_server(('',15000))

如果你想手动创建你自己的线程池, 通常可以使用一个Queue来轻松实现。下面是一个稍微不同但是手动实现的例子:

from socket import socket, AF_INET, SOCK_STREAM
from threading import Thread
from queue import Queue

def echo_client(q):
  '''
  Handle a client connection
  '''
  sock, client_addr = q.get()
  print('Got connection from', client_addr)
  while True:
    msg = sock.recv(65536)
    if not msg:
      break
    sock.sendall(msg)
  print('Client closed connection')

  sock.close()

def echo_server(addr, nworkers):
  # Launch the client workers
  q = Queue()
  for n in range(nworkers):
    t = Thread(target=echo_client, args=(q,))
    t.daemon = True
    t.start()

  # Run the server
  sock = socket(AF_INET, SOCK_STREAM)
  sock.bind(addr)
  sock.listen(5)
  while True:
    client_sock, client_addr = sock.accept()
    q.put((client_sock, client_addr))

echo_server(('',15000), 128)

使用 ThreadPoolExecutor 相对于手动实现的一个好处在于它使得 任务提交者更方便的从被调用函数中获取返回值。例如,你可能会像下面这样写:

from concurrent.futures import ThreadPoolExecutor
import urllib.request

def fetch_url(url):
  u = urllib.request.urlopen(url)
  data = u.read()
  return data

pool = ThreadPoolExecutor(10)
# Submit work to the pool
a = pool.submit(fetch_url, 'http://www.python.org')
b = pool.submit(fetch_url, 'http://www.pypy.org')

# Get the results back
x = a.result()
y = b.result()

例子中返回的handle对象会帮你处理所有的阻塞与协作,然后从工作线程中返回数据给你。 特别的,a.result() 操作会阻塞进程直到对应的函数执行完成并返回一个结果。

讨论

通常来讲,你应该避免编写线程数量可以无限制增长的程序。例如,看看下面这个服务器:

from threading import Thread
from socket import socket, AF_INET, SOCK_STREAM

def echo_client(sock, client_addr):
  '''
  Handle a client connection
  '''
  print('Got connection from', client_addr)
  while True:
    msg = sock.recv(65536)
    if not msg:
      break
    sock.sendall(msg)
  print('Client closed connection')
  sock.close()

def echo_server(addr, nworkers):
  # Run the server
  sock = socket(AF_INET, SOCK_STREAM)
  sock.bind(addr)
  sock.listen(5)
  while True:
    client_sock, client_addr = sock.accept()
    t = Thread(target=echo_client, args=(client_sock, client_addr))
    t.daemon = True
    t.start()

echo_server(('',15000))

尽管这个也可以工作, 但是它不能抵御有人试图通过创建大量线程让你服务器资源枯竭而崩溃的攻击行为。 通过使用预先初始化的线程池,你可以设置同时运行线程的上限数量。

你可能会关心创建大量线程会有什么后果。 现代操作系统可以很轻松的创建几千个线程的线程池。 甚至,同时几千个线程等待工作并不会对其他代码产生性能影响。 当然了,如果所有线程同时被唤醒并立即在CPU上执行,那就不同了——特别是有了全局解释器锁GIL。 通常,你应该只在I/O处理相关代码中使用线程池。

创建大的线程池的一个可能需要关注的问题是内存的使用。 例如,如果你在OS X系统上面创建2000个线程,系统显示Python进程使用了超过9GB的虚拟内存。 不过,这个计算通常是有误差的。当创建一个线程时,操作系统会预留一个虚拟内存区域来 放置线程的执行栈(通常是8MB大小)。但是这个内存只有一小片段被实际映射到真实内存中。 因此,Python进程使用到的真实内存其实很小 (比如,对于2000个线程来讲,只使用到了70MB的真实内存,而不是9GB)。 如果你担心虚拟内存大小,可以使用 threading.stack_size() 函数来降低它。例如:

import threading
threading.stack_size(65536)

如果你加上这条语句并再次运行前面的创建2000个线程试验, 你会发现Python进程只使用到了大概210MB的虚拟内存,而真实内存使用量没有变。 注意线程栈大小必须至少为32768字节,通常是系统内存页大小(4096、8192等)的整数倍。

以上就是Python 如何创建一个线程池的详细内容,更多关于Python 创建线程池的资料请关注我们其它相关文章!

(0)

相关推荐

  • python线程池如何使用

    线程池的使用 线程池的基类是 concurrent.futures 模块中的 Executor,Executor 提供了两个子类,即 ThreadPoolExecutor 和ProcessPoolExecutor,其中 ThreadPoolExecutor 用于创建线程池,而 ProcessPoolExecutor 用于创建进程池. 如果使用线程池/进程池来管理并发编程,那么只要将相应的 task 函数提交给线程池/进程池,剩下的事情就由线程池/进程池来搞定. Exectuor 提供了如下常用方

  • Python线程池模块ThreadPoolExecutor用法分析

    本文实例讲述了Python线程池模块ThreadPoolExecutor用法.分享给大家供大家参考,具体如下: python3内置的有Threadingpool和ThreadPoolExecutor模块,两个都可以做线程池,当然ThreadPoolExecutor会更好用一些,而且也有ProcessPoolExecutor进程池模块,使用方法基本一致. 首先导入模块 from concurrent.futures import ThreadPoolExecutor 使用方法很简单,最常用的可能就

  • 解决python ThreadPoolExecutor 线程池中的异常捕获问题

    问题 最近写了涉及线程池及线程的 python 脚本,运行过程中发现一个有趣的现象,线程池中的工作线程出现问题,引发了异常,但是主线程没有捕获异常,还在发现 BUG 之前一度以为线程池代码正常返回. 先说重点 这里主要想介绍 python concurrent.futuresthread.ThreadPoolExecutor 线程池中的 worker 引发异常的时候,并不会直接向上抛起异常,而是需要主线程通过调用concurrent.futures.Future.exception(timeou

  • Python 使用threading+Queue实现线程池示例

    一.线程池 1.为什么需要使用线程池 1.1 创建/销毁线程伴随着系统开销,过于频繁的创建/销毁线程,会很大程度上影响处理效率. 记创建线程消耗时间T1,执行任务消耗时间T2,销毁线程消耗时间T3,如果T1+T3>T2,那说明开启一个线程来执行这个任务太不划算了!在线程池缓存线程可用已有的闲置线程来执行新任务,避免了创建/销毁带来的系统开销. 1.2 线程并发数量过多,抢占系统资源从而导致阻塞. 线程能共享系统资源,如果同时执行的线程过多,就有可能导致系统资源不足而产生阻塞的情况. 1.3 对线

  • python爬虫 线程池创建并获取文件代码实例

    本实例主要进行线程池创建,多线程获取.存储视频文件 梨视频:利用线程池进行视频爬取 #爬取梨视频数据 import requests import re from lxml import etree from multiprocessing.dummy import Pool import random # 定义获取视频数据方法 def getVideoData(url): # url为列表中的视频url return requests.get(url=url,headers=headers).

  • Python mutiprocessing多线程池pool操作示例

    本文实例讲述了Python mutiprocessing多线程池pool操作.分享给大家供大家参考,具体如下: python - mutiprocessing 多线程 pool 脚本代码: root@72132server:~/python/multiprocess# ls multiprocess_pool.py multprocess.py root@72132server:~/python/multiprocess# cat multiprocess_pool.py #!/usr/bin/

  • python线程池threadpool实现篇

    本文为大家分享了threadpool线程池中所有的操作,供大家参考,具体内容如下 首先介绍一下自己使用到的名词: 工作线程(worker):创建线程池时,按照指定的线程数量,创建工作线程,等待从任务队列中get任务: 任务(requests):即工作线程处理的任务,任务可能成千上万个,但是工作线程只有少数.任务通过          makeRequests来创建 任务队列(request_queue):存放任务的队列,使用了queue实现的.工作线程从任务队列中get任务进行处理: 任务处理函

  • python爬虫之线程池和进程池功能与用法详解

    本文实例讲述了python爬虫之线程池和进程池功能与用法.分享给大家供大家参考,具体如下: 一.需求 最近准备爬取某电商网站的数据,先不考虑代理.分布式,先说效率问题(当然你要是请求的太快就会被封掉,亲测,400个请求过去,服务器直接拒绝连接,心碎),步入正题.一般情况下小白的我们第一个想到的是for循环,这个可是单线程啊.那我们考虑for循环直接开他个5个线程,问题来了,如果有一个url请求还没有回来,后面的就干等,这么用多线程等于没用,到处贴创可贴. 二.性能考虑 确定要用多线程或者多进程了

  • python多进程使用及线程池的使用方法代码详解

    多进程:主要运行multiprocessing模块 import os,time import sys from multiprocessing import Process class MyProcess(Process): """docstring for MyProcess""" def __init__(self, arg, callback): super(MyProcess, self).__init__() self.arg = a

  • Python 如何创建一个线程池

    问题 你创建一个工作者线程池,用来响应客户端请求或执行其他的工作. 解决方案 concurrent.futures 函数库有一个 ThreadPoolExecutor 类可以被用来完成这个任务. 下面是一个简单的TCP服务器,使用了一个线程池来响应客户端: from socket import AF_INET, SOCK_STREAM, socket from concurrent.futures import ThreadPoolExecutor def echo_client(sock, c

  • Python快速实现一个线程池的示例代码

    目录 楔子 Future 对象 提交函数自动创建 Future 对象 future.set_result 到底干了什么事情 提交多个函数 使用 map 来提交多个函数 按照顺序等待执行 取消一个函数的执行 函数执行时出现异常 等待所有函数执行完毕 小结 楔子 当有多个 IO 密集型的任务要被处理时,我们自然而然会想到多线程.但如果任务非常多,我们不可能每一个任务都启动一个线程去处理,这个时候最好的办法就是实现一个线程池,至于池子里面的线程数量可以根据业务场景进行设置. 比如我们实现一个有 10

  • Java代码构建一个线程池

    在现代的操作系统中,有一个很重要的概念――线程,几乎所有目前流行的操作系统都支持线程,线程来源于操作系统中进程的概念,进程有自己的虚拟地址空间以及正文段.数据段及堆栈,而且各自占有不同的系统资源(例如文件.环境变量等等).与此不同,线程不能单独存在,它依附于进程,只能由进程派生.如果一个进程派生出了两个线程,那这两个线程共享此进程的全局变量和代码段,但每个线程各拥有各自的堆栈,因此它们拥有各自的局部变量,线程在UNIX系统中还被进一步分为用户级线程(由进程自已来管理)和系统级线程(由操作系统的调

  • Python 中创建 PostgreSQL 数据库连接池

    目录 习惯于使用数据库之前都必须创建一个连接池,即使是单线程的应用,只要有多个方法中需用到数据库连接,建立一两个连接的也会考虑先池化他们.连接池的好处多多, 1) 如果反复创建连接相当耗时, 2) 对于单个连接一路用到底的应用,有连接池时避免了数据库连接对象传来传去, 3) 忘记关连接了,连接池幸许还能帮忙在一定时长后关掉,当然密集取连接的应用势将耗尽连接, 4) 一个应用打开连接的数量是可控的 接触到 Python 后,在使用 PostgreSQL 也自然而然的考虑创建连接池,使用时从池中取,

  • Java实现手写一个线程池的示例代码

    目录 概述 线程池框架设计 代码实现 阻塞队列的实现 线程池消费端实现 获取任务超时设计 拒绝策略设计 概述 线程池技术想必大家都不陌生把,相信在平时的工作中没有少用,而且这也是面试频率非常高的一个知识点,那么大家知道它的实现原理和细节吗?如果直接去看jdk源码的话,可能有一定的难度,那么我们可以先通过手写一个简单的线程池框架,去掌握线程池的基本原理后,再去看jdk的线程池源码就会相对容易,而且不容易忘记. 线程池框架设计 我们都知道,线程资源的创建和销毁并不是没有代价的,甚至开销是非常高的.同

  • 在python里创建一个任务(Task)实例

    与事件循环进行交互,最基本的方式就是任务,任务封装了协程和自动跟踪它的状态.任务是Future类的子类,所以其它协程可以等待任务完成,或当这些任务完成获取返回结果. 在这里通过create_task()函数来创建一个任务实例,然后事件循环就运行这个任务,直到这个任务返回为止: import asyncio async def task_func(): print('in task_func') return 'the result' async def main(loop): print('cr

  • Python 如何创建一个简单的REST接口

    问题 你想使用一个简单的REST接口通过网络远程控制或访问你的应用程序,但是你又不想自己去安装一个完整的web框架. 解决方案 构建一个REST风格的接口最简单的方法是创建一个基于WSGI标准(PEP 3333)的很小的库,下面是一个例子: # resty.py import cgi def notfound_404(environ, start_response): start_response('404 Not Found', [ ('Content-type', 'text/plain')

  • python QT界面关闭线程池的线程跟随退出完美解决方案

    目录 方法一.线程池执行的循环代码为自己写的情况 方法二.线程池中执行的循环为调用的模块内的方法 方法一.线程池执行的循环代码为自己写的情况 定义一个全局变量,默认为T,当QT界面关闭后,将该变量值改为F. 线程执行的循环代码内增加一个判断方法,每次循环之前对全局变量进行判断,如果结果为T则进行循环.如果为F,则break退出循环,结束线程 from concurrent.futures import ThreadPoolExecutor import time a = True # 设置全局变

  • python多线程扫描端口(线程池)

    扫描服务器ip开放端口,用线程池ThreadPoolExecutor,i7的cpu可以开到600个左右现成,大概20s左右扫描完65535个端口,根据电脑配置适当降低线程数 #!/usr/local/python3.6.3/bin/python3.6 # coding = utf-8 import socket import datetime import re from concurrent.futures import ThreadPoolExecutor, wait DEBUG = Fal

随机推荐