python使用threading.Condition交替打印两个字符

Python中使用threading.Condition交替打印两个字符的程序。

这个程序涉及到两个线程的的协调问题,两个线程为了能够相互协调运行,必须持有一个共同的状态,通过这个状态来维护两个线程的执行,通过使用threading.Condition对象就能够完成两个线程之间的这种协调工作。

threading.Condition默认情况下会通过持有一个ReentrantLock来协调线程之间的工作,所谓可重入锁,是只一个可以由一个线程递归获取的锁,此锁对象会维护当前锁的所有者(线程)和当前所有者递归获取锁的次数(本文在逻辑上和可重入锁没有任何关系,完全可以用一个普通锁替代)。

Python文档中给出的描述是:它是一个与某个锁相联系的变量。同时它实现了上下文管理协议。其对象中除了acquire和release方法之外,其它方法的调用的前提是,当前线程必须是这个锁的所有者。

通过代码和其中的注释,能够非常明白地弄清楚Condition的原理是怎样的:

import threading
import time
import functools

def worker(cond, name):
 """worker running in different thread"""
 with cond: # 通过__enter__方法,获取cond对象中的锁,默认是一个ReentrantLock对象
  print('...{}-{}-{}'.format(name, threading.current_thread().getName(), cond._is_owned()))
  cond.wait() # 创建一个新的锁NEWLOCK,调用acquire将NEWLOCK获取,然后将NEWLOCK放入等待列表中,\
  # 释放cond._lock锁(_release_save),最后再次调用acquire让NEWLOCK阻塞
 print('wait returned in {}'.format(name))

if __name__ == '__main__':
 condition = threading.Condition()
 t1 = threading.Thread(target=functools.partial(worker, condition, 't1'))
 t2 = threading.Thread(target=functools.partial(worker, condition, 't2'))

 t2.start() # 启动线程2
 t1.start() # 启动线程1

 time.sleep(2)
 with condition:
  condition.notify(1) # 按照FIFO顺序(wait调用顺序),释放一个锁,并将其从等待列表中删除

 time.sleep(2)

 with condition:
  condition.notify(1) # 按照FIFO顺序(wait调用顺序),释放另一个锁,并将其从等待队列中删除

 t1.join() # 主线程等待子线程结束
 t2.join() # 主线程等待子线程结束

 print('All done')

其输出为:

...t2-Thread-2-True
...t1-Thread-1-True
wait returned in t2
wait returned in t1
All done

其中wait方法要求获取到threading.Condition对象中的锁(如果没有提供,默认使用一个可重入锁),然后自己创建一个新的普通锁(NEWLOCK),并获取这个NEWLOCK;之后调用_release_save方法释放threading.Condition对象中的锁,让其它线程能够获取到;最后再次调用NEWLOCK上的acquire方法,由于在创建时已经acquire过,所以此线程会阻塞在此。而wait想要继续执行,必须等待其它线程将产生阻塞的这个NEWLOCK给release掉,当然,这就是notify方法的责任了。

notify方法接收一个数字n,从等待列表中取出相应数量的等待对象(让wait方法阻塞的锁对象),调用其release方法,让对应的wait方法能够返回。而notify_all方法仅仅就是将n设置为等待列表的总长度而已。

在理解了threading.Condition对象中wait和notify的工作原理之后,我们就可以利用它们来实现两个线程交替打印字符的功能了:

import threading
import functools
import time

def print_a(state):
 while True:
  if state.closed:
   print('Close a')
   return
  print('A')
  time.sleep(2)
  state.set_current_is_a(True)
  state.wait_for_b()

def print_b(state):
 while True:
  if state.closed:
   print('Close b')
   return
  state.wait_for_a()
  print('B')
  time.sleep(2)
  state.set_current_is_a(False)

if __name__ == '__main__':
 class State(object):
  """state used to coordinate multiple(two here) threads"""
  def __init__(self):
   self.condition = threading.Condition()
   self.current_is_a = False
   self.closed = False

  def wait_for_a(self):
   with self.condition:
    while not self.current_is_a:
     self.condition.wait()

  def wait_for_b(self):
   with self.condition:
    while self.current_is_a:
     self.condition.wait()

  def set_current_is_a(self, flag):
   self.current_is_a = flag
   with self.condition:
    self.condition.notify_all()

 state = State()
 t1 = threading.Thread(target=functools.partial(print_a, state))
 t2 = threading.Thread(target=functools.partial(print_b, state))

 try:
  t1.start()
  t2.start()

  t1.join()
  t2.join()
 except KeyboardInterrupt:
  state.closed = True
  print('Closed')

