浅析Python中的多进程与多线程的使用

在批评Python的讨论中,常常说起Python多线程是多么的难用。还有人对 global interpreter lock(也被亲切的称为“GIL”)指指点点,说它阻碍了Python的多线程程序同时运行。因此,如果你是从其他语言(比如C++或Java)转过来的话,Python线程模块并不会像你想象的那样去运行。必须要说明的是,我们还是可以用Python写出能并发或并行的代码,并且能带来性能的显著提升,只要你能顾及到一些事情。如果你还没看过的话,我建议你看看Eqbal Quran的文章《Ruby中的并发和并行》。

在本文中,我们将会写一个小的Python脚本,用于下载Imgur上最热门的图片。我们将会从一个按顺序下载图片的版本开始做起,即一个一个地下载。在那之前,你得注册一个Imgur上的应用。如果你还没有Imgur账户,请先注册一个。

本文中的脚本在Python3.4.2中测试通过。稍微改一下,应该也能在Python2中运行——urllib是两个版本中区别最大的部分。
开始动手

让我们从创建一个叫“download.py”的Python模块开始。这个文件包含了获取图片列表以及下载这些图片所需的所有函数。我们将这些功能分成三个单独的函数:

  get_links
  download_link
  setup_download_dir

第三个函数,“setup_download_dir”,用于创建下载的目标目录(如果不存在的话)。

Imgur的API要求HTTP请求能支持带有client ID的“Authorization”头部。你可以从你注册的Imgur应用的面板上找到这个client ID,而响应会以JSON进行编码。我们可以使用Python的标准JSON库去解码。下载图片更简单,你只需要根据它们的URL获取图片,然后写入到一个文件即可。

代码如下:

import json
import logging
import os
from pathlib import Path
from urllib.request import urlopen, Request

logger = logging.getLogger(__name__)

def get_links(client_id):
  headers = {'Authorization': 'Client-ID {}'.format(client_id)}
  req = Request('https://api.imgur.com/3/gallery/', headers=headers, method='GET')
  with urlopen(req) as resp:
    data = json.loads(resp.readall().decode('utf-8'))
  return map(lambda item: item['link'], data['data'])

def download_link(directory, link):
  logger.info('Downloading %s', link)
  download_path = directory / os.path.basename(link)
  with urlopen(link) as image, download_path.open('wb') as f:
    f.write(image.readall())

def setup_download_dir():
  download_dir = Path('images')
  if not download_dir.exists():
    download_dir.mkdir()
  return download_dir

接下来,你需要写一个模块,利用这些函数去逐个下载图片。我们给它命名为“single.py”。它包含了我们最原始版本的Imgur图片下载器的主要函数。这个模块将会通过环境变量“IMGUR_CLIENT_ID”去获取Imgur的client ID。它将会调用“setup_download_dir”去创建下载目录。最后,使用get_links函数去获取图片的列表,过滤掉所有的GIF和专辑URL,然后用“download_link”去将图片下载并保存在磁盘中。下面是“single.py”的代码:

import logging
import os
from time import time

from download import setup_download_dir, get_links, download_link

logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logging.getLogger('requests').setLevel(logging.CRITICAL)
logger = logging.getLogger(__name__)

def main():
  ts = time()
  client_id = os.getenv('IMGUR_CLIENT_ID')
  if not client_id:
    raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!")
  download_dir = setup_download_dir()
  links = [l for l in get_links(client_id) if l.endswith('.jpg')]
  for link in links:
    download_link(download_dir, link)
  print('Took {}s'.format(time() - ts))

if __name__ == '__main__':
  main()

在我的笔记本上,这个脚本花了19.4秒去下载91张图片。请注意这些数字在不同的网络上也会有所不同。19.4秒并不是非常的长,但是如果我们要下载更多的图片怎么办呢?或许是900张而不是90张。平均下载一张图片要0.2秒,900张的话大概需要3分钟。那么9000张图片将会花掉30分钟。好消息是使用了并发或者并行后,我们可以将这个速度显著地提高。

接下来的代码示例将只会显示导入特有模块和新模块的import语句。所有相关的Python脚本都可以在这方便地找到this GitHub repository
使用线程

线程是最出名的实现并发和并行的方式之一。操作系统一般提供了线程的特性。线程比进程要小,而且共享同一块内存空间。

