利用pandas进行大文件计数处理的方法

Pandas读取大文件

要处理的是由探测器读出的脉冲信号,一组数据为两列,一列为时间,一列为脉冲能量,数据量在千万级,为了有一个直接的认识,先使用Pandas读取一些

import pandas as pd
data = pd.read_table('filename.txt', iterator=True)
chunk = data.get_chunk(5) 

而输出是这样的:

Out[4]:
332.977889999979 -0.0164794921875
0 332.97790 -0.022278
1 332.97791 -0.026855
2 332.97792 -0.030518
3 332.97793 -0.045776
4 332.97794 -0.032654

DataFram基本用法

这里,data只是个容器,pandas.io.parsers.TextFileReader。

使用astype可以实现dataframe字段类型转换

输出数据中,每组数据会多处一行,因为get_chunk返回的是pandas.core.frame.DataFrame格式, 而data在读取过程中并没有指定DataFrame的columns,因此在get_chunk过程中,默认将第一组数据作为columns。因此需要在读取过程中指定names即DataFrame的columns。

import pandas as pd
data = pd.read_table('filename.txt', iterator=True, names=['time', 'energe'])
chunk = data.get_chunk(5)
data['energe'] = df['energe'].astype('int')

输出为

Out[6]:

index time energe
0 332.97789 -0.016479
1 332.97790 -0.022278
2 332.97791 -0.026855
3 332.97792 -0.030518
4 332.97793 -0.045776

DataFram存储和索引

这里讲一下DataFrame这个格式,与一般二维数据不同(二维列表等),DataFrame既有行索引又有列索引,因此在建立一个DataFrame数据是

