Golang信号量设计实现示例详解

目录
  • 开篇
  • 信号量
  • semaphore 扩展库实现
    • Acquire
    • Release
    • TryAcquire
  • 总结

开篇

在我们此前的文章 Golang Mutex 原理解析 中曾提到过,Mutex 的底层结构包含了两个字段,state 和 sema:

type Mutex struct {
    state int32
    sema  uint32
}
  • state 代表互斥锁的状态,比如是否被锁定;
  • sema 表示信号量,协程阻塞会等待该信号量,解锁的协程释放信号量从而唤醒等待信号量的协程。

这个 sema 就是 semaphore 信号量的意思。Golang 协程之间的抢锁,实际上争抢给Locked赋值的权利,能给 Locked 置为1,就说明抢锁成功。抢不到就阻塞等待 sema 信号量,一旦持有锁的协程解锁,那么等待的协程会依次被唤醒。

有意思的是,虽然 semaphore 在锁的实现中起到了至关重要的作用,Golang 对信号量的实现却是隐藏在 runtime 中,并没有包含到标准库里来,在 src 源码中我们可以看到底层依赖的信号量相关函数。

// defined in package runtime
// Semacquire waits until *s > 0 and then atomically decrements it.
// It is intended as a simple sleep primitive for use by the synchronization
// library and should not be used directly.
func runtime_Semacquire(s *uint32)
// Semrelease atomically increments *s and notifies a waiting goroutine
// if one is blocked in Semacquire.
// It is intended as a simple wakeup primitive for use by the synchronization
// library and should not be used directly.
// If handoff is true, pass count directly to the first waiter.
// skipframes is the number of frames to omit during tracing, counting from
// runtime_Semrelease's caller.
func runtime_Semrelease(s *uint32, handoff bool, skipframes int)
  • runtime_Semacquire:阻塞等待直到 s 大于 0,然后立刻将 s 减去 1【原子操作】;
  • runtime_Semrelease:将 s 增加 1,然后通知一个阻塞在 runtime_Semacquire 的 goroutine【原子操作】。

两个原子操作,一个 acquire,一个 release,其实就代表了对资源的获取和释放。Mutex 作为 sync 包的核心,支撑了 RWMutex,channel,singleflight 等多个并发控制的能力,而对信号量的管理又是 Mutex 的基础。

虽然源码看不到,但 Golang 其实在扩展库 golang.org/x/sync/semaphore 也提供了一套信号量的实现,我们可以由此来参考一下,理解 semaphore 的实现思路。

信号量

在看源码之前,我们先理清楚【信号量】设计背后的场景和原理。

信号量的概念是荷兰计算机科学家 Edsger Dijkstra 在 1963 年左右提出来的,广泛应用在不同的操作系统中。在系统中,会给每一个进程一个信号量,代表每个进程目前的状态。未得到控制权的进程,会在特定的地方被迫停下来,等待可以继续进行的信号到来。

在 Mutex 依赖的信号量机制中我们可以看到,这里本质就是依赖 sema 一个 uint32 的变量 + 原子操作来实现并发控制能力。当 goroutine 完成对信号量等待时,该变量 -1,当 goroutine 完成对信号量的释放时,该变量 +1。

如果一个新的 goroutine 发现信号量不大于 0,说明资源暂时没有,就得阻塞等待。直到信号量 > 0,此时的语义是有新的资源,该goroutine就会结束等待,完成对信号量的 -1 并返回。注意我们上面有提到,runtime 支持的两个方法都是原子性的,不用担心两个同时在等待的 goroutine 同时抢占同一份资源。

典型的信号量场景是【图书馆借书】。假设学校图书馆某热门书籍现在只有 100 本存货,但是上万学生都想借阅,怎么办?

直接买一万本书是非常简单粗暴的解法,但资源有限,这不是长久之计。

常见的解决方案很简单:学生们先登记,一个一个来。我们先给 100 个同学发出,剩下的你们继续等,等到什么时候借书的同学看完了,把书还回来了,就给排队等待的同学们发放。同时,为了避免超发,每发一个,都需要在维护的记录里将【余量】减去 1,每还回来一个,就把【余量】加上 1。

runtime_Semacquire 就是排队等待借书,runtime_Semrelease 就是看完了把书归还给图书馆。

另外需要注意,虽然我们上面举例的增加/减小的粒度都是 1,但这本质上只是一种场景,事实上就算是图书馆借书,也完全有可能出现一个人同时借了两本一模一样的书。所以,信号量的设计需要支持 N 个资源的获取和释放。

