Python详解复杂CSV文件处理方法

目录
  • 项目简介
  • 项目笔记与心得
    • 1.分批处理与多进程及多线程加速
    • 2.优化算法提高效率
  • 总结

项目简介

鉴于项目保密的需要,不便透露太多项目的信息,因此,简单介绍一下项目存在的难点:

  • 海量数据:项目是对CSV文件中的数据进行处理,而特点是数据量大...真的大!!!拿到的第一个CSV示例文件是110多万行(小CASE),而第二个文件就到了4500万行,等到第三个文件......好吧,一直没见到第三个完整示例文件,因为太大了,据说是第二个示例文件的40多倍,大概二十亿行......
  • 业务逻辑复杂:项目是需要对CSV文件的每一行数据的各种组合可能性进行判断,而判断的业务逻辑较为复杂,如何在解决复杂逻辑的同时保证较高的处理效率是难点之一。

项目笔记与心得

1.分批处理与多进程及多线程加速

  • 因为数据量太大,肯定是要分批对数据进行处理,否则,效率低不谈,大概率也没有足够的内存能够支撑,需要用到chunksize,此外,为了节约内存,以及提高处理效率,可以将文本类的数据存储为“category”格式:
  • 项目整体是计算密集型的任务,因此,需要用到多进程,充分利用CPU的多核性能;
  • 多线程进行读取与写入,其中,写入使用to_csv的增量写入方法,mode参数设置为'a';
  • 多进程与多线程开启一般为死循环,需要在合适的位置,放入结束循环的信号,以便处理完毕后退出多进程或多线程
"""鉴于项目保密需要,以下代码仅为示例"""
import time
import pathlib as pl
import pandas as pd
from threading import Thread
from multiprocessing import Queue, Process, cpu_count
# 导入多线程Thread,多进程的队列Queue,多进程Process,CPU核数cpu_count
# 存放分段读取的数据队列,注:maxsize控制队列的最大数量,避免一次性读取到内存中的数据量太大
data_queue = Queue(maxsize=cpu_count() * 2)
# 存放等待写入磁盘的数据队列
write_queue = Queue()
def read_data(path: pl.Path, data_queue: Queue, size: int = 10000):
    """
    读取数据放入队列的方法
    :return:
    """
    data_obj = pd.read_csv(path, sep=',', header=0, chunksize=size, dtype='category')
    for idx, df in enumerate(data_obj):
        while data_queue.full():  # 如果队列满了,那就等待
            time.sleep(1)
        data_queue.put((idx + 1, df))
    data_queue.put((None, None))  # 放入结束信号
def write_data(out_path: pl.Path, write_queue: Queue):
    """
    将数据增量写入CSV的方法
    :return:
    """
    while True:
        while write_queue.empty():
            time.sleep(1)
        idx, df = write_queue.get()
        if df is None:
            return  # 结束退出
        df.to_csv(out_path, mode='a', header=None, index=False, encoding='ansi')  # 输出CSV
def parse_data(data_queue: Queue, write_queue: Queue):
    """
    从队列中取出数据,并加工的方法
    :return:
    """
    while True:
        while write_queue.empty():
            time.sleep(1)
        idx, df = data_queue.get()
        if df is None:  # 如果是空的结束信号,则结束退出进程,
        # 特别注意结束前把结束信号放回队列,以便其他进程也能接收到结束信号!!!
            data_queue.put((idx, df))
            return
        """处理数据的业务逻辑略过"""
        write_queue.put((idx, df))  # 将处理后的数据放入写队列
# 创建一个读取数据的线程
read_pool = Thread(target=read_data, args=(read_data_queue, *args))
read_pool.start()  # 开启读取线程
# 创建一个增量写入CSV数据的线程
write_pool = Thread(target=write_data, args=(write_data_queue, *args))
write_pool.start()  # 开启写进程
pools = []  # 存放解析进程的队列
for i in range(cpu_count()):  # 循环开启多进程,不确定开多少个进程合适的情况下,那么按CPU的核数开比较合理
    pool = Process(target=parse_data, args=(read_data_queue, write_data_queue, *args))
    pool.start()  # 启动进程
    pools.append(pool)  # 加入队列
for pool in pools:
    pool.join()  # 等待所有解析进程完成
# 所有解析进程完成后,在写队列放入结束写线程的信号
write_data_queue.put((None, None))
write_pool.join()  # 等待写线程结束
print('任务完成')

2.优化算法提高效率

将类对象存入dataframe列

在尝试了n种方案之后,最终使用了将类对象存到dataframe的列中,使用map方法,运行类方法,最后,将运行结果展开到多列中的方式。该方案本项目中取得了最佳的处理效率。