可以看到有两种类型的任务,一个用于打印字符A,一个用于打印字符B,我们的实现种让A先于B打印,所以在print_a中,先打印A,再设置当前字符状态并释放等待列表中的所有锁(set_current_is_a),如果没有这一步,current_is_a将一直是False,wait_for_b能够返回,而wait_for_a却永远不会返回,最终效果就是每隔两秒就打印一个字符A,而B永远不会打印。另一个副作用是如果wait_for_a永远不会返回,那print_b所在线程的关闭逻辑也就无法执行,最终会成为僵尸线程(这里的关闭逻辑只用作示例,生产环境需要更加完善的关闭机制)。

考虑另一种情况,print_a种将set_current_is_a和wait_for_b交换一下位置会怎么样。从观察到的输出我们看到,程序首先输出了一个字符A,以后,每隔2秒钟,就会同时输出A和B,而不是交替输出。原因在于,由于current_is_a还是False,我们先调用的wait_for_b其会立即返回,之后调用set_current_is_a,将current_is_a设置为True,并释放所有的阻塞wait的锁(notify_all),这个过程中没有阻塞,print_a紧接着进入了下一个打印循环;与此同时,print_b中的wait_for_a也返回了,进入到B的打印循环,故最终我们看到A和B总是一起打印。

可见对于threading.Condition的使用需要多加小心,要注意逻辑上的严谨性。

附一个队列版本:

import threading
import functools
import time
from queue import Queue

def print_a(q_a, q_b):
 while True:
  char_a = q_a.get()
  if char_a == 'closed':
   return
  print(char_a)
  time.sleep(2)
  q_b.put('B')

def print_b(q_a, q_b):
 while True:
  char_b = q_b.get()
  if char_b == 'closed':
   return
  print(char_b)
  time.sleep(2)
  q_a.put('A')

if __name__ == '__main__':
 q_a = Queue()
 q_b = Queue()

 t1 = threading.Thread(target=functools.partial(print_a, q_a, q_b))
 t2 = threading.Thread(target=functools.partial(print_b, q_a, q_b))

 try:
  t1.start()
  t2.start()

  q_a.put('A')

  t1.join()
  t2.join()
 except KeyboardInterrupt:
  q_a.put('closed')
  q_b.put('closed')

 print('Done')

队列版本逻辑更清晰,更不容易出错,实际应用中应该选用队列。

附一个协程版本(Python 3.5+):

import time
import asyncio

async def print_a():
 while True:
  print('a')
  time.sleep(2) # simulate the CPU block time
  await asyncio.sleep(0) # release control to event loop

async def print_b():
 while True:
  print('b')
  time.sleep(2) # simulate the CPU block time
  await asyncio.sleep(0) # release control to event loop

async def main():
 await asyncio.wait([print_a(), print_b()])

if __name__ == '__main__':
 loop = asyncio.get_event_loop()
 loop.run_until_complete(main())

协程的运行需要依附于一个事件循环(select/poll/epoll/kqueue),通过async def将一个函数定义为协程,通过await主动让渡控制权,通过相互让渡控制权完成交替打印字符。整个程序运行于一个线程中,这样就没有线程间协调的工作,仅仅是控制权的让渡逻辑。对于IO密集型操作,而没有明显的CPU阻塞(计算复杂,以致出现明显的延时,比如复杂加解密算法)的情况下非常合适。

附一个Java版本:

PrintMain类,用于管理和协调打印A和打印B的两个线程:

