Python并发编程线程消息通信机制详解

目录
  • 1 Event事件
  • 2 Condition
  • 3 Queue队列
  • 4 总结一下

前面我已经向大家介绍了,如何使用创建线程,启动线程。相信大家都会有这样一个想法,线程无非就是创建一下,然后再start()下,实在是太简单了。

可是要知道,在真实的项目中,实际场景可要我们举的例子要复杂的多得多,不同线程的执行可能是有顺序的,或者说他们的执行是有条件的,是要受控制的。如果仅仅依靠前面学的那点浅薄的知识,是远远不够的。

那今天,我们就来探讨一下如何控制线程的触发执行。

要实现对多个线程进行控制,其实本质上就是消息通信机制在起作用,利用这个机制发送指令,告诉线程,什么时候可以执行,什么时候不可以执行,执行什么内容。

经过我的总结,线程中通信方法大致有如下三种:

threading.Event

threading.Condition

queue.Queue

接下来我们来一一探讨下。

1 Event事件

Python提供了非常简单的通信机制 Threading.Event,通用的条件变量。多个线程可以等待某个事件的发生,在事件发生后,所有的线程都会被激活。

关于Event的使用也超级简单,就三个函数

event = threading.Event()

# 重置event,使得所有该event事件都处于待命状态
event.clear()

# 等待接收event的指令,决定是否阻塞程序执行
event.wait()

# 发送event指令,使所有设置该event事件的线程执行
event.set()

举个例子来看下。

import time
import threading

class MyThread(threading.Thread):
    def __init__(self, name, event):
        super().__init__()
        self.name = name
        self.event = event

    def run(self):
        print('Thread: {} start at {}'.format(self.name, time.ctime(time.time())))
        # 等待event.set()后,才能往下执行
        self.event.wait()
        print('Thread: {} finish at {}'.format(self.name, time.ctime(time.time())))

threads = []
event = threading.Event()

# 定义五个线程
[threads.append(MyThread(str(i), event)) for i in range(1,5)]

# 重置event,使得event.wait()起到阻塞作用
event.clear()

# 启动所有线程
[t.start() for t in threads]

print('等待5s...')
time.sleep(5)

print('唤醒所有线程...')
event.set()
Thread: 1 start at Sun May 13 20:38:08 2018
Thread: 2 start at Sun May 13 20:38:08 2018
Thread: 3 start at Sun May 13 20:38:08 2018
Thread: 4 start at Sun May 13 20:38:08 2018

等待5s...

唤醒所有线程...
Thread: 1 finish at Sun May 13 20:38:13 2018
Thread: 4 finish at Sun May 13 20:38:13 2018
Thread: 2 finish at Sun May 13 20:38:13 2018
Thread: 3 finish at Sun May 13 20:38:13 2018

可见在所有线程都启动(start())后,并不会执行完,而是都在self.event.wait()止住了,需要我们通过event.set()来给所有线程发送执行指令才能往下执行。

2 Condition

Condition和Event 是类似的,并没有多大区别。

同样,Condition也只需要掌握几个函数即可。

cond = threading.Condition()

# 类似lock.acquire()
cond.acquire()

# 类似lock.release()
cond.release()

# 等待指定触发,同时会释放对锁的获取,直到被notify才重新占有琐。
cond.wait()

# 发送指定,触发执行
cond.notify()

举个网上一个比较趣的捉迷藏的例子来看看

import threading, time

class Hider(threading.Thread):
    def __init__(self, cond, name):
        super(Hider, self).__init__()
        self.cond = cond
        self.name = name

    def run(self):
        time.sleep(1)  #确保先运行Seeker中的方法
        self.cond.acquire()
        print(self.name + ': 我已经把眼睛蒙上了')
        self.cond.notify()
        self.cond.wait()
        print(self.name + ': 我找到你了哦 ~_~')
        self.cond.notify()
        self.cond.release()
        print(self.name + ': 我赢了')

class Seeker(threading.Thread):
    def __init__(self, cond, name):
        super(Seeker, self).__init__()
        self.cond = cond
        self.name = name

    def run(self):
        self.cond.acquire()
        self.cond.wait()
        print(self.name + ': 我已经藏好了,你快来找我吧')
        self.cond.notify()
        self.cond.wait()
        self.cond.release()
        print(self.name + ': 被你找到了,哎~~~')

cond = threading.Condition()
seeker = Seeker(cond, 'seeker')
hider = Hider(cond, 'hider')
seeker.start()
hider.start()

通过cond来通信,阻塞自己,并使对方执行。从而,达到有顺序的执行。
看下结果