"""鉴于保密需要,以下代码仅为示例"""
class Obj:
    def __init__(self, ser: pd.Series):
        """
        初始化类对象
        :param ser: 传入series
        """
        self.ser = ser  # 行数据
        self.attrs1 = []  # 属性1
        self.attrs2 = []  # 属性2
        self.attrs3 = []  # 属性3
    def __repr__(self):
        """
        自定义输出
        """
        attrs1 = '_'.join([str(a) for a in self.attrs1])
        attrs2 = '_'.join([str(a) for a in self.attrs2])
        attrs3 = '_'.join([str(a) for a in self.attrs3])
        return '_'.join([attrs1, attrs2, attrs3])
    def run(self):
        """运行业务逻辑"""
# 创建obj列,存入类对象
data['obj'] = data.apply(lambda x: Obj(x), axis=1)
# 运行obj列中的类方法获得判断结果
data['obj'] = data['obj'].map(lambda x: x.run())
# 链式调用,1将类对象文本化->2拆分到多列->3删除空列->4转换为category格式
data[['col1', 'col2', 'col3', ...省略]] = data['obj'].map(str).str.split('_', expand=True).dropna(axis=1).astype('category')
# 删除obj列
data.drop(columns='obj', inplace=True)  

减少计算次数以提高运行效率

在整个优化过程中,对运行效率产生最大优化效果的有两项:

  • 一是改变遍历算法,采用直接对整行数据进行综合判断的方法,使原需要遍历22个组合的计算与判断大大减少
  • 二是提前计算特征组合,制作成字典,后续直接查询结果,而不再进行重复计算

使用numpy加速计算

numpy还是数据处理上的神器,使用numpy的方法,比自己实现的方法效率要高非常多,本项目中就用到了:bincount、argsort,argmax、flipud、in1d、all等,即提高了运行效率,又解决了逻辑判断的问题:

"""numpy方法使用示例"""
import numpy as np
# 计算数字的个数组合bincount
np.bincount([9, 2, 13, 12, 9, 10, 11])
# 输出结果:array([0, 0, 1, 0, 0, 0, 0, 0, 0, 2, 1, 1, 1, 1], dtype=int64)
# 取得个数最多的数字argmax
np.argmax(np.bincount([9, 2, 13, 12, 9, 10, 11]))
# 输出结果: 9
# 将数字按照个数优先,其次大小进行排序argsort
np.argsort(np.bincount([9, 2, 13, 12, 9, 10, 11]))
# 输出结果:array([ 0,  1,  3,  4,  5,  6,  7,  8,  2, 10, 11, 12, 13,  9], dtype=int64)
# 翻转列表flipud
np.flipud(np.argsort(np.bincount([9, 2, 13, 12, 9, 10, 11])))
# 输出结果: array([ 9, 13, 12, 11, 10,  2,  8,  7,  6,  5,  4,  3,  1,  0], dtype=int64)
# 查找相同值in1d
np.in1d([2, 3, 4], [2, 9, 3])
# 输出结果: array([ True,  True, False]) 注:指2,3True,4False
np.all(np.in1d([2, 3], [2, 9, 3]))
# 输出结果: array([ True,  True])
# 是否全是all
np.all(np.in1d([2, 3, 4], [2, 9, 3]))  # 判断组合1是否包含在组合2中
# 输出结果: False
np.all(np.in1d([2, 3], [2, 9, 3]))
# 输出结果: True

优化前后的效率对比

总结

优化算法是在这个项目上时间花费最多的工作(没有之一)。4月12日接单,10天左右出了第1稿,虽能运行,但回头看存在两个问题:一是有bug需要修正,二是运行效率不高(4500万行数据,执行需要1小时21分钟,如果只是在这个版本上debug需要增加判断条件,效率只会更低);后20多天是在不断的优化算法的同时对bug进行修正,最后版本执行相同数据只需要不足30分钟,效率提高了一倍多。回顾来看,虽然调优花费的时间多,但是每一个尝试不论成功还是失败都是一次宝贵的经验积累。

