14.2 協程間的信道

14.2.1 概念

在第一個例子中,協程是獨立執行的,他們之間沒有通信。他們必須通信纔會變得更有用:彼此之間發送和接收信息並且協調/同步他們的工作。協程可以使用共享變量來通信,但是很不提倡這樣做,因爲這種方式給所有的共享內存的多線程都帶來了困難。

而Go有一個特殊的類型,通道(channel),像是通道(管道),可以通過它們發送類型化的數據在協程之間通信,可以避開所有內存共享導致的坑;通道的通信方式保證了同步性。數據通過通道:同一時間只有一個協程可以訪問數據:所以不會出現數據競爭,設計如此。數據的歸屬(可以讀寫數據的能力)被傳遞。

工廠的傳送帶是個很有用的例子。一個機器(生產者協程)在傳送帶上放置物品,另外一個機器(消費者協程)拿到物品並打包。

通道服務於通信的兩個目的:值的交換,同步的,保證了兩個計算(協程)任何時候都是可知狀態。

通常使用這樣的格式來聲明通道:var identifier chan datatype

未初始化的通道的值是nil。

所以通道只能傳輸一種類型的數據,比如 chan int 或者 chan string,所有的類型都可以用於通道,空接口 interface{} 也可以。甚至可以(有時非常有用)創建通道的通道。

通道實際上是類型化消息的隊列:使數據得以傳輸。它是先進先出(FIFO)結構的所以可以保證發送給他們的元素的順序(有些人知道,通道可以比作 Unix shells 中的雙向管道(tw-way pipe))。通道也是引用類型,所以我們使用 make() 函數來給它分配內存。這裏先聲明瞭一個字符串通道 ch1,然後創建了它(實例化):

var ch1 chan string
ch1 = make(chan string)

當然可以更短: ch1 := make(chan string)

這裏我們構建一個int通道的通道: chanOfChans := make(chan int)

或者函數通道:funcChan := chan func()(相關示例請看第 14.17 節)。

所以通道是對象的第一類型:可以存儲在變量中,作爲函數的參數傳遞,從函數返回以及通過通道發送它們自身。另外它們是類型化的,允許類型檢查,比如嘗試使用整數通道發送一個指針。

14.2.2 通信操作符 <-

這個操作符直觀的標示了數據的傳輸:信息按照箭頭的方向流動。

流向通道(發送)

ch <- int1 表示:用通道 ch 發送變量 int1(雙目運算符,中綴 = 發送)

從通道流出(接收),三種方式:

int2 = <- ch 表示:變量 int2 從通道 ch(一元運算的前綴操作符,前綴 = 接收)接收數據(獲取新值);假設 int2 已經聲明過了,如果沒有的話可以寫成:int2 := <- ch

<- ch 可以單獨調用獲取通道的(下一個)值,當前值會被丟棄,但是可以用來驗證,所以以下代碼是合法的:

if <- ch != 1000{
    ...
}

操作符 <- 也被用來發送和接收,Go 儘管不必要,爲了可讀性,通道的命名通常以 ch 開頭或者包含 chan。通道的發送和接收操作都是自動的:它們通常一氣呵成。下面的示例展示了通信操作。

示例 14.2-goroutine2.go

package main

import (
    "fmt"
    "time"
)

func main() {
    ch := make(chan string)

    go sendData(ch)
    go getData(ch)  

    time.Sleep(1e9)
}

func sendData(ch chan string) {
    ch <- "Washington"
    ch <- "Tripoli"
    ch <- "London"
    ch <- "Beijing"
    ch <- "Tokio"
}

func getData(ch chan string) {
    var input string
    // time.Sleep(1e9)
    for {
        input = <-ch
        fmt.Printf("%s ", input)
    }
}

輸出:

Washington Tripoli London Beijing Tokio

main() 函數中啓動了兩個協程:sendData() 通過通道 ch 發送了 5 個字符串,getData() 按順序接收它們並打印出來。

如果 2 個協程需要通信,你必須給他們同一個通道作爲參數才行。

嘗試一下如果註釋掉 time.Sleep(1e9) 會如何。

我們發現協程之間的同步非常重要:

  • main() 等待了 1 秒讓兩個協程完成,如果不這樣,sendData() 就沒有機會輸出。
  • getData() 使用了無限循環:它隨着 sendData() 的發送完成和 ch 變空也結束了。
  • 如果我們移除一個或所有 go 關鍵字,程序無法運行,Go 運行時會拋出 panic:
