Python多线程中线程数量如何控制

前言

前段时间学习了python的多线程爬虫,当时爬取一个图片网站,开启多线程后,并没有限制线程的数量,也就是说,如果下载1000张图片,会一次性开启1000个子线程同时进行下载

现在希望控制线程数量:例如每次只下载5张,当下载完成后再下载另外5张,直至全部完成

查了一些资料,发现在python中,threading 模块有提供 Semaphore类 和 BoundedSemaphore类来限制线程数

官网给出例子如下:

信号量通常用于保护容量有限的资源,例如数据库服务器。在资源大小固定的任何情况下,都应使用有界信号量。在产生任何工作线程之前,您的主线程将初始化信号量:

maxconnections = 5
# ...
pool_sema = BoundedSemaphore(value=maxconnections)

产生后,工作线程在需要连接到服务器时会调用信号量的获取和释放方法:

改造之前的多线程爬虫

首先贴出原来的代码

# -*- coding:utf-8 -*-
import requests
from requests.exceptions import RequestException
import os, time
import re
from lxml import etree
import threading

lock = threading.Lock()
def get_html(url):
    """
    定义一个方法,用于获取一个url页面的响应内容
    :param url: 要访问的url
    :return: 响应内容
    """
    response = requests.get(url, timeout=10)
    # print(response.status_code)
    try:
        if response.status_code == 200:

            # print(response.text)
            return response.text
        else:
             return None
    except RequestException:
        print("请求失败")
        # return None

def parse_html(html_text):
    """
    定义一个方法,用于解析页面内容,提取图片url
    :param html_text:
    :return:一个页面的图片url集合
    """
    html = etree.HTML(html_text)

    if len(html) > 0:
        img_src = html.xpath("//img[@class='photothumb lazy']/@data-original")  # 元素提取方法
        # print(img_src)
        return img_src

    else:
        print("解析页面元素失败")

def get_image_pages(url):
    """
    获取所查询图片结果的所有页码
    :param url: 查询图片url
    :return: 总页码数
    """

    html_text = get_html(url)  # 获取搜索url响应内容
    # print(html_text)
    if html_text is not None:
        html = etree.HTML(html_text)  # 生成XPath解析对象
        last_page = html.xpath("//div[@class='pages']//a[last()]/@href")  # 提取最后一页所在href链接
        print(last_page)
        if last_page:
            max_page = re.compile(r'(\d+)', re.S).search(last_page[0]).group()  # 使用正则表达式提取链接中的页码数字
            print(max_page)
            print(type(max_page))
            return int(max_page)  # 将字符串页码转为整数并返回
        else:
            print("暂无数据")
            return None
    else:
        print("查询结果失败")

def get_all_image_url(page_number):
    """
    获取所有图片的下载url
    :param page_number: 爬取页码
    :return: 所有图片url的集合
    """

    base_url = 'https://imgbin.com/free-png/naruto/'
    image_urls = []

    x = 1  # 定义一个标识,用于给每个图片url编号,从1递增
    for i in range(1, page_number):
        url = base_url + str(i)  # 根据页码遍历请求url
        try:
            html = get_html(url)  # 解析每个页面的内容
            if html:
                data = parse_html(html)  # 提取页面中的图片url
                # print(data)
                # time.sleep(3)
                if data:
                    for j in data:
                        image_urls.append({
                            'name': x,
                            'value': j
                        })
                        x += 1  # 每提取一个图片url,标识x增加1
        except RequestException as f:
            print("遇到错误:", f)
            continue
    # print(image_urls)
    return image_urls

def get_image_content(url):
    """请求图片url,返回二进制内容"""
    # print("正在下载", url)
    try:
        r = requests.get(url, timeout=15)
        if r.status_code == 200:
            return r.content
        return None
    except RequestException:
        return None

def main(url, image_name):
    """
    主函数:实现下载图片功能
    :param url: 图片url
    :param image_name: 图片名称
    :return:
    """
    semaphore.acquire()  # 加锁,限制线程数
    print('当前子线程: {}'.format(threading.current_thread().name))
    save_path = os.path.dirname(os.path.abspath('.')) + '/pics/'
    try:
        file_path = '{0}/{1}.jpg'.format(save_path, image_name)
        if not os.path.exists(file_path):  # 判断是否存在文件,不存在则爬取
            with open(file_path, 'wb') as f:
                f.write(get_image_content(url))
                f.close()

                print('第{}个文件保存成功'.format(image_name))

        else:
            print("第{}个文件已存在".format(image_name))

        semaphore.release()  # 解锁imgbin-多线程-重写run方法.py

    except FileNotFoundError as f:
        print("第{}个文件下载时遇到错误,url为:{}:".format(image_name, url))
        print("报错:", f)
        raise

    except TypeError as e:
        print("第{}个文件下载时遇到错误,url为:{}:".format(image_name, url))
        print("报错:", e)

