Python利用多进程将大量数据放入有限内存的教程

简介

这是一篇有关如何将大量的数据放入有限的内存中的简略教程。

与客户工作时,有时会发现他们的数据库实际上只是一个csv或Excel文件仓库,你只能将就着用,经常需要在不更新他们的数据仓库的情况下完成工作。大部分情况下,如果将这些文件存储在一个简单的数据库框架中或许更好,但时间可能不允许。这种方法对时间、机器硬件和所处环境都有要求。

下面介绍一个很好的例子:假设有一堆表格(没有使用Neo4j、MongoDB或其他类型的数据库,仅仅使用csvs、tsvs等格式存储的表格),如果将所有表格组合在一起,得到的数据帧太大,无法放入内存。所以第一个想法是:将其拆分成不同的部分,逐个存储。这个方案看起来不错,但处理起来很慢。除非我们使用多核处理器。
目标

这里的目标是从所有职位中(大约1万个),找出相关的的职位。将这些职位与政府给的职位代码组合起来。接着将组合的结果与对应的州(行政单位)信息组合起来。然后用通过word2vec生成的属性信息在我们的客户的管道中增强已有的属性。

这个任务要求在短时间内完成,谁也不愿意等待。想象一下,这就像在不使用标准的关系型数据库的情况下进行多个表的连接。
数据

示例脚本

下面的是一个示例脚本,展示了如何使用multiprocessing来在有限的内存空间中加速操作过程。脚本的第一部分是和特定任务相关的,可以自由跳过。请着重关注第二部分,这里侧重的是multiprocessing引擎。

#import the necessary packages
import pandas as pd
import us
import numpy as np
from multiprocessing import Pool,cpu_count,Queue,Manager

# the data in one particular column was number in the form that horrible excel version
# of a number where '12000' is '12,000' with that beautiful useless comma in there.
# did I mention I excel bothers me?
# instead of converting the number right away, we only convert them when we need to
def median_maker(column):
  return np.median([int(x.replace(',','')) for x in column])

# dictionary_of_dataframes contains a dataframe with information for each title; e.g title is 'Data Scientist'
# related_title_score_df is the dataframe of information for the title; columns = ['title','score']
### where title is a similar_title and score is how closely the two are related, e.g. 'Data Analyst', 0.871
# code_title_df contains columns ['code','title']
# oes_data_df is a HUGE dataframe with all of the Bureau of Labor Statistics(BLS) data for a given time period (YAY FREE DATA, BOO BAD CENSUS DATA!)

def job_title_location_matcher(title,location):
  try:
    related_title_score_df = dictionary_of_dataframes[title]
    # we limit dataframe1 to only those related_titles that are above
    # a previously established threshold
    related_title_score_df = related_title_score_df[title_score_df['score']>80]

    #we merge the related titles with another table and its codes
    codes_relTitles_scores = pd.merge(code_title_df,related_title_score_df)
    codes_relTitles_scores = codes_relTitles_scores.drop_duplicates()

    # merge the two dataframes by the codes
    merged_df = pd.merge(codes_relTitles_scores, oes_data_df)
    #limit the BLS data to the state we want
    all_merged = merged_df[merged_df['area_title']==str(us.states.lookup(location).name)]

    #calculate some summary statistics for the time we want
    group_med_emp,group_mean,group_pct10,group_pct25,group_median,group_pct75,group_pct90 = all_merged[['tot_emp','a_mean','a_pct10','a_pct25','a_median','a_pct75','a_pct90']].apply(median_maker)
    row = [title,location,group_med_emp,group_mean,group_pct10,group_pct25, group_median, group_pct75, group_pct90]
    #convert it all to strings so we can combine them all when writing to file
    row_string = [str(x) for x in row]
    return row_string
  except:
    # if it doesnt work for a particular title/state just throw it out, there are enough to make this insignificant
    'do nothing'

这里发生了神奇的事情:

