18

Go 语言计时器的使用详解

 3 years ago
source link: https://mp.weixin.qq.com/s/StlVAhJtbvYpalvJZlttJQ
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

Go语言计时器

Go 语言的标准库里提供两种类型的计时器 TimerTickerTimer 经过指定的 duration 时间后被触发,往自己的时间 channel 发送当前时间,此后 Timer 不再计时。 Ticker 则是每隔 duration 时间都会把当前时间点发送给自己的时间 channel ,利用计时器的时间 channel 可以实现很多与计时相关的功能。

文章主要涉及如下内容:

  • TimerTicker 计时器的内部结构表示

  • TimerTicker 的使用方法和注意事项

  • 如何正确 Reset 定时器

计时器的内部表示

两种计时器都是基于 Go 语言的运行时计时器 runtime.timer 实现的, rumtime.timer 的结构体表示如下:

type timer struct {
pp puintptr

when int64
period int64
f func(interface{}, uintptr)
arg interface
{}
seq uintptr
nextwhen int64
status uint32
}

rumtime.timer 结构体中的字段含义是

  • when — 当前计时器被唤醒的时间;

  • period — 两次被唤醒的间隔;

  • f — 每当计时器被唤醒时都会调用的函数;

  • arg — 计时器被唤醒时调用  f 传入的参数;

  • nextWhen — 计时器处于  timerModifiedLater/timerModifiedEairlier 状态时,用于设置  when 字段;

  • status — 计时器的状态;

这里的 runtime.timer 只是私有的计时器运行时表示,对外暴露的计时器  time.Timertime.Ticker 的结构体表示如下:

type Timer struct {
C <-chan Time
r runtimeTimer
}

type Ticker struct {
C <-chan Time
r runtimeTimer
}

Timer.CTicker.C 就是计时器中的时间 channel ,接下来我们看一下怎么使用这两种计时器,以及使用时要注意的地方。

Timer计时器

time.Timer 计时器必须通过  time.NewTimertime.AfterFunc 或者  time.After 函数创建。当计时器失效时,失效的时间就会被发送给计时器持有的  channel ,订阅  channel 的  goroutine 会收到计时器失效的时间。

通过定时器 Timer 用户可以定义自己的超时逻辑,尤其是在应对使用 select 处理多个 channel 的超时、单 channel 读写的超时等情形时尤为方便。 Timer 常见的使用方法如下:

//使用time.AfterFunc:

t := time.AfterFunc(d, f)

//使用time.After:
select {
case m := <-c:
handle(m)
case <-time.After(5 * time.Minute):
fmt.Println("timed out")
}

// 使用time.NewTimer:
t := time.NewTimer(5 * time.Minute)
select {
case m := <-c:
handle(m)
case <-t.C:
fmt.Println("timed out")
}

time.AfterFunc 这种方式创建的 Timer ,在到达超时时间后会在单独的 goroutine 里执行函数 f

func AfterFunc(d Duration, f func()) *Timer {
t := &Timer{
r: runtimeTimer{
when: when(d),
f: goFunc,
arg: f,
},
}
startTimer(&t.r)
return t
}

func goFunc(arg interface{}, seq uintptr) {
go arg.(func())()
}

从上面 AfterFunc 的源码可以看到外面传入的 f 参数并非直接赋值给了运行时计时器的 f ,而是作为包装函数 goFunc 的参数传入的。 goFunc 会启动了一个新的 goroutine 来执行外部传入的函数 f 。这是因为所有计时器的事件函数都是由 Go 运行时内唯一的 goroutine timerproc 运行的。为了不阻塞 timerproc 的执行,必须启动一个新的 goroutine 执行到期的事件函数。

对于 NewTimerAfter 这两种创建方法,则是 Timer 在超时后,执行一个标准库中内置的函数: sendTime

func NewTimer(d Duration) *Timer {
c := make(chan Time, 1)
t := &Timer{
C: c,
r: runtimeTimer{
when: when(d),
f: sendTime,
arg: c,
},
}
startTimer(&t.r)
return t
}

func sendTime(c interface{}, seq uintptr) {
select {
case c.(chan Time) <- Now():
default:
}
}

sendTime 将当前时间发送到 Timer 的时间 channel 中。那么这个动作不会阻塞 timerproc 的执行么?答案是不会,原因是 NewTimer 创建的是一个带缓冲的 channel 所以无论 Timer.C 这个 channel 有没有接收方 sendTime 都可以非阻塞的将当前时间发送给 Timer.C ,而且 sendTime 中还加了双保险:通过 select 判断 Timer.CBuffer 是否已满,一旦满了,会直接退出,依然不会阻塞。

