python3线程池ThreadPoolExecutor处理csv文件数据

目录
  • 背景
  • 知识点
  • 拓展
  • 流程
  • 实现代码
  • 解释

背景

由于不同乙方对服务商业务接口字段理解不一致,导致线上上千万数据量数据存在问题,为了修复数据,通过 Python 脚本进行修改

知识点

Python3、线程池、pymysql、CSV 文件操作、requests

拓展

当我们程序在使用到线程、进程或协程的时候,以下三个知识点可以先做个基本认知

CPU 密集型、IO 密集型、GIL 全局解释器锁

pip3 install requests

pip3 install pymysql

流程

实现代码

# -*- coding:utf-8 -*-
# @FileName:grade_update.py
# @Desc    :在一台超级计算机上运行过的牛逼Python代码
import time
from concurrent.futures import ThreadPoolExecutor,FIRST_COMPLETED,wait
import requests
import pymysql
from projectPath import path
gradeId = [4303, 4304, 1000926, 1000927]
def writ_mysql():
    """
    数据库连接
    """
    return pymysql.connect(host="localhost",
                         port=3306,
                         user="admin",
                         password="admin",
                         database="test"
                         )
def oprationdb(grade_id, member_id):
  """
  操作数据库
  """
    db = writ_mysql()
    try:
        cursor = db.cursor()
        sql = f"UPDATE `t_m_member_grade` SET `current_grade_id`={grade_id}, `modified` =now() WHERE `member_id`={member_id};"
        cursor.execute(sql)
        db.commit()
        print(f"提交的SQL->{sql}")
    except pymysql.Error as e:
        db.rollback()
        print("DB数据库异常:", e)
    db.close()
    return True
def interface(rows, thead):
  """
  调用第三方接口
  """
    print(f"处理数据行数--->{thead}----数据--->{rows}")
    try:
        url = "http://xxxx/api/xxx-data/Tmall/bindQuery"
        body = {
            "nickname": str(rows[0]),
            "seller_name": "test",
            "mobile": "111"
        }
        heade={"Content-Type": "application/x-www-form-urlencoded"}
        res = requests.post(url=url, data=body,headers=heade)
        result = res.json()
        if result["data"]["status"] in [1, 2]:
            grade = result["data"]["member"]["level"]
            grade_id = gradeId[grade]
            oprationdb(grade_id=grade_id, member_id=rows[1])
            return True
        return True
    except Exception as e:
        print(f"调用异常:{e}")
def read_csv():
    import csv
    # db = writ_mysql()
    #线程数
    MAX_WORKERS=5
    with ThreadPoolExecutor(MAX_WORKERS) as pool:
        with open(path + '/file/result2_colu.csv', 'r', newline='', encoding='utf-8') as f:
            #set() 函数创建无序不重复元素集
            seq_notdone = set()
            seq_done = set()
            # 使用csv的reader()方法,创建一个reader对象
            reader = csv.reader(f)
            n = 0
            for row in reader:
                n += 1
                # 遍历reader对象的每一行
                try:
                    seq_notdone.add(pool.submit(interface, rows=row, thead=n))
                    if len(seq_notdone) >= MAX_WORKERS:
                        #FIRST_COMPLETED文档说明 -- Return when any future finishes or is cancelled.
                        done, seq_notdone = wait(seq_notdone,return_when=FIRST_COMPLETED)
                        seq_done.update(done)
                except Exception as e:
                    print(f"解析结果出错:{e}")
    # db.close()
    return "完成"
if __name__ == '__main__':
    read_csv()

解释

引入线程池库

from concurrent.futures import ThreadPoolExecutor,FIRST_COMPLETED,wait

pool.submit(interface, rows=row, thead=n)

提交任务,interface 调用的函数,rows、thead 为 interface() 函数的入参

任务持续提交,线程池通过 MAX_WORKERS 定义的线程数持续消费

说明像这种 I/O 密集型的操作脚本适合使用多线程,如果是 CPU 密集型建议使用进行,根据机器核数进行配置

以上就是python3线程池ThreadPoolExecutor处理csv文件数据的详细内容,更多关于python3 ThreadPoolExecutor处理csv的资料请关注我们其它相关文章!

(0)

