Python高效处理大文件的方法详解

目录
  • 开始
  • 处理文本
  • 串行处理
  • 多进程处理
  • 并行处理
  • 并行批量处理
  • 将文件分割成批
  • 运行并行批处理
  • tqdm 并发
  • 结论

为了进行并行处理,我们将任务划分为子单元。它增加了程序处理的作业数量,减少了整体处理时间。

例如,如果你正在处理一个大的CSV文件,你想修改一个单列。我们将把数据以数组的形式输入函数,它将根据可用的进程数量,一次并行处理多个值。这些进程是基于你的处理器内核的数量。

在这篇文章中,我们将学习如何使用multiprocessing、joblib和tqdm Python包减少大文件的处理时间。这是一个简单的教程,可以适用于任何文件、数据库、图像、视频和音频。

开始

我们将使用来自 Kaggle的 US Accidents (2016 - 2021) 数据集,它包括280万条记录和47个列。

我们将导入multiprocessing、joblib和tqdm用于并行处理,pandas用于数据导入,re、nltk和string用于文本处理。

# Parallel Computing
import multiprocessing as mp
from joblib import Parallel, delayed
from tqdm.notebook import tqdm
# Data Ingestion
import pandas as pd
# Text Processing
import re
from nltk.corpus import stopwords
import string

在我们开始之前,让我们通过加倍cpu_count()来设置n_workers。正如你所看到的,我们有8个workers。

n_workers = 2 * mp.cpu_count()
print(f"{n_workers} workers are available")
>>> 8 workers are available

下一步,我们将使用pandas read_csv函数读取大型CSV文件。然后打印出dataframe的形状、列的名称和处理时间。

%%time
file_name="../input/us-accidents/US_Accidents_Dec21_updated.csv"
df = pd.read_csv(file_name)
print(f"Shape:{df.shape}\n\nColumn Names:\n{df.columns}\n")

输出:

Shape:(2845342, 47)
Column Names:
Index(['ID', 'Severity', 'Start_Time', 'End_Time', 'Start_Lat', 'Start_Lng',
'End_Lat', 'End_Lng', 'Distance(mi)', 'Description', 'Number', 'Street',
'Side', 'City', 'County', 'State', 'Zipcode', 'Country', 'Timezone',
'Airport_Code', 'Weather_Timestamp', 'Temperature(F)', 'Wind_Chill(F)',
'Humidity(%)', 'Pressure(in)', 'Visibility(mi)', 'Wind_Direction',
'Wind_Speed(mph)', 'Precipitation(in)', 'Weather_Condition', 'Amenity',
'Bump', 'Crossing', 'Give_Way', 'Junction', 'No_Exit', 'Railway',
'Roundabout', 'Station', 'Stop', 'Traffic_Calming', 'Traffic_Signal',
'Turning_Loop', 'Sunrise_Sunset', 'Civil_Twilight', 'Nautical_Twilight',
'Astronomical_Twilight'],
dtype='object')
CPU times: user 33.9 s, sys: 3.93 s, total: 37.9 s
Wall time: 46.9 s

处理文本

clean_text是一个用于处理文本的简单函数。我们将使用nltk.copus获得英语停止词,并使用它来过滤掉文本行中的停止词。之后,我们将删除句子中的特殊字符和多余的空格。它将成为确定串行、并行和批处理的处理时间的基准函数。

def clean_text(text):
 # Remove stop words
 stops = stopwords.words("english")
  text = " ".join([word for word in text.split() if word
not in stops])
 # Remove Special Characters
 text = text.translate(str.maketrans('', '', string.punctuation))
 # removing the extra spaces
 text = re.sub(' +',' ', text)
 return text

串行处理

对于串行处理,我们可以使用pandas的.apply()函数,但是如果你想看到进度条,你需要为pandas激活tqdm,然后使用.progress_apply()函数。

我们将处理280万条记录,并将结果保存回 “Description” 列中。

%%time
tqdm.pandas()
df['Description'] = df['Description'].progress_apply(clean_text)

输出

高端处理器串行处理280万行花了9分5秒。

100%          2845342/2845342 [09:05<00:00, 5724.25it/s]
CPU times: user 8min 14s, sys: 53.6 s, total: 9min 7s
Wall time: 9min 5s

多进程处理

有多种方法可以对文件进行并行处理,我们将了解所有这些方法。multiprocessing是一个内置的python包,通常用于并行处理大型文件。

我们将创建一个有8个workers的多处理池,并使用map函数来启动进程。为了显示进度条,我们将使用tqdm。

map函数由两部分组成。第一个部分需要函数,第二个部分需要一个参数或参数列表。

