pandas apply 函数 实现多进程的示例讲解

前言: 在进行数据处理的时候,我们经常会用到 pandas 。但是 pandas 本身好像并没有提供多进程的机制。本文将介绍如何来自己实现 pandas (apply 函数)的多进程执行。其中,我们主要借助 joblib库,这个库为python 提供了一个非常简洁方便的多进程实现方法。

所以,本文将按照下面的安排展开,前面可能比较啰嗦,若只是想知道怎么用可直接看第三部分:

- 首先简单介绍 pandas 中的分组聚合操作 groupby

- 然后简单介绍 joblib 的使用方法。

- 最后,通过一个去停用词的实验详细介绍如何实现 pandas 中 apply 函数多进程执行。

注意:本文说的都是多进程而不是多线程。

1. DataFrame.groupby 分组聚合操作

# groupby 操作
df1 = pd.DataFrame({'a':[1,2,1,2,1,2], 'b':[3,3,3,4,4,4], 'data':[12,13,11,8,10,3]})
df1

按照某列分组

grouped = df1.groupby('b')
# 按照 'b' 这列分组了,name 为 'b' 的 key 值,group 为对应的df_group
for name, group in grouped:
 print name, '->'
 print group
3 ->
 a b data
0 1 3 12
1 2 3 13
2 1 3 11
4 ->
 a b data
3 2 4  8
4 1 4 10
5 2 4  3

按照多列分组

grouped = df1.groupby(['a','b'])
# 按照 'b' 这列分组了,name 为 'b' 的 key 值,group 为对应的df_group
for name, group in grouped:
 print name, '->'
 print group
(1, 3) ->
 a b data
0 1 3 12
2 1 3 11
(1, 4) ->
 a b data
4 1 4 10
(2, 3) ->
 a b data
1 2 3 13
(2, 4) ->
 a b data
3 2 4  8
5 2 4  3

若 df.index 为[1,2,3…]这样一个 list, 那么按照 df.index分组,其实就是每组就是一行,在后面去停用词实验中,我们就用这个方法把 df_all 处理成每行为一个元素的 list, 再用多进程处理这个 list。

grouped = df1.groupby(df1.index)
# 按照 index 分组,其实每行就是一个组了
print len(grouped), type(grouped)
for name, group in grouped:
 print name, '->'
 print group
6 <class 'pandas.core.groupby.DataFrameGroupBy'>
0 ->
 a b data
0 1 3 12
1 ->
 a b data
1 2 3 13
2 ->
 a b data
2 1 3 11
3 ->
 a b data
3 2 4  8
4 ->
 a b data
4 1 4 10
5 ->
 a b data
5 2 4  3

2. joblib 用法

refer: https://pypi.python.org/pypi/joblib

# 1. Embarrassingly parallel helper: to make it easy to write readable parallel code and debug it quickly:
from joblib import Parallel, delayed
from math import sqrt

处理小任务的时候,多进程并没有体现出优势。

%time result1 = Parallel(n_jobs=1)(delayed(sqrt)(i**2) for i in range(10000))
%time result2 = Parallel(n_jobs=8)(delayed(sqrt)(i**2) for i in range(10000))
CPU times: user 316 ms, sys: 0 ns, total: 316 ms
Wall time: 309 ms
CPU times: user 692 ms, sys: 384 ms, total: 1.08 s
Wall time: 1.03 s

当需要处理大量数据的时候,并行处理就体现出了它的优势

%time result = Parallel(n_jobs=1)(delayed(sqrt)(i**2) for i in range(1000000))
CPU times: user 3min 43s, sys: 5.66 s, total: 3min 49s
Wall time: 3min 33s
%time result = Parallel(n_jobs=8)(delayed(sqrt)(i**2) for i in range(1000000))
CPU times: user 50.9 s, sys: 12.6 s, total: 1min 3s
Wall time: 52 s