相关推荐

  • 解决python ThreadPoolExecutor 线程池中的异常捕获问题

    问题 最近写了涉及线程池及线程的 python 脚本,运行过程中发现一个有趣的现象,线程池中的工作线程出现问题,引发了异常,但是主线程没有捕获异常,还在发现 BUG 之前一度以为线程池代码正常返回. 先说重点 这里主要想介绍 python concurrent.futuresthread.ThreadPoolExecutor 线程池中的 worker 引发异常的时候,并不会直接向上抛起异常,而是需要主线程通过调用concurrent.futures.Future.exception(timeou

  • python线程池 ThreadPoolExecutor 的用法示例

    前言 从Python3.2开始,标准库为我们提供了 concurrent.futures 模块,它提供了 ThreadPoolExecutor (线程池)和ProcessPoolExecutor (进程池)两个类. 相比 threading 等模块,该模块通过 submit 返回的是一个 future 对象,它是一个未来可期的对象,通过它可以获悉线程的状态主线程(或进程)中可以获取某一个线程(进程)执行的状态或者某一个任务执行的状态及返回值: 主线程可以获取某一个线程(或者任务的)的状态,以及返

  • 对Python 多线程统计所有csv文件的行数方法详解

    如下所示: #统计某文件夹下的所有csv文件的行数(多线程) import threading import csv import os class MyThreadLine(threading.Thread): #用于统计csv文件的行数的线程类 def __init__(self,path): threading.Thread.__init__(self) #父类初始化 self.path=path #路径 self.line=-1 #统计行数 def run(self): reader =

  • Python线程池模块ThreadPoolExecutor用法分析

    本文实例讲述了Python线程池模块ThreadPoolExecutor用法.分享给大家供大家参考,具体如下: python3内置的有Threadingpool和ThreadPoolExecutor模块,两个都可以做线程池,当然ThreadPoolExecutor会更好用一些,而且也有ProcessPoolExecutor进程池模块,使用方法基本一致. 首先导入模块 from concurrent.futures import ThreadPoolExecutor 使用方法很简单,最常用的可能就

  • Python3操作读写CSV文件使用包过程解析

    CSV(Comma-Separated Values)即逗号分隔值,一种以逗号分隔按行存储的文本文件,所有的值都表现为字符串类型(注意:数字为字符串类型). 如果CSV中有中文,应以utf-8编码读写,如果要支持Excel查看,应是要用utf-8 with bom格式及utf-8-sig Python3操作CSV文件使用自带的csv包 reader=csv.reader(f, delimiter=','):用来读取数据,reader为生成器,每次读取一行,每行数据为列表格式,可以通过delimi

  • python爬虫之线程池和进程池功能与用法详解

    本文实例讲述了python爬虫之线程池和进程池功能与用法.分享给大家供大家参考,具体如下: 一.需求 最近准备爬取某电商网站的数据,先不考虑代理.分布式,先说效率问题(当然你要是请求的太快就会被封掉,亲测,400个请求过去,服务器直接拒绝连接,心碎),步入正题.一般情况下小白的我们第一个想到的是for循环,这个可是单线程啊.那我们考虑for循环直接开他个5个线程,问题来了,如果有一个url请求还没有回来,后面的就干等,这么用多线程等于没用,到处贴创可贴. 二.性能考虑 确定要用多线程或者多进程了

  • python3线程池ThreadPoolExecutor处理csv文件数据

    目录 背景 知识点 拓展 库 流程 实现代码 解释 背景 由于不同乙方对服务商业务接口字段理解不一致,导致线上上千万数据量数据存在问题,为了修复数据,通过 Python 脚本进行修改 知识点 Python3.线程池.pymysql.CSV 文件操作.requests 拓展 当我们程序在使用到线程.进程或协程的时候,以下三个知识点可以先做个基本认知 CPU 密集型.IO 密集型.GIL 全局解释器锁 库 pip3 install requests pip3 install pymysql 流程 实

  • Java线程池 ThreadPoolExecutor 详解

    目录 一 为什么要使用线程池 二 线程池原理详解 2.1 线程池核心组成 2.2 Execute 原理 三 线程池的使用 3.1 创建线程池 3.1.1 自定义线程池 3.1.2 功能线程池 3.1.3 功能线程池存在的问题 3.2 向线程池提交任务 3.3 关闭线程池 3.4 自定义线程池需要考虑因素 一 为什么要使用线程池 对于操作系统而言,创建一个线程的代价是十分昂贵的, 需要给它分配内存.列入调度,同时在线程切换时要执行内存换页,清空 CPU 缓存,切换回来时还要重新从内存中读取信息,破

  • Android之线程池ThreadPoolExecutor的简介

    Android中的线程池ThreadPoolExecutor解决了单线程下载数据的效率慢和线程阻塞的的问题,它的应用也是优化实现的方式.所以它的重要性不言而喻,但是它的复杂性也大,理解上可能会有问题,不过作为安卓工程师,了解这个也是必然的. ThreadPoolExecutor有几个构造函数,最多参数的构造函数最常用,下面会详细介绍各个参数的含义及其几个参数之间的关系: <span style="font-size:18px;">ThreadPoolExecutor(cor

  • java线程池实现批量下载文件

    本文实例为大家分享了java线程池实现批量下载文件的具体代码,供大家参考,具体内容如下 1 创建线程池 package com.cheng.webb.thread; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadFactory; import java.util.concurrent.Thr

  • 线程池ThreadPoolExecutor使用简介与方法实例

    一.简介 线程池类为 java.util.concurrent.ThreadPoolExecutor,常用构造方法为: ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, RejectedExecutionHandler handler) corePoolSize: 线程池维护线程的最少数量 maximumPool

  • SpringBoot使用异步线程池实现生产环境批量数据推送

    目录 前言 编写线程池配置类 编写异步服务 异步批量上报数据 总结 前言 SpringBoot使用异步线程池: 1.编写线程池配置类,自定义一个线程池: 2.定义一个异步服务: 3.使用@Async注解指向定义的线程池: 这里以我工作中使用过的一个案例来做描述,我所在公司是医疗行业,敏感数据需要上报到某监管平台,所以有一个定时任务在流量较小时(一般是凌晨后)执行上报行为.但特殊时期会存在一定要在工作时间大批量上报数据的情况,且要求短时间内就要完成,此时就考虑写一个专门的异步上报接口手动执行,利用

  • Java 线程池ThreadPoolExecutor源码解析

    目录 引导语 1.整体架构图 1.1.类结构 1.2.类注释 1.3.ThreadPoolExecutor重要属性 2.线程池的任务提交 3.线程执行完任务之后都在干啥 4.总结 引导语 线程池我们在工作中经常会用到.在请求量大时,使用线程池,可以充分利用机器资源,增加请求的处理速度,本章节我们就和大家一起来学习线程池. 本章的顺序,先说源码,弄懂原理,接着看一看面试题,最后看看实际工作中是如何运用线程池的. 1.整体架构图 我们画了线程池的整体图,如下: 本小节主要就按照这个图来进行 Thre

  • Java并发包线程池ThreadPoolExecutor的实现

    线程池主要解决两个问题:一是当执行大量异步任务时线程池能够提供较好的性能.在不使用线程池时,每当需要执行异步任务时直接new一个线程来运行,而线程的创建和销毁都是需要开销的.线程池里面的线程是可复用的,不需要每次执行异步任务时都重新创建和销毁线程.二是线程池提供了一种资源限制和管理手段,比如可以限制线程的个数,动态新增线程等.每个ThreadPoolExecutor也保留了一些基本的统计数据,比如当前线程池完成的任务数目等. 我们首先来看一下类图 Excecutor是一个工具类,里面提供了许多静

  • 简单聊一聊Java线程池ThreadPoolExecutor

    目录 简介 参数说明 如何创建线程池 拒绝策略 总结 简介 ThreadPoolExecutor是一个实现ExecutorService接口的线程池,ExecutorService是主要用来处理多线程任务的一个接口,通常比较简单是用法是由Executors工厂类去创建. 线程池主要解决了两个不同的问题: 在执行大量异步任务时,为了能够提高性能,通常会减少每个任务的调用开销. 提供了一系列多线程任务的管理方法,便于多任务执行时合理分配资源以及一些异常情况的处理.每个ThreadPoolExecut

随机推荐