%%time
p = mp.Pool(n_workers)
df['Description'] = p.map(clean_text,tqdm(df['Description']))

输出

我们的处理时间几乎提高了3倍。处理时间从9分5秒下降到3分51秒。

100%          2845342/2845342 [02:58<00:00, 135646.12it/s]
CPU times: user 5.68 s, sys: 1.56 s, total: 7.23 s
Wall time: 3min 51s

并行处理

我们现在将学习另一个Python包来执行并行处理。在本节中,我们将使用joblib的Parallel和delayed来复制map函数。

  • Parallel需要两个参数:n_job = 8和backend = multiprocessing。
  • 然后,我们将在delayed函数中加入clean_text。
  • 创建一个循环,每次输入一个值。

下面的过程是相当通用的,你可以根据你的需要修改你的函数和数组。我曾用它来处理成千上万的音频和视频文件,没有任何问题。

建议:使用 "try: "和 "except: "添加异常处理。

def text_parallel_clean(array):
 result = Parallel(n_jobs=n_workers,backend="multiprocessing")(
 delayed(clean_text)
  (text)
 for text in tqdm(array)
 )
 return result

在text_parallel_clean()中添加“Description”列。

%%time
df['Description'] = text_parallel_clean(df['Description'])

输出

我们的函数比多进程处理Pool多花了13秒。即使如此,并行处理也比串行处理快4分59秒。

100%          2845342/2845342 [04:03<00:00, 10514.98it/s]
CPU times: user 44.2 s, sys: 2.92 s, total: 47.1 s
Wall time: 4min 4s

并行批量处理

有一个更好的方法来处理大文件,就是把它们分成若干批,然后并行处理。让我们从创建一个批处理函数开始,该函数将在单一批次的值上运行clean_function。

批量处理函数

def proc_batch(batch):
 return [
 clean_text(text)
 for text in batch
 ]

将文件分割成批

下面的函数将根据workers的数量把文件分成多个批次。在我们的例子中,我们得到8个批次。

def batch_file(array,n_workers):
 file_len = len(array)
 batch_size = round(file_len / n_workers)
 batches = [
 array[ix:ix+batch_size]
 for ix in tqdm(range(0, file_len, batch_size))
 ]
 return batches
batches = batch_file(df['Description'],n_workers)
>>> 100% 8/8 [00:00<00:00, 280.01it/s]

运行并行批处理

最后,我们将使用Parallel和delayed来处理批次。

%%time
batch_output = Parallel(n_jobs=n_workers,backend="multiprocessing")(
 delayed(proc_batch)
  (batch)
 for batch in tqdm(batches)
 )
df['Description'] = [j for i in batch_output for j in i]

输出

我们已经改善了处理时间。这种技术在处理复杂数据和训练深度学习模型方面非常有名。

100%          8/8 [00:00<00:00, 2.19it/s]
CPU times: user 3.39 s, sys: 1.42 s, total: 4.81 s
Wall time: 3min 56s

tqdm 并发

tqdm将多处理带到了一个新的水平。它简单而强大。

process_map需要:

  • 函数名称
  • Dataframe 列名
  • max_workers
  • chucksize与批次大小类似。我们将用workers的数量来计算批处理的大小,或者你可以根据你的喜好来添加这个数字。
%%time
from tqdm.contrib.concurrent import process_map
batch = round(len(df)/n_workers)
df['Description'] = process_map(clean_text,df['Description'], max_workers=n_workers, chunksize=batch)

输出

通过一行代码,我们得到了最好的结果:

100%          2845342/2845342 [03:48<00:00, 1426320.93it/s]
CPU times: user 7.32 s, sys: 1.97 s, total: 9.29 s
Wall time: 3min 51s

结论

我们需要找到一个平衡点,它可以是串行处理,并行处理,或批处理。如果你正在处理一个较小的、不太复杂的数据集,并行处理可能会适得其反。

在这个教程中,我们已经了解了各种处理大文件的Python包,它们允许我们对数据函数进行并行处理。

如果你只处理一个表格数据集,并且想提高处理性能,那么建议你尝试Dask、datatable和RAPIDS。