3. apply 函数的多进程执行(去停用词)

多进程的实现主要参考了 stack overflow 的解答: Parallelize apply after pandas groupby

上图中,我们要把 AbstractText 去停用词, 处理成 AbstractText1 那样。首先,导入停用词表。

# 读入所有停用词
with open('stopwords.txt', 'rb') as inp:
 lines = inp.read()
stopwords = re.findall('"(.*?)"', lines)
print len(stopwords)
print stopwords[:10]
692
['a', "a's", 'able', 'about', 'above', 'according', 'accordingly', 'across', 'actually', 'after']
# 对 AbstractText 去停用词
# 方法一:暴力法,对每个词进行判断
def remove_stopwords1(text):
 words = text.split(' ')
 new_words = list()
 for word in words:
  if word not in stopwords:
   new_words.append(word)
 return new_words
# 方法二:先构建停用词的映射
for word in stopwords:
 if word in words_count.index:
  words_count[word] = -1
def remove_stopwords2(text):
 words = text.split(' ')
 new_words = list()
 for word in words:
  if words_count[word] != -1:
   new_words.append(word)
 return new_words
%time df_all['AbstractText1'] = df_all['AbstractText'].apply(remove_stopwords1)
%time df_all['AbstractText2'] = df_all['AbstractText'].apply(remove_stopwords2)
CPU times: user 8min 56s, sys: 2.72 s, total: 8min 59s
Wall time: 8min 48s
CPU times: user 1min 2s, sys: 4.12 s, total: 1min 6s
Wall time: 1min 2s

上面我尝试了两种不同的方法来去停用词:

方法一中使用了比较粗暴的方法:首先用一个 list 存储所有的 stopwords,然后对于每一个 text 中的每一个 word,我们判断它是否出现在 stopwords 的list中(复杂度 O(n)O(n) ), 若为 stopword 则去掉。

方法二中我用 一个Series(words_count) 对所有的词进行映射,如果该词为 stopword, 则把它的值修改为 -1。这样,对于 text 中的每个词 ww, 我们只需要判断它的值是否为 -1 即可判定是否为 stopword (复杂度 O(1)O(1))。

所以,在这两个方法中,我们都是采用单进程来执行,方法二的速度(1min 2s)明显高于方法一(8min 48s)。

from joblib import Parallel, delayed
import multiprocessing
# 方法三:对方法一使用多进程
def tmp_func(df):
 df['AbstractText3'] = df['AbstractText'].apply(remove_stopwords1)
 return df
def apply_parallel(df_grouped, func):
 """利用 Parallel 和 delayed 函数实现并行运算"""
 results = Parallel(n_jobs=-1)(delayed(func)(group) for name, group in df_grouped)
 return pd.concat(results)
if __name__ == '__main__':
 time0 = time.time()
 df_grouped = df_all.groupby(df_all.index)
 df_all =applyParallel(df_grouped, tmp_func)
 print 'time costed {0:.2f}'.format(time.time() - time0)
time costed 150.81
# 方法四:对方法二使用多进程
def tmp_func(df):
 df['AbstractText3'] = df['AbstractText'].apply(remove_stopwords2)
 return df
def apply_parallel(df_grouped, func):
 """利用 Parallel 和 delayed 函数实现并行运算"""
 results = Parallel(n_jobs=-1)(delayed(func)(group) for name, group in df_grouped)
 return pd.concat(results)
if __name__ == '__main__':
 time0 = time.time()
 df_grouped = df_all.groupby(df_all.index)
 df_all =applyParallel(df_grouped, tmp_func)
 print 'time costed {0:.2f}'.format(time.time() - time0)
time costed 123.80