在这里,我们将写一个替代“single.py”的新模块。它将创建一个有八个线程的池,加上主线程的话总共就是九个线程。之所以是八个线程,是因为我的电脑有8个CPU内核,而一个工作线程对应一个内核看起来还不错。在实践中,线程的数量是仔细考究的,需要考虑到其他的因素,比如在同一台机器上跑的的其他应用和服务。

下面的脚本几乎跟之前的一样,除了我们现在有个新的类,DownloadWorker,一个Thread类的子类。运行无限循环的run方法已经被重写。在每次迭代时,它调用“self.queue.get()”试图从一个线程安全的队列里获取一个URL。它将会一直堵塞,直到队列中出现一个要处理元素。一旦工作线程从队列中得到一个元素,它将会调用之前脚本中用来下载图片到目录中所用到的“download_link”方法。下载完成之后,工作线程向队列发送任务完成的信号。这非常重要,因为队列一直在跟踪队列中的任务数。如果工作线程没有发出任务完成的信号,“queue.join()”的调用将会令整个主线程都在阻塞状态。

from queue import Queue
from threading import Thread

class DownloadWorker(Thread):
  def __init__(self, queue):
    Thread.__init__(self)
    self.queue = queue

  def run(self):
    while True:
      # Get the work from the queue and expand the tuple
      # 从队列中获取任务并扩展tuple
      directory, link = self.queue.get()
      download_link(directory, link)
      self.queue.task_done()

def main():
  ts = time()
  client_id = os.getenv('IMGUR_CLIENT_ID')
  if not client_id:
    raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!")
  download_dir = setup_download_dir()
  links = [l for l in get_links(client_id) if l.endswith('.jpg')]
  # Create a queue to communicate with the worker threads
  queue = Queue()
  # Create 8 worker threads
  # 创建八个工作线程
  for x in range(8):
    worker = DownloadWorker(queue)
    # Setting daemon to True will let the main thread exit even though the workers are blocking
    # 将daemon设置为True将会使主线程退出,即使worker都阻塞了
    worker.daemon = True
    worker.start()
  # Put the tasks into the queue as a tuple
  # 将任务以tuple的形式放入队列中
  for link in links:
    logger.info('Queueing {}'.format(link))
    queue.put((download_dir, link))
  # Causes the main thread to wait for the queue to finish processing all the tasks
  # 让主线程等待队列完成所有的任务
  queue.join()
  print('Took {}'.format(time() - ts))

在同一个机器上运行这个脚本,下载时间变成了4.1秒!即比之前的例子快4.7倍。虽然这快了很多,但还是要提一下,由于GIL的缘故,在这个进程中同一时间只有一个线程在运行。因此,这段代码是并发的但不是并行的。而它仍然变快的原因是这是一个IO密集型的任务。进程下载图片时根本毫不费力,而主要的时间都花在了等待网络上。这就是为什么线程可以提供很大的速度提升。每当线程中的一个准备工作时,进程可以不断转换线程。使用Python或其他有GIL的解释型语言中的线程模块实际上会降低性能。如果你的代码执行的是CPU密集型的任务,例如解压gzip文件,使用线程模块将会导致执行时间变长。对于CPU密集型任务和真正的并行执行,我们可以使用多进程(multiprocessing)模块。

官方的Python实现——CPython——带有GIL,但不是所有的Python实现都是这样的。比如,IronPython,使用.NET框架实现的Python就没有GIL,基于Java实现的Jython也同样没有。你可以点这查看现有的Python实现。
生成多进程

多进程模块比线程模块更易使用,因为我们不需要像线程示例那样新增一个类。我们唯一需要做的改变在主函数中。

为了使用多进程,我们得建立一个多进程池。通过它提供的map方法,我们把URL列表传给池,然后8个新进程就会生成,它们将并行地去下载图片。这就是真正的并行,不过这是有代价的。整个脚本的内存将会被拷贝到各个子进程中。在我们的例子中这不算什么,但是在大型程序中它很容易导致严重的问题。

from functools import partial
from multiprocessing.pool import Pool