---- Error run E:/Go/Goboek/code examples/chapter 14/goroutine2.exe with code Crashed ---- Program exited with code -2147483645: panic: all goroutines are asleep-deadlock!

爲什麼會這樣?運行時會檢查所有的協程(也許只有一個是這種情況)是否在等待(可以讀取或者寫入某個通道),意味着程序無法處理。這是死鎖(deadlock)形式,運行時可以檢測到這種情況。

注意:不要使用打印狀態來表明通道的發送和接收順序:由於打印狀態和通道實際發生讀寫的時間延遲會導致和真實發生的順序不同。

練習 14.4:解釋一下爲什麼如果在函數 getData() 的一開始插入 time.Sleep(1e9),不會出現錯誤但也沒有輸出呢。

14.2.3 通道阻塞

默認情況下,通信是同步且無緩衝的:在有接受者接收數據之前,發送不會結束。可以想象一個無緩衝的通道在沒有空間來保存數據的時候:必須要一個接收者準備好接收通道的數據然後發送者可以直接把數據發送給接收者。所以通道的發送/接收操作在對方準備好之前是阻塞的:

1)對於同一個通道,發送操作(協程或者函數中的),在接收者準備好之前是阻塞的:如果ch中的數據無人接收,就無法再給通道傳入其他數據:新的輸入無法在通道非空的情況下傳入。所以發送操作會等待 ch 再次變爲可用狀態:就是通道值被接收時(可以傳入變量)。

2)對於同一個通道,接收操作是阻塞的(協程或函數中的),直到發送者可用:如果通道中沒有數據,接收者就阻塞了。

儘管這看上去是非常嚴格的約束,實際在大部分情況下工作的很不錯。

程序 channel_block.go 驗證了以上理論,一個協程在無限循環中給通道發送整數數據。不過因爲沒有接收者,只輸出了一個數字 0。

示例 14.3-channel_block.go

package main

import "fmt"

func main() {
    ch1 := make(chan int)
    go pump(ch1)       // pump hangs
    fmt.Println(<-ch1) // prints only 0
}

func pump(ch chan int) {
    for i := 0; ; i++ {
        ch <- i
    }
}

輸出:

0

pump() 函數爲通道提供數值,也被叫做生產者。

爲通道解除阻塞定義了 suck 函數來在無限循環中讀取通道,參見示例 14.4-channel_block2.go

func suck(ch chan int) {
    for {
        fmt.Println(<-ch)
    }
}

main() 中使用協程開始它:

go pump(ch1)
go suck(ch1)
time.Sleep(1e9)

給程序 1 秒的時間來運行:輸出了上萬個整數。

練習 14.1:channel_block3.go:寫一個通道證明它的阻塞性,開啓一個協程接收通道的數據,持續 15 秒,然後給通道放入一個值。在不同的階段打印消息並觀察輸出。

14.2.4 通過一個(或多個)通道交換數據進行協程同步。

通信是一種同步形式:通過通道,兩個協程在通信(協程會和)中某刻同步交換數據。無緩衝通道成爲了多個協程同步的完美工具。

甚至可以在通道兩端互相阻塞對方,形成了叫做死鎖的狀態。Go 運行時會檢查並 panic,停止程序。死鎖幾乎完全是由糟糕的設計導致的。

無緩衝通道會被阻塞。設計無阻塞的程序可以避免這種情況,或者使用帶緩衝的通道。

練習 14.2: blocking.go

解釋爲什麼下邊這個程序會導致 panic:所有的協程都休眠了 - 死鎖!

package main

import (
    "fmt"
)

func f1(in chan int) {
    fmt.Println(<-in)
}

func main() {
    out := make(chan int)
    out <- 2
    go f1(out)
}

14.2.5 同步通道-使用帶緩衝的通道

一個無緩衝通道只能包含 1 個元素,有時顯得很侷限。我們給通道提供了一個緩存,可以在擴展的 make 命令中設置它的容量,如下:

buf := 100
ch1 := make(chan string, buf)

buf 是通道可以同時容納的元素(這裏是 string)個數

在緩衝滿載(緩衝被全部使用)之前,給一個帶緩衝的通道發送數據是不會阻塞的,而從通道讀取數據也不會阻塞,直到緩衝空了。

緩衝容量和類型無關,所以可以(儘管可能導致危險)給一些通道設置不同的容量,只要他們擁有同樣的元素類型。內置的 cap 函數可以返回緩衝區的容量。

如果容量大於 0,通道就是異步的了:緩衝滿載(發送)或變空(接收)之前通信不會阻塞,元素會按照發送的順序被接收。如果容量是0或者未設置,通信僅在收發雙方準備好的情況下才可以成功。

同步:ch :=make(chan type, value)

  • value == 0 -> synchronous, unbuffered (阻塞)
  • value > 0 -> asynchronous, buffered(非阻塞)取決於value元素

若使用通道的緩衝,你的程序會在“請求”激增的時候表現更好:更具彈性,專業術語叫:更具有伸縮性(scalable)。要在首要位置使用無緩衝通道來設計算法,只在不確定的情況下使用緩衝。

練習 14.3:channel_buffer.go:給 channel_block3.go 的通道增加緩衝並觀察輸出有何不同。

14.2.6 協程中用通道輸出結果

爲了知道計算何時完成,可以通過信道回報。在例子 go sum(bigArray) 中,要這樣寫:

ch := make(chan int)
go sum(bigArray, ch) // bigArray puts the calculated sum on ch
// .. do something else for a while
sum := <- ch // wait for, and retrieve the sum

也可以使用通道來達到同步的目的,這個很有效的用法在傳統計算機中稱爲信號量(semaphore)。或者換個方式:通過通道發送信號告知處理已經完成(在協程中)。

在其他協程運行時讓 main 程序無限阻塞的通常做法是在 main 函數的最後放置一個{}。

也可以使用通道讓 main 程序等待協程完成,就是所謂的信號量模式,我們會在接下來的部分討論。

14.2.7 信號量模式

下邊的片段闡明:協程通過在通道 ch 中放置一個值來處理結束的信號。main 協程等待 <-ch 直到從中獲取到值。

我們期望從這個通道中獲取返回的結果,像這樣:

func compute(ch chan int){
    ch <- someComputation() // when it completes, signal on the channel.
}

func main(){
    ch := make(chan int)     // allocate a channel.
    go compute(ch)        // stat something in a goroutines
    doSomethingElseForAWhile()
    result := <- ch
}

這個信號也可以是其他的,不返回結果,比如下面這個協程中的匿名函數(lambda)協程:

ch := make(chan int)
go func(){
    // doSomething
    ch <- 1 // Send a signal; value does not matter
}
doSomethingElseForAWhile()
<- ch    // Wait for goroutine to finish; discard sent value.

或者等待兩個協程完成,每一個都會對切片s的一部分進行排序,片段如下:

done := make(chan bool)
// doSort is a lambda function, so a closure which knows the channel done:
doSort := func(s []int){
    sort(s)
    done <- true
}
i := pivot(s)
go doSort(s[:i])
go doSort(s[i:])
<-done
<-done

下邊的代碼,用完整的信號量模式對長度爲N的 float64 切片進行了 N 個doSomething() 計算並同時完成,通道 sem 分配了相同的長度(切包含空接口類型的元素),待所有的計算都完成後,發送信號(通過放入值)。在循環中從通道 sem 不停的接收數據來等待所有的協程完成。

type Empty interface {}
var empty Empty
...
data := make([]float64, N)
res := make([]float64, N)
sem := make(chan Empty, N)
...
for i, xi := range data {
    go func (i int, xi float64) {
        res[i] = doSomething(i, xi)
        sem <- empty
    } (i, xi)
}
// wait for goroutines to finish
for i := 0; i < N; i++ { <-sem }

注意閉合:ixi 都是作爲參數傳入閉合函數的,從外層循環中隱藏了變量 ixi。讓每個協程有一份 ixi 的拷貝;另外,for 循環的下一次迭代會更新所有協程中 ixi 的值。切片 res 沒有傳入閉合函數,因爲協程不需要單獨拷貝一份。切片 res 也在閉合函數中但並不是參數。

14.2.8 實現並行的 for 循環

在上一部分章節 14.2.7 的代碼片段中:for 循環的每一個迭代是並行完成的:

for i, v := range data {
    go func (i int, v float64) {
        doSomething(i, v)
        ...
    } (i, v)
}

在 for 循環中並行計算迭代可能帶來很好的性能提升。不過所有的迭代都必須是獨立完成的。有些語言比如 Fortress 或者其他並行框架以不同的結構實現了這種方式,在 Go 中用協程實現起來非常容易:

14.2.9 用帶緩衝通道實現一個信號量

信號量是實現互斥鎖(排外鎖)常見的同步機制,限制對資源的訪問,解決讀寫問題,比如沒有實現信號量的 sync 的 Go 包,使用帶緩衝的通道可以輕鬆實現:

  • 帶緩衝通道的容量和要同步的資源容量相同
  • 通道的長度(當前存放的元素個數)與當前資源被使用的數量相同
  • 容量減去通道的長度就是未處理的資源個數(標準信號量的整數值)

不用管通道中存放的是什麼,只關注長度;因此我們創建了一個長度可變但容量爲0(字節)的通道:

type Empty interface {}
type semaphore chan Empty

將可用資源的數量N來初始化信號量 semaphoresem = make(semaphore, N)

然後直接對信號量進行操作:

// acquire n resources
func (s semaphore) P(n int) {
    e := new(Empty)
    for i := 0; i < n; i++ {
        s <- e
    }
}

// release n resouces
func (s semaphore) V(n int) {
    for i:= 0; i < n; i++{
        <- s
    }
}

可以用來實現一個互斥的例子:

/* mutexes */
func (s semaphore) Lock() {
    s.P(1)
}

func (s semaphore) Unlock(){
    s.V(1)
}

/* signal-wait */
func (s semaphore) Wait(n int) {
    s.P(n)
}

func (s semaphore) Signal() {
    s.V(1)
}

練習 14.5:gosum.go:用這種習慣用法寫一個程序,開啓一個協程來計算2個整數的合併等待計算結果並打印出來。

練習 14.6:producer_consumer.go:用這種習慣用法寫一個程序,有兩個協程,第一個提供數字 0,10,20,...90 並將他們放入通道,第二個協程從通道中讀取並打印。main() 等待兩個協程完成後再結束。

習慣用法:通道工廠模式

編程中常見的另外一種模式如下:不將通道作爲參數傳遞給協程,而用函數來生成一個通道並返回(工廠角色);函數內有個匿名函數被協程調用。

channel_block2.go 加入這種模式便有了示例 14.5-channel_idiom.go

package main

import (
    "fmt"
    "time"
)

func main() {
    stream := pump()
    go suck(stream)
    time.Sleep(1e9)
}

func pump() chan int {
    ch := make(chan int)
    go func() {
        for i := 0; ; i++ {
            ch <- i
        }
    }()
    return ch
}

func suck(ch chan int) {
    for {
        fmt.Println(<-ch)
    }
}

14.2.10 給通道使用 for 循環

for 循環的 range 語句可以用在通道 ch 上,便可以從通道中獲取值,像這樣:

for v := range ch {
    fmt.Printf("The value is %v\n", v)
}

它從指定通道中讀取數據直到通道關閉,才繼續執行下邊的代碼。很明顯,另外一個協程必須寫入 ch(不然代碼就阻塞在 for 循環了),而且必須在寫入完成後才關閉。suck 函數可以這樣寫,且在協程中調用這個動作,程序變成了這樣:

示例 14.6-channel_idiom2.go

package main

import (
    "fmt"
    "time"
)

func main() {
    suck(pump())
    time.Sleep(1e9)
}

func pump() chan int {
    ch := make(chan int)
    go func() {
        for i := 0; ; i++ {
            ch <- i
        }
    }()
    return ch
}

func suck(ch chan int) {
    go func() {
        for v := range ch {
            fmt.Println(v)
        }
    }()
}

習慣用法:通道迭代模式

這個模式用到了前邊示例 14.6 中的模式,通常,需要從包含了地址索引字段 items 的容器給通道填入元素。爲容器的類型定義一個方法 Iter(),返回一個只讀的通道(參見第 14.2.8 節)items,如下:

func (c *container) Iter () <- chan items {
    ch := make(chan item)
    go func () {
        for i:= 0; i < c.Len(); i++{    // or use a for-range loop
            ch <- c.items[i]
        }
    } ()
    return ch
}

在協程裏,一個 for 循環迭代容器 c 中的元素(對於樹或圖的算法,這種簡單的 for 循環可以替換爲深度優先搜索)。

調用這個方法的代碼可以這樣迭代容器:

for x := range container.Iter() { ... }

可以運行在自己的協程中,所以上邊的迭代用到了一個通道和兩個協程(可能運行在兩個線程上)。就有了一個特殊的生產者-消費者模式。如果程序在協程給通道寫完值之前結束,協程不會被回收;設計如此。這種行爲看起來是錯誤的,但是通道是一種線程安全的通信。在這種情況下,協程嘗試寫入一個通道,而這個通道永遠不會被讀取,這可能是個 bug 而並非期望它被靜默的回收。

習慣用法:生產者消費者模式

假設你有 Produce() 函數來產生 Consume 函數需要的值。它們都可以運行在獨立的協程中,生產者在通道中放入給消費者讀取的值。整個處理過程可以替換爲無限循環:

for {
    Consume(Produce())
}

14.2.11 通道的方向

通道類型可以用註解來表示它只發送或者只接收:

var send_only chan<- int         // channel can only send data
var recv_only <-chan int        // channel can onley receive data

只接收的通道(<-chan T)無法關閉,因爲關閉通道是發送者用來表示不再給通道發送值了,所以對只接收通道是沒有意義的。通道創建的時候都是雙向的,但也可以分配有方向的通道變量,就像以下代碼:

var c = make(chan int) // bidirectional
go source(c)
go sink(c)

func source(ch chan<- int){
    for { ch <- 1 }
}

func sink(ch <-chan int) {
    for { <-ch }
}

習慣用法:管道和選擇器模式

更具體的例子還有協程處理它從通道接收的數據併發送給輸出通道:

sendChan := make(chan int)
reciveChan := make(chan string)
go processChannel(sendChan, receiveChan)

func processChannel(in <-chan int, out chan<- string) {
    for inValue := range in {
        result := ... /// processing inValue
    out <- result
    }
}

通過使用方向註解來限制協程對通道的操作。

這裏有一個來自 Go 指導的很讚的例子,打印了輸出的素數,使用選擇器(‘篩’)作爲它的算法。每個 prime 都有一個選擇器,如下圖:

版本1:示例 14.7-sieve1.go

// Copyright 2009 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.package main
package main

import "fmt"

// Send the sequence 2, 3, 4, ... to channel 'ch'.
func generate(ch chan int) {
    for i := 2; ; i++ {
        ch <- i // Send 'i' to channel 'ch'.
    }
}

// Copy the values from channel 'in' to channel 'out',
// removing those divisible by 'prime'.
func filter(in, out chan int, prime int) {
    for {
        i := <-in // Receive value of new variable 'i' from 'in'.
        if i%prime != 0 {
            out <- i // Send 'i' to channel 'out'.
        }
    }
}

// The prime sieve: Daisy-chain filter processes together.
func main() {
    ch := make(chan int) // Create a new channel.
    go generate(ch)      // Start generate() as a goroutine.
    for {
        prime := <-ch
        fmt.Print(prime, " ")
        ch1 := make(chan int)
        go filter(ch, ch1, prime)
        ch = ch1
    }
}

協程 filter(in, out chan int, prime int) 拷貝整數到輸出通道,丟棄掉可以被 prime 整除的數字。然後每個 prime 又開啓了一個新的協程,生成器和選擇器併發請求。

輸出:

2 3 5 7 11 13 17 19 23 29 31 37 41 43 47 53 59 61 67 71 73 79 83 89 97 101
103 107 109 113 127 131 137 139 149 151 157 163 167 173 179 181 191 193 197 199 211 223
227 229 233 239 241 251 257 263 269 271 277 281 283 293 307 311 313 317 331 337 347 349
353 359 367 373 379 383 389 397 401 409 419 421 431 433 439 443 449 457 461 463 467 479
487 491 499 503 509 521 523 541 547 557 563 569 571 577 587 593 599 601 607 613 617 619
631 641 643 647 653 659 661 673 677 683 691 701 709 719 727 733 739 743 751 757 761 769
773 787 797 809 811 821 823 827 829 839 853 857 859 863 877 881 883 887 907 911 919 929
937 941 947 953 967 971 977 983 991 997 1009 1013...

第二個版本引入了上邊的習慣用法:函數 sievegeneratefilter 都是工廠;它們創建通道並返回,而且使用了協程的 lambda 函數。main 函數現在短小清晰:它調用 sieve() 返回了包含素數的通道,然後通過 fmt.Println(<-primes) 打印出來。

版本2:示例 14.8-sieve2.go

// Copyright 2009 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package main

import (
    "fmt"
)

// Send the sequence 2, 3, 4, ... to returned channel
func generate() chan int {
    ch := make(chan int)
    go func() {
        for i := 2; ; i++ {
            ch <- i
        }
    }()
    return ch
}

// Filter out input values divisible by 'prime', send rest to returned channel
func filter(in chan int, prime int) chan int {
    out := make(chan int)
    go func() {
        for {
            if i := <-in; i%prime != 0 {
                out <- i
            }
        }
    }()
    return out
}

func sieve() chan int {
    out := make(chan int)
    go func() {
        ch := generate()
        for {
            prime := <-ch
            ch = filter(ch, prime)
            out <- prime
        }
    }()
    return out
}

func main() {
    primes := sieve()
    for {
        fmt.Println(<-primes)
    }
}

鏈接

results matching ""

    No results matching ""