所以,我们对于 acquire 和 release 两种操作的语义如下:

  • release: 将信号量增加 n【保证原子性】;
  • acquire: 若信号量 < n,阻塞等待,直到信号量 >= n,此时将信号量的值减去 n【保证原子性】。

semaphore 扩展库实现

这里我们结合golang.org/x/sync/semaphore 源码来看看怎样设计出来我们上面提到的信号量结构。

// NewWeighted creates a new weighted semaphore with the given
// maximum combined weight for concurrent access.
func NewWeighted(n int64) *Weighted {
	w := &Weighted{size: n}
	return w
}
// Weighted provides a way to bound concurrent access to a resource.
// The callers can request access with a given weight.
type Weighted struct {
	size    int64       // 最大资源数
	cur     int64       // 当前已被使用的资源
	mu      sync.Mutex
	waiters list.List   // 等待队列
}

有意思的是,虽然包名是 semaphore,但是扩展库里真正给【信号量结构体】定义的名称是 Weighted。从上面的定义我们可以看到,传入初始资源个数 n(对应 size),就可以生成一个 Weighted 信号量结构。

Weighted 提供了三个方法来实现对信号量机制的支持:

  • Acquire

对应上面我们提到的 acquire 语义,注意我们提到过,抽象的来讲,acquire 成功与否其实不太看返回值,而是只要获取不了就一直阻塞,如果返回了,就意味着获取到了。

但在 Golang 实现当中,我们肯定不希望,如果发生了异常 case,导致一直阻塞在这里。所以你可以看到 Acquire 的入参里有个 context.Context,借用 context 的上下文控制能力,你可以对此进行 cancel, 可以设置 timeout 等待超时,就能对 acquire 行为进行更多约束。

所以,acquire 之后我们仍然需要检查返回值 error,如果为 nil,代表正常获取了资源。否则可能是 context 已经不合法了。

  • Release

跟上面提到的 release 语义完全一致,传入你要释放的资源数 n,保证原子性地增加信号量。

  • TryAcquire

这里其实跟 sync 包中的各类 TryXXX 函数定位很像。并发的机制中大都包含 fast path 和 slow path,比如首个 goroutine 先来 acquire,那么一定是能拿到的,后续再来请求的 goroutine 由于慢了一步,只能走 slow path 进行等待,自旋等操作。sync 包中绝大部分精华,都在于 slow path 的处理。fast path 大多是一个基于 atomic 包的原子操作,比如 CAS 就可以解决。

TryAcquire 跟 Acquire 的区别在于,虽然也是要资源,但是不等待。有了我就获取,就减信号量,返回 trye。但是如果目前还没有,我不会阻塞在这里,而是直接返回 false。

下面我们逐个方法看看,Weighted 是怎样实现的。

Acquire

// Acquire acquires the semaphore with a weight of n, blocking until resources
// are available or ctx is done. On success, returns nil. On failure, returns
// ctx.Err() and leaves the semaphore unchanged.
//
// If ctx is already done, Acquire may still succeed without blocking.
func (s *Weighted) Acquire(ctx context.Context, n int64) error {
	s.mu.Lock()
	if s.size-s.cur >= n && s.waiters.Len() == 0 {
		s.cur += n
		s.mu.Unlock()
		return nil
	}
	if n > s.size {
		// Don't make other Acquire calls block on one that's doomed to fail.
		s.mu.Unlock()
		<-ctx.Done()
		return ctx.Err()
	}
	ready := make(chan struct{})
	w := waiter{n: n, ready: ready}
	elem := s.waiters.PushBack(w)
	s.mu.Unlock()
	select {
	case <-ctx.Done():
		err := ctx.Err()
		s.mu.Lock()
		select {
		case <-ready:
			// Acquired the semaphore after we were canceled.  Rather than trying to
			// fix up the queue, just pretend we didn't notice the cancelation.
			err = nil
		default:
			isFront := s.waiters.Front() == elem
			s.waiters.Remove(elem)
			// If we're at the front and there're extra tokens left, notify other waiters.
			if isFront && s.size > s.cur {
				s.notifyWaiters()
			}
		}
		s.mu.Unlock()
		return err
	case <-ready:
		return nil
	}
}

在阅读之前回忆一下上面 Weighted 结构的定义,注意 Weighted 并没有维护一个变量用来表示【当前剩余的资源】,这一点是通过 size(初始化的时候设置,表示总资源数)减去 cur(当前已被使用的资源),二者作差得到的。

我们来拆解一下上面这段代码:

第一步:这是常规意义上的 fast path

s.mu.Lock()
if s.size-s.cur >= n && s.waiters.Len() == 0 {
        s.cur += n
        s.mu.Unlock()
        return nil
}
  • 先上锁,保证并发安全;
  • 校验如果 size - cur >= n,代表剩余的资源是足够,同时 waiters 这个等待队列为空,代表没有别的协程在等待;
  • 此时就没什么多想的,直接 cur 加上 n 即可,代表又消耗了 n 个资源,然后解锁返回,很直接。

第二步:针对特定场景做提前剪枝

if n > s.size {
        // Don't make other Acquire calls block on one that's doomed to fail.
        s.mu.Unlock()
        <-ctx.Done()
        return ctx.Err()
}

如果请求的资源数量,甚至都大于资源总数量了,说明这个协程心里没数。。。。就算我现在把所有初始化的资源都拿回来,也喂不饱你呀!!!那能怎么办,我就不烦劳后面流程处理了,直接等你的 context 什么时候 Done,给你返回 context 的错误就行了,同时先解个锁,别耽误别的 goroutine 拿资源。

第三步:资源是够的,只是现在没有,那就把当前goroutine加到排队的队伍里

ready := make(chan struct{})
w := waiter{n: n, ready: ready}
elem := s.waiters.PushBack(w)
s.mu.Unlock()

这里 ready 结构是个空结构体的 channel,仅仅是为了实现协程间通信,通知什么时候资源 ready,建立一个属于这个 goroutine 的 waiter,然后塞到 Weighted 结构的等待队列 waiters 里。

搞定了以后直接解锁,因为你已经来排队了,手续处理完成,以后的路有别的通知机制保证,就没必要在这儿拿着锁阻塞新来的 goroutine 了,人家也得排队。

第四步:排队等待

select {
    case <-ctx.Done():
            err := ctx.Err()
            s.mu.Lock()
            select {
            case <-ready:
                    // Acquired the semaphore after we were canceled.  Rather than trying to
                    // fix up the queue, just pretend we didn't notice the cancelation.
                    err = nil
            default:
                    isFront := s.waiters.Front() == elem
                    s.waiters.Remove(elem)
                    // If we're at the front and there're extra tokens left, notify other waiters.
                    if isFront && s.size > s.cur {
                            s.notifyWaiters()
                    }
            }
            s.mu.Unlock()
            return err
    case <-ready:
            return nil
    }

一个 select 语句,只看两种情况:1. 这个 goroutine 的 context 超时了;2. 拿到了资源,皆大欢喜。

重点在于 ctx.Done 分支里 default 的处理。我们可以看到,如果是超时了,此时还没拿到资源,首先会把当前 goroutine 从 waiters 等待队列里移除(合情合理,你既然因为自己的原因做不了主,没法继续等待了,就别耽误别人事了)。

然后接着判断,若这个 goroutine 同时也是排在最前的 goroutine,而且恰好现在有资源了,就赶紧通知队里的 goroutine 们,伙计们,现在有资源了,赶紧来拿。我们来看看这个 notifyWaiters 干了什么:

func (s *Weighted) notifyWaiters() {
	for {
		next := s.waiters.Front()
		if next == nil {
			break // No more waiters blocked.
		}
		w := next.Value.(waiter)
		if s.size-s.cur < w.n {
			// Not enough tokens for the next waiter.  We could keep going (to try to
			// find a waiter with a smaller request), but under load that could cause
			// starvation for large requests; instead, we leave all remaining waiters
			// blocked.
			//
			// Consider a semaphore used as a read-write lock, with N tokens, N
			// readers, and one writer.  Each reader can Acquire(1) to obtain a read
			// lock.  The writer can Acquire(N) to obtain a write lock, excluding all
			// of the readers.  If we allow the readers to jump ahead in the queue,
			// the writer will starve — there is always one token available for every
			// reader.
			break
		}
		s.cur += w.n
		s.waiters.Remove(next)
		close(w.ready)
	}
}

其实很简单,遍历 waiters 这个等待队列,拿到排队最前的 waiter,判断资源够不够,如果够了,增加 cur 变量,资源给你,然后把你从等待队列里移出去,再 close ready 那个goroutine 就行,算是通知一下。

重点部分在于,如果资源不够怎么办?

想象一下现在的处境,Weighted 这个 semaphore 的确有资源,而目前要处理的这个 goroutine 的的确确就是排队最靠前的,而且人家也没狮子大开口,要比你总 size 还大的资源。但是,但是,好巧不巧,现在你要的数量,比我手上有的少。。。。

