Go并发编程

Go

本文最后更新于 <span id="expire-date"></span> 天前,文中部分描述可能已经过时。

绪论

主要困难:

  • 不知道如何选择并发原语
  • 在多个并发源于中选择最优解,比如mutex or chan?
  • 不知如何编排并发任务
  • 排查panic或者死锁困难
  • 已知的并发原语不能解决并发问题

核心内容

基本并发原语:主要有Mutex RWMutex Waitgroup Cond Pool Context等标准库中的并发原语,这些都是传统的并发原语

原子操作:原子操作时其他并发原语的基础,可以以此创造新的并发原语

channel:chan类型时golang独有的,很好用

拓展并发原语:信号量、SingleFlight、循环栅栏、ErrGroup

分布式并发原语: 是应对大规模的应用程序中并发问题的并发类型,主要有etcd实现的分布式并发原语,比如Leader选举、分布式互斥锁、分布式读写锁、分布式队列等

学习方向

基础用法、实现原理、易错场景、知名项目bug

初始:任务编排用chan,共享资源保护用传统并发原语,进阶后不能局限于此

有野心地创造出自己需要地并发原语:

  1. 对既有并发原语进行组合,使用多个并发原语解决问题
  2. 根据已掌握地并发原语地设计经验,创造出合适地新的并发原语

解决资源并发访问问题

互斥锁的实现机制

互斥锁是并发控制的一个基本手段,是为了避免竞争而建立的一种并发控制机制。

临界区: 一个被共享的资源,为了避免并发访问导致的问题,部分程序需要被保护起来

使用互斥锁使临界区只能同时由一个线程持有

同步原语的使用场景:

  1. 共享资源
  2. 任务编排
  3. 消息传递

Mutex的基本使用方法

sync包定义了一个Locker接口,Mutex就实现了这个接口

Locker 的接口定义了锁同步原语的方法集:

1
2
3
4
type Locker interface{
Lock()
UnLock()
}

互斥锁mutex只提供两个方法:进入临界区之前调用Lock方法,退出临界区的时候调用Unlock方法

1
2
func(m *Mutex) Lock()
func(m *Mutex) Unlock()

以这段代码为例子,若想实现10个goroutine 分别对count加10000,则目标结果为1000000

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package main

import (
"fmt"
"sync"
)

func main() {
count := 0
var wg sync.WaitGroup
wg.Add(10)
for i := 0; i < 10; i++ {
go func() {
defer wg.Done()
for j := 0; j < 100000; j++ {
count++
}
}()
}
wg.Wait()
fmt.Println(count)
}

结果:

1
2
go run main.go
364393

为什么结果是错的?rnm,退钱!!!

因为count++不是一个原子操作,包含几个步骤,读取,加一,保存。若10个goroutine同时读取到并操作,最后只加了1,汇编代码如下

1
2
3
MOVQ "".count(SB), AX
LEAQ 1(AX), CX
MOVQ CX, "".count(SB)

使用mutex的正确体位

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
package main

import (
"fmt"
"sync"
)

func main() {
count := 0
var mu sync.Mutex
var wg sync.WaitGroup
wg.Add(10)
for i := 0; i < 10; i++ {
go func() {
defer wg.Done()
for j := 0; j < 100000; j++ {
mu.Lock() //start
count++
mu.Unlock()//end
}
}()
}
wg.Wait()
fmt.Println(count)
}

mutex的其他用法

嵌入到其它struct中使用

1
2
3
4
type Counter struct{
mu sync.Mutex
Count uint64
}

在初始化嵌入的struct时,不必初始化这个mutex字段

通过嵌入字段,直接在struct上调用Lock/Unlock方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package main

import (
"fmt"
"sync"
)

type Counter struct {
sync.Mutex
Count uint64
}

func main() {
var counter Counter
var wg sync.WaitGroup
wg.Add(10)
for i := 0; i < 10; i++ {
go func() {
defer wg.Done()
for j := 0; j < 100000; j++ {
counter.Lock()
counter.Count++
counter.Unlock()
}
}()
}
wg.Wait()
fmt.Println(counter.Count)
}

还可以把获取锁、释放锁、计数加一的逻辑封装成一个方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
package main

import (
"fmt"
"sync"
)

type Counter struct {
CounterType int
Name string

mu sync.Mutex
count uint64
}

func (c *Counter) Incr() {
c.mu.Lock()
c.count++
c.mu.Unlock()
}

func (c *Counter) Count() uint64 {
c.mu.Lock()
defer c.mu.Unlock()
return c.count
}

