14.5 通道、超時和計時器(Ticker)
time
包中有一些有趣的功能可以和通道組合使用。
其中就包含了 time.Ticker
結構體,這個對象以指定的時間間隔重複的向通道 C 發送時間值:
type Ticker struct {
C <-chan Time // the channel on which the ticks are delivered.
// contains filtered or unexported fields
...
}
時間間隔的單位是 ns(納秒,int64),在工廠函數 time.NewTicker
中以 Duration
類型的參數傳入:func Newticker(dur) *Ticker
。
在協程週期性的執行一些事情(打印狀態日誌,輸出,計算等等)的時候非常有用。
調用 Stop()
使計時器停止,在 defer
語句中使用。這些都很好的適應 select
語句:
ticker := time.NewTicker(updateInterval)
defer ticker.Stop()
...
select {
case u:= <-ch1:
...
case v:= <-ch2:
...
case <-ticker.C:
logState(status) // call some logging function logState
default: // no value ready to be received
...
}
time.Tick()
函數聲明爲 Tick(d Duration) <-chan Time
,當你想返回一個通道而不必關閉它的時候這個函數非常有用:它以 d 爲週期給返回的通道發送時間,d是納秒數。如果需要像下邊的代碼一樣,限制處理頻率(函數 client.Call()
是一個 RPC 調用,這裏暫不贅述(參見第 15.9 節):
import "time"
rate_per_sec := 10
var dur Duration = 1e9 / rate_per_sec
chRate := time.Tick(dur) // a tick every 1/10th of a second
for req := range requests {
<- chRate // rate limit our Service.Method RPC calls
go client.Call("Service.Method", req, ...)
}
這樣只會按照指定頻率處理請求:chRate
阻塞了更高的頻率。每秒處理的頻率可以根據機器負載(和/或)資源的情況而增加或減少。
問題 14.1:擴展上邊的代碼,思考如何承載週期請求數的暴增(提示:使用帶緩衝通道和計時器對象)。
定時器(Timer)結構體看上去和計時器(Ticker)結構體的確很像(構造爲 NewTimer(d Duration)
),但是它只發送一次時間,在 Dration d
之後。
還有 time.After(d)
函數,聲明如下:
func After(d Duration) <-chan Time
在 Duration d
之後,當前時間被髮到返回的通道;所以它和 NewTimer(d).C
是等價的;它類似 Tick()
,但是 After()
只發送一次時間。下邊有個很具體的示例,很好的闡明瞭 select
中 default
的作用:
示例 14.11:timer_goroutine.go:
package main
import (
"fmt"
"time"
)
func main() {
tick := time.Tick(1e8)
boom := time.After(5e8)
for {
select {
case <-tick:
fmt.Println("tick.")
case <-boom:
fmt.Println("BOOM!")
return
default:
fmt.Println(" .")
time.Sleep(5e7)
}
}
}
輸出:
.
.
tick.
.
.
tick.
.
.
tick.
.
.
tick.
.
.
tick.
BOOM!
習慣用法:簡單超時模式
要從通道 ch
中接收數據,但是最多等待1秒。先創建一個信號通道,然後啓動一個 lambda
協程,協程在給通道發送數據之前是休眠的:
timeout := make(chan bool, 1)
go func() {
time.Sleep(1e9) // one second
timeout <- true
}()
然後使用 select
語句接收 ch
或者 timeout
的數據:如果 ch
在 1 秒內沒有收到數據,就選擇到了 time
分支並放棄了 ch
的讀取。
select {
case <-ch:
// a read from ch has occured
case <-timeout:
// the read from ch has timed out
break
}
第二種形式:取消耗時很長的同步調用
也可以使用 time.After()
函數替換 timeout-channel
。可以在 select
中使用以發送信號超時或停止協程的執行。以下代碼,在 timeoutNs
納秒後執行 select
的 timeout
分支時,client.Call
不會給通道 ch
返回值:
ch := make(chan error, 1)
go func() { ch <- client.Call("Service.Method", args, &reply) } ()
select {
case resp := <-ch
// use resp and reply
case <-time.After(timeoutNs):
// call timed out
break
}
注意緩衝大小設置爲 1 是必要的,可以避免協程死鎖以及確保超時的通道可以被垃圾回收。
第三種形式:假設程序從多個複製的數據庫同時讀取。只需要一個答案,需要接收首先到達的答案,Query
函數獲取數據庫的連接切片並請求。並行請求每一個數據庫並返回收到的第一個響應:
func Query(conns []conn, query string) Result {
ch := make(chan Result, 1)
for _, conn := range conns {
go func(c Conn) {
select {
case ch <- c.DoQuery(query):
default:
}
}(conn)
}
return <- ch
}
再次聲明,結果通道 ch
必須是帶緩衝的:以保證第一個發送進來的數據有地方可以存放,確保放入的首個數據總會成功,所以第一個到達的值會被獲取而與執行的順序無關。正在執行的協程可以總是可以使用 runtime.Goexit()
來停止。
在應用中緩存數據:
應用程序中用到了來自數據庫(或者常見的數據存儲)的數據時,經常會把數據緩存到內存中,因爲從數據庫中獲取數據的操作代價很高;如果數據庫中的值不發生變化就沒有問題。但是如果值有變化,我們需要一個機制來週期性的從數據庫重新讀取這些值:緩存的值就不可用(過期)了,而且我們也不希望用戶看到陳舊的數據。
鏈接
- 目錄
- 上一節:使用select切換協程
- 下一節:協程和恢復(recover)