def main():
  ts = time()
  client_id = os.getenv('IMGUR_CLIENT_ID')
  if not client_id:
    raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!")
  download_dir = setup_download_dir()
  links = [l for l in get_links(client_id) if l.endswith('.jpg')]
  download = partial(download_link, download_dir)
  with Pool(8) as p:
    p.map(download, links)
  print('Took {}s'.format(time() - ts))

分布式任务

你已经知道了线程和多进程模块可以给你自己的电脑跑脚本时提供很大的帮助,那么在你想要在不同的机器上执行任务,或者在你需要扩大规模而超过一台机器的的能力范围时,你该怎么办呢?一个很好的使用案例是网络应用的长时间后台任务。如果你有一些很耗时的任务,你不会希望在同一台机器上占用一些其他的应用代码所需要的子进程或线程。这将会使你的应用的性能下降,影响到你的用户们。如果能在另外一台甚至很多台其他的机器上跑这些任务就好了。

Python库RQ非常适用于这类任务。它是一个简单却很强大的库。首先将一个函数和它的参数放入队列中。它将函数调用的表示序列化(pickle),然后将这些表示添加到一个Redis列表中。任务进入队列只是第一步,什么都还没有做。我们至少还需要一个能去监听任务队列的worker(工作线程)。

第一步是在你的电脑上安装和使用Redis服务器,或是拥有一台能正常的使用的Redis服务器的使用权。接着,对于现有的代码只需要一些小小的改动。先创建一个RQ队列的实例并通过redis-py 库传给一台Redis服务器。然后,我们执行“q.enqueue(download_link, download_dir, link)”,而不只是调用“download_link” 。enqueue方法的第一个参数是一个函数,当任务真正执行时,其他的参数或关键字参数将会传给该函数。

最后一步是启动一些worker。RQ提供了方便的脚本,可以在默认队列上运行起worker。只要在终端窗口中执行“rqworker”,就可以开始监听默认队列了。请确认你当前的工作目录与脚本所在的是同一个。如果你想监听别的队列,你可以执行“rqworker queue_name”,然后将会开始执行名为queue_name的队列。RQ的一个很好的点就是,只要你可以连接到Redis,你就可以在任意数量上的机器上跑起任意数量的worker;因此,它可以让你的应用扩展性得到提升。下面是RQ版本的代码:

from redis import Redis
from rq import Queue

def main():
  client_id = os.getenv('IMGUR_CLIENT_ID')
  if not client_id:
    raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!")
  download_dir = setup_download_dir()
  links = [l for l in get_links(client_id) if l.endswith('.jpg')]
  q = Queue(connection=Redis(host='localhost', port=6379))
  for link in links:
    q.enqueue(download_link, download_dir, link)

然而RQ并不是Python任务队列的唯一解决方案。RQ确实易用并且能在简单的案例中起到很大的作用,但是如果有更高级的需求,我们可以使用其他的解决方案(例如 Celery)。
总结

如果你的代码是IO密集型的,线程和多进程可以帮到你。多进程比线程更易用,但是消耗更多的内存。如果你的代码是CPU密集型的,多进程就明显是更好的选择——特别是所使用的机器是多核或多CPU的。对于网络应用,在你需要扩展到多台机器上执行任务,RQ是更好的选择。

(0)

