golang中sync.Mutex的实现方法

目录
  • mutex 的实现思想
  • golang 中 mutex 的实现思想
  • mutex 的结构以及一些 const 常量值
  • Mutex 没有被锁住,第一个协程来拿锁
  • Mutex 仅被协程 A 锁住,没有其他协程抢锁,协程 A 释放锁
  • Mutex 已经被协程 A 锁住,协程 B 来拿锁
  • lockSlow()
  • runtime_doSpin()
  • runtime_canSpin()
  • Mutex 被协程 A 锁住,协程 B 来抢锁但失败被放入等待队列,此时协程 A 释放锁
  • unlockSlow()
  • Mutex 被协程 A 锁住,协程 B 来抢锁但失败被放入等待队列,此时协程 A 释放锁,协程 B 被唤醒
  • 饥饿情况下的解锁行为: starvationFlag 的作用
  • Unlock()
  • Lock()
  • runtime_Semrelease()
  • 饥饿模式下新来的 goroutine 的加锁行为: starvationFlag 的作用
  • 当有协程正在自旋时的解锁行为: wokenFlag 的作用
  • 协程从等待队列被唤醒后如果还是没有抢到锁,会被放到队列首部还是尾部?
  • 复杂情景分析
  • 一点小瑕疵: 一种很边缘的 starvation case
  • reference

mutex 的实现思想

mutex 主要有两个 method: Lock() 和 Unlock()

Lock() 可以通过一个 CAS 操作来实现

func (m *Mutex) Lock() {
	for !atomic.CompareAndSwapUint32(&m.locked, 0, 1) {
	}
}

func (m *Mutex) Unlock() {
	atomic.StoreUint32(&m.locked, 0)
}

Lock() 一直进行 CAS 操作,比较耗 CPU。因此带来了一个优化:如果协程在一段时间内抢不到锁,可以把该协程挂到一个等待队列上,Unlock() 的一方除了更新锁的状态,还需要从等待队列中唤醒一个协程。

但是这个优化会存在一个问题,如果一个协程从等待队列中唤醒后再次抢锁时,锁已经被一个新来的协程抢走了,它就只能再次被挂到等待队列中,接着再被唤醒,但又可能抢锁失败...... 这个悲催的协程可能会一直抢不到锁,由此产生饥饿 (starvation) 现象。

饥饿现象会导致尾部延迟 (Tail Latency) 特别高。什么是尾部延迟?用一句话来说就是:最慢的特别慢!

如果共有 1000 个协程,假设 999 个协程可以在 1ms 内抢到锁,虽然平均时间才 2ms,但是最慢的那个协程却需要 1s 才抢到锁,这就是尾部延迟。

golang 中 mutex 的实现思想

➜  go version
go version go1.16.5 darwin/arm64

本次阅读的 go 源码版本为 go1.16.5。

golang 标准库里的 mutex 避免了饥饿现象的发生,先大致介绍一下 golang 的加锁和解锁流程,对后面的源码阅读有帮助。

锁有两种 mode,分别是 normal mode 和 starvation mode。初始为 normal mode,当一个协程来抢锁时,依旧是做 CAS 操作,如果成功了,就直接返回,如果没有抢到锁,它会做一定次数的自旋操作,等待锁被释放,在自旋操作结束后,如果锁依旧没有被释放,那么这个协程就会被放到等待队列中。如果一个处于等待队列中的协程一直都没有抢到锁,mutex 就会从 normal mode 变成 starvation mode,在 starvation mode 下,当有协程释放锁时,这个锁会被直接交给等待队列中的协程,从而避免产生饥饿线程。

除此之外,golang 还有一点小优化,当有协程正在自旋抢锁时,Unlock() 的一方不会从等待队列中唤醒协程,因为即使唤醒了,被唤醒的协程也抢不过正在自旋的协程。

下面正式开始阅读源码。

mutex 的结构以及一些 const 常量值

type Mutex struct {
	state int32
	sema  uint32
}
const (
	mutexLocked = 1 << iota // mutex is locked
	mutexWoken
	mutexStarving
	mutexWaiterShift = iota // 3

	// Mutex fairness.
	//
	// Mutex can be in 2 modes of operations: normal and starvation.
	// In normal mode waiters are queued in FIFO order, but a woken up waiter
	// does not own the mutex and competes with new arriving goroutines over
	// the ownership. New arriving goroutines have an advantage -- they are
	// already running on CPU and there can be lots of them, so a woken up
	// waiter has good chances of losing. In such case it is queued at front
	// of the wait queue. If a waiter fails to acquire the mutex for more than 1ms,
	// it switches mutex to the starvation mode.
	//
	// In starvation mode ownership of the mutex is directly handed off from
	// the unlocking goroutine to the waiter at the front of the queue.
	// New arriving goroutines don't try to acquire the mutex even if it appears
	// to be unlocked, and don't try to spin. Instead they queue themselves at
	// the tail of the wait queue.
	//
	// If a waiter receives ownership of the mutex and sees that either
	// (1) it is the last waiter in the queue, or (2) it waited for less than 1 ms,
	// it switches mutex back to normal operation mode.
	//
	// Normal mode has considerably better performance as a goroutine can acquire
	// a mutex several times in a row even if there are blocked waiters.
	// Starvation mode is important to prevent pathological cases of tail latency.
	starvationThresholdNs = 1e6
)

