Python读取文件的四种方式的实例详解

目录
  • 学生数量特别少的情况
  • 停车场空间不够时怎么办?
  • 怎么加快执行效率?
  • 怎么加快处理速度?
  • 结语

故事背景:最近在处理Wikipedia的数据时发现由于数据量过大,之前的文件读取和数据处理方法几乎不可用,或耗时非常久。今天学校安排统一核酸检查,刚好和文件读取的过程非常相似。正好借此机会和大家一起从头梳理一下几种文件读取方法。

故事设定:现在学校要求对所有同学进行核酸采集,每位同学先在宿舍内等候防护人员(以下简称“大白”)叫号,叫到自己时去停车场排队等候大白对自己进行采集,采集完之后的样本由大白统一有序收集并储存。

名词解释:

  • 学生:所有的学生是一个大文件,每个学生是其中的一行数据
  • 宿舍:硬盘
  • 停车场:内存
  • 核酸采集:数据处理
  • 样本:处理后的数据
  • 大白:程序

学生数量特别少的情况

当学生数量特别少时,可以考虑将所有学生统一叫到停车场等候,再依次进行核酸采集。

方法一:简单情况

此时的程序可以模拟为:

import time
from typing import List

def pick_all_students(dorm: str) -> List[str]:
    with open(dorm, "rt", encoding="utf8") as fin:
        students = fin.readlines()
        return students

def pick_sample(student: str) -> str:
    time.sleep(0.01)
    sample = f"{student.strip()}'s sample"
    return sample

def process(dorm: str, sample_storeroom: str) -> None:
    with open(sample_storeroom, "wt", encoding="utf8") as fout:
        students = pick_all_students(dorm)
        for student in students:
            sample = pick_sample(student)
            fout.write(f"{sample}\n")
            fout.flush()

if __name__ == "__main__":
    process(
        "student_names.txt",
        "sample_storeroom.txt"
    )

注意,在第19行中,大白一次性把所有同学都叫到了停车场中。这种做法在学生比较少时做起来很快,但是如果学生特别多,停车场装不下怎么办?

停车场空间不够时怎么办?

方法二:边读边处理

一般来说,由于停车场空间有限,我们不会采用一次性把所有学生都叫到停车场中,而是会一个一个地处理,这样可以节约内存空间。

import time
from typing import Iterator

def pick_one_student(dorm: str) -> Iterator[str]:
    with open(dorm, "rt", encoding="utf8") as fin:
        for student in fin:
            yield student

def pick_sample(student: str) -> str:
    time.sleep(0.01)
    sample = f"{student.strip()}'s sample"
    return sample

def process(dorm: str, sample_storeroom: str) -> None:
    with open(sample_storeroom, "wt", encoding="utf8") as fout:
        for student in pick_one_student(dorm):
            sample = pick_sample(student)
            fout.write(f"{sample}\n")
            fout.flush()

if __name__ == "__main__":
    process(
        "student_names.txt",
        "sample_storeroom.txt"
    )

这里pick_one_student函数中的返回值是用yield返回的,一次只会返回一名同学。

不过,这种做法虽然确保了停车场不会满员,但是这种做法在人数特别多的时候就不再适合了。虽然可以保证完成任务,但由于每次只能采集一个同学,程序的执行并不高。特别是当你的CPU有多个核时,会浪费机器性能,出现一核有难,其它围观的现象。

怎么加快执行效率?

大家可能也已经注意到了,刚刚我们的场景中,不论采用哪种方法,都只有一名大白在工作。那我们能不能加派人手,从而提高效率呢?

答案当然是可行的。我们现在先考虑增加两名大白,使得一名大白专注于叫号,安排学生进入停车场,另外一名大白专注于采集核酸,最后一名大白用于存储核酸样本。

方法三

import time
from multiprocessing import Queue, Process
from typing import Iterator