DataFrame(data, columns=[‘year', ‘month', ‘day'],
index=[‘one', ‘two', ‘three'])
year month day
0 2010 4 1
1 2011 5 2
2 2012 6 3
3 2013 7 5
4 2014 8 9

而pd.read_table中的names就是指定DataFrame的columns,而index自动设置。 而DataFrame的索引格式有很多

类型 说明 例子
obj[val] 选取单列或者一组列
obj.ix[val] 选取单个行或者一组行
obj.ix[:,val] 选取单个列或列子集
obj.ix[val1, val2] 同时选取行和列
reindex方法 将一个或多个轴匹配到新索引
xs方法 根据标签选取单行或单列,返回一个Series
icol,lrow方法 根据整数位置选取单列或单行,返回一个Series
get_value,set_value 根据行标签列标签选取单个值

exp: In[1]:data[:2]

Out[2]:

year month day
0 2010 4 1
1 2011 5 2

In[2]:data[data[‘month']>5]

Out[2]:

year month day
2 2012 6 3
4 2014 8 9

如果我们直接把data拿来比较的话,相当于data中所有的标量元素

In[3]:data[data<6]=0

Out[3]:

year month day
0 2010 0 0
1 2011 0 0
2 2012 6 0
3 2013 7 0
4 2014 8 9

Pandas运算

series = data.ix[0]
data - series

Out:

year month day
0 0 0 0
1 1 1 1
2 2 2 2
3 3 3 4
4 4 4 8

DataFrame与Series之间运算会将Series索引匹配到DataFrame的列,然后沿行一直向下广播

如果令series1 = data[‘year']

data.sub(series1,axis=0) 

则每一列都减去该series1,axis为希望匹配的轴,=0行索引,即匹配列,=1列索引,则按行匹配。

DataFrame的一些函数方法

这个就有很多了,比如排序和排名;求和、平均数以及方差、协方差等数学方法;还有就是唯一值(类似于集合)、值计数和成员资格等方法。

当然还有一些更高级的属性,用的时候再看吧

数据处理

在得到数据样式后我们先一次性读取数据

start = time.time()
data = pd.read_table('Eu155_Na22_K40_MR_0CM_3Min.csv', names=['time', 'energe'])
end = time.time()
data.index
print("The time is %f s" % (end - start))
plus = data['energe']
plus[plus < 0] = 0
The time is 29.403917 s
RangeIndex(start=0, stop=68319232, step=1)

对于一个2G大小,千万级的数据,这个读取速度还是挺快的。之前使用matlab load用时160多s,但是不知道这个是否把数据完全读取了。然后只抽取脉冲信号,将负值归0,因为会出现一定的电子噪声从而产生一定负值。

然后就需要定位脉冲信号中的能峰了,也就是findpeaks

这里用到了scipy.signal中的find_peaks_cwt,具体用法可以参见官方文档

peaks = signal.find_peaks_cwt(pluse, np.arange(1, 10)),它返回找到的peaks的位置,输入第一个为数据,第二个为窗函数,也就是在这个宽度的能窗内寻找峰,我是这样理解的。刚开始以为是数据的另一维坐标,结果找了半天没结果。不过事实上这个找的确定也挺慢的。

50w条的数据,找了足足7分钟,我这一个数据3000w条不得找半个多小时,而各种数据有好几十,恩。。这样是不行的,于是想到了并行的方法。这个下篇文章会讲到,也就是把数据按照chunksize读取,然后同时交给(map)几个进程同时寻峰,寻完后返回(reduce)一起计数,计数的同时,子进程再此寻峰。

在处理的时候碰到我自己的破 笔记本由于内存原因不能load这个数据,并且想着每次copy这么大数据好麻烦,就把一个整体数据文件分割成了几个部分,先对方法进行一定的实验,时间快,比较方便。

import pandas as pd

def split_file(filename, size):
 name = filename.split('.')[0]
 data = pd.read_table(filename, chunksize=size, names=['time', 'intension'])
 i = 1
 for piece in data:
 outname = name + str(i) + '.csv'
 piece.to_csv(outname, index=False, names = ['time', 'intension'])
 i += 1

def split_csvfile(filename, size):
 name = filename.split('.')[0]
 data = pd.read_csv(filename, chunksize=size, names=['time', 'intension'])
 i = 1
 for piece in data:
 outname = name + str(i) + '.csv'
 piece = piece['intension']
 piece.to_csv(outname, index=False)
 i += 1

额..使用并行寻峰通过map/reduce的思想来解决提升效率这个想法,很早就实现了,但是,由于效果不是特别理想,所以放那也就忘了,今天整理代码来看了下当时记的些笔记,然后竟然发现有个评论…..我唯一收到的评论竟然是“催稿”=。=。想一想还是把下面的工作记录下来,免得自己后来完全忘记了。

rom scipy import signal
import os
import time
import pandas as pd
import numpy as np
from multiprocessing import Pool
import matplotlib.pylab as plt
from functools import partial

def findpeak(pluse):
 pluse[pluse < 0.05] = 0
 print('Sub process %s.' % os.getpid())
 start = time.time()
 peaks = signal.find_peaks_cwt(pluse, np.arange(1, 10)) # 返回一个列表
 end = time.time()
 print("The time is %f s" % (end - start))
 pks = [pluse[x] for x in peaks]
 return pks

def histcnt(pks, edge=None, channel=None):
 cnt = plt.hist(pks, edge)
 res = pd.DataFrame(cnt[0], index=channel, columns=['cnt'])
 return res

if __name__ == '__main__':
 with Pool(processes=8) as p:
 start = time.time()
 print('Parent process %s.' % os.getpid())
 pluse = pd.read_csv('data/samples.csv', chunksize=50000, names=['time', 'energe'])
 channel = pd.read_csv('data/channels.txt', names=['value'])
 edges = channel * 2
 edges = pd.DataFrame({'value': [0]}).append(edges, ignore_index=True)
 specal = []
 for data in pluse:
 total = p.apply_async(findpeak, (data['energe'],),
   callback=partial(histcnt, edge=edges['value'], channel=channel['value']))
 specal.append(total)
 print('Waiting for all subprocesses done...')
 p.close()
 p.join()
 print('All subprocesses done.')
 spec = sum(specal)
 plt.figure()
 plt.plot(spec['cnt'])
 spec.to_csv('data/spec1.csv', header=False)
 print('every is OK')
 end = time.time()
 print("The time is %f s" % (end - start))

由于对对进程线程的编程不是很了解,其中走了很多弯路,尝试了很多方法也,这个是最终效果相对较好的。

首先,通过 pd.readtable以chunksize=50000分块读取,edges为hist过程中的下统计box。

然后,apply_async为非阻塞调用findpeak,然后将结果返回给回调函数histcnt,但是由于回调函数除了进程返回结果还有额外的参数,因此使用partial,对特定的参数赋予固定的值(edge和channel)并返回了一个全新的可调用对象,这个新的可调用对象仍然需要通过制定那些未被赋值的参数(findpeak返回的值)来调用。这个新的课调用对象将传递给partial()的固定参数结合起来,同一将所有参数传递给原始函数(histcnt)。(至于为啥不在histcnt中确定那两个参数,主要是为了避免一直打开文件。。当然,有更好的办法只是懒得思考=。=),还有个原因就是,apply_async返回的是一个对象,需要通过该对象的get方法才能获取值。。

对于 apply_async官方上是这样解释的

Apply_async((func[, args[, kwds[, callback[, error_callback]]]])),apply()方法的一个变体,返回一个结果对象

如果指定回调,那么它应该是一个可调用的接受一个参数。结果准备好回调时,除非调用失败,在这种情况下,应用error_callback代替。

如果error_callback被指定,那么它应该是一个可调用的接受一个参数。如果目标函数失败,那么error_callback叫做除了实例。

回调应立即完成以来,否则线程处理结果将被封锁。

不使用回调函数的版本如下,即先将所有子进程得到的数据都存入peaks列表中,然后所有进程完毕后在进行统计计数。

import pandas as pd
import time
import scipy.signal as signal
import numpy as np
from multiprocessing import Pool
import os
import matplotlib.pyplot as plt

def findpeak(pluse):
 pluse[pluse < 0] = 0
 pluse[pluse > 100] = 0
 print('Sub process %s.' % os.getpid())
 start = time.time()
 peaks = signal.find_peaks_cwt(pluse, np.arange(1, 10))
 end = time.time()
 print("The time is %f s" % (end - start))
 res = [pluse[x] for x in peaks]
 return res

if __name__ == '__main__':
 with Pool(processes=8) as p:
 start = time.time()
 print('Parent process %s.' % os.getpid())
 pluse = pd.read_csv('data/sample.csv', chunksize=200000, names=['time', 'energe'])
 pks = []
 for data in pluse:
 pks.append(p.apply_async(findpeak, (data['energe'],)))
 print('Waiting for all subprocesses done...')
 p.close()
 p.join()
 print('All subprocesses done.')
 peaks = []
 for i, ele in enumerate(pks):
 peaks.extend(ele.get())
 peaks = pd.DataFrame(peaks, columns=['energe'])
 peaks.to_csv('peaks.csv', index=False, header=False, chunksize=50000)
 channel = pd.read_csv('data/channels.txt', names=['value'])
 channel *= 2
 channel = pd.DataFrame({'value': [0]}).append(channel, ignore_index=True)
 plt.figure()
 spec = plt.hist(peaks['energe'], channel['value'])
 # out.plot.hist(bins=1024)
 # print(out)
 # cnt = peaks.value_counts(bins=1024)
 # cnt.to_csv('data/cnt.csv', index=False, header=False)
 print('every is OK')
 end = time.time()
 print("The time is %f s" % (end - start))

以上这篇利用pandas进行大文件计数处理的方法就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持我们。

(0)

相关推荐

  • 解决Python pandas df 写入excel 出现的问题

    学习Python数据分析挖掘实战一书时,在数据预处理阶段,有一节要使用拉格朗日插值法对缺失值补充,代码如下: #-*- coding:utf-8 -*- import pandas as pd import matplotlib.pyplot as plt from scipy.interpolate import lagrange#导入拉格朗日插值函数 inputfile="catering_sale.xls" outputfile="H:\python\file\pyth

  • pandas每次多Sheet写入文件的方法

    pandas每次多Sheet写入文件,只能一次性存入,不然每次会重写文件,最后只保留最后一次的写入. # !usr/bin env python # -*- coding: utf-8 -*- import pandas as pd price_path = 'ASHAREEODPRICE.csv' df_price = pd.read_csv(price_path) for i in xrange(4): sh = 'Sheet{}'.format(i+1) file_path = 'qimo

  • 用pandas按列合并两个文件的实例

    直接上图,图文并茂,相信你很快就知道要干什么. A文件: B文件: 可以发现,A文件中"汉字井号"这一列和B文件中"WELL"这一列的属性相同,以这一列为主键,把B文件中"TIME"这一列数据添加到A文件中,如果B文件缺少某些行,则空着,最后A文件的行数不变,效果如下: 代码如下: # -*- coding: utf-8 -*- """ Created on Wed Nov 29 16:02:05 2017 @aut

  • Python之pandas读写文件乱码的解决方法

    python读写文件有时候会出现   'XXX'编码不能打开XXX什么的,用记事本打开要读取的文件,另存为UTF-8编码,然后再用py去读应该可以了.如果还不行,那么尝试使用文件原有的编码方式读取,参考之前的文章 在pandas中读写csv时候通过制定encoding可以有效防止excel打开或者写入中文乱码 data.to_csv(f_out,index=False,encoding='gb2312') 以上这篇Python之pandas读写文件乱码的解决方法就是小编分享给大家的全部内容了,希

  • 利用pandas进行大文件计数处理的方法

    Pandas读取大文件 要处理的是由探测器读出的脉冲信号,一组数据为两列,一列为时间,一列为脉冲能量,数据量在千万级,为了有一个直接的认识,先使用Pandas读取一些 import pandas as pd data = pd.read_table('filename.txt', iterator=True) chunk = data.get_chunk(5) 而输出是这样的: Out[4]: 332.977889999979 -0.0164794921875 0 332.97790 -0.02

  • 利用numpy和pandas处理csv文件中的时间方法

    环境:numpy,pandas,python3 在机器学习和深度学习的过程中,对于处理预测,回归问题,有时候变量是时间,需要进行合适的转换处理后才能进行学习分析,关于时间的变量如下所示,利用pandas和numpy对csv文件中时间进行处理. date (UTC) Price 01/01/2015 0:00 48.1 01/01/2015 1:00 47.33 01/01/2015 2:00 42.27 #coding:utf-8 import datetime import pandas as

  • python 利用pandas将arff文件转csv文件的方法

    直接贴代码啦: #coding=utf-8 import pandas as pd def arff_to_csv(fpath): #读取arff数据 if fpath.find('.arff') <0: print('the file is nott .arff file') return f = open(fpath) lines = f.readlines() content = [] for l in lines: content.append(l) datas = [] for c i

  • python利用pandas将excel文件转换为txt文件的方法

    python将数据换为txt的方法有很多,可以用xlrd库实现.本人比较懒,不想按太多用的少的插件,利用已有库pandas将excel文件转换为txt文件. 直接上代码: ''' function:将excel文件转换为text author:Nstock date:2018/3/1 ''' import pandas as pd import re import codecs #将excel转化为txt文件 def exceltotxt(excel_dir, txt_dir): with co

  • 通过Pandas读取大文件的实例

    当数据文件过大时,由于计算机内存有限,需要对大文件进行分块读取: import pandas as pd f = open('E:/学习相关/Python/数据样例/用户侧数据/test数据.csv') reader = pd.read_csv(f, sep=',', iterator=True) loop = True chunkSize = 100000 chunks = [] while loop: try: chunk = reader.get_chunk(chunkSize) chun

  • 使用pandas读取csv文件的指定列方法

    根据教程实现了读取csv文件前面的几行数据,一下就想到了是不是可以实现前面几列的数据.经过多番尝试总算试出来了一种方法. 之所以想实现读取前面的几列是因为我手头的一个csv文件恰好有后面几列没有可用数据,但是却一直存在着.原来的数据如下: GreydeMac-mini:chapter06 greyzhang$ cat data.csv 1,name_01,coment_01,,,, 2,name_02,coment_02,,,, 3,name_03,coment_03,,,, 4,name_04

  • Python实现模拟分割大文件及多线程处理的方法

    本文实例讲述了Python实现模拟分割大文件及多线程处理的方法.分享给大家供大家参考,具体如下: #!/usr/bin/env python #--*-- coding:utf-8 --*-- from random import randint from time import ctime from time import sleep import queue import threading class MyTask(object): """具体的任务类"&qu

  • php readfile下载大文件失败的解决方法

    本文实例讲述了php readfile下载大文件失败的解决方法.分享给大家供大家参考,具体如下: 大文件有200多M,只下载了200K就提示下载完成,且不报错. 原因是PHP内存有限制,需要改为按块下载,就是把大文件切块后逐块下载. if (file_exists($file)) { if (FALSE!== ($handler = fopen($file, 'r'))) { header('Content-Description: File Transfer'); header('Conten

  • Python使用Pandas对csv文件进行数据处理的方法

    今天接到一个新的任务,要对一个140多M的csv文件进行数据处理,总共有170多万行,尝试了导入本地的MySQL数据库进行查询,结果用Navicat导入直接卡死....估计是XAMPP套装里面全默认配置的MySQL性能不给力,又尝试用R搞一下吧结果发现光加载csv文件就要3分钟左右的时间,相当不给力啊,翻了翻万能的知乎发现了Python下的一个神器包:Pandas(熊猫们?),加载这个140多M的csv文件两秒钟就搞定,后面的分类汇总等操作也都是秒开,太牛逼了!记录一下这次数据处理的过程: 使用

  • 在python中利用GDAL对tif文件进行读写的方法

    利用GDAL库对tif影像进行读取 示例代码默认波段为[B.G.R.NIR的顺序,且为四个波段] import gdal def readTif(fileName): dataset = gdal.Open(fileName) if dataset == None: print(fileName+"文件无法打开") return im_width = dataset.RasterXSize #栅格矩阵的列数 im_height = dataset.RasterYSize #栅格矩阵的行

随机推荐