TimerStop 方法可以阻止计时器触发,调用 Stop 方法成功停止了计时器的触发将会返回 true ,如果计时器已经过期了或者已经被 Stop 停止过了,再次调用 Stop 方法将会返回 false

Go 运行时将所有计时器维护在一个最小堆 Min Heap 中, Stop 一个计时器就是从堆中删除该计时器。

Ticker计时器

Ticker 可以周期性地触发时间事件,每次到达指定的时间间隔后都会触发事件。

time.Ticker 需要通过 time.NewTicker 或者 time.Tick 创建。

// 使用time.Tick:
go func() {
for t := range time.Tick(time.Minute) {
fmt.Println("Tick at", t)
}
}()

// 使用time.Ticker
var ticker *time.Ticker = time.NewTicker(1 * time.Second)

go func() {
for t := range ticker.C {
fmt.Println("Tick at", t)
}
}()

time.Sleep(time.Second * 5)
ticker.Stop()
fmt.Println("Ticker stopped")

不过 time.Tick 很少会被用到,除非你想在程序的整个生命周期里都使用 time.Ticker 的时间 channel 。官文文档里对 time.Tick 的描述是:

time.Tick 底层的 Ticker 不能被垃圾收集器恢复;

所以使用 time.Tick 时一定要小心,为避免意外尽量使用 time.NewTicker 返回的 Ticker 替代。

NewTicker 创建的计时器与 NewTimer 创建的计时器持有的时间 channel 一样都是带一个缓存的 channel ,每次触发后执行的函数也是 sendTime ,这样即保证了无论有误接收方 Ticker 触发时间事件时都不会阻塞:

func NewTicker(d Duration) *Ticker {
if d <= 0 {
panic(errors.New("non-positive interval for NewTicker"))
}
// Give the channel a 1-element time buffer.
// If the client falls behind while reading, we drop ticks
// on the floor until the client catches up.
c := make(chan Time, 1)
t := &Ticker{
C: c,
r: runtimeTimer{
when: when(d),
period: int64(d),
f: sendTime,
arg: c,
},
}
startTimer(&t.r)
return t
}

Reset计时器时要注意的问题

关于 Reset 的使用建议,文档里的描述是:

重置计时器时必须注意不要与当前计时器到期发送时间到t.C的操作产生竞争。如果程序已经从t.C接收到值,则计时器是已知的已过期,并且t.Reset可以直接使用。如果程序尚未从t.C接收值,计时器必须先被停止,并且-如果使用t.Stop时报告计时器已过期,那么请排空其通道中值。

例如:

if !t.Stop() {
<-t.C
}
t.Reset(d)

下面的例子里 producer goroutine 里每一秒向通道中发送一个 false 值,循环结束后等待一秒再往通道里发送一个 true 值。在 consumer goroutine 里通过循环试图从通道中读取值,用计时器设置了最长等待时间为5秒,如果计时器超时了,输出当前时间并进行下次循环尝试,如果从通道中读取出的不是期待的值(预期值是 true ),则尝试重新从通道中读取并重置计时器。

func main() {
c := make(chan bool)

go func() {
for i := 0; i < 5; i++ {
time.Sleep(time.Second * 1)
c <- false
}

time.Sleep(time.Second * 1)
c <- true
}()

go func() {
// try to read from channel, block at most 5s.
// if timeout, print time event and go on loop.
// if read a message which is not the type we want(we want true, not false),
// retry to read.
timer := time.NewTimer(time.Second * 5)
for {
// timer is active , not fired, stop always returns true, no problems occurs.
if !timer.Stop() {
<-timer.C
}
timer.Reset(time.Second * 5)
select {
case b := <-c:
if b == false {
fmt.Println(time.Now(), ":recv false. continue")
continue
}
//we want true, not false
fmt.Println(time.Now(), ":recv true. return")
return
case <-timer.C:
fmt.Println(time.Now(), ":timer expired")
continue
}
}
}()

//to avoid that all goroutine blocks.
var s string
fmt.Scanln(&s)
}

程序的输出如下:

2020-05-13 12:49:48.90292 +0800 CST m=+1.004554120 :recv false. continue
2020-05-13 12:49:49.906087 +0800 CST m=+2.007748042 :recv false. continue
2020-05-13 12:49:50.910208 +0800 CST m=+3.011892138 :recv false. continue
2020-05-13 12:49:51.914291 +0800 CST m=+4.015997373 :recv false. continue
2020-05-13 12:49:52.916762 +0800 CST m=+5.018489240 :recv false. continue
2020-05-13 12:49:53.920384 +0800 CST m=+6.022129708 :recv true. return