def pick_student(stu_queue: Queue, dorm: str) -> Iterator[str]:
    print("pick_student: started")

    picked_num = 0
    with open(dorm, "rt", encoding="utf8") as fin:
        for student in fin:
            stu_queue.put(student)
            picked_num += 1
            if picked_num % 500 == 0:
                print(f"pick_student: {picked_num}")

    # end signal
    stu_queue.put(None)
    print("pick_student: finished")

def pick_sample(student: str) -> str:
    time.sleep(0.01)
    sample = f"{student.strip()}'s sample"
    return sample

def process(stu_queue: Queue, store_queue: Queue) -> None:
    print("process: started")

    process_num = 0
    while True:
        student = stu_queue.get()
        if student is not None:
            sample = pick_sample(student)
            store_queue.put(sample)
            process_num += 1
            if process_num % 500 == 0:
                print(f"process: {process_num}")
        else:
            break

    # end signal
    store_queue.put(None)
    print("process: finished")

def store_sample(store_queue: Queue, sample_storeroom: str) -> None:
    print("store_sample: started")

    store_num = 0
    with open(sample_storeroom, "wt", encoding="utf8") as fout:
        while True:
            sample = store_queue.get()
            if sample is not None:
                fout.write(f"{sample}\n")
                fout.flush()

                store_num += 1
                if store_num % 500 == 0:
                    print(f"store_sample: {store_num}")
            else:
                break

    print("store_sample: finished")

if __name__ == "__main__":
    dorm = "student_names.txt"
    sample_storeroom = "sample_storeroom.txt"

    stu_queue = Queue()
    store_queue = Queue()

    store_p = Process(target=store_sample, args=(store_queue, sample_storeroom), daemon=True)
    store_p.start()
    process_p = Process(target=process, args=(stu_queue, store_queue), daemon=True)
    process_p.start()
    read_p = Process(target=pick_student, args=(stu_queue, dorm), daemon=True)
    read_p.start()

    store_p.join()

这份代码中,我们引入了多进程的思路,将每个大白看作一个进程,并使用了队列Queue作为进程间通信的媒介。stu_queue表示学生叫号进停车场的队列,store_queue表示已经采集过的待存储核酸样本的队列。

此外,为了控制进程的停止,我们在pick_student和 process函数的最后都向各自队列中添加了None作为结束标志符。

假设有1w名学生(student_names.txt文件有1w行),经过测试后发现上述方法的时间如下:

  • 方法一:1m40.716s
  • 方法二:1m40.717s
  • 方法三:1m41.097s

咦?不是做了分工吗?怎么速度还变慢了?经笔者观察,这是因为叫号的大白速度太快了(文件读取速度快)通常是TA已经齐活了,另外俩人还在吭哧吭哧干活呢,体现不出来分工的优势。如果这个时候我们对法二和法三的叫号做延时操作,每个学生叫号之后停滞10ms再叫下一位学生,则方法三的处理时间几乎不变,而方法二的时间则会延长至3m21.345s。

怎么加快处理速度?

上面提到,大白采核酸的时间较长,往往上一个人的核酸还没采完,下一个人就已经在后面等着了。我们能不能提高核酸采集这个动作(数据处理)的速度呢?其实一名大白执行一次核酸采集的时间我们几乎无法再缩短了,但是我们可以通过增加人手的方式,来达到这个目的。就像去银行办业务,如果开放的窗口越多,那么每个人等待的时间就会越短。这里我们也采取类似的策略,增加核酸采集的窗口。

import time
from multiprocessing import Queue, Process, cpu_count
from typing import Iterator

def pick_student(stu_queue: Queue, dorm: str, num_workers: int) -> Iterator[str]:
    print("pick_student: started")

    picked_num = 0
    with open(dorm, "rt", encoding="utf8") as fin:
        for student in fin:
            stu_queue.put(student)
            picked_num += 1
            if picked_num % 500 == 0:
                print(f"pick_student: {picked_num}")

    # end signal
    for _ in range(num_workers):
        stu_queue.put(None)

    print("pick_student: finished")

def pick_sample(student: str) -> str:
    time.sleep(0.01)
    sample = f"{student.strip()}'s sample"
    return sample

def process(stu_queue: Queue, store_queue: Queue) -> None:
    print("process: started")

    process_num = 0
    while True:
        student = stu_queue.get()
        if student is not None:
            sample = pick_sample(student)
            store_queue.put(sample)
            process_num += 1
            if process_num % 500 == 0:
                print(f"process: {process_num}")
        else:
            break

    print("process: finished")

def store_sample(store_queue: Queue, sample_storeroom: str) -> None:
    print("store_sample: started")

    store_num = 0
    with open(sample_storeroom, "wt", encoding="utf8") as fout:
        while True:
            sample = store_queue.get()
            if sample is not None:
                fout.write(f"{sample}\n")
                fout.flush()

                store_num += 1
                if store_num % 500 == 0:
                    print(f"store_sample: {store_num}")
            else:
                break

    print("store_sample: finished")

if __name__ == "__main__":
    dorm = "student_names.txt"
    sample_storeroom = "sample_storeroom.txt"
    num_process = max(1, cpu_count() - 1)

    maxsize = 10 * num_process
    stu_queue = Queue(maxsize=maxsize)
    store_queue = Queue(maxsize=maxsize)

    store_p = Process(target=store_sample, args=(store_queue, sample_storeroom), daemon=True)
    store_p.start()
    process_workers = []
    for _ in range(num_process):
        process_p = Process(target=process, args=(stu_queue, store_queue), daemon=True)
        process_p.start()
        process_workers.append(process_p)
    read_p = Process(target=pick_student, args=(stu_queue, dorm, num_process), daemon=True)
    read_p.start()

    for worker in process_workers:
        worker.join()

    # end signal
    store_queue.put(None)
    store_p.join()

总耗时 0m4.160s !我们来具体看看其中的细节部分:

首先我们将CPU核数 - 3作为采核酸的大白数量。这里减3是为其它工作进程保留了一些资源,你也可以根据自己的具体情况做调整

这次我们在 Queue中增加了 maxsize参数,这个参数是限制队列的最大长度,这个参数通常与你的实际内存情况有关。如果数据特别多时要考虑做些调整。这里我采用10倍的工作进程数目作为队列的长度

注意这里pick_student函数中要为每个后续的工作进程都添加一个结束标志,因此最后会有个for循环

我们把之前放在process函数中的结束标志提取出来,放在了最外侧,使得所有工作进程均结束之后再关闭最后的store_p进程

结语

总结来说,如果你的数据集特别小,用法一;通常情况下用法二;数据集特别大时用法四。

以上就是Python读取文件的四种方式的实例详解的详细内容,更多关于Python读取文件的资料请关注我们其它相关文章!

(0)