hider:   我已经把眼睛蒙上了
seeker:  我已经藏好了,你快来找我吧
hider:   我找到你了 ~_~
hider:   我赢了
seeker:  被你找到了,哎~~~

3 Queue队列

最后一个,队列,它是本节的重点,因为它是我们日常开发中最使用频率最高的。

从一个线程向另一个线程发送数据最安全的方式可能就是使用 queue 库中的队列了。创建一个被多个线程共享的 Queue 对象,这些线程通过使用put()get() 操作来向队列中发送和获取元素。

同样,对于Queue,我们也只需要掌握几个函数即可。

from queue import Queue
# maxsize默认为0,不受限
# 一旦>0,而消息数又达到限制,q.put()也将阻塞
q = Queue(maxsize=0)

# 默认阻塞程序,等待队列消息,可设置超时时间
q.get(block=True, timeout=None)

# 发送消息:默认会阻塞程序至队列中有空闲位置放入数据
q.put(item, block=True, timeout=None)

# 等待所有的消息都被消费完
q.join()

# 通知队列任务处理已经完成,当所有任务都处理完成时,join() 阻塞将会解除
q.task_done()

以下三个方法,知道就好,一般不需要使用

# 查询当前队列的消息个数
q.qsize()

# 队列消息是否都被消费完,返回 True/False
q.empty()

# 检测队列里消息是否已满
q.full()

函数会比之前的多一些,同时也从另一方面说明了其功能更加丰富。

我来举个老师点名的例子。

# coding=utf-8
# /usr/bin/env python

'''
Author: wangbm
Email: wongbingming@163.com
Wechat: mrbensonwon
Blog: python-online.cn
公众号:Python编程时光
date: 2020/9/20 下午7:30
desc:
'''
__author__ = 'wangbm'
from queue import Queue
from threading import Thread
import time
class Student:
    def __init__(self, name):
        self.name = name

    def speak(self):
        print("{}:到!".format(self.name))

class Teacher:
    def __init__(self, queue):
        super().__init__()
        self.queue=queue

    def call(self, student_name):
        if student_name == "exit":
            print("点名结束,开始上课..")
        else:
            print("老师:{}来了没?".format(student_name))
            # 发送消息,要点谁的名
        self.queue.put(student_name)

class CallManager(Thread):
    def __init__(self, queue):
        super().__init__()
        self.students = {}
        self.queue = queue

    def put(self, student):
        self.students.setdefault(student.name, student)

    def run(self):
        while True:
            # 阻塞程序,时刻监听老师,接收消息
            student_name = queue.get()
            if student_name == "exit":
                break
            elif student_name in self.students:
                self.students[student_name].speak()
            else:
                print("老师,咱班,没有 {} 这个人".format(student_name))

queue = Queue()
teacher = Teacher(queue=queue)
s1 = Student(name="小明")
s2 = Student(name="小亮")
cm = CallManager(queue)
cm.put(s1)
cm.put(s2)
cm.start()
print('开始点名~')
teacher.call('小明')
time.sleep(1)
teacher.call('小亮')
time.sleep(1)
teacher.call("exit")

运行结果如下

开始点名~
老师:小明来了没?
小明:到!
老师:小亮来了没?
小亮:到!
点名结束,开始上课..

其实 queue 还有一个很重要的方法,Queue.task_done()

如果不明白它的原理,我们在写程序,就很有可能卡死。

当我们使用 Queue.get() 从队列取出数据后,这个数据有没有被正常消费,是很重要的。

如果数据没有被正常消费,那么Queue会认为这个任务还在执行中,此时你使用 Queue.join() 会一直阻塞,即使此时你的队列里已经没有消息了。

那么如何解决这种一直阻塞的问题呢?

就是在我们正常消费完数据后,记得调用一下 Queue.task_done(),说明队列这个任务已经结束了。

当队列内部的任务计数器归于零时,调用 Queue.join() 就不会再阻塞了。

要理解这个过程,请参考 http://python.iswbm.com/en/latest/c02/c02_06.html 里自定义线程池的的例子。

4 总结一下

学习了以上三种通信方法,我们很容易就能发现EventCondition 是threading模块原生提供的模块,原理简单,功能单一,它能发送 TrueFalse 的指令,所以只能适用于某些简单的场景中。

Queue则是比较高级的模块,它可能发送任何类型的消息,包括字符串、字典等。其内部实现其实也引用了Condition模块(譬如putget函数的阻塞),正是其对Condition进行了功能扩展,所以功能更加丰富,更能满足实际应用。