目前来看没什么问题,使用Reset重置计时器也起作用了,接下来我们对 producer goroutin 做一些更改,我们把 producer goroutine 里每秒发送值的逻辑改成每 6 秒发送值,而 consumer gouroutine 里和计时器还是 5 秒就到期。

  // producer
go func() {
for i := 0; i < 5; i++ {
time.Sleep(time.Second * 6)
c <- false
}

time.Sleep(time.Second * 6)
c <- true
}()

再次运行会发现程序发生了 deadlock 在第一次报告计时器过期后直接阻塞住了:

2020-05-13 13:09:11.166976 +0800 CST m=+5.005266022 :timer expired

那程序是在哪阻塞住的呢?对就是在抽干 timer.C 通道时阻塞住了(英文叫做drain channel比喻成流干管道里的水,在程序里就是让 timer.C 管道中不再存在未接收的值)。

if !timer.Stop() {
<-timer.C
}
timer.Reset(time.Second * 5)

producer goroutine 的发送行为发生了变化, comsumer goroutine 在收到第一个数据前有了一次计时器过期的事件, for 循环进行一下次循环。这时 timer.Stop 函数返回的不再是 true ,而是 false ,因为计时器已经过期了,上面提到的维护着所有活跃计时器的最小堆中已经不包含该计时器了。而此时 timer.C 中并没有数据,接下来用于 drain channel 的代码会将 consumer goroutine 阻塞住。

这种情况,我们应该直接 Reset 计时器,而不用显式 drain channel 。如何将这两种情形合二为一呢?我们可以利用一个 select 来包裹 drain channel 的操作,这样无论 channel 中是否有数据, drain 都不会阻塞住。

//consumer
go func() {
// try to read from channel, block at most 5s.
// if timeout, print time event and go on loop.
// if read a message which is not the type we want(we want true, not false),
// retry to read.
timer := time.NewTimer(time.Second * 5)
for {
// timer may be not active, and fired
if !timer.Stop() {
select {
case <-timer.C: //try to drain from the channel
default:
}
}
timer.Reset(time.Second * 5)
select {
case b := <-c:
if b == false {
fmt.Println(time.Now(), ":recv false. continue")
continue
}
//we want true, not false
fmt.Println(time.Now(), ":recv true. return")
return
case <-timer.C:
fmt.Println(time.Now(), ":timer expired")
continue
}
}
}()

运行修改后的程序,发现程序不会被阻塞住,能正常进行通道读取,读取到 true 值后会自行退出。输出结果如下:

2020-05-13 13:25:08.412679 +0800 CST m=+5.005475546 :timer expired
2020-05-13 13:25:09.409249 +0800 CST m=+6.002037341 :recv false. continue
2020-05-13 13:25:14.412282 +0800 CST m=+11.005029547 :timer expired
2020-05-13 13:25:15.414482 +0800 CST m=+12.007221569 :recv false. continue
2020-05-13 13:25:20.416826 +0800 CST m=+17.009524859 :timer expired
2020-05-13 13:25:21.418555 +0800 CST m=+18.011245687 :recv false. continue
2020-05-13 13:25:26.42388 +0800 CST m=+23.016530193 :timer expired
2020-05-13 13:25:27.42294 +0800 CST m=+24.015582511 :recv false. continue
2020-05-13 13:25:32.425666 +0800 CST m=+29.018267054 :timer expired
2020-05-13 13:25:33.428189 +0800 CST m=+30.020782483 :recv false. continue
2020-05-13 13:25:38.432428 +0800 CST m=+35.024980796 :timer expired
2020-05-13 13:25:39.428343 +0800 CST m=+36.020887629 :recv true. return

总结

以上比较详细地介绍了 Go 语言的计时器以及它们的使用方法和注意事项,总结一下有如下关键点:

  • TimerTicker 都是在运行时计时器 runtime.timer 的基础上实现的。

  • 运行时里的所有计时器都由运行时内唯一的 timerproc 触发。

  • time.Tick 创建的 Ticker 在运行时不会被 gc 回收,能不用就不用。

  • TimerTicker 的时间 channel 都是带有一个缓冲的通道。

  • time.Aftertime.NewTimertime.NewTicker 创建的计时器触发时都会执行 sendTime

  • sendTime 和计时器带缓冲的时间通道保证了计时器不会阻塞程序。

  • Reset 计时器时要注意 drain channel 和计时器过期存在竞争条件。

参考链接:

Timer Reset方法使用的正确姿势

Go advanced concurrency patterns: timers

How Do They Do It: Timers in Go

Go语言设计与实现之计时器

EveuYbA.jpg!web


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK