pandas apply多线程实现代码

一、多线程化选择

并行化一个代码有两大选择:multithread 和 multiprocess。

Multithread,多线程,同一个进程(process)可以开启多个线程执行计算。每个线程代表了一个 CPU 核心,这么多线程可以访问同样的内存地址(所谓共享内存),实现了线程之间的通讯,算是最简单的并行模型。

Multiprocess,多进程,则相当于同时开启多个 Python 解释器,每个解释器有自己独有的数据,自然不会有数据冲突。

二、并行化思想

并行化的基本思路是把 dataframe 用 np.array_split 方法切割成多个子 dataframe。再调用 Pool.map 函数并行地执行。注意到顺序执行的 pandas.DataFrame.apply 是如何转化成 Pool.map 然后并行执行的。

Pool 对象是一组并行的进程,开源Pool类

开源Pool类定义

 def Pool(self, processes=None, initializer=None, initargs=(),
       maxtasksperchild=None):
    '''Returns a process pool object'''
    from .pool import Pool
    return Pool(processes, initializer, initargs, maxtasksperchild,
          context=self.get_context())

设置进程初始化函数

def init_process(global_vars):
  global a
  a = global_vars

设置进程初始化函数

Pool(processes=8,initializer=init_process,initargs=(a,))

其中,指定产生 8 个进程,每个进程的初始化需运行 init_process函数,其参数为一个 singleton tuple a. 利用 init_process 和 initargs,我们可以方便的设定需要在进程间共享的全局变量(这里是 a)。

with 关键词是 context manager,避免写很繁琐的处理开关进程的逻辑。

 with Pool(processes=8,initializer=init_process,initargs=(a,)) as pool:
    result_parts = pool.map(apply_f,df_parts)

三、多线程化应用

多线程时间比较和多线程的几种apply应用

import numpy as np
import pandas as pd
import time
from multiprocessing import Pool

def f(row):
  #直接对某列进行操作
  return sum(row)+a

def f1_1(row):
  #对某一列进行操作,我这里的columns=range(0,2),此处是对第0列进行操作
  return row[0]**2

def f1_2(row1):
  #对某一列进行操作,我这里的columns=range(0,2),此处是对第0列进行操作
  return row1**2

def f2_1(row):
  #对某两列进行操作,我这里的columns=range(0,2),此处是对第0,2列进行操作
  return pd.Series([row[0]**2,row[1]**2],index=['1_1','1_2'])

def f2_2(row1,row2):
  #对某两列进行操作,我这里的columns=range(0,2),此处是对第0,2列进行操作
  return pd.Series([row1**2,row2**2],index=['2_1','2_2'])

def apply_f(df):
  return df.apply(f,axis=1)

def apply_f1_1(df):
  return df.apply(f1_1,axis=1)

def apply_f1_2(df):
  return df[0].apply(f1_2)

def apply_f2_1(df):
  return df.apply(f2_1,axis=1)

def apply_f2_2(df):
  return df.apply(lambda row :f2_2(row[0],row[1]),axis=1)

def init_process(global_vars):
  global a
  a = global_vars

def time_compare():
  '''直接调用和多线程调用时间对比'''
  a = 2
  np.random.seed(0)
  df = pd.DataFrame(np.random.rand(10**5,2),columns=range(0,2))
  print(df.columns)

  t1= time.time()
  result_serial = df.apply(f,axis=1)
  t2 = time.time()
  print("Serial time =",t2-t1)
  print(result_serial.head())

  df_parts=np.array_split(df,20)
  print(len(df_parts),type(df_parts[0]))
  with Pool(processes=8,initializer=init_process,initargs=(a,)) as pool:
  #with Pool(processes=8) as pool:
    result_parts = pool.map(apply_f,df_parts)
  result_parallel= pd.concat(result_parts)
  t3 = time.time()
  print("Parallel time =",t3-t2)
  print(result_parallel.head())

def apply_fun():
  '''多种apply函数的调用'''
  a = 2
  np.random.seed(0)
  df = pd.DataFrame(np.random.rand(10**5,2),columns=range(0,2))
  print(df.columns)
  df_parts=np.array_split(df,20)
  print(len(df_parts),type(df_parts[0]))
  with Pool(processes=8,initializer=init_process,initargs=(a,)) as pool:
  #with Pool(processes=8) as pool:
    res_part0 = pool.map(apply_f,df_parts)
    res_part1 = pool.map(apply_f1_1,df_parts)
    res_part2 = pool.map(apply_f1_2,df_parts)
    res_part3 = pool.map(apply_f2_1,df_parts)
    res_part4 = pool.map(apply_f2_2,df_parts)

  res_parallel0 = pd.concat(res_part0)
  res_parallel1 = pd.concat(res_part1)
  res_parallel2 = pd.concat(res_part2)
  res_parallel3 = pd.concat(res_part3)
  res_parallel4 = pd.concat(res_part4)

  print("f:\n",res_parallel0.head())
  print("f1:\n",res_parallel1.head())
  print("f2:\n",res_parallel2.head())
  print("f3:\n",res_parallel3.head())
  print("f4:\n",res_parallel4.head())

  df=pd.concat([df,res_parallel0],axis=1)
  df=pd.concat([df,res_parallel1],axis=1)
  df=pd.concat([df,res_parallel2],axis=1)
  df=pd.concat([df,res_parallel3],axis=1)
  df=pd.concat([df,res_parallel4],axis=1)

  print(df.head())

