28

如何把golang的Channel玩出async和await的feel

 4 years ago
source link: https://studygolang.com/articles/26542
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

如何优雅的同步化异步代码,一直以来都是各大编程语言致力于优化的点,记得最早是C# 5.0加入了async/await来简化TPL的多线程模型,后来Javascript的Promise也吸取这一语法糖,在ES 6中也加入了async和await.

那么,被大家一称赞并发性能好、异步模型独树一帜的golang,能否也有async和await呢?

其实,这对于golang的CSM来说是一点也不难!

核心代码如下:

done := make(chan struct{})
go func() {
	// do work asynchronously here
	//
	close(done)
}()
<-done
复制代码

是不是很简单呢? go rountine负责async, channel的负责await, 简直是完美!

但这个代码看起来还是有点丑,而且这个 go func(){} 还没有返回值,虽说可以通过闭包来接收返回值,但那个代码就更难维护了。

Go Promise

代码难看不要紧,只要Don't repeat yourself (DRY),封装一下不就好了?

type WorkFunc func() (interface{}, error)

func NewPromise(workFunc WorkFunc) *Promise {
	promise := Promise{done: make(chan struct{})}
	go func() {
		defer close(promise.done)
		promise.res, promise.err = workFunc()
	}()
	return &promise
}

func (p *Promise) Done() (interface{}, error) {
	<-p.done
	return p.res, p.err
}
复制代码

调用的代码如下:

promise := NewPromise(func() (interface{}, error) {
	// do work asynchronously here
	//
	return res, err
})

// await
res, err := promise.Done()
复制代码

是不是美观了许多呢?

这个实现和Javascript的Promise的API是有很大差距,使用体验上因为golang没有泛型,也需要转来转去的,但为了不辜负Promise这个名字,怎么能没有 then 呢?

type SuccessHandler func(interface{}) (interface{}, error)

type ErrorHandler func(error) interface{}

func (p *Promise) Then(successHandler SuccessHandler, errorHandler ErrorHandler) *Promise {
	newPromise := &Promise{done: make(chan struct{})}
	go func() {
		res, err := p.Done()
		defer close(newPromise.done)
		if err != nil {
			if errorHandler != nil {
				newPromise.res = errorHandler(err)
			} else {
				newPromise.err = err
			}
		} else {
			if successHandler != nil {
				newPromise.res, newPromise.err = successHandler(res)
			} else {
				newPromise.res = res
			}
		}
	}()

	return newPromise
}
复制代码

有了 then 可以chain起来,是不是找到些Promise的感觉呢?

完整代码请查看 promise.go

Actor

本来我的理解也就到些了,然后前段时间( 说来也是一月有余了 ),看了 Go并发设计模式之 Active Object 这篇文章后, 发现如果有一个常驻协程在异步的处理任务,而且是FIFO的,那么这其实是相当于一个无锁的设计,可以简化对临界资源的操作。

于是,我照着文章的思路,实现了下面的代码:

// Creates a new actor
func NewActor(setActorOptionFuncs ...SetActorOptionFunc) *Actor {
	actor := &Actor{buffer: runtime.NumCPU(), quit: make(chan struct{}), wg: &sync.WaitGroup{}}
	for _, setOptionFunc := range setActorOptionFuncs {
		setOptionFunc(actor)
	}

	actor.queue = make(chan request, actor.buffer)

	actor.wg.Add(1)
	go actor.schedule()

	return actor
}

// The long live go routine to run.
func (actor *Actor) schedule() {
loop:
	for {
		select {
		case request := <-actor.queue:
			request.promise.res, request.promise.err = request.work()
			close(request.promise.done)
		case <-actor.quit:
			break loop
		}
	}
	actor.wg.Done()
}

// Do a work.
func (actor *Actor) Do(workFunc WorkFunc) *Promise {
	methodRequest := request{work: workFunc, promise: &Promise{
		done: make(chan struct{}),
	}}
	actor.queue <- methodRequest
	return methodRequest.promise
}

// Close actor
func (actor *Actor) Close() {
	close(actor.quit)
	actor.wg.Wait()
}
复制代码

一个简单的没啥意义的纯粹为了demo的测试用例如下:

func TestActorAsQueue(t *testing.T) {
	actor := NewActor()
	defer actor.Close()

	i := 0
	workFunc := func() (interface{}, error) {
		time.Sleep(1 * time.Second)
		i++
		return i, nil
	}

	promise := actor.Do(workFunc)
	promise2 := actor.Do(workFunc)

	res2, _ := promise2.Done()
	res1, _ := promise.Done()

	if res1 != 1 {
		t.Fail()
	}

	if res2 != 2 {
		t.Fail()
	}
}
复制代码

完整代码请查看 actor.go

总结

每个语言都有它的独特之处,在我的理解中,玩转golang的CSM模型,channel一定要用的6。

于是,我创建了 Channelx 这个repo, 包含了对channel常用场景的封装,欢迎大家审阅,喜欢的就点个star。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK