python 多进程和协程配合使用写入数据

一、需求分析

有一批key已经写入到3个txt文件中,每一个txt文件有30万行记录。
现在需要读取这些txt文件,判断key是否在数据仓库中。(redis或者mysql)

为空的记录,需要写入到日志文件中!

任务分工

1. 使用多进程技术,每一个进程读取一个txt文件

2. 使用协程技术,批量读取txt文件记录。比如一次性读取 2000条记录

注意:打开文件操作,最好在一个进程中,重复打开文件,会造成系统资源浪费!

二、完整代码

#!/usr/bin/env python3
# coding: utf-8
"""
多线程和协程配合使用示例
"""

import os
import time
from gevent import monkey;monkey.patch_all()
from gevent.pool import Pool
from functools import partial
from multiprocessing import Process

COROUTINE_NUMBER = 2000 # 协程池数量
pool = Pool(COROUTINE_NUMBER) # 使用协程池

# 模拟数据仓库,测试数据
data_dict = {"1":"x1","3":"x3","5":"x5","7":"x7","9":"x9"}

class TestProgram(object): # 测试程序
 def __init__(self):
  self.BASE_DIR = os.path.dirname(os.path.abspath(__file__)) # 项目根目录

 def write_log(self,number, content, colour='white', skip=False):
  """
  写入日志文件
  :param content: 写入内容
  :param colour: 颜色
  :param skip: 是否跳过打印时间
  :return:
  """
  # 颜色代码
  colour_dict = {
   'red': 31, # 红色
   'green': 32, # 绿色
   'yellow': 33, # 黄色
   'blue': 34, # 蓝色
   'purple_red': 35, # 紫红色
   'bluish_blue': 36, # 浅蓝色
   'white': 37, # 白色
  }
  choice = colour_dict.get(colour) # 选择颜色

  path = os.path.join(self.BASE_DIR, "output_%s.log" % number) # 日志文件
  with open(path, mode='a+', encoding='utf-8') as f:
   if skip is False: # 不跳过打印时间时
    content = time.strftime('%Y-%m-%d %H:%M:%S') + ' ' + content

   info = "\033[1;{};1m{}\033[0m".format(choice, content)
   print(info)
   f.write(content + "\n")

 def has_null(self, key, number):
  """
  输出key
  :param key: 键值
  :param number: 文件标记
  :return: bool
  """
  key = key.strip()
  if not data_dict.get(key):
   self.write_log(number,"错误,{} 记录为空".format(key),"red")
   return False

  print(key)
  return True

 def read_file(self, number):
  """
  读取文件
  :param number: 文件标记
  :return:
  """
  file_name = os.path.join(self.BASE_DIR, "data", "%s.txt" % number)
  # print(file_name)
  self.write_log(number, "开始读取文件 {}".format(file_name),"green")
  with open(file_name, encoding='utf-8') as f:
   # 使用协程池,执行任务。语法: pool.map(func,iterator)
   # partial使用偏函数传递参数
   # 注意:has_null第一个参数,必须是迭代器遍历的值
   pool.map(partial(self.has_null, number=number), f)

  self.write_log(number, "结束文件读取 {} 完成".format(file_name),"green")
  return True

 def run(self, number):
  """
  读取指定的文件,判断每一个key是否为空
  :param number:
  :return:
  """
  startime = time.time() # 开始时间

  # 清空日志
  path = os.path.join(self.BASE_DIR, "output_%s.log" % number) # 日志文件
  with open(path, mode='w') as f:
   pass

  self.read_file(number)

  endtime = time.time()
  take_time = endtime - startime

  if take_time < 1: # 判断不足1秒时
   take_time = 1 # 设置为1秒
  # 计算花费时间
  m, s = divmod(take_time, 60)
  h, m = divmod(m, 60)

  self.write_log(number, "%s.txt 花费时间 %02d:%02d:%02d" % (number,h, m, s),"green")

 def main(self):
  """
  使用多线程执行程序
  :return:
  """
  # 文件标记列表
  file_list = ["7001", "7002", "7003"]

  p_lst = [] # 线程列表
  for i in file_list:
   # self.run(i)
   p = Process(target=self.run, args=(i,)) # 子进程调用函数
   p.start() # 启动子进程
   p_lst.append(p) # 将所有进程写入列表中

  for p in p_lst: p.join() # 检测p是否结束,如果没有结束就阻塞直到结束,否则不阻塞

TestProgram().main() # 启动主程序,它会开启3个进程。

执行输出

以上就是python 多进程和协程配合使用写入数据的详细内容,更多关于python 多进程和协程的资料请关注我们其它相关文章!

(0)

相关推荐

  • Python多进程multiprocessing、进程池用法实例分析

    本文实例讲述了Python多进程multiprocessing.进程池用法.分享给大家供大家参考,具体如下: 内容相关: multiprocessing: 进程的创建与运行 进程常用相关函数 进程池: 为什么要有进程池 进程池的创建与运行:串行.并行 回调函数 多进程multiprocessing: python中的多进程需要使用multiprocessing模块 多进程的创建与运行: 1.进程的创建:进程对象=multiprocessing.Process(target=函数名,args=(参

  • python 使用事件对象asyncio.Event来同步协程的操作

    事件对象asyncio.Event是基于threading.Event来实现的. 事件可以一个信号触发多个协程同步工作, 例子如下: import asyncio import functools def set_event(event): print('setting event in callback') event.set() async def coro1(event): print('coro1 waiting for event') await event.wait() print(

  • python学习笔记之多进程

    我们现代的操作系统,都是支持"多任务"的操作系统,对于操作系统来说,一个任务就是一个进程(process).比如打开一个浏览器就是启动一个浏览器进程. 如果我们将计算器的核心CPU比喻为一座工厂,那么进程就像工厂里的车间,它代表CPU所能处理的单个任务.任一时刻,CPU总是运行一个进程,其他进程处于非运行状态. 看到这大家可能会有一些疑问了,其他进程处于非运行状态?可是我用浏览器访问网页的时候,音乐播放器明明也在运行啊. 实际上是操作系统轮流让各个任务交替执行,任务1执行0.01秒,切

  • Python手动或自动协程操作方法解析

    1.手动协程操作: # pip install gevent from greenlet import greenlet def test(): print('He ') gr2.switch() # 切换到test2 print('a ') gr2.switch() def test2(): print('is ') gr1.switch() print('student.') gr1 = greenlet(test) # 创建一个协程 gr2 = greenlet(test2) gr1.sw

  • python并发编程之多进程、多线程、异步和协程详解

    最近学习python并发,于是对多进程.多线程.异步和协程做了个总结. 一.多线程 多线程就是允许一个进程内存在多个控制权,以便让多个函数同时处于激活状态,从而让多个函数的操作同时运行.即使是单CPU的计算机,也可以通过不停地在不同线程的指令间切换,从而造成多线程同时运行的效果. 多线程相当于一个并发(concunrrency)系统.并发系统一般同时执行多个任务.如果多个任务可以共享资源,特别是同时写入某个变量的时候,就需要解决同步的问题,比如多线程火车售票系统:两个指令,一个指令检查票是否卖完

  • python3爬虫中异步协程的用法

    1. 前言 在执行一些 IO 密集型任务的时候,程序常常会因为等待 IO 而阻塞.比如在网络爬虫中,如果我们使用 requests 库来进行请求的话,如果网站响应速度过慢,程序一直在等待网站响应,最后导致其爬取效率是非常非常低的. 为了解决这类问题,本文就来探讨一下 Python 中异步协程来加速的方法,此种方法对于 IO 密集型任务非常有效.如将其应用到网络爬虫中,爬取效率甚至可以成百倍地提升. 注:本文协程使用 async/await 来实现,需要 Python 3.5 及以上版本. 2.

  • Python多进程编程常用方法解析

    python中的多线程其实并不是真正的多线程,如果想要充分地使用多核CPU资源,在python中大部分情况需要使用多进程.python提供了非常好用的多进程包Multiprocessing,只需要定义一个函数,python会完成其它所有事情.借助这个包,可以轻松完成从单进程到并发执行的转换.multiprocessing支持子进程.通信和共享数据.执行不同形式的同步,提供了Process.Queue.Pipe.LocK等组件 一.Process 语法:Process([group[,target

  • 深入浅析python中的多进程、多线程、协程

    进程与线程的历史 我们都知道计算机是由硬件和软件组成的.硬件中的CPU是计算机的核心,它承担计算机的所有任务. 操作系统是运行在硬件之上的软件,是计算机的管理者,它负责资源的管理和分配.任务的调度. 程序是运行在系统上的具有某种功能的软件,比如说浏览器,音乐播放器等. 每次执行程序的时候,都会完成一定的功能,比如说浏览器帮我们打开网页,为了保证其独立性,就需要一个专门的管理和控制执行程序的数据结构--进程控制块. 进程就是一个程序在一个数据集上的一次动态执行过程. 进程一般由程序.数据集.进程控

  • 浅谈Python协程

    协程 协程,又称微线程,纤程.英文名Coroutine.一句话说明什么是线程:协程是一种用户态的轻量级线程. 协程拥有自己的寄存器上下文和栈.协程调度切换时,将寄存器上下文和栈保存到其他地方,在切回来的时候,恢复先前保存的寄存器上下文和栈.因此: 协程能保留上一次调用时的状态(即所有局部状态的一个特定组合),每次过程重入时,就相当于进入上一次调用的状态,换种说法:进入上一次离开时所处逻辑流的位置. 协程的好处: 无需线程上下文切换的开销 无需原子操作锁定及同步的开销 "原子操作(atomic o

  • python多进程 主进程和子进程间共享和不共享全局变量实例

    Python 多进程默认不能共享全局变量 主进程与子进程是并发执行的,进程之间默认是不能共享全局变量的(子进程不能改变主进程中全局变量的值). 如果要共享全局变量需要用(multiprocessing.Value("d",10.0),数值)(multiprocessing.Array("i",[1,2,3,4,5]),数组)(multiprocessing.Manager().dict(),字典)(multiprocessing.Manager().list(ran

随机推荐