相关推荐

  • 用python分割TXT文件成4K的TXT文件

    复制代码 代码如下: ########################## # # # 为了避免截断中文字符 # # 文件要求是 unicode 编码 # # txt文件另存为对话框下面有下拉框,可选存 # # 储编码格式 # # # ########################## import os import struct filename = str(raw_input("Please enter an old file name: ")) filenamepre = s

  • Python实现将一个大文件按段落分隔为多个小文件的简单操作方法

    本文实例讲述了Python实现将一个大文件按段落分隔为多个小文件的简单操作方法.分享给大家供大家参考,具体如下: 今天帮同学处理一点语料.语料文件有点大,并且是以连续两个换行符作为段落标志,他想把它按段落分隔成多个小文件,即每3个段落组成一个新文件.由于以前没有遇到过类似的操作,在网上找了一些相似的方法,看起来都有点复杂.所以经尝试,自己写了一段代码,完美解决问题. 基本思路是,先读原文件内容,并使用正则表达式,依据\n\n进行切片处理,结果为一个列表,其中每一个列表元素都存放一个切片中的内容;

  • 详解Python中的多线程编程

    一.简介 多线程编程技术可以实现代码并行性,优化处理能力,同时功能的更小划分可以使代码的可重用性更好.Python中threading和Queue模块可以用来实现多线程编程. 二.详解 1.线程和进程        进程(有时被称为重量级进程)是程序的一次执行.每个进程都有自己的地址空间.内存.数据栈以及其它记录其运行轨迹的辅助数据.操作系统管理在其上运行的所有进程,并为这些进程公平地分配时间.进程也可以通过fork和spawn操作来完成其它的任务,不过各个进程有自己的内存空间.数据栈等,所以只

  • 理解python多线程(python多线程简明教程)

    对于python 多线程的理解,我花了很长时间,搜索的大部份文章都不够通俗易懂.所以,这里力图用简单的例子,让你对多线程有个初步的认识. 单线程 在好些年前的MS-DOS时代,操作系统处理问题都是单任务的,我想做听音乐和看电影两件事儿,那么一定要先排一下顺序. (好吧!我们不纠结在DOS时代是否有听音乐和看影的应用.^_^) 复制代码 代码如下: from time import ctime,sleep def music():    for i in range(2):        prin

  • Python实现分割文件及合并文件的方法

    本文实例讲述了Python实现分割文件及合并文件的方法.分享给大家供大家参考.具体如下: 分割文件split.py如下: #!/usr/bin/python ########################################################################## # split a file into a set of parts; join.py puts them back together; # this is a customizable ve

  • Python实现模拟分割大文件及多线程处理的方法

    本文实例讲述了Python实现模拟分割大文件及多线程处理的方法.分享给大家供大家参考,具体如下: #!/usr/bin/env python #--*-- coding:utf-8 --*-- from random import randint from time import ctime from time import sleep import queue import threading class MyTask(object): """具体的任务类"&qu

  • python简单分割文件的方法

    本文实例讲述了python简单分割文件的方法.分享给大家供大家参考.具体如下: 有的网站在上传文件时对文件大小有限制,因此可以将大文件分割成多个小文件再上传. #!/usr/bin/env python def split(filename, size): fp = open(filename, 'rb') i = 0 n = 0 temp = open(filename+'.part'+str(i),'wb') buf = fp.read(1024) while(True): temp.wri

  • python与php实现分割文件代码

    前两天有个朋友说,想实现一个文本文件按照固定行数进行分割成多个文本文件,却不知如何实现.如果数据量小手动分割下就好了,如果数据量很大的话手动完成实在太耗费人力了,也不现实.那么就需要借助脚本去实现.既然有朋友想简单的完成这个任务,那么不如记录下来,给需要的朋友提供方便. 下面我就分别使用python和php进行脚本的实现和操作,当然用其他语言都能实现,大家可根据对语言的熟悉程度进行自主选择,如果有朋友还没有达到编写代码的能力的话,那么最起码对语言环境的使用要会,只要达到这些,就可以完成如下工作.

  • Python threading多线程编程实例

    Python 的多线程有两种实现方法: 函数,线程类 1.函数 调用 thread 模块中的 start_new_thread() 函数来创建线程,以线程函数的形式告诉线程该做什么 复制代码 代码如下: # -*- coding: utf-8 -*- import thread def f(name):   #定义线程函数   print "this is " + name   if __name__ == '__main__':   thread.start_new_thread(f

  • python分割文件的常用方法

    本文大家整理了一些比较好用的关于python分割文件的方法,方法非常的简单实用.分享给大家供大家参考.具体如下: 例子1 指定分割文件大小 配置文件 config.ini: 复制代码 代码如下: [global] #原文件存放目录 dir1=F:\work\python\3595\pyserver\test #新文件存放目录 dir2=F:\work\python\3595\pyserver\test1 python 代码如下: 复制代码 代码如下: #!/usr/bin/python # -*

  • python多线程threading.Lock锁用法实例

    本文实例讲述了python多线程threading.Lock锁的用法实例,分享给大家供大家参考.具体分析如下: python的锁可以独立提取出来 复制代码 代码如下: mutex = threading.Lock() #锁的使用 #创建锁 mutex = threading.Lock() #锁定 mutex.acquire([timeout]) #释放 mutex.release() 锁定方法acquire可以有一个超时时间的可选参数timeout.如果设定了timeout,则在超时后通过返回值

随机推荐