上面方法三和方法四分别对应于前面方法一和方法二,但是都是用了多进程操作。结果是方法一使用多进程以后,速度一下子提高了好几倍,但是方法二的多进程速度不升反降。这是不是有问题?的确,但是首先可以肯定,我们的代码没有问题。下图显示了我用 top 命令看到各个方法的进程执行情况。可以看出,在方法三和方法四中,的的确确是 12 个CPU核都跑起来了。只是在方法四中,每个核占用的比例都是比较低的。

fig1. 单进程 cpu 使用情况

fig2. 方法三 cpu 使用情况

fig3. 方法四 cpu 使用情况

一个直观的解释就是,当我们开启多进程的时候,进程开启和最后结果合并,进程结束,这些操作都是要消耗时间的。如果我们执行的任务比较小,那么进程开启等操作所消耗的时间可能就要比执行任务本身消耗的时间还多。这样就会出现多进程的方法四比单进程的方法二耗时更多的情况了。

所以总结来说,在处理小任务的时候没有必要开启多进程。借助joblib (Parallel, delayed 两个函数) ,我们能够很方便地实现 python 多进程。

以上这篇pandas apply 函数 实现多进程的示例讲解就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持我们。

您可能感兴趣的文章:

  • 对pandas中apply函数的用法详解
  • pandas 使用apply同时处理两列数据的方法
(0)