到此这篇关于Python详解复杂CSV文件处理方法的文章就介绍到这了,更多相关Python CSV文件处理内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • Python CSV 文件解析和生成方法示例

    目录 思路 Python with CSV CSV 格式 使用 Python 的 csv 库 其他 思路 简单的生成和读取 CSV CSV 文件格式 使用 csv 库 Python with CSV 先有个很朴素的生成和解析的方法. 生成: data = [ [1, 2, 3], [4, 5, 6], [7, 8, 9], ] for cow in data: print(','.join(map(str, cow))) 生成结果: 1,2,34,5,67,8,9 解析 s = ""&

  • Python全面解析json数据并保存为csv文件

    目录 解析json数据并保存为csv文件 完整代码 将json任意行文件转为csv文件并保存 将json格式的前3000条数据存入csv 解析json数据并保存为csv文件 首先导入两个包: import json import pandas as pd 打开json 文件并读取: with open('2.json', encoding='utf-8') as f:     line = f.readline()     d = json.loads(line)     f.close() 读

  • Python Pandas处理CSV文件的常用技巧分享

    目录 读取Pandas文件 统计列值出现的次数 筛选特定列值 遍历数据行 绘制直方图(柱状图) Pandas处理CSV文件,分为以下几步: 读取Pandas文件 统计列值出现的次数 筛选特定列值 遍历数据行 绘制直方图(柱状图) 读取Pandas文件 df = pd.read_csv(file_path, encoding='GB2312') print(df.info()) 注意:Pandas的读取格式默认是UTF-8,在中文CSV中会报错: UnicodeDecodeError: 'utf-

  • Python数据读写之Python读写CSV文件

    目录 1. 读取CSV文件 csv.reader() 2. 写入CSV文件 1. 读取CSV文件 csv.reader() 该方法的作用相当于就是通过 ',' 分割csv格式的数据,并将分割好的每行数据存入列表中,并且还去除了每行最后分割产生的数据尾部的空格.换行符.制表符等等. import csv with open('data.csv',mode='r',encoding='utf-8-sig',newline='') as File: # 使用csv.reader()将文件中的每行数据读

  • Python中CSV文件(逗号分割)实战操作指南

    目录 一.csv文件介绍 1.csv文件简介 2.为什么要使用csv文件 二.csv文件查看 1.测试文件创建 2.查看csv文件(列表) 3.查看csv文件(字典) 4.写入文件(列表) 5.写入文件(字典) 总结 一.csv文件介绍 1.csv文件简介 逗号分隔值(Comma-Separated Values,CSV,有时也称为字符分隔值,因为分隔字符也可以不是逗号),其文件以纯文本形式存储表格数据(数字和文本).纯文本意味着该文件是一个字符序列,不含必须像二进制数字那样被解读的数据.CSV

  • python保存字典数据到csv文件的完整代码

    导入包 import csv 创建或打开文件,设置文件形式 f = open('xixi.csv', mode='a',encoding='utf-8',newline='') #xixi为文件名称 设置输入数据的格式,设置'A','B','C','D','E', 'F'为列名,根据自己的需要设置自己的列名 csv_writer= csv.DictWriter(f,fieldnames=['A','B','C','D','E', 'F']) 将列名输入 csv_writer.writeheade

  • 在python中读取和写入CSV文件详情

    目录 前言 1.导入CSV库 2.对CSV文件进行读写 2.1 用列表形式写入CSV文件 2.2 用列表形式读取CSV文件 2.3 用字典形式写入csv文件 2.4 用字典形式读取csv文件 结语 前言 CSV(Comma-Separated Values)即逗号分隔值,一种以逗号分隔按行存储的文本文件,所有的值都表现为字符串类型(注意:数字为字符串类型).如果CSV中有中文,应以utf-8编码读写. 1.导入CSV库 python中对csv文件有自带的库可以使用,当我们要对csv文件进行读写的

  • 十分钟教会你用Python处理CSV文件

    目录 前言 Python库:csv 读取csv文件 使用csv.reader读取数据 使用csv.DictReader读取数据 写入csv文件 使用csv.writer写入数据 使用csv.DictWriter写入数据 总结 前言 在前几年,如果你和嵌入式开发人员推荐Python,大概会是这样一种场景: A:”诶,老王,你看Python开发这么方便,以后会不会用到嵌入式设备?“ B:“别做梦了,那玩意儿速度贼慢,肯定满足不了性能要求…” 但近几年,随着半导体行业的迅猛发展,嵌入式处理器的性能有了

  • 利用python合并csv文件的方式实例

    目录 1.用concat方法合并csv 2.glob模块批量合并csv 补充:Python处理(加载.合并)多个csv文件 总结 1.用concat方法合并csv 将两个相同的csv文件进行数据合并,通过pandas的read_csv和to_csv来完成,即采用concat方法: #加载第三方库 import pandas as pd import numpy as np #读取文件 df1 = pd.read_csv("文件-1.csv") df2 = pd.read_csv(&qu

  • Python详解复杂CSV文件处理方法

    目录 项目简介 项目笔记与心得 1.分批处理与多进程及多线程加速 2.优化算法提高效率 总结 项目简介 鉴于项目保密的需要,不便透露太多项目的信息,因此,简单介绍一下项目存在的难点: 海量数据:项目是对CSV文件中的数据进行处理,而特点是数据量大...真的大!!!拿到的第一个CSV示例文件是110多万行(小CASE),而第二个文件就到了4500万行,等到第三个文件......好吧,一直没见到第三个完整示例文件,因为太大了,据说是第二个示例文件的40多倍,大概二十亿行...... 业务逻辑复杂:项

  • python批量解压zip文件的方法

    这是一个用python写解压大量zip脚本的说明,本人新手一个,希望能对各位有所启发. 首先要注意的,在运行自己的脚本之前一定先备份或者复制出一些样本进行测试,不然出错会很麻烦: 之后我用到的是解压zip文件的扩展包zipfile,可以直接pip安装或者在IDE里安装,需要特别注意的是这个包的文件名解码方式需要我们去修改,先去查看源文件,直接搜索"cp437"(一个编码方式),找到后全部替换为"gbk",即可解决中文显示问题. 代码: import os impor

  • python 使用pandas读取csv文件的方法

    目录 pandas读取csv文件的操作 1. 读取csv文件 在这里记录一下,python使用pandas读取文件的方法用到pandas库的read_csv函数 # -*- coding: utf-8 -*- """ Created on Mon Jan 24 16:48:32 2022 @author: zxy """ # 导入包 import numpy as np import pandas as pd import matplotlib.

  • Python Pandas批量读取csv文件到dataframe的方法

    PYTHON Pandas批量读取csv文件到DATAFRAME 首先使用glob.glob获得文件路径.然后定义一个列表,读取文件后再使用concat合并读取到的数据. #读取数据 import pandas as pd import numpy as np import glob,os path=r'e:\tj\month\fx1806' file=glob.glob(os.path.join(path, "zq*.xls")) print(file) dl= [] for f i

  • 对Python 多线程统计所有csv文件的行数方法详解

    如下所示: #统计某文件夹下的所有csv文件的行数(多线程) import threading import csv import os class MyThreadLine(threading.Thread): #用于统计csv文件的行数的线程类 def __init__(self,path): threading.Thread.__init__(self) #父类初始化 self.path=path #路径 self.line=-1 #统计行数 def run(self): reader =

  • 详解Python读取和写入操作CSV文件的方法

    目录 什么是 CSV 文件? 内置 CSV 库解析 CSV 文件 读取 CSV 文件csv 将 CSV 文件读入字典csv 可选的 Python CSV reader参数 使用 csv 写入文件 从字典中写入 CSV 文件csv 使用 pandas 库解析 CSV 文件 pandas 读取 CSV 文件 pandas 写入 CSV 文件 最流行的数据交换格式之一是 CSV 格式.是需要通过键盘和控制台以外的方式将信息输入和输出的程序,通过文本文件交换信息是在程序之间共享信息的常用方法. 这里带和

  • Python Pandas读写txt和csv文件的方法详解

    目录 一.文本文件 1. read_csv() 2. to_csv() 一.文本文件 文本文件,主要包括csv和txt两种等,相应接口为read_csv()和to_csv(),分别用于读写数据 1. read_csv() 格式代码: pandas.read_csv(filepath_or_buffer, sep=', ', delimiter=None, header='infer', names=None, index_col=None, usecols=None, squeeze=False

  • C#实现读写CSV文件的方法详解

    目录 CSV文件标准 文件示例 RFC 4180 简化标准 读写CSV文件 使用CsvHelper 使用自定义方法 总结 项目中经常遇到CSV文件的读写需求,其中的难点主要是CSV文件的解析.本文会介绍CsvHelper.TextFieldParser.正则表达式三种解析CSV文件的方法,顺带也会介绍一下CSV文件的写方法. CSV文件标准 在介绍CSV文件的读写方法前,我们需要了解一下CSV文件的格式. 文件示例 一个简单的CSV文件: Test1,Test2,Test3,Test4,Test

  • Java实现将类数据逐行写入CSV文件的方法详解

    目录 1. 需求和思路 2. 现有方法 3. 代码 4. 参考 1. 需求和思路 最近要用java制作一个数据集,每一行是一个样本,格式是csv.用了一下java类的相关概念,把csv文件里的每一行,即每一个样本视为一个类. 2. 现有方法 目前已有的csv包如opencsv,可以支持字符串,也可以支持javabean(即java类).相关教程如下 Java OpenCSV|极客教程 由于墙的原因,我maven老是下载不到opencsv的jar包,没办法我只能手写个平民版的 3. 代码 自定义的

  • Python pandas 列转行操作详解(类似hive中explode方法)

    最近在工作上用到Python的pandas库来处理excel文件,遇到列转行的问题.找了一番资料后成功了,记录一下. 1. 如果需要爆炸的只有一列: df=pd.DataFrame({'A':[1,2],'B':[[1,2],[1,2]]}) df Out[1]: A B 0 1 [1, 2] 1 2 [1, 2] 如果要爆炸B这一列,可以直接用explode方法(前提是你的pandas的版本要高于或等于0.25) df.explode('B') A B 0 1 1 1 1 2 2 2 1 3

随机推荐