14.5 通道、超時和計時器(Ticker)

time 包中有一些有趣的功能可以和通道組合使用。

其中就包含了 time.Ticker 結構體,這個對象以指定的時間間隔重複的向通道 C 發送時間值:

type Ticker struct {
    C <-chan Time // the channel on which the ticks are delivered.
    // contains filtered or unexported fields
    ...
}

時間間隔的單位是 ns(納秒,int64),在工廠函數 time.NewTicker 中以 Duration 類型的參數傳入:func Newticker(dur) *Ticker

在協程週期性的執行一些事情(打印狀態日誌,輸出,計算等等)的時候非常有用。

調用 Stop() 使計時器停止,在 defer 語句中使用。這些都很好的適應 select 語句:

ticker := time.NewTicker(updateInterval)
defer ticker.Stop()
...
select {
case u:= <-ch1:
    ...
case v:= <-ch2:
    ...
case <-ticker.C:
    logState(status) // call some logging function logState
default: // no value ready to be received
    ...
}

time.Tick() 函數聲明爲 Tick(d Duration) <-chan Time,當你想返回一個通道而不必關閉它的時候這個函數非常有用:它以 d 爲週期給返回的通道發送時間,d是納秒數。如果需要像下邊的代碼一樣,限制處理頻率(函數 client.Call() 是一個 RPC 調用,這裏暫不贅述(參見第 15.9 節):

import "time"

rate_per_sec := 10
var dur Duration = 1e9 / rate_per_sec
chRate := time.Tick(dur) // a tick every 1/10th of a second
for req := range requests {
    <- chRate // rate limit our Service.Method RPC calls
    go client.Call("Service.Method", req, ...)
}

這樣只會按照指定頻率處理請求:chRate 阻塞了更高的頻率。每秒處理的頻率可以根據機器負載(和/或)資源的情況而增加或減少。

問題 14.1:擴展上邊的代碼,思考如何承載週期請求數的暴增(提示:使用帶緩衝通道和計時器對象)。

定時器(Timer)結構體看上去和計時器(Ticker)結構體的確很像(構造爲 NewTimer(d Duration)),但是它只發送一次時間,在 Dration d 之後。

還有 time.After(d) 函數,聲明如下:

func After(d Duration) <-chan Time

Duration d 之後,當前時間被髮到返回的通道;所以它和 NewTimer(d).C 是等價的;它類似 Tick(),但是 After() 只發送一次時間。下邊有個很具體的示例,很好的闡明瞭 selectdefault 的作用:

示例 14.11:timer_goroutine.go

package main

import (
    "fmt"
    "time"
)

func main() {
    tick := time.Tick(1e8)
    boom := time.After(5e8)
    for {
        select {
        case <-tick:
            fmt.Println("tick.")
        case <-boom:
            fmt.Println("BOOM!")
            return
        default:
            fmt.Println("    .")
            time.Sleep(5e7)
        }
    }
}

輸出:

    .
    .
tick.
    .
    .
tick.
    .
    .
tick.
    .
    .
tick.
    .
    .
tick.
BOOM!

習慣用法:簡單超時模式

要從通道 ch 中接收數據,但是最多等待1秒。先創建一個信號通道,然後啓動一個 lambda 協程,協程在給通道發送數據之前是休眠的:

timeout := make(chan bool, 1)
go func() {
        time.Sleep(1e9) // one second
        timeout <- true
}()

然後使用 select 語句接收 ch 或者 timeout 的數據:如果 ch 在 1 秒內沒有收到數據,就選擇到了 time 分支並放棄了 ch 的讀取。

select {
    case <-ch:
        // a read from ch has occured
    case <-timeout:
        // the read from ch has timed out
        break
}

第二種形式:取消耗時很長的同步調用

也可以使用 time.After() 函數替換 timeout-channel。可以在 select 中使用以發送信號超時或停止協程的執行。以下代碼,在 timeoutNs 納秒後執行 selecttimeout 分支時,client.Call 不會給通道 ch 返回值:

ch := make(chan error, 1)
go func() { ch <- client.Call("Service.Method", args, &reply) } ()
select {
case resp := <-ch
    // use resp and reply
case <-time.After(timeoutNs):
    // call timed out
    break
}

注意緩衝大小設置爲 1 是必要的,可以避免協程死鎖以及確保超時的通道可以被垃圾回收。

第三種形式:假設程序從多個複製的數據庫同時讀取。只需要一個答案,需要接收首先到達的答案,Query 函數獲取數據庫的連接切片並請求。並行請求每一個數據庫並返回收到的第一個響應:

func Query(conns []conn, query string) Result {
    ch := make(chan Result, 1)
    for _, conn := range conns {
        go func(c Conn) {
            select {
            case ch <- c.DoQuery(query):
            default:
            }
        }(conn)
    }
    return <- ch
}

再次聲明,結果通道 ch 必須是帶緩衝的:以保證第一個發送進來的數據有地方可以存放,確保放入的首個數據總會成功,所以第一個到達的值會被獲取而與執行的順序無關。正在執行的協程可以總是可以使用 runtime.Goexit() 來停止。

在應用中緩存數據:

應用程序中用到了來自數據庫(或者常見的數據存儲)的數據時,經常會把數據緩存到內存中,因爲從數據庫中獲取數據的操作代價很高;如果數據庫中的值不發生變化就沒有問題。但是如果值有變化,我們需要一個機制來週期性的從數據庫重新讀取這些值:緩存的值就不可用(過期)了,而且我們也不希望用戶看到陳舊的數據。

鏈接

results matching ""

    No results matching ""