#runs the function and puts the answers in the queue
def worker(row, q):
    ans = job_title_location_matcher(row[0],row[1])
    q.put(ans)

# this writes to the file while there are still things that could be in the queue
# this allows for multiple processes to write to the same file without blocking eachother
def listener(q):
  f = open(filename,'wb')
  while 1:
    m = q.get()
    if m =='kill':
        break
    f.write(','.join(m) + 'n')
    f.flush()
  f.close()

def main():
  #load all your data, then throw out all unnecessary tables/columns
  filename = 'skill_TEST_POOL.txt'

  #sets up the necessary multiprocessing tasks
  manager = Manager()
  q = manager.Queue()
  pool = Pool(cpu_count() + 2)
  watcher = pool.map_async(listener,(q,))

  jobs = []
  #titles_states is a dataframe of millions of job titles and states they were found in
  for i in titles_states.iloc:
    job = pool.map_async(worker, (i, q))
    jobs.append(job)

  for job in jobs:
    job.get()
  q.put('kill')
  pool.close()
  pool.join()

if __name__ == "__main__":
  main()

由于每个数据帧的大小都不同(总共约有100Gb),所以将所有数据都放入内存是不可能的。通过将最终的数据帧逐行写入内存,但从来不在内存中存储完整的数据帧。我们可以完成所有的计算和组合任务。这里的“标准方法”是,我们可以仅仅在“job_title_location_matcher”的末尾编写一个“write_line”方法,但这样每次只会处理一个实例。根据我们需要处理的职位/州的数量,这大概需要2天的时间。而通过multiprocessing,只需2个小时。

虽然读者可能接触不到本教程处理的任务环境,但通过multiprocessing,可以突破许多计算机硬件的限制。本例的工作环境是c3.8xl ubuntu ec2,硬件为32核60Gb内存(虽然这个内存很大,但还是无法一次性放入所有数据)。这里的关键之处是我们在60Gb的内存的机器上有效的处理了约100Gb的数据,同时速度提升了约25倍。通过multiprocessing在多核机器上自动处理大规模的进程,可以有效提高机器的利用率。也许有些读者已经知道了这个方法,但对于其他人,可以通过multiprocessing能带来非常大的收益。顺便说一句,这部分是skill assets in the job-market这篇博文的延续。

(0)

