go语言同步教程之条件变量

Go的标准库中有一个类型叫条件变量:sync.Cond。这种类型与互斥锁和读写锁不同,它不是开箱即用的,它需要与互斥锁组合使用:

// NewCond returns a new Cond with Locker l.
func NewCond(l Locker) *Cond {
 return &Cond{L: l}
}

// A Locker represents an object that can be locked and unlocked.
type Locker interface {
 Lock()
 Unlock()
}

通过使用 NewCond 函数可以返回 *sync.Cond 类型的结果, *sync.Cond 我们主要操作其三个方法,分别是:

wait():等待通知

Signal():单播通知

Broadcast():广播通知

具体的函数说明如下:

// Wait atomically unlocks c.L and suspends execution
// of the calling goroutine. After later resuming execution,
// Wait locks c.L before returning. Unlike in other systems,
// Wait cannot return unless awoken by Broadcast or Signal.
//
// Because c.L is not locked when Wait first resumes, the caller
// typically cannot assume that the condition is true when
// Wait returns. Instead, the caller should Wait in a loop:
//
// c.L.Lock()
// for !condition() {
//  c.Wait()
// }
// ... make use of condition ...
// c.L.Unlock()
//
func (c *Cond) Wait() {
 c.checker.check()
 t := runtime_notifyListAdd(&c.notify)
 c.L.Unlock()
 runtime_notifyListWait(&c.notify, t)
 c.L.Lock()
}

// Signal wakes one goroutine waiting on c, if there is any.
//
// It is allowed but not required for the caller to hold c.L
// during the call.
func (c *Cond) Signal() {
 c.checker.check()
 runtime_notifyListNotifyOne(&c.notify)
}

// Broadcast wakes all goroutines waiting on c.
//
// It is allowed but not required for the caller to hold c.L
// during the call.
func (c *Cond) Broadcast() {
 c.checker.check()
 runtime_notifyListNotifyAll(&c.notify)
}

条件变量sync.Cond本质上是一些正在等待某个条件的线程的同步机制。

sync.Cond 主要实现一个条件变量,假如 goroutine A 执行前需要等待另外的goroutine B 的通知,那边处于等待的goroutine A 会保存在一个通知列表,也就是说需要某种变量状态的goroutine A 将会等待/Wait在那里,当某个时刻状态改变时负责通知的goroutine B 通过对条件变量通知的方式(Broadcast,Signal)来通知处于等待条件变量的goroutine A, 这样便可首先一种“消息通知”的同步机制。

以go的http处理为例,在Go的源码中http模块server部分源码中所示,当需要处理一个新的连接的时候,若连接conn是实现自*tls.Conn的情况下,会进行相关的客户端与服务端的“握手”处理Handshake(), 入口代码如下:

if tlsConn, ok := c.rwc.(*tls.Conn); ok {
  if d := c.server.ReadTimeout; d != 0 {
   c.rwc.SetReadDeadline(time.Now().Add(d))
  }
  if d := c.server.WriteTimeout; d != 0 {
   c.rwc.SetWriteDeadline(time.Now().Add(d))
  }
  if err := tlsConn.Handshake(); err != nil {
   c.server.logf("http: TLS handshake error from %s: %v", c.rwc.RemoteAddr(), err)
   return
  }
  c.tlsState = new(tls.ConnectionState)
  *c.tlsState = tlsConn.ConnectionState()
  if proto := c.tlsState.NegotiatedProtocol; validNPN(proto) {
   if fn := c.server.TLSNextProto[proto]; fn != nil {
    h := initNPNRequest{tlsConn, serverHandler{c.server}}
    fn(c.server, tlsConn, h)
   }
   return
  }
 }

其中的Handshake函数代码通过使用条件变量的方式来处理新连接握手调用的同步问题:

func (c *Conn) Handshake() error {
 c.handshakeMutex.Lock()
 defer c.handshakeMutex.Unlock()

 for {
  if err := c.handshakeErr; err != nil {
   return err
  }
  if c.handshakeComplete {
   return nil
  }
  if c.handshakeCond == nil {
   break
  }

  c.handshakeCond.Wait()
 }

 c.handshakeCond = sync.NewCond(&c.handshakeMutex)
 c.handshakeMutex.Unlock()

 c.in.Lock()
 defer c.in.Unlock()

 c.handshakeMutex.Lock()

 if c.handshakeErr != nil || c.handshakeComplete {
  panic("handshake should not have been able to complete after handshakeCond was set")
 }

 if c.isClient {
  c.handshakeErr = c.clientHandshake()
 } else {
  c.handshakeErr = c.serverHandshake()
 }
 if c.handshakeErr == nil {
  c.handshakes++
 } else {
  c.flush()
 }

 if c.handshakeErr == nil && !c.handshakeComplete {
  panic("handshake should have had a result.")
 }

 c.handshakeCond.Broadcast()
 c.handshakeCond = nil

 return c.hand

我们也可以再通过一个例子熟悉sync.Cond的使用:

我们尝试实现一个读写同步的例子,需求是:我们有数个读取器和数个写入器,读取器必须依赖写入器对缓存区进行数据写入后,才可从缓存区中对数据进行读出。我们思考下,要实现类似的功能,除了使用channel,还能如何做?

写入器每次完成写入数据后,它都需要某种通知机制广播给处于阻塞状态的读取器,告诉它们可以对数据进行访问,这其实跟sync.Cond 的 广播机制是不是很像? 有了这个广播机制,我们可以通过sync.Cond来实现这个例子了:

package main

import (
 "bytes"
 "fmt"
 "io"
 "sync"
 "time"
)

type MyDataBucket struct {
 br  *bytes.Buffer
 gmutex *sync.RWMutex
 rcond *sync.Cond //读操作需要用到的条件变量
}

func NewDataBucket() *MyDataBucket {
 buf := make([]byte, 0)
 db := &MyDataBucket{
  br:  bytes.NewBuffer(buf),
  gmutex: new(sync.RWMutex),
 }
 db.rcond = sync.NewCond(db.gmutex.RLocker())
 return db
}

func (db *MyDataBucket) Read(i int) {
 db.gmutex.RLock()
 defer db.gmutex.RUnlock()
 var data []byte
 var d byte
 var err error
 for {
  //读取一个字节
  if d, err = db.br.ReadByte(); err != nil {
   if err == io.EOF {
    if string(data) != "" {
     fmt.Printf("reader-%d: %s\n", i, data)
    }
    db.rcond.Wait()
    data = data[:0]
    continue
   }
  }
  data = append(data, d)
 }
}

func (db *MyDataBucket) Put(d []byte) (int, error) {
 db.gmutex.Lock()
 defer db.gmutex.Unlock()
 //写入一个数据块
 n, err := db.br.Write(d)
 db.rcond.Broadcast()
 return n, err
}

func main() {
 db := NewDataBucket()

 go db.Read(1)

 go db.Read(2)

 for i := 0; i < 10; i++ {
  go func(i int) {
   d := fmt.Sprintf("data-%d", i)
   db.Put([]byte(d))
  }(i)
  time.Sleep(100 * time.Millisecond)
 }
}

当使用sync.Cond的时候有两点移动要注意的:

  • 一定要在调用cond.Wait方法前,锁定与之关联的读写锁
  • 一定不要忘记在cond.Wait后,若数据已经处理完毕,在返回前要对与之关联的读写锁进行解锁。

如下面 Wait() 的源码所示,Cond.Wait会自动释放锁等待信号的到来,当信号到来后,第一个获取到信号的Wait将继续往下执行并从新上锁

func (c *Cond) Wait() {
 c.checker.check()
 t := runtime_notifyListAdd(&c.notify)
 c.L.Unlock()
 runtime_notifyListWait(&c.notify, t)
 c.L.Lock()
}

如果不释放锁, 其它收到信号的gouroutine将阻塞无法继续执行。

总结

以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作具有一定的参考学习价值,如果有疑问大家可以留言交流,谢谢大家对我们的支持。

(0)

相关推荐

  • Go语言同步与异步执行多个任务封装详解(Runner和RunnerAsync)

    前言 同步适合多个连续执行的,每一步的执行依赖于上一步操作,异步执行则和任务执行顺序无关(如从10个站点抓取数据) 同步执行类RunnerAsync 支持返回超时检测,系统中断检测 错误常量定义 //超时错误 var ErrTimeout = errors.New("received timeout") //操作系统系统中断错误 var ErrInterrupt = errors.New("received interrupt") 实现代码如下 package ta

  • go语言同步教程之条件变量

    Go的标准库中有一个类型叫条件变量:sync.Cond.这种类型与互斥锁和读写锁不同,它不是开箱即用的,它需要与互斥锁组合使用: // NewCond returns a new Cond with Locker l. func NewCond(l Locker) *Cond { return &Cond{L: l} } // A Locker represents an object that can be locked and unlocked. type Locker interface

  • 易语言基础教程之条件选择结构

    语言结构(顺序结构.条件选择结构和循环结构)是面对过程化编程语言的核心,然而由于这三种编程结构很符合人类思维,因此在面向对象编程的易语言中也得到了充分的支持和应用.可以说,任何功能的实现,都离不开这三种编程结构.下面就一起来学习一下语言结构之条件选择结构的具体使用方法. 方法/步骤 首先我们先来了解一下易语言中条件选择结构的两种形式.为了说明问题,小编制作了如图所示的对比图. 据对比图,我们可以得到两条规律: 1."如果"条件语句只有条件判断为真时执行紧接着的下一条语句. 2.&quo

  • 浅谈互斥锁为什么还要和条件变量配合使用

    mutex体现的是一种竞争,我离开了,通知你进来. cond体现的是一种协作,我准备好了,通知你开始吧. 互斥锁一个明显的缺点是它只有两种状态:锁定和非锁定.而条件变量通过允许线程阻塞和等待另一个线程发送信号的方法弥补了互斥锁的不足,它常和互斥锁一起配合使用.使用时,条件变量被用来阻塞一个线程,当条件不满足时,线程往往解开相应的互斥锁并等待条件发生变化.一旦其他的某个线程改变了条件变量,他将通知相应的条件变量唤醒一个或多个正被此条件变量阻塞的线程.这些线程将重新锁定互斥锁并重新测试条件是否满足.

  • 详解C++11中的线程锁和条件变量

    线程 std::thread类, 位于<thread>头文件,实现了线程操作.std::thread可以和普通函数和 lambda 表达式搭配使用.它还允许向线程的执行函数传递任意多参数. #include <thread> void func() { // do some work } int main() { std::thread t(func); t.join(); return 0; } 上面的例子中,t是一个线程实例,函数func()在该线程运行.调用join()函数是

  • C++多线程中的锁和条件变量使用教程

    在做多线程编程时,有两个场景我们都会遇到: 多线程访问共享资源,需要用到锁: 多线程间的状态同步,这个可用的机制很多,条件变量是广泛使用的一种. 今天我用一个简单的例子来给大家介绍下锁和条件变量的使用. 代码使用C++11 示例代码 #include <iostream> #include <mutex> #include <thread> #include <condition_variable> std::mutex g_mutex; // 用到的全局锁

  • Go语言入门教程之基础语法快速入门

    Go语言是一个开源的,为创建简单的,快速的,可靠的软件而设计的语言. Go语言实(示)例教程,通过过实例加注释的方式来介绍Go语言的用法. Hello World 第一个程序会输出"hello world"消息.源代码如下: 复制代码 代码如下: package main import "fmt" func main() {     fmt.Println("hello world") } //通过go run来运行Go程序 $ go run h

  • python条件变量之生产者与消费者操作实例分析

    本文实例讲述了python条件变量之生产者与消费者操作.分享给大家供大家参考,具体如下: 互斥锁是最简单的线程同步机制,面对复杂线程同步问题,Python还提供了Condition对象.Condition被称为条件变量,除了提供与Lock类似的acquire和release方法外,还提供了wait和notify方法.线程首先acquire一个条件变量,然后判断一些条件.如果条件不满足则wait:如果条件满足,进行一些处理改变条件后,通过notify方法通知其他线程,其他处于wait状态的线程接到

  • C语言进阶教程之循环语句缺陷详析

    目录 前言 1 循环语句的三要素 2 使用不同循环语句实现六种排列组合 2.1 第一种排列(ABC) 2.2 第二种排列(ACB) 2.3 第三种排列(BCA) 2.4 第四种排列(CBA) 2.5 第五种排列(BAC) 2.6 第六种排列(CAB) 3 什么时候用for循环语句 4 什么时候用while循环语句 5 什么时候用do-while循环语句 6 其他情况 7 总结 前言 你是否也有过下面的体会? 为什么刚开始学习C语言时很喜欢用for循环语句,但逐渐发现有经验的工程师都在用while

  • C语言进阶教程之字符串&内存函数

    目录 前言: 一.求字符串长度 strlen strlen函数的模拟实现 二.长度不受限制的字符串函数 strcpy strcpy函数的模拟实现 strcat strcat函数的模拟实现 strcmp strcmp函数的模拟实现 三.长度受限制的字符串函数 strncpy strncpy函数的模拟实现 strncat strncat函数的模拟实现 strncmp strncmp函数的模拟实现 四.字符串查找 strstr strstr函数的模拟实现 strtok strtok函数的模拟实现 五.

  • C语言进阶教程之字符函数&字符串函数

    目录 1.strlen 1.1.三种模拟实现 2.长度不受限制的字符串函数 2.1.strcpy 2.1.1.模拟实现 2.2.strcat 2.2.1.模拟实现 2.3.strcmp 2.3.1.模拟实现 3.长度受限制的字符串函数 3.1.strncpy 3.1.1.模拟实现 3.2.strncat 3.2.1.模拟实现 3.3.strncmp 3.3.1.模拟实现 4.字符串查找 4.1.strstr 4.1.1.模拟实现 4.2.strtok 5.错误信息报告 5.1.strerror

随机推荐