package com.cuttyfox.tests.self.version1;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class PrintMain {
 private boolean currentIsA = false;

 public synchronized void waitingForPrintingA() throws InterruptedException {
  while (this.currentIsA == false) {
   wait();
  }
 }

 public synchronized void waitingForPrintingB() throws InterruptedException {
  while (this.currentIsA == true) {
   wait();
  }
 }

 public synchronized void setCurrentIsA(boolean flag) {
  this.currentIsA = flag;
  notifyAll();
 }

 public static void main(String[] args) throws Exception {
  PrintMain state = new PrintMain();
  ExecutorService executorService = Executors.newCachedThreadPool();
  executorService.execute(new PrintB(state));
  executorService.execute(new PrintA(state));
  executorService.shutdown();
  executorService.awaitTermination(10, TimeUnit.SECONDS);
  System.out.println("Done");
  System.exit(0);
 }
}

打印A的线程(首先打印A):

package com.cuttyfox.tests.self.version1;

import java.util.concurrent.TimeUnit;

public class PrintA implements Runnable{
 private PrintMain state;

 public PrintA(PrintMain state) {
  this.state = state;
 }

 public void run() {
  try {
   while (!Thread.interrupted()){
    System.out.println("Print A");
    TimeUnit.SECONDS.sleep(1);
    this.state.setCurrentIsA(true);
    this.state.waitingForPrintingB();
   }
  } catch (InterruptedException e) {
   System.out.println("Exit through Interrupting.");
  }

 }
}

打印B的线程:

package com.cuttyfox.tests.self.version1;

import java.util.concurrent.TimeUnit;

public class PrintB implements Runnable{
 private PrintMain state;

 public PrintB(PrintMain state) {
  this.state = state;
 }

 public void run() {
  try{
   while (!Thread.interrupted()) {
    this.state.waitingForPrintingA();
    System.out.println("Print B");
    TimeUnit.SECONDS.sleep(1);
    this.state.setCurrentIsA(false);
   }
  } catch (InterruptedException e) {
   System.out.println("Exit through Interrupting.");
  }

 }
}

Java对象本身有对象锁,故这里没有像Python中那样需要显式通过创建一个Condition对象来得到一把锁。

使用Python实现交替打印abcdef的过程:

import threading
import time
import functools
from collections import deque

 LETTERS = [chr(code) for code in range(97, 97+6)]
 LENGTH = len(LETTERS)

 class State(object):
  def __init__(self):
   self.condition = threading.Condition()
   self.index_value = 0

  def set_next_index(self, index):
   with self.condition:
    self.index_value = index
    self.condition.notify_all()

  def wait_for(self, index_value):
   with self.condition:
    while not self.index_value == index_value:
     self.condition.wait()

 def print_letter(state: State, wait_ident: int):
  print('Got: {}!'.format(wait_ident))
  while True:
   state.wait_for(wait_ident)
   time.sleep(2)
   print(LETTERS[state.index_value])
   print('PRINT: {} AND SET NEXT: {}'.format(state.index_value,
              (state.index_value + 1) % LENGTH
              ))
   state.set_next_index((state.index_value + 1) % LENGTH)

 state = State()
 d = deque()
 d.extend(range(LENGTH))
 d.rotate(1)
 print(d)

 threads = []
 for wait_ident in d:
  t = threading.Thread(target=functools.partial(print_letter, state, wait_ident))
  threads.append(t)

 for thread in threads:
  thread.start()

 for thread in threads:
  thread.join()

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持我们。

(0)