相关推荐

  • 用Python中的__slots__缓存资源以节省内存开销的方法

    我们曾经提到,Oyster.com的Python web服务器怎样利用一个巨大的Python dicts(hash table),缓存大量的静态资源.我们最近在Image类中,用仅仅一行__slots__代码,让每个6G内存占用的服务进程(共4个),省出超过2G来. 这是其中一个服务器在部署代码前后的截图: 我们alloc了大约一百万个类似如下class的实例:   class Image(object):     def __init__(self, id, caption, url):   

  • Python StringIO模块实现在内存缓冲区中读写数据

    模块是用类编写的,只有一个StringIO类,所以它的可用方法都在类中. 此类中的大部分函数都与对文件的操作方法类似. 例: 复制代码 代码如下: #coding=gbk   import StringIO, cStringIO, sys   s = StringIO.StringIO("JGood is a handsome boy") s.write("JGood is a handsome boy \r\n") s.write('okkkk中国') s.see

  • 10种检测Python程序运行时间、CPU和内存占用的方法

    在运行复杂的Python程序时,执行时间会很长,这时也许想提高程序的执行效率.但该怎么做呢? 首先,要有个工具能够检测代码中的瓶颈,例如,找到哪一部分执行时间比较长.接着,就针对这一部分进行优化. 同时,还需要控制内存和CPU的使用,这样可以在另一方面优化代码. 因此,在这篇文章中我将介绍7个不同的Python工具,来检查代码中函数的执行时间以及内存和CPU的使用. 1. 使用装饰器来衡量函数执行时间 有一个简单方法,那就是定义一个装饰器来测量函数的执行时间,并输出结果: import time

  • python内存管理分析

    本文较为详细的分析了python内存管理机制.分享给大家供大家参考.具体分析如下: 内存管理,对于Python这样的动态语言,是至关重要的一部分,它在很大程度上甚至决定了Python的执行效率,因为在Python的运行中,会创建和销毁大量的对象,这些都涉及到内存的管理. 小块空间的内存池 在Python中,许多时候申请的内存都是小块的内存,这些小块内存在申请后,很快又会被释放,由于这些内存的申请并不是为了创建对象,所以并没有对象一级的内存池机制. Python内存池全景 这就意味着Python在

  • Python深入学习之内存管理

    语言的内存管理是语言设计的一个重要方面.它是决定语言性能的重要因素.无论是C语言的手工管理,还是Java的垃圾回收,都成为语言最重要的特征.这里以Python语言为例子,说明一门动态类型的.面向对象的语言的内存管理方式.  对象的内存使用 赋值语句是语言最常见的功能了.但即使是最简单的赋值语句,也可以很有内涵.Python的赋值语句就很值得研究. a = 1 整数1为一个对象.而a是一个引用.利用赋值语句,引用a指向对象1.Python是动态类型的语言(参考动态类型),对象与引用分离.Pytho

  • Python使用稀疏矩阵节省内存实例

    推荐系统中经常需要处理类似user_id, item_id, rating这样的数据,其实就是数学里面的稀疏矩阵,scipy中提供了sparse模块来解决这个问题,但scipy.sparse有很多问题不太合用: 1.不能很好的同时支持data[i, ...].data[..., j].data[i, j]快速切片: 2.由于数据保存在内存中,不能很好的支持海量数据处理. 要支持data[i, ...].data[..., j]的快速切片,需要i或者j的数据集中存储:同时,为了保存海量的数据,也需

  • 有关wxpython pyqt内存占用问题分析

    一直觉得wxpython占用内存比较多,在工作中写的一些小程序应用,一对比其它的小程序,发现内存相差确实有点大. 测试了下QT框架 复制代码 代码如下: import sys,timefrom PyQt4 import QtCore, QtGui#import wxif __name__ == "__main__":while True:time.sleep(1) 只载入了框架,内存占用就有明显差别.载入wx的时候一般在20M左右,我写的几个应用也差不多是这么多,所以占用内存多的主要是

  • Python利用多进程将大量数据放入有限内存的教程

    简介 这是一篇有关如何将大量的数据放入有限的内存中的简略教程. 与客户工作时,有时会发现他们的数据库实际上只是一个csv或Excel文件仓库,你只能将就着用,经常需要在不更新他们的数据仓库的情况下完成工作.大部分情况下,如果将这些文件存储在一个简单的数据库框架中或许更好,但时间可能不允许.这种方法对时间.机器硬件和所处环境都有要求. 下面介绍一个很好的例子:假设有一堆表格(没有使用Neo4j.MongoDB或其他类型的数据库,仅仅使用csvs.tsvs等格式存储的表格),如果将所有表格组合在一起

  • python利用requests库模拟post请求时json的使用教程

    我们都见识过requests库在静态网页的爬取上展现的威力,我们日常见得最多的为get和post请求,他们最大的区别在于安全性上: 1.GET是通过URL方式请求,可以直接看到,明文传输. 2.POST是通过请求header请求,可以开发者工具或者抓包可以看到,同样也是明文的. 3.GET请求会保存在浏览器历史纪录中,还可能会保存在Web的日志中. 两者用法上也有显著差异(援引自知乎): 1.GET用于从服务器端获取数据,包括静态资源(HTML|JS|CSS|Image等等).动态数据展示(列表

  • Python实现 多进程导入CSV数据到 MySQL

    前段时间帮同事处理了一个把 CSV 数据导入到 MySQL 的需求.两个很大的 CSV 文件, 分别有 3GB.2100 万条记录和 7GB.3500 万条记录.对于这个量级的数据,用简单的单进程/单线程导入 会耗时很久,最终用了多进程的方式来实现.具体过程不赘述,记录一下几个要点: 批量插入而不是逐条插入 为了加快插入速度,先不要建索引 生产者和消费者模型,主进程读文件,多个 worker 进程执行插入 注意控制 worker 的数量,避免对 MySQL 造成太大的压力 注意处理脏数据导致的异

  • Python利用pandas处理Excel数据的应用详解

    最近迷上了高效处理数据的pandas,其实这个是用来做数据分析的,如果你是做大数据分析和测试的,那么这个是非常的有用的!!但是其实我们平时在做自动化测试的时候,如果涉及到数据的读取和存储,那么而利用pandas就会非常高效,基本上3行代码可以搞定你20行代码的操作!该教程仅仅限于结合柠檬班的全栈自动化测试课程来讲解下pandas在项目中的应用,这仅仅只是冰山一角,希望大家可以踊跃的去尝试和探索! 一.安装环境: 1:pandas依赖处理Excel的xlrd模块,所以我们需要提前安装这个,安装命令

  • Ajax获取到数据放入echarts里不显示的原因分析及解决办法

    在做一个需要用到echarts地图的项目的时候,成功通过ajax获取到了后台提供的数据,并生成了想要的JSON串.但是,放到echarts option.series[0].data里,获取不到数据.在生成的地图上无法看到你从后台获取到的值.翻遍百度和必应,给出的答案五花八门,仍旧未解决问题,最后还是一个同事大牛给解决的,在此分享给大家.希望对大家有帮助,,,, 废话不多说,直接上码: $(function () { var data = []; function setOption(data)

  • python 把文件中的每一行以数组的元素放入数组中的方法

    有时候需要把文件中的数据放入到数组中,这里提供了一种方法,可以根据文件结尾的标记进行数据拆分,然后再把拆分的文件放入数组中 # -*-coding: utf-8 -*- f = open("username.txt","w") f.write("Lycoridiata\n") f.write("wulei\n") f.write("leilei\n") f.write("Xingyu\n"

  • python pandas中DataFrame类型数据操作函数的方法

    python数据分析工具pandas中DataFrame和Series作为主要的数据结构. 本文主要是介绍如何对DataFrame数据进行操作并结合一个实例测试操作函数. 1)查看DataFrame数据及属性 df_obj = DataFrame() #创建DataFrame对象 df_obj.dtypes #查看各行的数据格式 df_obj['列名'].astype(int)#转换某列的数据类型 df_obj.head() #查看前几行的数据,默认前5行 df_obj.tail() #查看后几

  • 利用springmvc处理模型数据

    springmvc处理模型数据 很多情况下页面上需要很多数据,单单返回页面是不行的,那么springmvc如何将数据返回到该页面呢 springmvc提供了四种方式来输出模型数据 ModelAndView: 处理返回值为ModelAndView时,可以将该对象中添加数据模型 Map及Model:入参为Model.ModelMap或Map时,处理方法返回时,Map中的数据会自动添加到模型中 @SessionAttributes: 将模型中的某个属性暂存到HttpSession中,以便多个请求之间共

  • Python向MySQL批量插数据的实例讲解

    背景:最近测试web项目需要多条测试数据,sql中嫌要写多条,就看了看python如何向MySQL批量插数据(pymysql库) 1.向MySQL批量插数据 import pymysql #import datetime #day = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')#参数值插入时间 db = pymysql.connect(host='服务器IP', user='账号', passwd='密码', port=端口号) c

  • python读取csv文件并把文件放入一个list中的实例讲解

    如下所示: #coding=utf8 ''' 读取CSV文件,把csv文件放在一份list中. ''' import csv class readCSV(object): def __init__(self,path="Demo.csv"): #创建一个属性用来保存要操作CSV的文件 self.path=path try: #打开一个csv文件,并赋予读的权限 self.csvHand=open(self.path,"r") #调用csv的reader函数读取csv

随机推荐