mutex 的状态是通过 state 来维护的,state 有 32 个 bit。

前面 29 个 bit 用来记录当前等待队列中有多少个协程在等待,将等待队列的协程数量记录为 waiterCount。

state >> mutexWaiterShift // mutexWaiterShift 的值为 3

第 30 个 bit 表示当前 mutex 是否处于 starvation mode,将这个 bit 记为 starvationFlag。

state & mutexStarving

第 31 个 bit 表示当前是否有协程正在 (第一次) 自旋,将这个 bit 记为 wokenFlag,woken 的意思也就是醒着,代表它不在等待队列上睡眠。

state & mutexWoken

第 32 个 bit 表示当前锁是否被锁了 (感觉有点绕口哈哈) ,将这个 bit 记为 lockFlag。

state & mutexLocked

用一个图来表示这些 bit

0 0 0 0 0 0 0 0 ... 0 0				0			0
                      |				|			|
waiterCount     starvationFlag  wokenFlag   lockFlag

sema 是一个信号量,它会被用来关联一个等待队列。

分别讨论几种 case 下,代码的执行情况。

Mutex 没有被锁住,第一个协程来拿锁

func (m *Mutex) Lock() {
	// Fast path: grab unlocked mutex.
	if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
		// ...
		return
	}
	// Slow path (outlined so that the fast path can be inlined)
	m.lockSlow()
}

在 Mutex 没有被锁住时,state 的值为 0,此时第一个协程来拿锁时,由于 state 的值为 0,因此 CAS 操作会成功,CAS 操作之后的 state 的值变成 1 (lockFlag = 1) ,然后 return 掉,不会进入到 m.lockSlow() 里面。

Mutex 仅被协程 A 锁住,没有其他协程抢锁,协程 A 释放锁

func (m *Mutex) Unlock() {
	// ...

	// Fast path: drop lock bit.
	new := atomic.AddInt32(&m.state, -mutexLocked)
	if new != 0 {
		// Outlined slow path to allow inlining the fast path.
		// To hide unlockSlow during tracing we skip one extra frame when tracing GoUnblock.
		m.unlockSlow(new)
	}
}

紧接上面,state 的值为 1,AddInt32(m.state,-1) 之后,state 的值变成了 0 (lockFlag = 0) ,new 的值为 0,然后就返回了。

Mutex 已经被协程 A 锁住,协程 B 来拿锁

func (m *Mutex) Lock() {
	// Fast path: grab unlocked mutex.
	if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
		// ...
		return
	}
	// Slow path (outlined so that the fast path can be inlined)
	m.lockSlow()
}

因为 state 的值不为 0,CompareAndSwapInt32 会返回 false,所以会进入到 lockSlow() 里面

lockSlow()

首先看一下 lockSlow() 这个方法的全貌

func (m *Mutex) lockSlow() {
	var waitStartTime int64
	starving := false
	awoke := false
	iter := 0
	old := m.state
	for {
		// Don't spin in starvation mode, ownership is handed off to waiters
		// so we won't be able to acquire the mutex anyway.
		if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
			// Active spinning makes sense.
			// Try to set mutexwokenFlag to inform Unlock
			// to not wake other blocked goroutines.
			if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
				atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
				awoke = true
			}
			runtime_doSpin()
			iter++
			old = m.state
			continue
		}
		new := old
		// Don't try to acquire starving mutex, new arriving goroutines must queue.
		if old&mutexStarving == 0 {
			new |= mutexLocked
		}
		if old&(mutexLocked|mutexStarving) != 0 {
			new += 1 << mutexWaiterShift
		}
		// The current goroutine switches mutex to starvation mode.
		// But if the mutex is currently unlocked, don't do the switch.
		// Unlock expects that starving mutex has waiters, which will not
		// be true in this case.
		if starving && old&mutexLocked != 0 {
			new |= mutexStarving
		}
		if awoke {
			// The goroutine has been woken from sleep,
			// so we need to reset the flag in either case.
			if new&mutexWoken == 0 {
				throw("sync: inconsistent mutex state")
			}
			new &^= mutexWoken
		}
		if atomic.CompareAndSwapInt32(&m.state, old, new) {
			if old&(mutexLocked|mutexStarving) == 0 {
				break // locked the mutex with CAS
			}
			// If we were already waiting before, queue at the front of the queue.
			queueLifo := waitStartTime != 0
			if waitStartTime == 0 {
				waitStartTime = runtime_nanotime()
			}
			runtime_SemacquireMutex(&m.sema, queueLifo, 1)
			starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
			old = m.state
			if old&mutexStarving != 0 {
				// If this goroutine was woken and mutex is in starvation mode,
				// ownership was handed off to us but mutex is in somewhat
				// inconsistent state: mutexLocked is not set and we are still
				// accounted as waiter. Fix that.
				if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
					throw("sync: inconsistent mutex state")
				}
				delta := int32(mutexLocked - 1<<mutexWaiterShift)
				if !starving || old>>mutexWaiterShift == 1 {
					// Exit starvation mode.
					// Critical to do it here and consider wait time.
					// Starvation mode is so inefficient, that two goroutines
					// can go lock-step infinitely once they switch mutex
					// to starvation mode.
					delta -= mutexStarving
				}
				atomic.AddInt32(&m.state, delta)
				break
			}
			awoke = true
			iter = 0
		} else {
			old = m.state
		}
	}

	if race.Enabled {
		race.Acquire(unsafe.Pointer(m))
	}
}

第一步: doSpin (空转)

进入 for 循环后,会执行一个判断

for {
    // Don't spin in starvation mode, ownership is handed off to waiters
    // so we won't be able to acquire the mutex anyway.
    if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
        // Active spinning makes sense.
        // Try to set mutexwokenFlag to inform Unlock
        // to not wake other blocked goroutines.
        if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
        atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
            awoke = true
        }
        runtime_doSpin()
        iter++
        old = m.state
        continue
    }
    // ...
}

runtime_canSpin(iter) 的作用是根据 iter 的值判断自否应该自旋下去。 (这个方法的实现可以在后面看到)

最初的几次判断,由于 iter 的值为 0,runtime_canSpin(iter) 会返回 true。因此

if old&amp;(mutexLocked|mutexStarving) == mutexLocked &amp;&amp; runtime_canSpin(iter)

这个判断会一直通过,由于 old>>mutexWaiterShift = 0 (waiterCount = 0) ,不满足第二个判断的条件,因此不会执行 CAS 操作和 awoke = true

接着就是执行 runtime_doSpin() 了,runtime_doSpin() 会进行一些空循环,消耗了一下 CPU 时间,然后就通过 continue 进入到下一次循环了。 (runtime_doSpin具体实现也可以在后面看到)

看到看到,这段代码不是用来抢锁的,而是用来等锁变成 unlock 状态的,它会空转一定的次数,期待在空转的过程中,锁被其他的协程释放。

runtime_doSpin()

// src/runtime/lock_sema.go
const active_spin_cnt = 30
//go:linkname sync_runtime_doSpin sync.runtime_doSpin
//go:nosplit
func sync_runtime_doSpin() {
	procyield(active_spin_cnt)
}
# /src/runtime/asm_amd64.s
TEXT runtime·procyield(SB),NOSPLIT,$0-0
	MOVL	cycles+0(FP), AX
again:
	PAUSE
	SUBL	$1, AX
	JNZ	again
	RET

procyield() 会循环执行 PAUSE 指令。

runtime_canSpin()

runtime_canSpin() 的实现在 src/runtime/proc.go 里面,里面的判断比较多,但是我们只需要关注 i >= active_spin 这一个判断就行。

const active_spin     = 4
// Active spinning for sync.Mutex.
//go:linkname sync_runtime_canSpin sync.runtime_canSpin
//go:nosplit
func sync_runtime_canSpin(i int) bool {
	// sync.Mutex is cooperative, so we are conservative with spinning.
	// Spin only few times and only if running on a multicore machine and
	// GOMAXPROCS>1 and there is at least one other running P and local runq is empty.
	// As opposed to runtime mutex we don't do passive spinning here,
	// because there can be work on global runq or on other Ps.
	if i >= active_spin || ncpu <= 1 || gomaxprocs <= int32(sched.npidle+sched.nmspinning)+1 {
		return false
	}
	if p := getg().m.p.ptr(); !runqempty(p) {
		return false
	}
	return true
}

一个小插曲

在利用断点来 debug 时,发现没办法 watch sync_runtime_canSpin() 内引用的一些全局变量,例如 active_spin,ncpu,sched.npidle 这些,所以我就大力出奇迹,强行修改源码在里面声明了几个局部变量,这下可以通过 watch 局部变量来得知全局变量的值了 (机智如我哈哈) 。

func sync_runtime_canSpin(i int) bool {
	local_active_spin := active_spin
	local_ncpu := ncpu
	local_gomaxprocs := gomaxprocs
	npidle := sched.npidle
	nmspinning := sched.nmspinning
	if i >= local_active_spin || local_ncpu <= 1 ||local_gomaxprocs <= int32(npidle+nmspinning)+1 {
		return false
	}
	if p := getg().m.p.ptr(); !runqempty(p) {
		return false
	}
	return true
}

第二步: 根据旧状态来计算新状态

new := old
// Don't try to acquire starving mutex, new arriving goroutines must queue.
if old&mutexStarving == 0 {
    new |= mutexLocked
}
if old&(mutexLocked|mutexStarving) != 0 {
    new += 1 << mutexWaiterShift
}
// The current goroutine switches mutex to starvation mode.
// But if the mutex is currently unlocked, don't do the switch.
// Unlock expects that starving mutex has waiters, which will not
// be true in this case.
if starving && old&mutexLocked != 0 {
    new |= mutexStarving
}
if awoke {
    // The goroutine has been woken from sleep,
    // so we need to reset the flag in either case.
    // ...
    new &^= mutexWoken
}