func main() {
var counter Counter
var wg sync.WaitGroup
wg.Add(10)

for i := 0; i < 10; i++ {
go func() {
defer wg.Done()
for j := 0; j < 100000; j++ {
counter.Incr()
}
}()
}
wg.Wait()
fmt.Println(counter.Count())
}

mutex实现

mutex的架构演进分为四个阶段

初版:使用一个flag字段标识是否持有锁

给新人机会:新的goroutine也能有机会竞争锁

多给些机会:新来的和被唤醒的有更多的机会竞争锁

解决饥饿:解决竞争问题,不会让goroutine长久等待

初版的互斥锁

通过一个flag变量,标记当前的锁是否被某个goroutine持有,如果flag的值是1,就代表锁已经被持有,那么,其他竞争的goroutine只能等待;如果这个flag的值是0,就可以通过CAS(compare-and-swap, 或者 compare-and-set)将这个flag设置为1,标识锁被当前goroutine持有了

Russ Cox在2008年提交的第一版mutex就是这样实现的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
func cas(val *int32, old, new int32) bool
func semacquire(*int32)
func semrelease(*int32)
//互斥锁的结构,包含两个字段
type Mutex struct{
key int32 //锁是否被持有的标识
sema int32 //信号量专用,用以阻塞/唤醒goroutine
}
//保证成功在val上增加delta的值
func xadd(val *int32, delta int32)(new int32){
for{
v := *val
if cas(val, v, v + delta){
return v +delta
}
}
panic("unreached")
}

//请求锁
func (m *Mutex) Lock(){
if xadd(&m.key, 1) == 1{//标识加1,如果等于1,成功获取到锁
return
}
semacquire(&m.sema) //否则阻塞等待
}
func (m *Mutex) Unlock(){
if xadd(&m.key, -1) == 0{//将标识减去1,如果等于0; 则没有其它等待着
return
}
semrelease(&m.sema)//唤醒其它阻塞的goroutine
}

CAS指令将给定的值和一个内存地址中的值进行比较,如果是同一个值,就用新值替换内存地址中的值,这个操作是原子性的

字段key:是一个flag,用来标识这个排外锁是否被某个goroutine持有,如果key大于等于1,说明这个排外锁已经被持有

字段sema:是个信号量变量,用来控制等待goroutine的阻塞休眠和唤醒

调用Lock请求锁的时候,通过xadd方法进行CAS操作,xadd方法通过循环执行CAS操作直到成功,保证对key加1的操作成功完成。如果比较幸运,锁没有被别的goroutine持有,那么Lock方法成功地将key设置为1,这个goroutine就持有了这个锁;如果锁已经被别的goroutine持有了,那么,当前的goroutine会把key加1,而且会调用semacquire方法,使用信号量将自己休眠,等锁释放的时候,信号量会将它唤醒

持有锁的goroutine调用Unlock释放锁时,它会将key减1,如果当前没有其它等待这个锁的goroutine,这个方法就返回了。但是,如果还有等待此锁的其它goroutine,那么,它会调用semrelease方法,利用信号量唤醒等待锁对的其它goroutine中的一个。

在使用mutex的时候,必须要保证goroutine尽可能不去释放自己未持有的锁,一定要遵循“谁申请,谁释放”的原则,一般在同一个方法中获取和释放锁。

给新人机会

2011年6月30日对mutex的调整

1
2
3
4
5
6
7
8
9
10
type Mutex struct{
state int32
sema uint32
}
const(
mutexLocked = 1 << iota // mutex is locked
mytexWoken
mutexWaiterShift = iota

)

state字段是个复合字段,第一位(最小位)表示这个锁是否被持有,第二位代表是否有唤醒的goroutine,剩余的位数代表的是等待此锁的goroutine数

请求锁的方法Lock变得复杂,state操作难以理解,代码逻辑也变得复杂

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
func (m *Mutex) Lock() {
// Fast path: 幸运case,能够直接获取到锁
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
return
}

awoke := false
for {
old := m.state
new := old | mutexLocked // 新状态加锁
if old&mutexLocked != 0 {
new = old + 1<<mutexWaiterShift //等待者数量加一
}
if awoke {
// goroutine是被唤醒的,
// 新状态清除唤醒标志
new &^= mutexWoken
}
if atomic.CompareAndSwapInt32(&m.state, old, new) {//设置新状态
if old&mutexLocked == 0 { // 锁原状态未加锁
break
}
runtime.Semacquire(&m.sema) // 请求信号量
awoke = true
}
}
}

本文作者:Mosquito

本文链接: http://example.com/2022/06/04/Go%E5%B9%B6%E5%8F%91%E7%BC%96%E7%A8%8B/

Mosquito 天使之吻手打

文章默认使用 CC BY-NC-SA 4.0 协议进行许可,使用时请注意遵守协议。