golang的Mutex锁如何实现

文章目录

  1. 1. 概述
  2. 2. 实现
    1. 2.1. Lock
      1. 2.1.1. 自旋
    2. 2.2. UnLock
  3. 3. 阻塞队列

概述

golang中锁实现的整体思路是利用类似信号量的P,V操作 (互斥锁:Mutex)
来对有限个goroutine争夺资源的共享。

在获取资源accquire是执行P,对锁标志位sema-1,其他获取资源时发现资源位为0则需阻塞等待
释放资源release是执行V,对锁标志位sema+1。
此时若无阻塞等待的goroutine则直接释放,否则需唤醒一个阻塞goroutine。

为实现上诉操作定义了以下结构

1
2
3
4
5
6
7
8
9
10
11
type Mutex struct {
state int32 //倒数(右到左)三位分别是 锁、唤醒、饥饿的标志位;剩余29位是当前阻塞等待锁的goroutine个数
sema uint32 //信号量标记位
}
const (
mutexLocked = 1 << iota // mutex is locked
mutexWoken
mutexStarving
mutexWaiterShift = iota
starvationThresholdNs = 1e6 //饥饿模式阈值:超1ms获取不到锁则进入饥饿模式
)

其中饥饿标志位引入是为了解决goroutine过长时间阻塞问题

互斥锁有正常模式和饥饿模式两种

1.正常模式下
阻塞排队的goroutine先进先出,被唤醒的goroutine和新来的goroutine争夺锁,新来的goroutine
因已经在cpu执行过而占有优势,更容易获得锁。

2.饥饿模式下
阻塞超过1ms还没抢到锁则转入饥饿模式
此模式下,新来的goroutine只能排队尾,不争夺锁,不自旋等待;
锁释放后会直接移交给队首goroutine。

3.恢复正常模式
满足以下任意一个条件时
1)等候时间小于1ms
2)阻塞队列为空时

实现

具体看下代码实现

Lock

基本步骤是
1.若为首次加锁,则直接标记锁标志位即可
2.非首次,则需区分正常模式和饥饿模式

  • 正常模式下,若已加锁,且当前可自旋等待,则自旋等待,同时标记唤醒位,防止此时unlock调用唤醒其他阻塞的goroutine
  • 自旋后尝试加锁,若加锁标记成功
    • 加锁前为 非加锁且非饥饿模式无需后续处理
    • 否则获取信号量,当前goroutine等待过则放队首,负责放队尾
    • 判断是否处于饥饿模式,饥饿模式下需减少阻塞goroutine
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
func (m *Mutex) Lock() {
// Fast path: grab unlocked mutex.
// 第一次锁,** 状态位为0 **,直接CAS设置锁标志位即可
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
return
}
// Slow path (outlined so that the fast path can be inlined)
// 非首次,需处理阻塞goroutine和新来goroutine的竞争
m.lockSlow()
}

func (m *Mutex) lockSlow() {
var waitStartTime int64
starving := false
awoke := false
iter := 0
old := m.state
for {
// Don't spin in starvation mode, ownership is handed off to waiters
// so we won't be able to acquire the mutex anyway.
// 正常模式的锁状态下(非饥饿模式,饥饿模式下直接把锁移交给阻塞goroutine),可自旋
if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) { //sync_runtime_canSpin
// Active spinning makes sense.
// Try to set mutexWoken flag to inform Unlock
// to not wake other blocked goroutines.
//当前goroutine自旋,若未标记唤醒,且有阻塞的goroutine,则标记唤醒,防止unlock时唤醒别的阻塞goroutine
if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
awoke = true
}
//自旋等待锁释放
runtime_doSpin() // sync_runtime_doSpin
iter++
old = m.state
continue
}
new := old
// Don't try to acquire starving mutex, new arriving goroutines must queue.
// 非饥饿模式下直接加锁
if old&mutexStarving == 0 {
new |= mutexLocked
}
//记录阻塞队列数
if old&(mutexLocked|mutexStarving) != 0 {
new += 1 << mutexWaiterShift
}
// The current goroutine switches mutex to starvation mode.
// But if the mutex is currently unlocked, don't do the switch.
// Unlock expects that starving mutex has waiters, which will not
// be true in this case.
if starving && old&mutexLocked != 0 {
new |= mutexStarving
}
if awoke {
// 清空新状态值得唤醒标记位
// The goroutine has been woken from sleep,
// so we need to reset the flag in either case.
if new&mutexWoken == 0 {
throw("sync: inconsistent mutex state")
}
new &^= mutexWoken
}
if atomic.CompareAndSwapInt32(&m.state, old, new) {
if old&(mutexLocked|mutexStarving) == 0 {
// 加锁前非饥饿模式,非加锁则无需处理
break // locked the mutex with CAS
}
// If we were already waiting before, queue at the front of the queue.
// 若当前goroutine已等待过,则加入阻塞队列头部,否则加入尾部
queueLifo := waitStartTime != 0
if waitStartTime == 0 {
waitStartTime = runtime_nanotime()
}
runtime_SemacquireMutex(&m.sema, queueLifo, 1)
// 判断是否饥饿
starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
old = m.state
//饥饿模式下
if old&mutexStarving != 0 {
// If this goroutine was woken and mutex is in starvation mode,
// ownership was handed off to us but mutex is in somewhat
// inconsistent state: mutexLocked is not set and we are still
// accounted as waiter. Fix that.
if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
throw("sync: inconsistent mutex state")
}
//加锁并减少一个阻塞goroutine
delta := int32(mutexLocked - 1<<mutexWaiterShift)
//非饥饿模式 || 只剩一个阻塞goroutine,则退出饥饿模式
if !starving || old>>mutexWaiterShift == 1 {
// Exit starvation mode.
// Critical to do it here and consider wait time.
// Starvation mode is so inefficient, that two goroutines
// can go lock-step infinitely once they switch mutex
// to starvation mode.
delta -= mutexStarving
}
atomic.AddInt32(&m.state, delta)
break
}
// 未加锁成功,则继续唤醒自旋
awoke = true
iter = 0
} else {
// 状态值已改变则重新获取
old = m.state
}
}

if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
}

自旋

自旋这里有些条件

  • 未超过自旋4次上限
  • 多核
  • GOMAXPROCS <= 1 + (sched.npidle+sched.nmspinning:空闲的p和忙碌m)
  • 有其他可用的p 且 本地执行队列(runq)为空
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    //runtime/lock_sema.go
    const (
    locked uintptr = 1

    active_spin = 4 //最多自旋4次
    active_spin_cnt = 30 //每次30个cpu时钟周期
    passive_spin = 1
    )

    //runtime/proc.go

    // Active spinning for sync.Mutex.
    //go:linkname sync_runtime_canSpin sync.runtime_canSpin
    //go:nosplit
    func sync_runtime_canSpin(i int) bool {
    // sync.Mutex is cooperative, so we are conservative with spinning.
    // Spin only few times and only if running on a multicore machine and
    // GOMAXPROCS>1 and there is at least one other running P and local runq is empty.
    // As opposed to runtime mutex we don't do passive spinning here,
    // because there can be work on global runq or on other Ps.

    if i >= active_spin || ncpu <= 1 || gomaxprocs <= int32(sched.npidle+sched.nmspinning)+1 {
    return false
    }
    if p := getg().m.p.ptr(); !runqempty(p) {
    return false
    }
    return true
    }

    //go:linkname sync_runtime_doSpin sync.runtime_doSpin
    //go:nosplit
    // 每次自旋30个cpu时钟
    func sync_runtime_doSpin() {
    procyield(active_spin_cnt)
    }

UnLock

基本步骤是

  • 移除锁标志
  • 若为首次解锁(state恢复为0)则直接返回
  • 否则
    • 正常模式下,有阻塞goroutine则需唤醒一个,并标记唤醒标志;执行信号量释放
    • 饥饿模式下,不唤醒阻塞goroutine,直接把锁移交给阻塞队首的goroutine
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
// Unlock unlocks m.
// It is a run-time error if m is not locked on entry to Unlock.
//
// A locked Mutex is not associated with a particular goroutine.
// It is allowed for one goroutine to lock a Mutex and then
// arrange for another goroutine to unlock it.
func (m *Mutex) Unlock() {
if race.Enabled {
_ = m.state
race.Release(unsafe.Pointer(m))
}

// Fast path: drop lock bit.
// 首次解锁直接移除标记位即可
new := atomic.AddInt32(&m.state, -mutexLocked)
if new != 0 {
// Outlined slow path to allow inlining the fast path.
// To hide unlockSlow during tracing we skip one extra frame when tracing GoUnblock.
m.unlockSlow(new)
}
}