这一段代码,是根据 old state 来计算 new state,有 4 个操作

  • set lockFlag: new |= mutexLocked
  • 增加 waiterCount: new += 1 << mutexWaiterShift
  • set starvationFlag: new |= mutexStarving
  • clear wokenFlag: new &^= mutexWoken

由于在这里我们只讨论 ”Mutex 已经被协程 A 锁住,协程 B 来拿锁“ 这种情况,可以分为两种 case

  • case1: 在第一步自旋的过程中,锁已经被释放了,此时 old state = 000000...000 (所有 bit 都为 0) ,经过这四个操作的洗礼后,lockFlag 被设置成了 1。
  • case2: 在第一步自旋结束后,锁还没有被释放,即 old state 此时为 00000000...001 (仅 lockFlag 为 1),经过这四个操作的洗礼后,waiterCounter = 1,lockFlag 也为 1。

第三步: 更新 state (抢锁)

if atomic.CompareAndSwapInt32(&m.state, old, new) {
    if old&(mutexLocked|mutexStarving) == 0 {
        break // locked the mutex with CAS
    }
    // ...
} else {
    old = m.state
}

这一步会通过 CAS 操作将 mutex.state 更新为我们刚刚计算得到的 new state。如果 CAS 成功,且 old 处于未上锁的状态时,就直接利用 break 退出循环返回了 (也就是上面的 case1) 。如果 CAS 失败,将会更新 old state 的值,进行下一次循环,再重复一二三步;

如果是 case2 的话,情况会稍微复杂一点

if atomic.CompareAndSwapInt32(&m.state, old, new) {
	// ...
    // If we were already waiting before, queue at the front of the queue.
    queueLifo := waitStartTime != 0
    if waitStartTime == 0 {
        waitStartTime = runtime_nanotime()
    }

    runtime_SemacquireMutex(&m.sema, queueLifo, 1)
    // ...
}

主要就是通过 runtime_SemacquireMutex() 把自己放进了等待队列里面,之后 runtime 不会再调度该协程,直到协程被唤醒。

关于 runtime_SemacquireMutex() 的实现,我暂时就不追究下去了,再追究下去就没完没了啦。

Mutex 被协程 A 锁住,协程 B 来抢锁但失败被放入等待队列,此时协程 A 释放锁

func (m *Mutex) Unlock() {
	// Fast path: drop lock bit.
	new := atomic.AddInt32(&m.state, -mutexLocked)
	if new != 0 {
		// Outlined slow path to allow inlining the fast path.
		// To hide unlockSlow during tracing we skip one extra frame when tracing GoUnblock.
		m.unlockSlow(new)
	}
}

紧接上回,最初 state 的值为 00000000000...0001001 (waiterCount = 1, lockFlag = 1)。执行完 AddInt32(&m.state, -mutexLocked) 后,变成了 0000...001000 (waiterCount = 1) ,new 的值也为 0000...001000,接着就进入到 unlockSlow 里面了。

unlockSlow()

看看 unlockSlow() 的全貌

func (m *Mutex) unlockSlow(new int32) {
	if (new+mutexLocked)&mutexLocked == 0 {
		throw("sync: unlock of unlocked mutex")
	}
	if new&mutexStarving == 0 {
		old := new
		for {
			// If there are no waiters or a goroutine has already
			// been woken or grabbed the lock, no need to wake anyone.
			// In starvation mode ownership is directly handed off from unlocking
			// goroutine to the next waiter. We are not part of this chain,
			// since we did not observe mutexStarving when we unlocked the mutex above.
			// So get off the way.
			if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
				return
			}
			// Grab the right to wake someone.
			new = (old - 1<<mutexWaiterShift) | mutexWoken
			if atomic.CompareAndSwapInt32(&m.state, old, new) {
				runtime_Semrelease(&m.sema, false, 1)
				return
			}
			old = m.state
		}
	} else {
		// Starving mode: handoff mutex ownership to the next waiter, and yield
		// our time slice so that the next waiter can start to run immediately.
		// Note: mutexLocked is not set, the waiter will set it after wakeup.
		// But mutex is still considered locked if mutexStarving is set,
		// so new coming goroutines won't acquire it.
		runtime_Semrelease(&m.sema, true, 1)
	}
}

此时 old >> mutexWaiterShift = 0000...0001 ≠ 0, 所以不会直接返回

old := new
for {
    // If there are no waiters or a goroutine has already
    // been woken or grabbed the lock, no need to wake anyone.
    // In starvation mode ownership is directly handed off from unlocking
    // goroutine to the next waiter. We are not part of this chain,
    // since we did not observe mutexStarving when we unlocked the mutex above.
    // So get off the way.
    if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
        return
    }
    // Grab the right to wake someone.
    new = (old - 1<<mutexWaiterShift) | mutexWoken
    if atomic.CompareAndSwapInt32(&m.state, old, new) {
        runtime_Semrelease(&m.sema, false, 1)
        return
    }
    old = m.state
}