相关推荐

  • Python 打印中文字符的三种方法

    方法一: 现在用 notepad++,在 UTF-8 格式下编写以下语句: #coding=utf-8 print"打印中文字符" 方法二: 用encode和decode 如: import os.path import xlrd,sys Filename='/home/tom/Desktop/1234.xls' if not os.path.isfile(Filename): raise NameError,"%s is not a valid filename"

  • python使用正则表达式匹配字符串开头并打印示例

    本文实例讲述了python使用正则表达式匹配字符串开头并打印的方法.分享给大家供大家参考,具体如下: import re s="name=z1hangshan username=fff url=www.baidu.com password=ddd256" s2="username=fff name=z1hangshan url=www.baidu.com password=ddd256" #p=re.compile(r'((?:\s)name=(\S)+)') p=

  • python使用threading.Condition交替打印两个字符

    Python中使用threading.Condition交替打印两个字符的程序. 这个程序涉及到两个线程的的协调问题,两个线程为了能够相互协调运行,必须持有一个共同的状态,通过这个状态来维护两个线程的执行,通过使用threading.Condition对象就能够完成两个线程之间的这种协调工作. threading.Condition默认情况下会通过持有一个ReentrantLock来协调线程之间的工作,所谓可重入锁,是只一个可以由一个线程递归获取的锁,此锁对象会维护当前锁的所有者(线程)和当前所

  • java实现多线程交替打印两个数

    本文实例为大家分享了java实现多线程交替打印两个数的具体代码,供大家参考,具体内容如下 方法1.使用wait和notify package com.thread; public class T01 { public static void main(String[] args) { char[] char1 = "AAAAAA".toCharArray(); char[] char2 = "BBBBBB".toCharArray(); Object object

  • java实现两个线程交替打印的实例代码

    使用ReentrantLock实现两个线程交替打印 实现字母在前数字在后 package com.study.pattern; import java.util.concurrent.CountDownLatch; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public c

  • Java 用两个线程交替打印数字和字母

    前一段时间听马士兵老师讲课,讲到某公司的一个面试,两个线程,其中一个线程输出ABC,另一个线程输出123,如何控制两个线程交叉输出1A2B3C,由于本人多线程掌握的一直不是很好,所以听完这道题,个人感觉收获良多,这是一个学习笔记.这道题有多种解法,不过有些属于纯炫技,所以只记录常见的三种解法.首先看第一种 1. park 和 unpark package cn.bridgeli.demo;   import com.google.common.collect.Lists;   import ja

  • go语言实现两个协程交替打印

    目录 方法一:使用两个channel 方法二 :使用一个channel 方法一:使用两个channel 这里channel CA 必须要有缓冲区,否则最后会报错 fatal error: all goroutines are asleep - deadlock! 这是因为无缓冲的通道只有在有接收方能够接收值的时候才能发送成功,否则会一直处于等待发送的阶段.因为最后交替运行完后没有协程可以接收CA通道中的数据,所以会一直阻塞发生死锁 package main import (     "fmt&q

  • C++详解如何实现两个线程交替打印

    C++线程库,点击此处查看文档 首先简单搭一个框架,让两个线程先尝试实现交替打印. //实现两个线程交替打印 #include <iostream> #include <thread> using namespace std; int main(void) { int n = 100; int i = 0; //创建两个线程 thread t1([&n, &i](){ while (i < n) { cout << i << "

  • Python线程协作threading.Condition实现过程解析

    领会下面这个示例吧,其实跟java中wait/nofity是一样一样的道理 import threading # 条件变量,用于复杂的线程间同步锁 """ 需求: 男:小姐姐,你好呀! 女:哼,想泡老娘不成? 男:对呀,想泡你 女:滚蛋,门都没有! 男:切,长这么丑, 还这么吊... 女:关你鸟事! """ class Boy(threading.Thread): def __init__(self, name, condition): supe

  • 举例详解Python中threading模块的几个常用方法

    threading.Thread Thread 是threading模块中最重要的类之一,可以使用它来创建线程.有两种方式来创建线程:一种是通过继承Thread类,重写它的run方法:另一种是创建一个threading.Thread对象,在它的初始化函数(__init__)中将可调用对象作为参数传入.下面分别举例说明.先来看看通过继承threading.Thread类来创建线程的例子: #coding=gbk import threading, time, random count = 0 cl

  • python 多线程threading程序详情

    CPython implementation detail: 在 CPython 中,由于存在全局解释器锁, 同一时刻只有一个线程可以执行 Python 代码(虽然某些性能导向的库可能会去除此限制). 如果你想让你的应用更好地利用多核心计算机的计算资源,推荐你使用multiprocessing或concurrent.futures.ProcessPoolExecutor但是,如果你想要同时运行多个 I/O 密集型任务,则多线程仍然是一个合适的模型. 再来引入一个概念: 并行(parallelis

  • python中threading超线程用法实例分析

    本文实例讲述了python中threading超线程用法.分享给大家供大家参考.具体分析如下: threading基于Java的线程模型设计.锁(Lock)和条件变量(Condition)在Java中是对象的基本行为(每一个对象都自带了锁和条件变量),而在Python中则是独立的对象.Python Thread提供了Java Thread的行为的子集:没有优先级.线程组,线程也不能被停止.暂停.恢复.中断.Java Thread中的部分被Python实现了的静态方法在threading中以模块方

随机推荐