相关推荐

  • Python学习之文件的读取详解

    目录 文件读取的模式 文件对象的读取方法 使用 read() 函数一次性读取文件全部内容 使用 readlines() 函数 读取文件内容 使用 readline() 函数 逐行读取文件内容 mode().name().closed() 函数演示 文件读取小实战 with open() 函数 利用with open() 函数读取文件的小实战 上一章节 我们学习了如何利用 open() 函数创建一个文件,以及如何在文件内写入内容:今天我们就来了解一下如何将文件中的内容读取出去来的方法. 文件读取的

  • python数据分析之文件读取详解

    目录 前言: 一·Numpy库中操作文件 二·Pandas库中操作文件 三·补充 总结 前言: 如果你使用的是Anaconda中的Jupyter,则不需要下载Pands和Numpy库:如果你使用的是pycharm或其他集成环境,则需要Pands和Numpy库 一·Numpy库中操作文件 1.操作csv文件 import numpy as np a=np.random.randint(0,10,size=(3,4)) np.savetext("score.csv",a,deliminte

  • Python读取文件夹下的所有文件实例代码

    Python读取文件夹下的所有文件 os.listdir(path)是得到在path路径下所以文件的名称列表. open(path)是打开某个文件. iter是python的迭代器. 所以读取某文件夹下的所有文件如下: import os path = "D:/Python34/news" #文件夹目录 files= os.listdir(path) #得到文件夹下的所有文件名称 s = [] for file in files: #遍历文件夹 if not os.path.isdir

  • python3读取文件指定行的三种方法

    行遍历实现 在python中如果要将一个文件完全加载到内存中,通过file.readlines()即可,但是在文件占用较高时,我们是无法完整的将文件加载到内存中的,这时候就需要用到python的file.readline()进行迭代式的逐行读取: filename = 'hello.txt' with open(filename, 'r') as file: line = file.readline() counts = 1 while line: if counts >= 50000000:

  • Python数据分析基础之文件的读取

    目录 一·Numpy库中操作文件 1.操作csv文件 2.在pycharm中操作csv文件 3.其他情况(.npy类型文件) 二·Pandas库中操作文件 1.操作csv文件 2.从剪贴板上复制数据 3.读取excel或xlsx文件 三·补充 1.常用 2.pandas中读取文件的函数 总结 前言:如果你使用的是Anaconda中的Jupyter,则不需要下载Pands和Numpy库:如果你使用的是pycharm或其他集成环境,则需要Pands和Numpy库 一·Numpy库中操作文件 1.操作

  • 实例讲解python读取各种文件的方法

    目录 1.yaml文件 2.CSV文件 3.ini文件 总结 1.yaml文件 # house.yaml-------------------------------------------------------------------------- # 1."数据结构"可以用类似大纲的"缩排"方式呈现 # 2.连续的项目通过减号"-"来表示,也可以用逗号来分割 # 3.key/value对用冒号":"来分隔 # 4.数组用

  • Python读取文件的四种方式的实例详解

    目录 学生数量特别少的情况 停车场空间不够时怎么办? 怎么加快执行效率? 怎么加快处理速度? 结语 故事背景:最近在处理Wikipedia的数据时发现由于数据量过大,之前的文件读取和数据处理方法几乎不可用,或耗时非常久.今天学校安排统一核酸检查,刚好和文件读取的过程非常相似.正好借此机会和大家一起从头梳理一下几种文件读取方法. 故事设定:现在学校要求对所有同学进行核酸采集,每位同学先在宿舍内等候防护人员(以下简称“大白”)叫号,叫到自己时去停车场排队等候大白对自己进行采集,采集完之后的样本由大白

  • C++读取文件的四种方式总结

    C++可以根据不同的目的来选取文件的读取方式,目前为止学习了C++中的四种文件读取方式. C++文件读取的一般步骤: 1.包含头文件 #include<fstream> 2.创建流对象:ifstream ifs(这里的ifs是自己起的流对象名字) 3.打开文件:file.open("文件路径","打开方式"),打开文件后并判断文件是否打开成功,ifs.is_open()是用于判断文件是否打开的语句 4.进行文件读取操作 5.关闭文件 ifs.close(

  • XML解析四种方式代码示例详解

    XML是一种通用的数据交换格式,它的平台无关性.语言无关性.系统无关性.给数据集成与交互带来了极大的方便.XML在不同的语言环境中解析方式都是一样的,只不过实现的语法不同而已. XML的解析方式分为四种:1.DOM解析:2.SAX解析:3.JDOM解析:4.DOM4J解析.其中前两种属于基础方法,是官方提供的平台无关的解析方式:后两种属于扩展方法,它们是在基础的方法上扩展出来的,只适用于java平台. 针对以下XML文件,会对四种方式进行详细描述: <?xml version="1.0&q

  • 利用Python读取文件的四种不同方法比对

    前言 大家都知道Python 读文件的方式多种多样,但是当需要读取一个大文件的时候,不同的读取方式会有不一样的效果.下面就来看看详细的介绍吧. 场景 逐行读取一个 2.9G 的大文件 CPU i7 6820HQ RAM 32G 方法 对每一行的读取进行一次分割字符串操作 以下方法都使用 with-as 方法打开文件. with 语句适用于对资源进行访问的场合,确保不管使用过程中是否发生异常都会执行必要的"清理"操作,释放资源,比如文件使用后自动关闭.线程中锁的自动获取和释放等. 方法一

  • Python定义二叉树及4种遍历方法实例详解

    本文实例讲述了Python定义二叉树及4种遍历方法.分享给大家供大家参考,具体如下: Python & BinaryTree 1. BinaryTree (二叉树) 二叉树是有限个元素的集合,该集合或者为空.或者有一个称为根节点(root)的元素及两个互不相交的.分别被称为左子树和右子树的二叉树组成. 二叉树的每个结点至多只有二棵子树(不存在度大于2的结点),二叉树的子树有左右之分,次序不能颠倒. 二叉树的第i层至多有2^{i-1}个结点 深度为k的二叉树至多有2^k-1个结点: 对任何一棵二叉

  • java自带的四种线程池实例详解

    目录 java预定义的哪四种线程池? 四种线程池有什么区别? 线程池有哪几个重要参数? 如何自定义线程池 总结 java预定义的哪四种线程池? newSingleThreadExexcutor:单线程数的线程池(核心线程数=最大线程数=1) newFixedThreadPool:固定线程数的线程池(核心线程数=最大线程数=自定义) newCacheThreadPool:可缓存的线程池(核心线程数=0,最大线程数=Integer.MAX_VALUE) newScheduledThreadPool:

  • Python urls.py的三种配置写法实例详解

    urls.py的配置写法一般有三种方式. 1. 第一种是导入视图的方式,就是 The Django Book 里面样例的写法: from blog.views import index url(r'^nowamagic/', index)  2. 第二种方法是视图处理方法,看代码就知道是怎么回事了. url(r'^nowamagic/', 'test.views.index') 3. 第三种是把模型与视图写在前缀里. urlpatterns = patterns('blog.views', ur

  • Python 列表(List) 的三种遍历方法实例 详解

    Python 遍历 最近学习python这门语言,感觉到其对自己的工作效率有很大的提升,下面废话不多说,直接贴代码 #!/usr/bin/env python # -*- coding: utf-8 -*- if __name__ == '__main__': list = ['html', 'js', 'css', 'python'] # 方法1 print '遍历列表方法1:' for i in list: print ("序号:%s 值:%s" % (list.index(i)

  • python 调用js的四种方式

    1. 前言 日常 Web 端爬虫过程中,经常会遇到参数被加密的场景,因此,我们需要分析网页源代码 通过调式,一层层剥离出关键的 JS 代码,使用 Python 去执行这段代码,得出参数加密前后的 Python 实现 本文将聊聊利用 Python 调用 JS 的4种方式 2. 准备 以一段简单的 JS 脚本为例,将代码写入到文件中 //norm.js //计算两个数的和 function add(num1, num2) {     return num1 + num2; } 其中,定义了一个方法,

  • linux服务器之间传输文件的四种方式

    本文为大家分享了linux服务器之间传输文件的四种方式,供大家参考,具体内容如下 1. scp [优点]简单方便,安全可靠:支持限速参数  [缺点]不支持排除目录 [用法] scp就是secure copy,是用来进行远程文件拷贝的.数据传输使用 ssh,并且和ssh 使用相同的认证方式,提供相同的安全保证 . 命令格式: scp [参数] <源地址(用户名@IP地址或主机名)>:<文件路径> <目的地址(用户名 @IP 地址或主机名)>:<文件路径> 举例

随机推荐