相关推荐

  • 对pandas中apply函数的用法详解

    最近在使用apply函数,总结一下用法. apply函数可以对DataFrame对象进行操作,既可以作用于一行或者一列的元素,也可以作用于单个元素. 例:列元素 行元素 列 行 以上这篇对pandas中apply函数的用法详解就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持我们. 您可能感兴趣的文章: 浅谈Pandas中map, applymap and apply的区别

  • pandas 使用apply同时处理两列数据的方法

    多的不说,看了代码就懂了! df = pd.DataFrame ({'a' : np.random.randn(6), 'b' : ['foo', 'bar'] * 3, 'c' : np.random.randn(6)}) def my_test(a, b): return a + b df['Value'] = df.apply(lambda row: my_test(row['a'], row['c']), axis=1) print df 以上这篇pandas 使用apply同时处理两列

  • pandas apply 函数 实现多进程的示例讲解

    前言: 在进行数据处理的时候,我们经常会用到 pandas .但是 pandas 本身好像并没有提供多进程的机制.本文将介绍如何来自己实现 pandas (apply 函数)的多进程执行.其中,我们主要借助 joblib库,这个库为python 提供了一个非常简洁方便的多进程实现方法. 所以,本文将按照下面的安排展开,前面可能比较啰嗦,若只是想知道怎么用可直接看第三部分: - 首先简单介绍 pandas 中的分组聚合操作 groupby. - 然后简单介绍 joblib 的使用方法. - 最后,

  • 解析pandas apply() 函数用法(推荐)

    目录 Series.apply() apply 函数接收带有参数的函数 DataFrame.apply() apply() 计算日期相减示例 参考 理解 pandas 的函数,要对函数式编程有一定的概念和理解.函数式编程,包括函数式编程思维,当然是一个很复杂的话题,但对今天介绍的 apply() 函数,只需要理解:函数作为一个对象,能作为参数传递给其它函数,也能作为函数的返回值. 函数作为对象能带来代码风格的巨大改变.举一个例子,有一个类型为 list 的变量,包含 从 1 到 10 的数据,需

  • Python pandas自定义函数的使用方法示例

    本文实例讲述了Python pandas自定义函数的使用方法.分享给大家供大家参考,具体如下: 自定义函数的使用 import numpy as np import pandas as pd # todo 将自定义的函数作用到dataframe的行和列 或者Serise的行上 ser1 = pd.Series(np.random.randint(-10,10,5),index=list('abcde')) df1 = pd.DataFrame(np.random.randint(-10,10,(

  • 详谈pandas中agg函数和apply函数的区别

    在利用python进行数据分析 这本书中其实没有明确表明这两个函数的却别,而是说apply更一般化. 其实在这本书的第九章'数组及运算和转换'点到了两者的一点点区别:agg是用来聚合运算的,所谓的聚合当然是合成的成分比较大些,这一节开头就点到了:聚合只不过是分组运算的其中一种而已.它是数据转换的一个特例,也就是说,它接受能够将一维数组简化为标量值的函数. 当然这两个函数都是作用在groupby对象上的,也就是分完组的对象上的,分完组之后针对某一组,如果值是一维数组,在利用完特定的函数之后,能做到

  • Pandas的Apply函数具体使用

    Pandas最好用的函数 Pandas是Python语言中非常好用的一种数据结构包,包含了许多有用的数据操作方法.而且很多算法相关的库函数的输入数据结构都要求是pandas数据,或者有该数据的接口. 仔细看pandas的API说明文档,就会发现有好多有用的函数,比如非常常用的文件的读写函数就包括如下函数: Format Type Data Description Reader Writer text CSV read_csv to_csv text JSON read_json to_json

  • Pandas中Apply函数加速百倍的技巧分享

    目录 前言 实验对比 01 Apply(Baseline) 02 Swift加速 03 向量化 04 类别转化+向量化 05 转化为values处理 实验汇总 前言 虽然目前dask,cudf等包的出现,使得我们的数据处理大大得到了加速,但是并不是每个人都有比较好的gpu,非常多的朋友仍然还在使用pandas工具包,但有时候真的很无奈,pandas的许多问题我们都需要使用apply函数来进行处理,而apply函数是非常慢的,本文我们就介绍如何加速apply函数600倍的技巧. 实验对比 01 A

  • Python pandas中apply函数简介以及用法详解

    目录 1.基本信息 2.语法结构 3.使用案例 3.1 DataFrame使用apply 3.2 Series使用apply 3.3 其他案例 4.总结 参考链接: 1.基本信息 ​ Pandas 的 apply() 方法是用来调用一个函数(Python method),让此函数对数据对象进行批量处理.Pandas 的很多对象都可以使用 apply() 来调用函数,如 Dataframe.Series.分组对象.各种时间序列等. 2.语法结构 ​ apply() 使用时,通常放入一个 lambd

  • Python Pandas聚合函数的应用示例

    目录 Python Pandas聚合函数 应用聚合函数 1) 对整体聚合 2) 对任意某一列聚合 3) 对多列数据聚合 4) 对单列应用多个函数 5) 对不同列应用多个函数 6) 对不同列应用不同函数 总结 Python Pandas聚合函数 在前一节,我们重点介绍了窗口函数.我们知道,窗口函数可以与聚合函数一起使用,聚合函数指的是对一组数据求总和.最大值.最小值以及平均值的操作,本节重点讲解聚合函数的应用. 应用聚合函数 首先让我们创建一个 DataFrame 对象,然后对聚合函数进行应用.

  • Kotlin示例讲解标准函数with与run和apply的使用

    目录 1.with 函数 2.run函数 3.apply函数 1.with 函数 首先先从with函数开始,with函数接受两个参数,第一个参数可以是一个任意类型的对象,第二个参数是一个Lambda表达式.with函数会在Lambda表达式中提供第一个参数对象的上下文,并且使用Lambda表达式中的最后一行代码作为返回值进行返回,代码如下: val with = with(obj) { //这里是 obj 的上下文 "value" //with 函数的返回值 } 那么这个函数有什么用的

  • Pandas中map(),applymap(),apply()函数的使用方法

    目录 指定pandas对象作为NumPy函数的参数 元素的应用 行/列的应用 pandas.DataFrame,pandas.Series方法 Pandas对象方法的函数应用 适用于Series的每个元素:map(),apply() 应用于DataFrame的每个元素:applymap() 应用于DataFrame的每行和每列:apply() 应用于DataFrame的特定行/列元素 将函数应用于pandas对象(pandas.DataFrame,pandas.Series)时,根据所应用的函数

随机推荐