Advanced Go Concurrency Patterns

文章目录

  1. 1. Concurrency Is Not Parallelism
    1. 1.1. Concurrency
    2. 1.2. Parallelism
    3. 1.3. VS
    4. 1.4. Go Concurrency Support
  2. 2. Go Concurrency Patterns
    1. 2.1. Service Channel
    2. 2.2. Multiplexing
    3. 2.3. Sequencing
    4. 2.4. for-select
    5. 2.5. timeout-select
    6. 2.6. quit channel
    7. 2.7. Daisy-chain
  3. 3. Advanced Go Concurrency Patterns
    1. 3.1. Three techniques
    2. 3.2. other improvement

go的并发模式是一个很有意思的东西,这里先做个搬运工。
本文是go官方talk里对并发的定义及并发的模式的一些搜集

原文引自:
Advanced Go Concurrency Patterns

Go Concurrency Patterns

Concurrency Is Not Parallelism

Concurrency Is Not Parallelism

Concurrency (并发) 是程序能组织执行过程使同时可以处理多件事
Parallelism (并行) 是程序能同时执行多件事

所以Rob Pike说:并发关乎结构,并行关乎执行

Concurrency

Concurrency is the composition of independently executing computations.

Concurrency is a way to structure software, particularly as a way to write clean code that interacts well with the real world.

Concurrency is about dealing with lots of things at once.

Parallelism

Parallelism is the simultaneous execution of (possibly related) computations.

Parallelism is about doing lots of things at once.

VS

Concurrency is about structure, parallelism is about execution.

Concurrency provides a way to structure a solution to solve a problem that may (but not necessarily) be parallelizable.

Go Concurrency Support

  • concurrent execution (goroutines) 协程支持
  • synchronization and messaging (channels) CSP的通信方式来共享内存
    此处需了解go的内存模型
  • multi-way concurrent control (select) 控制协程切换
    • All channels are evaluated.
      每个通道都会被评估
    • Selection blocks until one communication can proceed, which then does.
      没有可处理的通道时,select会一直阻塞
    • If multiple can proceed, select chooses pseudo-randomly.
      多个通道可执行是,select会伪随机选一个
    • A default clause, if present, executes immediately if no channel is ready.
      有default申明时,若没有可处理通道,则default立马执行

Go Concurrency Patterns

Service Channel

channel是go里first class值,想string这些类型一样。
用作函数返回时,通过返回的channel进行交互,可以起到服务一样的效果。

运行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
func main() {
joe := boring("Joe")
ann := boring("Ann")
for i := 0; i < 5; i++ {
fmt.Println(<-joe)
fmt.Println(<-ann)
}
fmt.Println("You're both boring; I'm leaving.")
}
func boring(msg string) <-chan string { // Returns receive-only channel of strings.
c := make(chan string)
go func() { // We launch the goroutine from inside the function.
for i := 0; ; i++ {
c <- fmt.Sprintf("%s %d", msg, i)
time.Sleep(time.Duration(rand.Intn(1e3)) * time.Millisecond)
}
}()
return c // Return the channel to the caller.
}

Multiplexing

就是将多个输入处理成一个,常见实现是使用go启动多个协程去合并;或者使用for-select去合并

运行

1
2
3
4
5
6
7
8
9
10
11
12
13
func main() {
c := fanIn(boring("Joe"), boring("Ann"))
for i := 0; i < 10; i++ {
fmt.Println(<-c)
}
fmt.Println("You're both boring; I'm leaving.")
}
func fanIn(input1, input2 <-chan string) <-chan string {
c := make(chan string)
go func() { for { c <- <-input1} }()
go func() { for { c <- <-input2} }()
return c
}

fan-in: A function can read from multiple inputs and proceed until all are closed by multiplexing the input channels onto a single channel that’s closed when all the inputs are closed.
fan-out: Multiple functions can read from the same channel until that channel is closed)
pipeline

Sequencing

按顺序执行,用全局通道去发送接受来控制任务依次执行
运行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
type Message struct {
str string
wait chan bool
}
//main
for i := 0; i < 5; i++ {
msg1 := <-c; fmt.Println(msg1.str)
msg2 := <-c; fmt.Println(msg2.str)
msg1.wait <- true
msg2.wait <- true
}
//boring
waitForIt := make(chan bool) // Give main control over our execution.
go func() { // Launch the goroutine from inside the function. Function Literal.
for i := 0; ; i++ {
c <- Message{fmt.Sprintf("%s %d", msg, i), waitForIt}
time.Sleep(time.Duration(rand.Intn(1e3)) * time.Millisecond)
<-waitForIt // Block until main tells us to go again.
}
}()