接着计算 new = 0000...1000 - 0000...1000 = 0000...0000,waiterCount 由 1 变成了 0。之后进行 CAS 操作,如果 CAS 成功,则从等待队列中唤醒一个 goroutine。

Mutex 被协程 A 锁住,协程 B 来抢锁但失败被放入等待队列,此时协程 A 释放锁,协程 B 被唤醒

让我们会视线切到 lockSlow 的后半截。

const starvationThresholdNs = 1e6
if atomic.CompareAndSwapInt32(&m.state, old, new) {
    // ...
    runtime_SemacquireMutex(&m.sema, queueLifo, 1)
    starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
    old = m.state
	// ...
    iter = 0
}

当协程 B 从 runtime_SemacquireMutex 处醒来后,会根据该协程的等待的时间来判断是否饥饿了。这里我们先假设此时还没有饥饿,后面会详细讨论饥饿时的情况。之后会将 iter 重置为 0,接着就进行下一次的循环了,由于 iter 已经被重置为 0 了,所以在下一次循环时,sync_runtime_doSpin(iter) 会返回 true

由于此时 state 已经变成了 0 了,所以在下一次循环里可以畅通无阻的拿到锁。

饥饿情况下的解锁行为: starvationFlag 的作用

设想这样一种情况:goroutine A 拿到锁,goroutine B 抢锁失败,被放入等待队列。goroutine A 释放锁,goroutine B 被唤醒,但是正当它抢锁时,锁被新来的 goroutine C 抢走了... 连续好几次,每当 goroutine B 要抢锁时,锁都被其他协程抢先一步拿走。直到某一次,goroutine B 再次被唤醒后执行

starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs

它就进入饥饿模式 (starvation mode) 啦!

// The current goroutine switches mutex to starvation mode.
// But if the mutex is currently unlocked, don't do the switch.
// Unlock expects that starving mutex has waiters, which will not
// be true in this case.
if starving && old&mutexLocked != 0 {
    new |= mutexStarving
}

之后通过 CAS 操作将饥饿标志设置到了 mutex.state 里面,然后它就又被放到等待队列中了。

atomic.CompareAndSwapInt32(&m.state, old, new)

Unlock()

视角切换到 Unlock() 这一边

func (m *Mutex) Unlock() {
	// ...
	// Fast path: drop lock bit.
	new := atomic.AddInt32(&m.state, -mutexLocked)
	if new != 0 {
		// Outlined slow path to allow inlining the fast path.
		// To hide unlockSlow during tracing we skip one extra frame when tracing GoUnblock.
		m.unlockSlow(new)
	}
}

func (m *Mutex) unlockSlow(new int32) {
	// ...
	if new&mutexStarving == 0 {
		// ...
		for {
			// ...
			if atomic.CompareAndSwapInt32(&m.state, old, new) {
				runtime_Semrelease(&m.sema, false, 1)
				return
			}
            // ...
		}
	} else {
		// Starving mode: handoff mutex ownership to the next waiter, and yield
		// our time slice so that the next waiter can start to run immediately.
		// Note: mutexLocked is not set, the waiter will set it after wakeup.
		// But mutex is still considered locked if mutexStarving is set,
		// so new coming goroutines won't acquire it.
		runtime_Semrelease(&m.sema, true, 1)
	}
}

在 unlockSlow() 中,此时 new&mutexStarving != 0,所以会直接进入到 else 分支内,调用 runtime_Semrelease() 方法,但要注意 else 分支内runtime_Semrelease() 的参数和 if 分支的参数不一样,在这里 runtime_Semrelease(&m.sema, true, 1) 起到的作用是唤醒了等待队列中的第一个协程并立马调度该协程 (runtime_Semrelease() 方法的详解在后面 )。

同时正如注释所说,在 Unlock() 中由于进行了 atomic.AddInt32(&m.state, -mutexLocked) 操作,所以 mutex.state 的 lockFlag 是为 0 的,但是没关系,starvationFlag 是为 1 的,所以会依旧被认为是锁住的状态。

Lock()

func (m *Mutex) Lock() {
	// ...
	m.lockSlow()
}

func (m *Mutex) lockSlow() {
	// ...
	for {
		// ...
		if atomic.CompareAndSwapInt32(&m.state, old, new) {
			// ...
			runtime_SemacquireMutex(&m.sema, queueLifo, 1)
			// ...
			old = m.state
			if old&mutexStarving != 0 {
				// If this goroutine was woken and mutex is in starvation mode,
				// ownership was handed off to us but mutex is in somewhat
				// inconsistent state: mutexLocked is not set and we are still
				// accounted as waiter. Fix that.
				// ...
				delta := int32(mutexLocked - 1<<mutexWaiterShift)
				if !starving || old>>mutexWaiterShift == 1 {
					// Exit starvation mode.
					// Critical to do it here and consider wait time.
					// Starvation mode is so inefficient, that two goroutines
					// can go lock-step infinitely once they switch mutex
					// to starvation mode.
					delta -= mutexStarving
				}
				atomic.AddInt32(&m.state, delta)
				break
			}
			awoke = true
			iter = 0
		} else {
			old = m.state
		}
	}
	// ...
}