以上就是Python并发编程线程消息通信机制详解的详细内容,更多关于Python并发线程消息通信机制的资料请关注我们其它相关文章!

(0)

相关推荐

  • 详解Python并发编程之创建多线程的几种方法

    大家好,并发编程 今天开始进入第二篇. 今天的内容会比较基础,主要是为了让新手也能无障碍地阅读,所以还是要再巩固下基础.学完了基础,你们也就能很顺畅地跟着我的思路理解以后的文章. 本文目录 学会使用函数创建多线程 学会使用类创建多线程 多线程:必学函数讲解 经过总结,Python创建多线程主要有如下两种方法: 函数 类 接下来,我们就来揭开多线程的神秘面纱. . 学会使用函数创建多线程 在Python3中,Python提供了一个内置模块 threading.Thread,可以很方便地让我们创建多

  • 详解Python中的多线程编程

    一.简介 多线程编程技术可以实现代码并行性,优化处理能力,同时功能的更小划分可以使代码的可重用性更好.Python中threading和Queue模块可以用来实现多线程编程. 二.详解 1.线程和进程        进程(有时被称为重量级进程)是程序的一次执行.每个进程都有自己的地址空间.内存.数据栈以及其它记录其运行轨迹的辅助数据.操作系统管理在其上运行的所有进程,并为这些进程公平地分配时间.进程也可以通过fork和spawn操作来完成其它的任务,不过各个进程有自己的内存空间.数据栈等,所以只

  • python多线程编程方式分析示例详解

    在Python多线程中如何创建一个线程对象如果你要创建一个线程对象,很简单,只要你的类继承threading.Thread,然后在__init__里首先调用threading.Thread的__init__方法即可 复制代码 代码如下: import threading  class mythread(threading.Thread):  def __init__(self, threadname):  threading.Thread.__init__(self, name = thread

  • Python多线程编程(八):使用Event实现线程间通信

    使用threading.Event可以实现线程间相互通信,之前的Python:使用threading模块实现多线程编程七[使用Condition实现复杂同步]我们已经初步实现了线程间通信的基本功能,但是更为通用的一种做法是使用threading.Event对象.使用threading.Event可以使一个线程等待其他线程的通知,我们把这个Event传递到线程对象中,Event默认内置了一个标志,初始值为False.一旦该线程通过wait()方法进入等待状态,直到另一个线程调用该Event的set

  • Python并发编程线程消息通信机制详解

    目录 1 Event事件 2 Condition 3 Queue队列 4 总结一下 前面我已经向大家介绍了,如何使用创建线程,启动线程.相信大家都会有这样一个想法,线程无非就是创建一下,然后再start()下,实在是太简单了. 可是要知道,在真实的项目中,实际场景可要我们举的例子要复杂的多得多,不同线程的执行可能是有顺序的,或者说他们的执行是有条件的,是要受控制的.如果仅仅依靠前面学的那点浅薄的知识,是远远不够的. 那今天,我们就来探讨一下如何控制线程的触发执行. 要实现对多个线程进行控制,其实

  • Java多线程之并发编程的基石CAS机制详解

    目录 一.CAS机制简介 1.1.悲观锁和乐观锁更新数据方式 1.2.什么是CAS机制 1.3.CAS与sychronized比较 1.4.Java中都有哪些地方应用到了CAS机制呢? 1.5.CAS 实现自旋锁 1.6.CAS机制优缺点 1>ABA问题 2>可能会消耗较高的CPU 3>不能保证代码块的原子性 二.Java提供的CAS操作类--Unsafe类 2.1.Unsafe类简介 2.2.Unsafe类的使用 三.CAS使用场景 3.1.使用一个变量统计网站的访问量 3.2.现在我

  • java并发编程专题(三)----详解线程的同步

    有兴趣的朋友可以回顾一下前两篇 java并发编程专题(一)----线程基础知识 java并发编程专题(二)----如何创建并运行java线程 在现实开发中,我们或多或少的都经历过这样的情景:某一个变量被多个用户并发式的访问并修改,如何保证该变量在并发过程中对每一个用户的正确性呢?今天我们来聊聊线程同步的概念. 一般来说,程序并行化是为了获得更高的执行效率,但前提是,高效率不能以牺牲正确性为代价.如果程序并行化后, 连基本的执行结果的正确性都无法保证, 那么并行程序本身也就没有任何意义了.因此,

  • java并发编程专题(五)----详解(JUC)ReentrantLock

    上一节我们了解了Lock接口的一些简单的说明,知道Lock锁的常用形式,那么这节我们正式开始进入JUC锁(java.util.concurrent包下的锁,简称JUC锁).下面我们来看一下Lock最常用的实现类ReentrantLock. 1.ReentrantLock简介 由单词意思我们可以知道这是可重入的意思.那么可重入对于锁而言到底意味着什么呢?简单来说,它有一个与锁相关的获取计数器,如果拥有锁的某个线程再次得到锁,那么获取计数器就加1,然后锁需要被释放两次才能获得真正释放.这模仿了 sy

  • Java并发编程(CyclicBarrier)实例详解

    Java并发编程(CyclicBarrier)实例详解 前言: 使用JAVA编写并发程序的时候,我们需要仔细去思考一下并发流程的控制,如何让各个线程之间协作完成某项工作.有时候,我们启动N个线程去做一件事情,只有当这N个线程都达到某一个临界点的时候,我们才能继续下面的工作,就是说如果这N个线程中的某一个线程先到达预先定义好的临界点,它必须等待其他N-1线程也到达这个临界点,接下来的工作才能继续,只要这N个线程中有1个线程没有到达所谓的临界点,其他线程就算抢先到达了临界点,也只能等待,只有所有这N

  • Java并发编程之阻塞队列深入详解

    目录 1. 什么是阻塞队列 2. 阻塞队列的代码使用 3. 生产者消费者模型 (1)应用一:解耦合 (2)应用二:削峰填谷 (3)相关代码 4.阻塞队列和生产者消费者模型功能的实现 1. 什么是阻塞队列 阻塞队列是一种特殊的队列,和数据结构中普通的队列一样,也遵守先进先出的原则同时,阻塞队列是一种能保证线程安全的数据结构,并且具有以下两种特性:当队列满的时候,继续向队列中插入元素就会让队列阻塞,直到有其他线程从队列中取走元素:当队列为空的时候,继续出队列也会让队列阻塞,直到有其他线程往队列中插入

  • java并发编程StampedLock高性能读写锁详解

    目录 一.读写锁 二.悲观读锁 三.乐观读 一.读写锁 在我的<java并发编程>上一篇文章中为大家介绍了<ReentrantLock读写锁>,ReentrantReadWriteLock可以保证最多同时有一个线程在写数据,或者可以同时有多个线程读数据,但读写不能同时进行. 比如你正在做的是日志,有一个线程正在做写操作,但是在写日志的时候你可能需要把日志集中转移到集中管理日志服务,但是此时读线程不能读数据(因为无法获取读锁).面对这个需求,ReentrantReadWriteLoc

  • python分布式爬虫中消息队列知识点详解

    当排队等待人数过多的时候,我们需要设置一个等待区防止秩序混乱,同时再有新来的想要排队也可以呆在这个地方.那么在python分布式爬虫中,消息队列就相当于这样的一个区域,爬虫要进入这个区域找寻自己想要的资源,当然这个是一定的次序的,不然数据获取就会出现重复.就下来我们就python分布式爬虫中的消息队列进行详细解释,小伙伴们可以进一步了解一下. 实现分布式爬取的关键是消息队列,这个问题以消费端为视角更容易理解.你的爬虫程序部署到很多台机器上,那么他们怎么知道自己要爬什么呢?总要有一个地方存储了他们

  • Python字符串的创建和驻留机制详解

    目录 字符串 字符串驻留机制 字符串驻留机制优缺点 字符串 字符串在Python中是基本数据类型,是一个不可变的字符序列. 字符串驻留机制 仅保存一份相同且不可变字符串的方法,不同的值被存放在字符串的驻留池中,Python的驻留机制对相同的字符串只保留一份拷贝,后续创建相同字符串时,不会开辟新空间,而是把该字符串的地址赋给新创建的变量. 驻留机制的几种情况(交互模式windows+r,cmd) 1.字符串的长度为0或1时 2.符合标识符的字符串 3.字符串只在编译时进行驻留,而非运行时 b在运行

  • Android AIDL——进程通信机制详解

    Android  AIDL, Android进程机制通信机制,这里就整理下AIDL 的知识,帮助大家学习理解此部分知识! 什么是 AIDL AIDL 全称 Android Interface Definition Language,即 安卓接口描述语言.听起来很深奥,其实它的本质就是生成进程间通信接口的辅助工具.它的存在形式是一种 .aidl 文件,开发者需要做的就是在该文件中定义进程间通信的接口,编译的时候 IDE 就会根据我们的 .aidl 接口文件生成可供项目使用的 .java 文件,这和

随机推荐