很无奈,那怎么办呢?

无非两种解法:

  • 我先不管你,反正你要的不够,我先看看你后面那个 goroutine 人家够不够,虽然你现在是排位第一个,但是也得继续等着;
  • 没办法,你排第一,需求我就得满足,所以我们都继续等,等啥时候资源够了就给你。

扩展库实际选用的是第 2 种策略,即一定要满足排在最前面的 goroutine,这里的考虑在注释里有提到,如果直接继续看后面的 goroutine 够不够,优先满足后面的,在一些情况下会饿死有大资源要求的 goroutine,设计上不希望这样的情况发生。

简单说:要的多不是错,既然你排第一,目前货不多,那就大家一起阻塞等待,保障你的权利。

Release

// Release releases the semaphore with a weight of n.
func (s *Weighted) Release(n int64) {
	s.mu.Lock()
	s.cur -= n
	if s.cur &lt; 0 {
		s.mu.Unlock()
		panic("semaphore: released more than held")
	}
	s.notifyWaiters()
	s.mu.Unlock()
}

Release 这里的实现非常简单,一把锁保障不出现并发,然后将 cur 减去 n 即可,说明此时又有 n 个资源回到了货仓。然后和上面 Acquire 一样,调用 notifyWaiters,叫排队第一的哥们(哦不,是 goroutine)来领东西了。

TryAcquire

// TryAcquire acquires the semaphore with a weight of n without blocking.
// On success, returns true. On failure, returns false and leaves the semaphore unchanged.
func (s *Weighted) TryAcquire(n int64) bool {
	s.mu.Lock()
	success := s.size-s.cur >= n && s.waiters.Len() == 0
	if success {
		s.cur += n
	}
	s.mu.Unlock()
	return success
}

其实就是 Acquire 方法的 fast path,只是返回了个 bool,标识是否获取成功。

总结

今天我们了解了扩展库 semaphore 对于信号量的封装实现,整体代码加上注释也才 100 多行,是非常好的学习材料,建议大家有空了对着源码再过一遍。Acquire 和 Release 的实现都很符合直觉。

其实,我们使用 buffered channel 其实也可以模拟出来 n 个信号量的效果,但就不具备 semaphore Weighted 这套实现里面,一次获取多个资源的能力了。

以上就是Golang信号量设计实现示例详解的详细内容,更多关于Go信号量设计的资料请关注我们其它相关文章!

(0)