视角再次切换到 Lock() 这边,饥饿的 goroutine 被唤醒并调度后,首先执行 old = m.state, 此时 old 的 starvationFlag = 1。

之后就正如注释所说,它会尝试修复 mutex.state 的"不一致" (inconsistent) 状态。

修复工作主要做了三件事情:

  • 在 starvation mode 下的 Unlock() 没有将 waitterCount - 1, 所以这里需要给 mutexWaiter 减 1
  • 将 state 的 locked flag 置为 1
  • 如果该 goroutine 没有饥饿或者是等待队列中的最后一个 goroutine 的话,清理 starvationFlag

这三件事情通过 atomic.AddInt32(&m.state, delta) 一步到位。

runtime_Semrelease()

// Semrelease atomically increments *s and notifies a waiting goroutine
// if one is blocked in Semacquire.
// It is intended as a simple wakeup primitive for use by the synchronization
// library and should not be used directly.
// If handoff is true, pass count directly to the first waiter.
// skipframes is the number of frames to omit during tracing, counting from
// runtime_Semrelease's caller.
func runtime_Semrelease(s *uint32, handoff bool, skipframes int)

handoff 就是传球的意思,handoff 为 false 时,仅仅唤醒等待队列中第一个协程,但是不会立马调度该协程;当 handoff 为 true 时,会立马调度被唤醒的协程,此外,当 handoff = true 时,被唤醒的协程会继承当前协程的时间片。具体例子,假设每个 goroutine 的时间片为 2ms,gorounte A 已经执行了 1ms,假设它通过 runtime_Semrelease(handoff = true) 唤醒了 goroutine B,则 goroutine B 剩余的时间片为 2 - 1 = 1ms。

饥饿模式下新来的 goroutine 的加锁行为: starvationFlag 的作用

如果在饥饿模式下,有新的 goroutine 来请求锁,它会执行下面这些步骤

func (m *Mutex) lockSlow() {
    // ...
	old := m.state
	for {
		// Don't spin in starvation mode, ownership is handed off to waiters
		// so we won't be able to acquire the mutex anyway.
		if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
			// ...
			runtime_doSpin()
		}
		new := old
		// Don't try to acquire starving mutex, new arriving goroutines must queue.
		if old&mutexStarving == 0 {
			new |= mutexLocked
		}
		if old&(mutexLocked|mutexStarving) != 0 {
			new += 1 << mutexWaiterShift
		}
		// ...
		if atomic.CompareAndSwapInt32(&m.state, old, new) {
			// ..
			runtime_SemacquireMutex(&m.sema, queueLifo, 1)
			// ...
		} else {
			// ...
		}
	}
	// ...
}

由于 old&(mutexLocked|mutexStarving) != mutexLocked ,所以它不会自旋。

由于 old&mutexStarving != 0,所以它不会 set lockFlag。

由于 old&(mutexLocked|mutexStarving) != 0,所以它 增加 waiterCount。

可以看到,它实际上就做了增加 waiterCount 这一个操作,之后通过 CAS 更新 state 的状态,更新完成之后就跑去等待队列睡觉去了。

因此在饥饿状态下,新的来争抢锁的 goroutine 是不会去抢锁 (set lockFlag) 的,它们只会登记一下 (waiterCount + 1) ,然后乖乖加入到等待队列里面。

当有协程正在自旋时的解锁行为: wokenFlag 的作用

wokenFlag 是在 lockSlow() 里面被设置的,wokenFlag 为 1 时,表示此时有协程正在进行自旋。

func (m *Mutex) lockSlow() {
	starving := false
	awoke := false
	iter := 0
	old := m.state
	for {
		// Don't spin in starvation mode, ownership is handed off to waiters
		// so we won't be able to acquire the mutex anyway.
		if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
			// Active spinning makes sense.
			// Try to set mutexwokenFlag to inform Unlock
			// to not wake other blocked goroutines.
			if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
				atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
				awoke = true
			}
			runtime_doSpin()
			iter++
			old = m.state
			continue
		}
		// ...
		if atomic.CompareAndSwapInt32(&m.state, old, new) {
			// ...
			runtime_SemacquireMutex(&m.sema, queueLifo, 1)
			// ...
			awoke = true
			iter = 0
		}
        // ...
	}
    // ...
}

当一个新来的协程 (从未被放到等待队列中) 在第一次自旋时,wokenFlag 的设置逻辑为:

if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
	atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
    awoke = true
}

但是当协程从等待队列中被唤醒后自旋时,却 lockSlow()找不到设置 wokenFlag 的逻辑,为何?因为这段逻辑被放到了 unlockSlow 里面了。

视线切换到 unlockSlow() 那一边

func (m *Mutex) unlockSlow(new int32) {
	// ...
	if new&mutexStarving == 0 {
		old := new
		for {
			// If there are no waiters or a goroutine has already
			// been woken or grabbed the lock, no need to wake anyone.
			// In starvation mode ownership is directly handed off from unlocking
			// goroutine to the next waiter. We are not part of this chain,
			// since we did not observe mutexStarving when we unlocked the mutex above.
			// So get off the way.
			if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
                // 当 mutexwokenFlag 被设置时,会直接 return
                // 不会去等待队列唤醒 goroutine
				return
			}
			// Grab the right to wake someone.
            // 这个地方会设置 wokenFlag 哦
			new = (old - 1<<mutexWaiterShift) | mutexWoken
			if atomic.CompareAndSwapInt32(&m.state, old, new) {
				runtime_Semrelease(&m.sema, false, 1)
				return
			}
			old = m.state
		}
	} else {
		// ...
	}
}

可以看到,当有协程正在自旋时 (wokenFlag = 1) ,不会从等待队列唤醒协程,这样就避免了等待队列上的协程加入竞争,当然,正在自旋中的协程之间彼此之间还是会竞争的;如果 wokenFlag = 0,则会从等待队列中唤醒一个协程,在唤醒之前会将 wokenFlag 设置为 1,这样协程被唤醒后就不用再去设置 wokenFlag 了,妙呀!

为什么当有协程在自旋时,不要去等待队列中唤醒协程呢?协程从被唤醒到被调度 (在 CPU 上面执行) 是要花时间的,等真正自旋时 mutex 早就被抢走了。

协程从等待队列被唤醒后如果还是没有抢到锁,会被放到队列首部还是尾部?

但是是头部,代码如下:

// If we were already waiting before, queue at the front of the queue.
queueLifo := waitStartTime != 0
if waitStartTime == 0 {
    waitStartTime = runtime_nanotime()
}
runtime_SemacquireMutex(&amp;m.sema, queueLifo, 1)

复杂情景分析

基于上面的逻辑来分析一下复杂的逻辑吧!

假设有协程 g1,g2,g3,g4,g5,g6, 共同争抢一把锁 m

一开始 g1 拿到锁

owner: g1 waitqueue: null

g2 开始抢锁,没有抢到,被放到等待队列

owner: g1 waitqueue: g2

g1 释放锁,g2 从等待队列中被唤醒

owner: null waitqueue: null

此时 g3 也开始抢锁,g2 没有抢过,又被放回等待队列

owner: g3 waitqueue: g2

g4 开始抢锁,没有抢到,被放到等待队列

owner: g3 waitqueue: g2, g4

g3 释放锁,g2 被唤醒

owner: null waitqueue: g4

此时 g5 开始抢锁,g2 没有抢过,又被放回等待队列首部

owner: g5 waitqueue: g2, g4

g6 开始抢锁,正在自旋中

owner: g5 waitqueue: g2, g4 wokenFlag: 1 spinning: g6

g5 释放锁,由于此时有协程正在自旋,因此不会去等待队列中唤醒协程,锁被 g6 轻松抢到

owner: g6 waitqueue: g2, g4 wokenFlag: 0 spinning: null

g6 释放锁,g2 被唤醒,此时 g7 开始抢锁,g2 没有抢过,又被放回等待队列首部,但是 g2 由于太久没有抢到锁,进入饥饿模式了

owner: g7 waitqueue: g2(饥饿), g4 starvationFlag: 1

g8 来抢锁,由于处于饥饿状态,g8 会被直接放在等待队列尾部

owner: g7 waitqueue: g2(饥饿), g4, g8 starvationFlag: 1

g7 释放锁,由于处于饥饿状态,会直接唤醒 g2 并调度它

owner: g2 waitqueue: g4, g8 starvationFlag: 1

g2 执行完毕,释放锁,注意此刻依旧是饥饿状态,直接调度 g4,g4 苏醒后,发现它自己没有饥饿,于是 clear starvationFlag

owner: g4 waitqueue: g8 starvationFlag: 0

此时新来的 g8 可以正常加入到对锁的争抢中了,之后就是正常的加锁解锁逻辑了。

一点小瑕疵: 一种很边缘的 starvation case

由于等待队列中的协程只有当被唤醒之后才会根据等待时间来判断是否进入 starvation mode,因此会存在一个协程在等待队列中等待了很久,它实际上已经饥饿了,但是一直没被唤醒过,就没机会 set starvationFlag,这就会导致饥饿现象的发生。

那么会存在等待队列里的协程一直不被唤醒的情况么?

有的!在 unlockSlow() 时如果 wokenFlag = 1,那就不会去唤醒等待队列中的线程。就会存在这样一种情况,假设每次 Unlock() 时恰好有一个新来的协程在自旋,那等待队列中的协程就会永远饥饿下去!

reference

Tail Latency Might Matter More Than You Think

Golang 互斥锁内部实现