for-select

简化go创建多个协程的声明方式

1
2
3
4
5
6
7
8
9
10
11
12
func fanIn(input1, input2 <-chan string) <-chan string {
c := make(chan string)
go func() {
for {
select {
case s := <-input1: c <- s
case s := <-input2: c <- s
}
}
}()
return c
}

timeout-select

对select可以增加超时通道,超时则返回,避免select一直阻塞

1
2
3
4
5
6
7
8
9
10
11
12
func main() {
c := boring("Joe")
for {
select {
case s := <-c:
fmt.Println(s)
case <-time.After(1 * time.Second):
fmt.Println("You're too slow.")
return
}
}
}

quit channel

使用通道控制执行是否提前结束

  • just quit

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    //main
    quit := make(chan struct{})
    c := boring("Joe", quit)
    for i := rand.Intn(10); i >= 0; i-- { fmt.Println(<-c) }
    quit <- struct{}
    select {
    case c <- fmt.Sprintf("%s: %d", msg, i):
    // do nothing
    case <-quit:
    return
    }
  • use quit deliver msg
    quit := make(chan string)

Daisy-chain

关于这个模式stackoverflow有个讨论,可以看看

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func f(left, right chan int) {
left <- 1 + <-right
}
func main() {
const n = 10000
leftmost := make(chan int)
right := leftmost
left := leftmost
for i := 0; i < n; i++ {
right = make(chan int)
go f(left, right)
left = right
}
go func(c chan int) { c <- 1 }(right)
fmt.Println(<-leftmost)
}

Advanced Go Concurrency Patterns

Three techniques

  • for-select loop
    select防止loop阻塞在某一状态,便于调度多项任务
  • service channel, reply channels (chan chan error)
    service channel就是常见的pattern,不多说
    reply channels实现了close操作中关闭和错误返回无data race:
    close通过chan chan error向loop请求关闭,并等待其返回关闭前是否有错误

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    func (s *sub) Close() error {
    errc := make(chan error)
    s.closing <- errc // HLchan //请求关闭
    return <-errc // HLchan //等待结果返回
    }
    //in loop
    var err error
    for{
    select{
    case errc := <-s.closing: //收到关闭请求
    errc <- err //返回错误
    close(s.updates) //执行关闭
    return
    }
    }
  • nil channels in select cases
    向值为nil的channel发送和接收都会阻塞,在select中使用它可控制是否执行

此外,分享中提到一些点也值得注意:

other improvement

  • limit:限制pending(处理任务队列)的数目,不要让其无限增大,避免过多请求及内存消耗
  • async io:拆解fetch的请求和结果处理,使用状态channel同步,使fetch中不要阻塞其他处理进行
    拆解前
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    case <-startFetch:
    var fetched []Item
    fetched, next, err = s.fetcher.Fetch()
    if err != nil {
    next = time.Now().Add(10 * time.Second)
    break
    }
    for _, item := range fetched {
    if !seen[item.GUID] {
    pending = append(pending, item)
    seen[item.GUID] = true
    }
    }

拆解后
startFetch和fetchDone同一时刻,只有一个不为nil

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
type fetchResult struct{ fetched []Item; next time.Time; err error }
var fetchDone chan fetchResult // if non-nil, Fetch is running
var startFetch <-chan time.Time
if fetchDone == nil && len(pending) < maxPending {
startFetch = time.After(fetchDelay) // enable fetch case
}
select {
case <-startFetch:
fetchDone = make(chan fetchResult, 1)
go func() {
fetched, next, err := s.fetcher.Fetch()
fetchDone <- fetchResult{fetched, next, err}
}()
case result := <-fetchDone:
fetchDone = nil
// Use result.fetched, result.next, result.err

这是对以上模式整合实现的一个rss聚合器demo,可以仔细研究下
运行

如有疑问,请留言或邮件newbvirgil@gmail.com
本文链接 : http://blog.newbmiao.com/2018/02/09/advanced-go-concurrency-patterns.html