Python 多核并行计算的示例代码

以前写点小程序其实根本不在乎并行,单核跑跑也没什么问题,而且我的电脑也只有双核四个超线程(下面就统称核好了),觉得去折腾并行没啥意义(除非在做IO密集型任务)。然后自从用上了32核128GB内存,看到 htop 里面一堆空载的核,很自然地就会想这个并行必须去折腾一下。后面发现,其实 Python 的并行真的非常简单。

multiprocessing vs threading

Python 自带的库又全又好用,这是我特别喜欢 Python 的原因之一。Python 里面有 multiprocessingthreading这两个用来实现并行的库。用线程应该是很自然的想法,毕竟(直觉上)开销小,还有共享内存的福利,而且在其他语言里面线程用的确实是非常频繁。然而,我可以很负责任的说,如果你用的是 CPython 实现,那么用了 threading 就等同于和并行计算说再见了(实际上,甚至会比单线程更慢),除非这是个IO密集型的任务。

GIL

CPython 指的是 python.org 提供的 Python 实现。是的,Python 是一门语言,它有各种不同的实现,比如 PyPy, Jython, IronPython 等等……我们用的最多的就是 CPython,它几乎就和 Python 画上了等号。

CPython 的实现中,使用了 GIL 即全局锁,来简化解释器的实现,使得解释器每次只执行一个线程中的字节码。也就是说,除非是在等待IO操作,否则 CPython 的多线程就是彻底的谎言!

有关 GIL 下面两个资料写的挺好的:

  1. http://cenalulu.github.io/python/gil-in-python/
  2. http://www.dabeaz.com/python/UnderstandingGIL.pdf

multiprocessing.Pool

因为 GIL 的缘故 threading 不能用,那么我们就好好研究研究 multiprocessing。(当然,如果你说你不用 CPython,没有 GIL 的问题,那也是极佳的。)

首先介绍一个简单粗暴,非常实用的工具,就是 multiprocessing.Pool。如果你的任务能用 ys = map(f, xs) 来解决,大家可能都知道,这样的形式天生就是最容易并行的,那么在 Python 里面并行计算这个任务真是再简单不过了。举个例子,把每个数都平方:

import multiprocessing

def f(x):
  return x * x

cores = multiprocessing.cpu_count()
pool = multiprocessing.Pool(processes=cores)
xs = range(5)

# method 1: map
print pool.map(f, xs) # prints [0, 1, 4, 9, 16]

# method 2: imap
for y in pool.imap(f, xs):
  print y      # 0, 1, 4, 9, 16, respectively

# method 3: imap_unordered
for y in pool.imap_unordered(f, xs):
  print(y)      # may be in any order

map 直接返回列表,而 i 开头的两个函数返回的是迭代器;imap_unordered 返回的是无序的。

当计算时间比较长的时候,我们可能想要加上一个进度条,这个时候 i 系列的好处就体现出来了。另外,有一个小技巧,就是输出 \r 可以使得光标回到行首而不换行,这样就可以制作简易的进度条了。

cnt = 0
for _ in pool.imap_unordered(f, xs):
  sys.stdout.write('done %d/%d\r' % (cnt, len(xs)))
  cnt += 1

更复杂的操作

要进行更复杂的操作,可以直接使用 multiprocessing.Process 对象。要在进程间通信可以使用:

  1. multiprocessing.Pipe
  2. multiprocessing.Queue
  3. 同步原语
  4. 共享变量

其中我强烈推荐的就是 Queue,因为其实很多场景就是生产者消费者模型,这个时候用 Queue 就解决问题了。用的方法也很简单,现在父进程创建 Queue,然后把它当做 args 或者 kwargs 传给 Process 就好了。

使用 Theano 或者 Tensorflow 等工具时的注意事项

需要注意的是,在 import theano 或者 import tensorflow 等调用了 Cuda 的工具的时候会产生一些副作用,这些副作用会原样拷贝到子进程中,然后就发生错误,如:

could not retrieve CUDA device count: CUDA_ERROR_NOT_INITIALIZED

解决的方法是,保证父进程不引入这些工具,而是在子进程创建好了以后,让子进程各自引入。

如果使用 Process,那就在 target 函数里面 import。举个例子:

import multiprocessing

def hello(taskq, resultq):
  import tensorflow as tf
  config = tf.ConfigProto()
  config.gpu_options.allow_growth=True
  sess = tf.Session(config=config)
  while True:
    name = taskq.get()
    res = sess.run(tf.constant('hello ' + name))
    resultq.put(res)