到此这篇关于golang中sync.Mutex的实现方法的文章就介绍到这了,更多相关golang sync.Mutex  内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • Go使用sync.Map来解决map的并发操作问题

    目录 前言 map 并发操作出现问题 sync.Map 解决并发操作问题 计算 map 长度 计算 sync.Map 长度 前言 在 Golang 中 map 不是并发安全的,自 1.9 才引入了 sync.Map ,sync.Map 的引入确实解决了 map 的并发安全问题,不过 sync.Map 却没有实现 len() 函数,如果想要计算 sync.Map 的长度,稍微有点麻烦,需要使用 Range 函数. map 并发操作出现问题 func main() { demo := make(ma

  • 解决golang sync.Wait()不执行的问题

    goroutine 似乎不用解释太多,可以利用它实现多线程,也可以利用它来实现异步事件. 在使用关键字go的过程中,常常会将用到sync.WaitGroup,如下一段代码. package main import ( "fmt" "sync" "time" ) func Run() { var wg = &sync.WaitGroup{} go func() { wg.Add(1) fmt.Println("halo world

  • Go语言中sync.Cond使用详解

    目录 sync.Cond 可以用来干什么? 与 Sync.Mutex 的区别 sync.Cond 使用场景 sync.Cond sync.Cond 有哪些方法 NewCond 创建实例 Broadcast 广播唤醒所有 Signal 唤醒一个协程 Wait 等待 代码示例 sync.Cond 可以用来干什么? Golang 的 sync 包中的 Cond 实现了一种条件变量,可以使用多个 Reader 等待公共资源. 每个 Cond 都会关联一个 Lock ,当修改条件或者调用 Wait 方法,

  • Go并发:使用sync.WaitGroup实现协程同步方式

    经常看到有人会问如何等待主协程中创建的协程执行完毕之后再结束主协程,例如如下代码: package main import ( "fmt" ) func main() { go func() { fmt.Println("Goroutine 1") }() go func() { fmt.Println("Goroutine 2") }() } 执行以上代码很可能看不到输出,因为有可能这两个协程还没得到执行主协程已经结束了,而主协程结束时会结束所

  • Golang之sync.Pool使用详解

    前言 我们通常用 Golang 来开发并构建高并发场景下的服务,但是由于 Golang 内建的GC机制多少会影响服务的性能,因此,为了减少频繁GC,Golang提供了对象重用的机制,也就是使用sync.Pool构建对象池. sync.Pool介绍 首先sync.Pool是可伸缩的临时对象池,也是并发安全的.其可伸缩的大小会受限于内存的大小,可以理解为是一个存放可重用对象的容器.sync.Pool设计的目的就是用于存放已经分配的但是暂时又不用的对象,而且在需要用到的时候,可以直接从该pool中取.

  • golang中sync.Map并发创建、读取问题实战记录

    背景: 我们有一个用go做的项目,其中用到了zmq4进行通信,一个简单的rpc过程,早期远端是使用一个map去做ip和具体socket的映射. 问题 大概是这样 struct SocketMap { sync.Mutex sockets map[string]*zmq4.Socket } 然后调用的时候的代码大概就是这样的: func (pushList *SocketMap) push(ip string, data []byte) { pushList.Lock() defer pushLi

  • Go语言并发编程 sync.Once

    sync.Once用于保证某个动作只被执行一次,可用于单例模式中,比如初始化配置.我们知道init()函数也只会执行一次,不过它是在main()函数之前执行,如果想要在代码执行过程中只运行某个动作一次,可以使用sync.Once,下面来介绍一下它的使用方法. 先来看下面的代码: package main import ( "fmt" "sync" ) func main() { var num = 6 var once sync.Once add_one := fu

  • 在golang中使用Sync.WaitGroup解决等待的问题

    面对goroutine我们都需要等待它完成交给它的事情,等待它计算完成或是执行完毕,所以不得不在程序需要等待的地方使用time.Sleep()来睡眠一段时间,等待其他goroytine执行完毕,下面的代码打印1到100的for循环可以在很快的时间内运行完毕,但是我们必须添加time.Sleep()来等待其打印完毕,如果我们不等待仿佛什么也没有发生一样.....这肯定不是我们想要的! func main(){ for i := 0; i < 100 ; i++{ go fmt.Println(i)

  • Go并发编程之sync.Once使用实例详解

    目录 一.序 二. 源码分析 2.1结构体 2.2 接口 三. 使用场景案例 3.1 单例模式 3.2 加载配置文件示例 四.总结 五. 参考 一.序 单从库名大概就能猜出其作用.sync.Once使用起来很简单, 下面是一个简单的使用案例 package main import ( "fmt" "sync" ) func main() { var ( once sync.Once wg sync.WaitGroup ) for i := 0; i < 10;

  • golang中使用sync.Map的方法

    背景 go中map数据结构不是线程安全的,即多个goroutine同时操作一个map,则会报错,因此go1.9之后诞生了sync.Map sync.Map思路来自java的ConcurrentHashMap 接口 sync.map就是1.9版本带的线程安全map,主要有如下几种方法: Load(key interface{}) (value interface{}, ok bool) //通过提供一个键key,查找对应的值value,如果不存在,则返回nil.ok的结果表示是否在map中找到值

随机推荐