class MyThread(threading.Thread):
    """继承Thread类重写run方法创建新进程"""
    def __init__(self, func, args):
        """

        :param func: run方法中要调用的函数名
        :param args: func函数所需的参数
        """
        threading.Thread.__init__(self)
        self.func = func
        self.args = args

    def run(self):
        print('当前子线程: {}'.format(threading.current_thread().name))
        self.func(self.args[0], self.args[1])
        # 调用func函数
        # 因为这里的func函数其实是上述的main()函数,它需要2个参数;args传入的是个参数元组,拆解开来传入

if __name__ == '__main__':
    start = time.time()
    print('这是主线程:{}'.format(threading.current_thread().name))

    urls = get_all_image_url(5)  # 获取所有图片url列表
    thread_list = []  # 定义一个列表,向里面追加线程
    semaphore = threading.BoundedSemaphore(5) # 或使用Semaphore方法
    for t in urls:
        # print(i)

        m = MyThread(main, (t["value"], t["name"]))  # 调用MyThread类,得到一个实例

        thread_list.append(m)

    for m in thread_list:

        m.start()  # 调用start()方法,开始执行

    for m in thread_list:
        m.join()  # 子线程调用join()方法,使主线程等待子线程运行完毕之后才退出

    end = time.time()
    print(end-start)
    # get_image_pages("https://imgbin.com/free-png/Naruto")

将代码进行改造

1、下面的第8、9行表示调用 threading 的 BoundedSemaphore类,初始化信号量为5,把结果赋给变量 pool_sema

if __name__ == '__main__':
    start = time.time()
    print('这是主线程:{}'.format(threading.current_thread().name))

    urls = get_all_image_url(5)  # 获取所有图片url列表
    thread_list = []  # 定义一个列表,向里面追加线程
    # 更多Python相关视频、资料加群778463939免费获取
    max_connections = 5  # 定义最大线程数
    pool_sema = threading.BoundedSemaphore(max_connections) # 或使用Semaphore方法
    for t in urls:
        # print(i)

        m = MyThread(main, (t["value"], t["name"]))  # 调用MyThread类,得到一个实例

        thread_list.append(m)

    for m in thread_list:

        m.start()  # 调用start()方法,开始执行

    for m in thread_list:
        m.join()  # 子线程调用join()方法,使主线程等待子线程运行完毕之后才退出

    end = time.time()
    print(end-start)

2、修改main()函数

(1)方法一:通过with语句实现,第9行添加 with pool_sema

使用 with 语句来获得一个锁、条件变量或信号量,相当于调用 acquire();离开 with 块后,会自动调用 release()

def main(url, image_name):
    """
    主函数:实现下载图片功能
    :param url: 图片url
    :param image_name: 图片名称
    :return:
    """

    with pool_sema:
        print('当前子线程: {}'.format(threading.current_thread().name))
        save_path = os.path.dirname(os.path.abspath('.')) + '/pics/'
        try:
            file_path = '{0}/{1}.jpg'.format(save_path, image_name)
            if not os.path.exists(file_path):  # 判断是否存在文件,不存在则爬取
                with open(file_path, 'wb') as f:
                    f.write(get_image_content(url))
                    f.close()

                    print('第{}个文件保存成功'.format(image_name))

            else:
                print("第{}个文件已存在".format(image_name))

        except FileNotFoundError as f:
            print("第{}个文件下载时遇到错误,url为:{}:".format(image_name, url))
            print("报错:", f)
            raise

        except TypeError as e:
            print("第{}个文件下载时遇到错误,url为:{}:".format(image_name, url))
            print("报错:", e)

(2)方法二:直接使用 acquire()和 release()

下面的第8行调用 acquire(),第24行调用release()

def main(url, image_name):
    """
    主函数:实现下载图片功能
    :param url: 图片url
    :param image_name: 图片名称
    :return:
    """
    pool_sema.acquire()  # 加锁,限制线程数
    # with pool_sema:
    print('当前子线程: {}'.format(threading.current_thread().name))
    save_path = os.path.dirname(os.path.abspath('.')) + '/pics/'
    try:
        file_path = '{0}/{1}.jpg'.format(save_path, image_name)
        if not os.path.exists(file_path):  # 判断是否存在文件,不存在则爬取
            with open(file_path, 'wb') as f:
                f.write(get_image_content(url))
                f.close()

                print('第{}个文件保存成功'.format(image_name))

        else:
            print("第{}个文件已存在".format(image_name))

        pool_sema.release()  # 解锁imgbin-多线程-重写run方法.py

    except FileNotFoundError as f:
        print("第{}个文件下载时遇到错误,url为:{}:".format(image_name, url))
        print("报错:", f)
        raise

    except TypeError as e:
        print("第{}个文件下载时遇到错误,url为:{}:".format(image_name, url))
        print("报错:", e)

最终效果是一样的,每次启用5个线程,完成后再启动下一批