到此这篇关于Python高效处理大文件的方法详解的文章就介绍到这了,更多相关Python处理大文件内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • Python文件处理与垃圾回收机制详情

    目录 01.文件操作 1.1.文件操作流程 1.2.文件的操作模式 1.3.操作文件的方法 1.4.主动移动文件内指针移动 1.5文件的修改 1.6垃圾回收机制 01.文件操作 文件是操作系统提供给用户/应用程序操作硬盘的一个虚拟的概念/接口 用户/应用程序可以通过文件将数据永久保存在硬盘中 用户/应用程序直接操作的是文件,对文件进行的所有的操作,都是在向操作系统发送系统调用,然后再由操作系统将其转成具体的硬盘操作 1.1.文件操作流程 打开文件: 打开文件,由应用系统向操作系统发起系统调用op

  • Python垃圾回收是怎么实现的

    目录 什么是垃圾回收 Python中的垃圾回收机制 引用计数 循环引用 标记清除解除循环引用 分代回收 总结 什么是垃圾回收 垃圾回收(GC) 大家应该多多少少都了解过,什么是垃圾回收呢?垃圾回收GC的全拼是 Garbage Collection,在维基百科的定义是:在计算机科学中,垃圾回收(英语:Garbage Collection,缩写为GC)是一种自动的内存管理机制.当一个电脑上的动态内存不再需要时,就应该予以释放,以让出内存,这种内存资源管理,称为垃圾回收.我们都知道在C/C++里用户需

  • python处理xml文件操作详解

    目录 1.python 操作xml的方式介绍 2.ElementTree模块 3.解析xml格式字符串并获取根节点 4.读取节点内容,getroot() 5.通标标签名直接获取标签(find,findall) 6.全文搜索标签名(类似xpath路径查找标签) 7.修改节点 8.删除节点 9.构建文件 方式1 (Element) 方式2 (makeelement) 方式3 1.python 操作xml的方式介绍 查看全部包含“三种⽅法: ⼀是xml.dom. * 模块,它是W3CDOMAPI的实现

  • Python 垃圾回收机制详解

    目录 1. 引用计数 2. 标记-清除 3. 分代回收 4. 其他 4.1 JNI(Java Native Interface) 总结 Python 的GC模块主要运用了引用计数来跟踪和回收垃圾:通过"标记-清除"解决容器对象可能产生的循环引用问题:通过分代回收以空间换时间进一步提高垃圾回收的效率. 也即采用"引用计数"为主(实时性,一旦没有引用,内存就直接释放了),"标记-清除"与"分代收集"两种机制为辅的策略.      

  • 分析python垃圾回收机制原理

    目录 引用计数 引用计数案例 执行结果: 导致引用计数 +1 的情况 导致引用计数-1 的情况 循环引用导致内存泄露 执行结果 分代回收 垃圾回收 gc 模块 常用函数: 引用计数 Python 语言默认采用的垃圾收集机制是『引用计数法 Reference Counting』,该算法最早 George E. Collins 在 1960 的时候首次提出,50 年后的今天,该算法依然被很多编程语言使用. 引用计数法的原理是:每个对象维护一个ob_ref字段,用来记录该对象当前被引用的次数,每当新的

  • python处理json文件的四个常用函数

    目录 一,json.load()和json.dump只要用于读写json数据 二,json.loads和json.dumps主要用于字符串和字典之间的类型转换 三,练习 1编写一个json格式的文件 2编写python方法 一,json.load()和json.dump只要用于读写json数据 1json.load() 从文件中读取json字符串 with open('data.json','r',encoding='utf-8') as f print(json.load(f)) 2json.

  • python PyVCF文件处理VCF文件格式实例详解

    目录 引言 PyVCF库的安装 PyVCF库的导入 PyVCF库详细介绍 使用实例: _Record对象------位点信息的储存形式 Reader对象------处理vcf文件,构建结构化信息 综合使用: 引言 vcf文件的全称是variant call file,即突变识别文件,它是基因组工作流程中产生的一种文件,保存的是基因组上的突变信息.通过对vcf文件进行分析,可以得到个体的变异信息.嗯,总之,这是很重要的文件,所以怎么处理它也显得十分重要.它的文件信息如下: 文件的开头是一堆以“##

  • python语言开发垃圾回收机制原理教程

    目录 一.什么是垃圾回收机制 二.为什么要有垃圾回收机制 三.垃圾回收机制的原理 1.引用计数 直接引用 间接引用 2.栈区 / 堆区 3.总结 四.标记清除 1.循环引用问题(也叫交叉引用) 2.循环引用导致的结果 3.解决方法 : 清除-标记 五.分代回收 1.效率问题 2.解决方法 : 分代回收 分代 回收 总结 一.什么是垃圾回收机制 垃圾回收机制(简称GC), 解释器自带的一种机制 它是一种动态存储管理技术,自动释放不再被程序引用的对象所占用的内存空间 二.为什么要有垃圾回收机制 程序

  • Python高效处理大文件的方法详解

    目录 开始 处理文本 串行处理 多进程处理 并行处理 并行批量处理 将文件分割成批 运行并行批处理 tqdm 并发 结论 为了进行并行处理,我们将任务划分为子单元.它增加了程序处理的作业数量,减少了整体处理时间. 例如,如果你正在处理一个大的CSV文件,你想修改一个单列.我们将把数据以数组的形式输入函数,它将根据可用的进程数量,一次并行处理多个值.这些进程是基于你的处理器内核的数量. 在这篇文章中,我们将学习如何使用multiprocessing.joblib和tqdm Python包减少大文件

  • Python自动操作Excel文件的方法详解

    目录 工具 读取Excel文件内容 写入Excel文件内容 Excel文件样式调整 设置表头的位置 设置单元格的宽高 总结 工具 python3.7 Pycharm Excel xlwt&xlrd 读取Excel文件内容 当前文件夹下有一个名为“股票数据.xlsx”的Excel文件,可以按照下列代码方式来操作它. import xlrd # 使用xlrd模块的open_workbook函数打开指定Excel文件并获得Book对象(工作簿) wb = xlrd.open_workbook('股票数

  • python通过http下载文件的方法详解

    1.通过requests.get方法 r = requests.get("http://200.20.3.20:8080/job/Compile/job/aaa/496/artifact/bbb.iso") with open(os.path.join(os.path.dirname(os.path.abspath("__file__")),"bbb.iso"),"wb") as f: f.write(r.content) 2

  • Python 工具类实现大文件断点续传功能详解

    依赖 os.sys.requests 工具代码 废话不多说,上代码. #!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Created on Sat Oct 23 13:54:39 2021 @author: huyi """ import os import sys import requests def download(url, file_path): # 重试计数 count = 0 #

  • Python实现在Excel中绘制可视化大屏的方法详解

    目录 数据清洗 绘制图表 生成可视化大屏 大家新年好哇,今天小编来给大家分享如何在Excel文档当中来绘制可视化图表,并且制作一个可视化大屏,非常的容易,这里我们会用到openpyxl模块,那么首先第一步便是调用该模块来读取Excel文件,代码如下 # 读取Excel文档并且指定工作表的名称 file_name = 'Bike_Sales_Playground.xlsx' df = pd.read_excel(file_name,sheet_name='bike_buyers') 当然为了保险起

  • python简单读取大文件的方法

    本文实例讲述了python简单读取大文件的方法.分享给大家供大家参考,具体如下: Python读取大文件(GB级别)采用的办法很简单: with open(...) as f: for line in f: <do something with line> 例如: with open(filepath,'r') as infile: for line in infile: print line 一切都交给python解释器处理,读取效率很高,且占用资源少. stackoverflow参考链接:

  • 使用python进行拆分大文件的方法

    python按指定行数把大文件进行拆分 如图大文件有7000多万行,大小为16G 需要拆分成多个200万行的小文件 代码如下: # -*- coding:utf-8 -*- from datetime import datetime def Main(): source_dir = '/data/u_lx_data/zhangqm/sh/yanjie/liuxuesheng/jz_yuanshi_list0206.txt' target_dir = '/data/u_lx_data/zhangq

  • Python Pandas读写txt和csv文件的方法详解

    目录 一.文本文件 1. read_csv() 2. to_csv() 一.文本文件 文本文件,主要包括csv和txt两种等,相应接口为read_csv()和to_csv(),分别用于读写数据 1. read_csv() 格式代码: pandas.read_csv(filepath_or_buffer, sep=', ', delimiter=None, header='infer', names=None, index_col=None, usecols=None, squeeze=False

  • 对Python的多进程锁的使用方法详解

    很多时候,我们需要在多个进程中同时写一个文件,如果不加锁机制,就会导致写文件错乱 这个时候,我们可以使用multiprocessing.Lock() 我一开始是这样使用的: import multiprocessing lock = multiprocessing.Lock() class MatchProcess(multiprocessing.Process): def __init__(self, threadId, mfile, lock): multiprocessing.Proces

  • Python实现提取音乐频谱的方法详解

    目录 前言 1.准备 2.频谱展示 前言 你有没有经常好奇一些音乐软件的频谱特效是怎么做的,为什么做的这么好看?有没有想试试自己提取音乐频谱并可视化展现出来?今天,咱就结合上次的音乐剪辑操作: 3行Python代码实现剪辑音乐 来简单粗暴地可视化下面这首歌曲的频谱! 1.准备 开始之前,你要确保Python和pip已经成功安装在电脑上,如果没有,可以访问这篇文章:超详细Python安装指南 进行安装. Windows环境下打开Cmd(开始—运行—CMD),苹果系统环境下请打开Terminal(c

随机推荐