if __name__ == '__main__':
  taskq = multiprocessing.Queue()
  resultq = multiprocessing.Queue()
  p = multiprocessing.Process(target=hello, args=(taskq, resultq))
  p.start()

  taskq.put('world')
  taskq.put('abcdabcd987')
  taskq.close()

  print(resultq.get())
  print(resultq.get())

  p.terminate()
  p.join()

如果使用 Pool,那么可以编写一个函数,在这个函数里面 import,并且把这个函数作为 initializer传入到 Pool 的构造函数里面。举个例子:

import multiprocessing

def init():
  global tf
  global sess
  import tensorflow as tf
  config = tf.ConfigProto()
  config.gpu_options.allow_growth=True
  sess = tf.Session(config=config)

def hello(name):
  return sess.run(tf.constant('hello ' + name))

if __name__ == '__main__':
  pool = multiprocessing.Pool(processes=2, initializer=init)
  xs = ['world', 'abcdabcd987', 'Lequn Chen']
  print pool.map(hello, xs)

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持我们。

(0)

相关推荐

  • Python中使用多进程来实现并行处理的方法小结

    进程和线程是计算机软件领域里很重要的概念,进程和线程有区别,也有着密切的联系,先来辨析一下这两个概念: 1.定义 进程是具有一定独立功能的程序关于某个数据集合上的一次运行活动,进程是系统进行资源分配和调度的一个独立单位. 线程是进程的一个实体,是CPU调度和分派的基本单位,它是比进程更小的能独立运行的基本单位.线程自己基本上不拥有系统资源,只拥有一点在运行中必不可少的资源(如程序计数器,一组寄存器和栈),但是它可与同属一个进程的其他的线程共享进程所拥有的全部资源. 2.关系 一个线程可以创建和撤

  • 用map函数来完成Python并行任务的简单示例

    众所周知,Python的并行处理能力很不理想.我认为如果不考虑线程和GIL的标准参数(它们大多是合法的),其原因不是因为技术不到位,而是我们的使用方法不恰当.大多数关于Python线程和多进程的教材虽然都很出色,但是内容繁琐冗长.它们的确在开篇铺陈了许多有用信息,但往往都不会涉及真正能提高日常工作的部分. 经典例子 DDG上以"Python threading tutorial (Python线程教程)"为关键字的热门搜索结果表明:几乎每篇文章中给出的例子都是相同的类+队列. 事实上,

  • python开启多个子进程并行运行的方法

    本文实例讲述了python开启多个子进程并行运行的方法.分享给大家供大家参考.具体如下: 这个python代码创建了多个process子进程,创建完成后先start(),最后统一join,这样所有子进程会并行执行. from multiprocessing import Process import sys, os import time def timetask(times): time.sleep(times) print time.localtime() def works(func, a

  • Python实现数据库并行读取和写入实例

    这篇主要记录一下如何实现对数据库的并行运算来节省代码运行时间.语言是Python,其他语言思路一样. 前言 一共23w条数据,是之前通过自然语言分析处理过的数据,附一张截图: 要实现对news主体的读取,并且找到其中含有的股票名称,只要发现,就将这支股票和对应的日期.score写入数据库. 显然,几十万条数据要是一条条读写,然后在本机上操作,耗时太久,可行性极低.所以,如何有效并行的读取内容,并且进行操作,最后再写入数据库呢? 并行读取和写入 并行读取:创建N*max_process个进程,对数

  • Python实现并行抓取整站40万条房价数据(可更换抓取城市)

    写在前面 这次的爬虫是关于房价信息的抓取,目的在于练习10万以上的数据处理及整站式抓取. 数据量的提升最直观的感觉便是对函数逻辑要求的提高,针对Python的特性,谨慎的选择数据结构.以往小数据量的抓取,即使函数逻辑部分重复,I/O请求频率密集,循环套嵌过深,也不过是1~2s的差别,而随着数据规模的提高,这1~2s的差别就有可能扩展成为1~2h. 因此对于要抓取数据量较多的网站,可以从两方面着手降低抓取信息的时间成本. 1)优化函数逻辑,选择适当的数据结构,符合Pythonic的编程习惯.例如,

  • Python中运行并行任务技巧

    示例 标准线程多进程,生产者/消费者示例: Worker越多,问题越大 复制代码 代码如下: # -*- coding: utf8 -*- import os import time import Queue import threading from PIL import Image def create_thumbnail(filename, size=(128, 128)):     try:         fp, fmt = filename.rsplit('.', 1)       

  • Python 多核并行计算的示例代码

    以前写点小程序其实根本不在乎并行,单核跑跑也没什么问题,而且我的电脑也只有双核四个超线程(下面就统称核好了),觉得去折腾并行没啥意义(除非在做IO密集型任务).然后自从用上了32核128GB内存,看到 htop 里面一堆空载的核,很自然地就会想这个并行必须去折腾一下.后面发现,其实 Python 的并行真的非常简单. multiprocessing vs threading Python 自带的库又全又好用,这是我特别喜欢 Python 的原因之一.Python 里面有 multiprocess

  • Python Flask基础教程示例代码

    本文研究的主要是Python Flask基础教程,具体介绍如下. 安装:pip install flask即可 一个简单的Flask from flask import Flask #导入Flask app = Flask(__name__) #创建一个Flask实例 #设置路由,即url @app.route('/') #url对应的函数 def hello_world(): #返回的页面 return 'Hello World!' #这个不是作为模块导入的时候运行,比如这个文件为aa.py,

  • Python安装OpenCV的示例代码

    OpenCV介绍 OpenCV是一个基于BSD许可(开源)发行的跨平台计算机视觉库,可以运行在Linux.Windows.Android和Mac OS操作系统上.它轻量级而且高效--由一系列 C 函数和少量 C++ 类构成,同时提供了Python.Ruby.MATLAB等语言的接口,实现了图像处理和计算机视觉方面的很多通用算法. OpenCV用C++语言编写,它的主要接口也是C++语言,但是依然保留了大量的C语言接口.该库也有大量的Python.Java and MATLAB/OCTAVE(版本

  • 利用python生成照片墙的示例代码

    PIL(Python Image Library)是python的第三方图像处理库,但是由于其强大的功能与众多的使用人数,几乎已经被认为是python官方图像处理库了.其官方主页为:PIL. PIL历史悠久,原来是只支持python2.x的版本的,后来出现了移植到python3的库pillow,pillow号称是friendly fork for PIL,其功能和PIL差不多,但是支持python3.本文只使用了PIL那些最常用的特性与用法,主要参考自:http://www.effbot.org

  • Python无损压缩图片的示例代码

    每个设计师.摄影师或有图片处理需求小编,都会面临批量高清大图的困扰. 因为高清大图放到网站上会严重拖慢加载速度,或是有的地方明确限制了图片大小,因此,为了完成工作,他们总是需要先把图片压缩,再上传. 当需要处理的图片多至十张.百张.千张,则严重影响工作效率.这时候,就可以交给Python啦! 只需要20行Python代码,就可以批量帮你无损压缩数张照片. ---1--- 前期工作 安装Python中现成的图片处理模块,然后将图片打包好导入,用循环的方式自动化处理图片就可以了! ---2--- 运

  • python操作链表的示例代码

    class Node: def __init__(self,dataval=None): self.dataval=dataval self.nextval=None class SLinkList: def __init__(self): self.headval=None # 遍历列表 def traversal_slist(self): head_node=self.headval while head_node is not None: print(head_node.dataval)

  • python调用摄像头的示例代码

    一.打开摄像头 import cv2 import numpy as np def video_demo(): capture = cv2.VideoCapture(0)#0为电脑内置摄像头 while(True): ret, frame = capture.read()#摄像头读取,ret为是否成功打开摄像头,true,false. frame为视频的每一帧图像 frame = cv2.flip(frame, 1)#摄像头是和人对立的,将图像左右调换回来正常显示. cv2.imshow("vi

  • Python进行特征提取的示例代码

    #过滤式特征选择 #根据方差进行选择,方差越小,代表该属性识别能力很差,可以剔除 from sklearn.feature_selection import VarianceThreshold x=[[100,1,2,3], [100,4,5,6], [100,7,8,9], [101,11,12,13]] selector=VarianceThreshold(1) #方差阈值值, selector.fit(x) selector.variances_ #展现属性的方差 selector.tra

  • Python调用Redis的示例代码

    #!/usr/bin/env python # -*- coding:utf-8 -*- # ************************************* # @Time : 2019/8/12 # @Author : Zhang Fan # @Desc : Library # @File : MyRedis.py # @Update : 2019/8/23 # ************************************* import redis class MyR

  • Python 实现二叉查找树的示例代码

    二叉查找树 所有 key 小于 V 的都被存储在 V 的左子树 所有 key 大于 V 的都存储在 V 的右子树 BST 的节点 class BSTNode(object): def __init__(self, key, value, left=None, right=None): self.key, self.value, self.left, self.right = key, value, left, right 二叉树查找 如何查找一个指定的节点呢,根据定义我们知道每个内部节点左子树的

随机推荐