到此这篇关于Python多线程中线程数量如何控制的文章就介绍到这了,更多相关Python 线程数量控制内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • Python控制线程和函数超时处理

    目录 前言 一.Eventlet 二.func-timeout 1.安装 2.使用 前言 在我们使用线程池控制线程运行时,或者是写爬虫时不停发送request获取地址,都需要我们对运行的线程加以控制.有这样一种场景,当你写入一条网站请求时,该线程一直请求并没有回应导致线程造成堵塞,浪费珍贵的线程资源.当你写入一条错误的sql查询语句时,该查询语句一直运行需要花费大量时间,导致其他查询语句阻塞.遇到如上述情况必须对线程加以控制,才能使得程序脚本稳定性更高. 控制线程运行时间方法有多种: 这里介绍两

  • python简易远程控制单线程版

    本文实例为大家分享了python简易远程控制的具体代码,供大家参考,具体内容如下 1. 技术:管道通信,流文件处理,socket基础 2. Tips: 默认IP:127.0.0.1 默认端口:7676 3. 代码样例: 服务端: #!/usr/bin/env python # encoding: utf-8 import socket import sys from os import * reload(sys) sys.setdefaultencoding("utf-8") def

  • python自定义线程池控制线程数量的示例

    1.自定义线程池 import threading import Queue import time queue = Queue.Queue() def put_data_in_queue(): for i in xrange(10): queue.put(i) class MyThread(threading.Thread): def run(self): while not queue.empty(): sleep_times = queue.get() time.sleep(sleep_t

  • python子线程退出及线程退出控制的代码

    下面通过代码给大家介绍python子线程退出问题,具体内容如下所示: def thread_func(): while True: #do something #do something #do something t=threading.Thread(target = thread_func) t.start() # main thread do something # main thread do something # main thread do something 跑起来是没有问题的,

  • python多线程同步之文件读写控制

    本文实例为大家分享了python多线程同步之文件读写控制的具体代码,供大家参考,具体内容如下 1.实现文件读写的文件ltz_schedule_times.py #! /usr/bin/env python #coding=utf-8 import os def ReadTimes(): res = [] if os.path.exists('schedule_times.txt'): fp = open('schedule_times.txt', 'r') else: os.system('to

  • Python控制多进程与多线程并发数总结

    一.前言 本来写了脚本用于暴力破解密码,可是1秒钟尝试一个密码2220000个密码我的天,想用多线程可是只会一个for全开,难道开2220000个线程吗?只好学习控制线程数了,官方文档不好看,觉得结构不够清晰,网上找很多文章也都不很清晰,只有for全开线程,没有控制线程数的具体说明,最终终于根据多篇文章和官方文档算是搞明白基础的多线程怎么实现法了,怕长时间不用又忘记,找着麻烦就贴这了,跟我一样新手也可以参照参照. 先说进程和线程的区别: 地址空间:进程内的一个执行单元;进程至少有一个线程;它们共

  • Python多线程同步---文件读写控制方法

    1.实现文件读写的文件ltz_schedule_times.py #! /usr/bin/env python #coding=utf-8 import os def ReadTimes(): res = [] if os.path.exists('schedule_times.txt'): fp = open('schedule_times.txt', 'r') else: os.system('touch schedule_times.txt') fp = open('schedule_ti

  • python多线程semaphore实现线程数控制的示例

    前面写过一篇关于python多线程的实现的文章, 但是效果不是最佳的,写法也不是很好.通过网上学习,也了解到了semaphore这个东西. 百度给的解释:Semaphore是一种在多线程环境下使用的设施,该设施负责协调各个线程,以保证它们能够正确.合理的使用公共资源的设施,也是操作系统中用于控制进程同步互斥的量. 一个有趣的例子:假设停车场只有三个车位,一开始三个车位都是空的.这时如果同时来了五辆车,看门人允许其中三辆不受阻碍的进入,然后放下车拦,剩下的车则必须在入口等待,此后来的车也都不得不在

  • Python多线程的退出控制实现

    日常前言 最近接 到一个抢票的爬虫外包,那个网站及其之捞,访问购票地址竟然还要排队,在购票高峰临时升一下服务器配置不行吗-没办法,甲方爸爸的要求还得做啊,其中一个障碍便是目标网站的后端限制了访问频次,俗话说:"上有政策,下有对策." 立刻想到了多线程 + 多代理的方式进行访问. 但此时问题便来了,多代理还好说,再写个爬虫爬一堆下来就好,多线程可就麻烦多了,多线程一旦发出去了,基本等同于失控的状态,你无法去结束或者是重启一个线程,最多只能是获取线程的信息,没有实际的控制权,而且Pytho

  • python基于event实现线程间通信控制

    这篇文章主要介绍了python基于event实现线程间通信控制,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 import threading,time class Boss(threading.Thread): def run(self): print("We must work today!") event.isSet() or event.set() time.sleep(5) print("You can go ho

  • python如何控制进程或者线程的个数

    背景 日常开发中,难免遇到并发场景,而并发场景难免需要做流量控制,即需要对并发的进程或者线程的总量进行控制. 今天简单总结两种常用的控制线程个数的方法. 方法一:进程池/线程池 如下例demo所示, 创建了一个大小是4的进程池,然后创建5个进程,并启动 from multiprocessing import Pool import os, time, random def long_time_task(name): print('Run task %s (%s)...' % (name, os.

随机推荐