相关推荐

  • golang使用信号量热更新的实现示例

    配置文件热更新是服务器程序的一个基本功能,通过热更新可以不停机调整程序的配置,特别是在生产环境可以提供极大的便利,比如发现log打得太多了可以动态调高日志等级,业务逻辑参数变化,甚至某个功能模块的开关等都可以动态调整. package main import ( "encoding/json" "fmt" "io/ioutil" "log" "os" "os/signal" "

  • 一文读懂go中semaphore(信号量)源码

    运行时信号量机制 semaphore 前言 最近在看源码,发现好多地方用到了这个semaphore. 本文是在go version go1.13.15 darwin/amd64上进行的 作用是什么 下面是官方的描述 // Semaphore implementation exposed to Go. // Intended use is provide a sleep and wakeup // primitive that can be used in the contended case /

  • golang模拟实现带超时的信号量示例代码

    前言 最近在写项目,需要用到信号量等待一些资源完成,但是最多等待N毫秒.在看本文的正文之前,我们先来看下C语言里的实现方法. 在C语言里,有如下的API来实现带超时的信号量等待: SYNOPSIS #include <pthread.h> int pthread_cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex, const struct timespec *abstime); 然后在查看golang的document后,发

  • Golang信号量设计实现示例详解

    目录 开篇 信号量 semaphore 扩展库实现 Acquire Release TryAcquire 总结 开篇 在我们此前的文章 Golang Mutex 原理解析 中曾提到过,Mutex 的底层结构包含了两个字段,state 和 sema: type Mutex struct { state int32 sema uint32 } state 代表互斥锁的状态,比如是否被锁定: sema 表示信号量,协程阻塞会等待该信号量,解锁的协程释放信号量从而唤醒等待信号量的协程. 这个 sema

  • Golang分布式应用之Redis示例详解

    目录 正文 分布式锁 运行测试 分布式过滤器 运行测试 分布式限流器 运行测试 其他 正文 Redis作是一个高性能的内存数据库,常被应用于分布式系统中,除了作为分布式缓存或简单的内存数据库还有一些特殊的应用场景,本文结合Golang来编写对应的中间件. 本文所有代码见github.com/qingwave/go… 分布式锁 单机系统中我们可以使用sync.Mutex来保护临界资源,在分布式系统中同样有这样的需求,当多个主机抢占同一个资源,需要加对应的“分布式锁”. 在Redis中我们可以通过s

  • Golang中的参数传递示例详解

    前言 本文主要给大家介绍了关于Golang参数传递的相关内容,分享出来供大家参考学习,下面话不多说了,来一起看看详细的介绍吧. 关于参数传递,Golang文档中有这么一句: after they are evaluated, the parameters of the call are passed by value to the function and the called function begins execution. 函数调用参数均为值传递,不是指针传递或引用传递.经测试引申出来,

  • Golang字符串变位词示例详解

    实现目标 本文的目标是写出一个函数 anagram(s, t) 去判断两个字符串是否是颠倒字母顺序构成的.下面话不多说了,来一起看看详细的介绍吧. GoLang 实现 func solution(s , t string)bool{ if s == t { return true } length := len(s) if length != len(t) { return false } //' ' 32 --> ~ 126 const MAX_ASCII int= 94 const SPAC

  • Golang 官方依赖注入工具wire示例详解

    目录 依赖注入是什么 开源选型 wire providers injectors 类型区分 总结 依赖注入是什么 Dependency Injection is the idea that your components (usually structs in go) should receive their dependencies when being created. 在 Golang 中,构造一个结构体常见的有两种方式: 在结构体初始化过程中,构建它的依赖: 将依赖作为构造器入参,传入进

  • Django中的模型类设计及展示示例详解

    django中设计数据模型类是基于ORM的对象关系映射更方便的进行数据库中的数据操作. 对象关系映射 把面向对象中的类和数据库表--对应,通过操作类和对象,对数表实现数据操作,不需要写sql,由ORM框架生成 django实现了ORM框架,在项目中与数据库之间产生桥梁作用 django数据库定义模型的步骤如下: python manage.py makemigrations python mange.py migrate 在应用models.py中编写模型类,继承models.Model类 在模

  • 基于gin的golang web开发:路由示例详解

    Gin是一个用Golang编写的HTTP网络框架.它的特点是类似于Martini的API,性能更好.在golang web开发领域是一个非常热门的web框架. 启动一个Gin web服务器 使用下面的命令安装Gin go get -u github.com/gin-gonic/gin 在代码里添加依赖 import "github.com/gin-gonic/gin" 快速启动一个Gin服务器的代码如下 package main import "github.com/gin-

  • golang中的三个点 '...'的用法示例详解

    '-' 其实是go的一种语法糖. 它的第一个用法主要是用于函数有多个不定参数的情况,可以接受多个不确定数量的参数. 第二个用法是slice可以被打散进行传递. 下面直接上例子: func test1(args ...string) { //可以接受任意个string参数 for _, v:= range args{ fmt.Println(v) } } func main(){ var strss= []string{ "qwr", "234", "yui

  • java面向对象设计原则之里氏替换原则示例详解

    目录 概念 实现 拓展 概念 里氏替换原则是任何基类出现的地方,子类一定可以替换它:是建立在基于抽象.多态.继承的基础复用的基石,该原则能够保证系统具有良好的拓展性,同时实现基于多态的抽象机制,能够减少代码冗余. 实现 里氏替换原则要求我们在编码时使用基类或接口去定义对象变量,使用时可以由具体实现对象进行赋值,实现变化的多样性,完成代码对修改的封闭,扩展的开放.如:商城商品结算中,定义结算接口Istrategy,该接口有三个具体实现类,分别为PromotionalStrategy (满减活动,两

  • java面向对象设计原则之接口隔离原则示例详解

    目录 概念 实现 拓展 概念 小接口原则,即每个接口中不存在子类用不到却必须实现的方法,如果不然,就要将接口拆分.如下图所示定义了一个接口,包含了5个方法,实现类A用到了3个方法.实现类B用到了3个方法,类图如下: 类A没有方法4.方法5,却要实现它:类B没有方法2.方法3,但还是要实现这两个方法,不符合接口隔离原则.改造为将其拆分为三个接口,实现方式改为下图所示,符合接口隔离原则: 实现 面向对象机制中一个类可以实现多个接口,通过多重继承分离,通过接口多继承(实现)来实现客户的需求,代码更加清

随机推荐