func (m *Mutex) unlockSlow(new int32) {
if (new+mutexLocked)&mutexLocked == 0 {
throw("sync: unlock of unlocked mutex")
}
if new&mutexStarving == 0 {
//非饥饿模式
old := new
for {
// If there are no waiters or a goroutine has already
// been woken or grabbed the lock, no need to wake anyone.
// In starvation mode ownership is directly handed off from unlocking
// goroutine to the next waiter. We are not part of this chain,
// since we did not observe mutexStarving when we unlocked the mutex above.
// So get off the way.
if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
return
}
// Grab the right to wake someone.
new = (old - 1<<mutexWaiterShift) | mutexWoken
if atomic.CompareAndSwapInt32(&m.state, old, new) {
runtime_Semrelease(&m.sema, false, 1)
return
}
old = m.state
}
} else {
// Starving mode: handoff mutex ownership to the next waiter.
// Note: mutexLocked is not set, the waiter will set it after wakeup.
// But mutex is still considered locked if mutexStarving is set,
// so new coming goroutines won't acquire it.
runtime_Semrelease(&m.sema, true, 1)
}
}

看完这两个函数,mutex的基本实现就有个大致概念了
但阻塞队列是如何实现的呢?

阻塞队列

具体在 runtime/sema.go 文件中有acquire和release时对阻塞队列的处理
队列存储结构是一个树堆结构 semaRoot.treap, 类型为 sudog
其中

  • treap内sudog的elem地址各不相同
  • 相同mutex阻塞的goroutine,会阻塞在同一个地址,即Mutex.sema地址,存储到elem,
  • 内部又按ticket值排序实现最小堆
  • 同一地址的sudog连成链表存入waitlink
  • waittail记录当前链表尾部

这样,对同一地址锁的阻塞队列的查找复杂度为O(1),查找semaRoot的根节点复杂度为O(log n)
n 是被哈希到同一个 semaRoot 的不同地址的总数。
这个列表semaTable大小为251。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
// runtime/sema.go
// A semaRoot holds a balanced tree of sudog with distinct addresses (s.elem).
// Each of those sudog may in turn point (through s.waitlink) to a list
// of other sudogs waiting on the same address.
// The operations on the inner lists of sudogs with the same address
// are all O(1). The scanning of the top-level semaRoot list is O(log n),
// where n is the number of distinct addresses with goroutines blocked
// on them that hash to the given semaRoot.
// See golang.org/issue/17953 for a program that worked badly
// before we introduced the second level of list, and test/locklinear.go
// for a test that exercises this.
type semaRoot struct {
lock mutex
treap *sudog // root of balanced tree of unique waiters.
nwait uint32 // Number of waiters. Read w/o the lock.
}

// Prime to not correlate with any user patterns.
const semTabSize = 251

var semtable [semTabSize]struct {
root semaRoot
pad [cpu.CacheLinePadSize - unsafe.Sizeof(semaRoot{})]byte
}

func semroot(addr *uint32) *semaRoot {
return &semtable[(uintptr(unsafe.Pointer(addr))>>3)%semTabSize].root
}

// runtime/runtime2.go
type mutex struct {
// Futex-based impl treats it as uint32 key,
// while sema-based impl as M* waitm.
// Used to be a union, but unions break precise GC.
key uintptr
}
type sudog struct {
// The following fields are protected by the hchan.lock of the
// channel this sudog is blocking on. shrinkstack depends on
// this for sudogs involved in channel ops.

g *g

// isSelect indicates g is participating in a select, so
// g.selectDone must be CAS'd to win the wake-up race.
isSelect bool
next *sudog
prev *sudog
elem unsafe.Pointer // data element (may point to stack)

// The following fields are never accessed concurrently.
// For channels, waitlink is only accessed by g.
// For semaphores, all fields (including the ones above)
// are only accessed when holding a semaRoot lock.

acquiretime int64
releasetime int64
ticket uint32
parent *sudog // semaRoot binary tree
waitlink *sudog // g.waiting list or semaRoot
waittail *sudog // semaRoot
c *hchan // channel
}

最后抛个问题,不知道有没有发现semtable中pad的大小是 [cpu.CacheLinePadSize - unsafe.Sizeof(semaRoot{})]byte
知道是为什么嘛?

参考

一份详细注释的go-mutex源码
Semaphore

如有疑问,请文末留言交流或邮件:newbvirgil@gmail.com 本文链接 : http://blog.newbmiao.com/2019/10/10/go_mutex_insight.html