if __name__ == '__main__':
  time_compare()
  apply_fun()

参考网址

https://blog.fangzhou.me/posts/20170702-python-parallelism/

https://docs.python.org/3.7/library/multiprocessing.html

到此这篇关于pandas apply多线程实现代码的文章就介绍到这了,更多相关pandas apply多线程内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • 对pandas中apply函数的用法详解

    最近在使用apply函数,总结一下用法. apply函数可以对DataFrame对象进行操作,既可以作用于一行或者一列的元素,也可以作用于单个元素. 例:列元素 行元素 列 行 以上这篇对pandas中apply函数的用法详解就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持我们. 您可能感兴趣的文章: 浅谈Pandas中map, applymap and apply的区别

  • pandas中apply和transform方法的性能比较及区别介绍

    1. apply与transform 首先讲一下apply() 与transform()的相同点与不同点 相同点: 都能针对dataframe完成特征的计算,并且常常与groupby()方法一起使用. 不同点: apply()里面可以跟自定义的函数,包括简单的求和函数以及复杂的特征间的差值函数等(注:apply不能直接使用agg()方法 / transform()中的python内置函数,例如sum.max.min.'count'等方法) transform() 里面不能跟自定义的特征交互函数,

  • pandas使用apply多列生成一列数据的实例

    如下所示: import pandas as pd def my_min(a, b): return min(abs(a),abs(b)) s = pd.Series([10.0247,10.0470, 10.0647,10.0761,15.0800,10.0761,10.0647,10.0470,10.0247,10.0,9.9753,9.9530,9.9353,9.9239,18.92,9.9239,9.9353,9.9530,9.9753,10.0]) df = pd.DataFrame(

  • pandas 使用apply同时处理两列数据的方法

    多的不说,看了代码就懂了! df = pd.DataFrame ({'a' : np.random.randn(6), 'b' : ['foo', 'bar'] * 3, 'c' : np.random.randn(6)}) def my_test(a, b): return a + b df['Value'] = df.apply(lambda row: my_test(row['a'], row['c']), axis=1) print df 以上这篇pandas 使用apply同时处理两列

  • Pandas对DataFrame单列/多列进行运算(map, apply, transform, agg)

    1.单列运算 在Pandas中,DataFrame的一列就是一个Series, 可以通过map来对一列进行操作: df['col2'] = df['col1'].map(lambda x: x**2) 其中lambda函数中的x代表当前元素.可以使用另外的函数来代替lambda函数,例如: define square(x): return (x ** 2) df['col2'] = df['col1'].map(square) 2.多列运算 apply()会将待处理的对象拆分成多个片段,然后对各

  • pandas apply 函数 实现多进程的示例讲解

    前言: 在进行数据处理的时候,我们经常会用到 pandas .但是 pandas 本身好像并没有提供多进程的机制.本文将介绍如何来自己实现 pandas (apply 函数)的多进程执行.其中,我们主要借助 joblib库,这个库为python 提供了一个非常简洁方便的多进程实现方法. 所以,本文将按照下面的安排展开,前面可能比较啰嗦,若只是想知道怎么用可直接看第三部分: - 首先简单介绍 pandas 中的分组聚合操作 groupby. - 然后简单介绍 joblib 的使用方法. - 最后,

  • Pandas的Apply函数具体使用

    Pandas最好用的函数 Pandas是Python语言中非常好用的一种数据结构包,包含了许多有用的数据操作方法.而且很多算法相关的库函数的输入数据结构都要求是pandas数据,或者有该数据的接口. 仔细看pandas的API说明文档,就会发现有好多有用的函数,比如非常常用的文件的读写函数就包括如下函数: Format Type Data Description Reader Writer text CSV read_csv to_csv text JSON read_json to_json

  • 详谈pandas中agg函数和apply函数的区别

    在利用python进行数据分析 这本书中其实没有明确表明这两个函数的却别,而是说apply更一般化. 其实在这本书的第九章'数组及运算和转换'点到了两者的一点点区别:agg是用来聚合运算的,所谓的聚合当然是合成的成分比较大些,这一节开头就点到了:聚合只不过是分组运算的其中一种而已.它是数据转换的一个特例,也就是说,它接受能够将一维数组简化为标量值的函数. 当然这两个函数都是作用在groupby对象上的,也就是分完组的对象上的,分完组之后针对某一组,如果值是一维数组,在利用完特定的函数之后,能做到

  • 浅谈Pandas中map, applymap and apply的区别

    1.apply() 当想让方程作用在一维的向量上时,可以使用apply来完成,如下所示 In [116]: frame = DataFrame(np.random.randn(4, 3), columns=list('bde'), index=['Utah', 'Ohio', 'Texas', 'Oregon']) In [117]: frame Out[117]: b d e Utah -0.029638 1.081563 1.280300 Ohio 0.647747 0.831136 -1.

  • pandas apply多线程实现代码

    一.多线程化选择 并行化一个代码有两大选择:multithread 和 multiprocess. Multithread,多线程,同一个进程(process)可以开启多个线程执行计算.每个线程代表了一个 CPU 核心,这么多线程可以访问同样的内存地址(所谓共享内存),实现了线程之间的通讯,算是最简单的并行模型. Multiprocess,多进程,则相当于同时开启多个 Python 解释器,每个解释器有自己独有的数据,自然不会有数据冲突. 二.并行化思想 并行化的基本思路是把 dataframe

  • pandas apply使用多列计算生成新的列实现示例

    在python数据分析中,有时需要根据多列数据生成中间结果,pandas给我们带来了很多方便,通常简短的代码可以实现一些高级功能,灵活掌握一些技巧可以事倍功半 pandas的apply方法用于对指定列的每个元素进行相同的操作,下面生成一个dataFrame用于演示: import pandas as pd a=range(5) b=range(5,10) c=range(10,15) data=pd.DataFrame([a,b,c]).T data.columns=["a",&quo

  • 详解pandas apply 并行处理的几种方法

    1. pandarallel (pip install ) 对于一个带有Pandas DataFrame df的简单用例和一个应用func的函数,只需用parallel_apply替换经典的apply. from pandarallel import pandarallel # Initialization pandarallel.initialize() # Standard pandas apply df.apply(func) # Parallel apply df.parallel_ap

  • 解析pandas apply() 函数用法(推荐)

    目录 Series.apply() apply 函数接收带有参数的函数 DataFrame.apply() apply() 计算日期相减示例 参考 理解 pandas 的函数,要对函数式编程有一定的概念和理解.函数式编程,包括函数式编程思维,当然是一个很复杂的话题,但对今天介绍的 apply() 函数,只需要理解:函数作为一个对象,能作为参数传递给其它函数,也能作为函数的返回值. 函数作为对象能带来代码风格的巨大改变.举一个例子,有一个类型为 list 的变量,包含 从 1 到 10 的数据,需

  • ASP.NET:一段比较经典的多线程学习代码

    一段比较经典的多线程学习代码. 1.用到了多线程的同步问题. 2.用到了多线程的顺序问题. 如果有兴趣的请仔细阅读下面的代码.注意其中代码段的顺序,思考一下,这些代码的顺序能否互相调换,为什么?这应该对学习很有帮助的.为了演示,让所有的线程都Sleep了一段时间. using System.Net;using System;using System.IO;using System.Text;using System.Threading;using System.Diagnostics; name

  • java多线程中断代码详解

    一.java中终止线程主要有三种方法: ①线程正常退出,即run()方法执行完毕了 ②使用Thread类中的stop()(已过期不推荐使用)方法强行终止线程. ③使用中断机制 t.stop()调用时,终止线程,会导致该线程所持有的锁被强制释放,从而被其他线程所持有,因此有可能导致与预期结果不一致.下面使用中断信号量中断非阻塞状态的线程中: public class TestStopThread { public static void main(String[] args) throws Int

  • Java多线程同步器代码详解

    同步器 为每种特定的同步问题提供了解决方案,同步器是一些使线程能够等待另一个线程的对象,允许它们协调动作.最常用的同步器是CountDownLatch和Semaphore,不常用的是Barrier 和Exchanger Semaphore Semaphore[信号标:旗语],通过计数器控制对共享资源的访问. 测试类: package concurrent; import concurrent.thread.SemaphoreThread; import java.util.concurrent.

  • python多线程实现代码(模拟银行服务操作流程)

    1.模拟银行服务完成程序代码 目前,在以银行营业大厅为代表的窗口行业中大量使用排队(叫号)系统,该系统完全模拟了人群排队全过程,通过取票进队.排队等待.叫号服务等功能,代替了人们站队的辛苦. 排队叫号软件的具体操作流程为: 顾客取服务序号 当顾客抵达服务大厅时,前往放置在入口处旁的取号机,并按一下其上的相应服务按钮,取号机会自动打印出一张服务单.单上显示服务号及该服务号前面正在等待服务的人数. 服务员工呼叫顾客 服务员工只需按一下其柜台上呼叫器的相应按钮,则顾客的服务号就会按顺序的显示在显示屏上

  • 在IntelliJ IDEA中多线程并发代码的调试方法详解

    通常来说,多线程的并发及条件断点的debug是很难完成的,或许本篇文章会给你提供一个友好的调试方法.让你在多线程开发过程中的调试更加的有的放矢. 我们将通过一个例子来学习.在这里,我编写了一个多线程程序来计算此数学问题:100! + 100000!.即:100的阶乘 + 100000的阶乘. 数学不好的同学看这里,100 阶乘就是:1 * 2 * 3 * -- * 100 = ? ,简写为100! import java.math.BigInteger; public class MathPro

随机推荐