21

Go并发设计模式之 Active Object | 鸟窝

 4 years ago
source link: https://colobu.com/2019/07/02/concurrency-design-patterns-active-object/?
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

Go并发设计模式之 Active Object

设计模式曾经很火,尤其是1995年的时候Erich Gamma, Richard Helm, Ralph Johnson 和 John Vlissides (GoF)推出的《设计模式》一书,可谓经典。这本书总结了面向对象设计中最有价值的经验,并且用简洁可复用的形式表达出来。书中分类描述了23种经典实用的设计模式,这些实际模式依然在现在的实际开发中被广泛实用。

当然,这23种设计模式并不能涵盖所有的模式场景,同时,书中也没有对其它领域的一些设计模式进行归纳总结和介绍,比如并发场景,数据库设计、前端设计、架构模式等等,这个问题GoF中也已经进行了说明。二十几年过去了,期间也陆陆续续的出了一些介绍模式的书,有些是对GoF的23中设计模式的具体语言的介绍、阐述等等,也有一些设计模式的书,介绍了企业开发中的其它领域的设计模式,有一些书还是非常值得一读的。

我会写一系列介绍并发设计模式的文章,主要介绍实用Go语言去实现这些并发设计模式,但是我不想遵循介绍设计模式的模版,而是结合很多流行的Go的项目和库,从实践的角度去介绍这些并发设计模式。这种介绍方式一是可以让读者更容易的去理解设计模式,而不是拿一些老掉牙、根本不会使用的例子来介绍,二来可以坚定读者的信心,因为这些并发设计模式已经在流行的项目中使用了,得到了实际的检验。

作为开篇一章,我介绍的是 Active Object设计模式,为什么拿它作为第一篇呢,因为它的首字母是A,最大。

activeobject.png

Active Object设计模式解耦了方法的调用和方法的执行,方法的调用和方法的执行运行在不同的线程之中(或者纤程、goroutine, 后面不再一一注释)。它引入了异步方法调用,允许应用程序可以并发的处理多个客户端的请求,通过调度器进行调用并发的方法执行,提供了并发执行方法的能力。

这个模式有时候也会叫做Concurrency ObjectActor设计模式。

很多程序会使用并发对象来提高它们的性能,例如并发地的处理客户端的请求,方法的调用和执行都在每个客户端的线程之中,并发对象也就存在于各个客户端线程之中,因为并发对象需要在各个线程之间共享,免不了要使用锁等同步方式控制并发对象的访问,这就要求我们为了保证服务的质量,需要设计程序满足:

  • 对并发对象的方法调用不应该阻塞完整的处理流程
  • 同步访问并发对象应该设计简单
  • 应用程序应该透明的使用软硬件的并发能力

而Active Object这个并发设计模式解耦了方法的调用和执行,但是客户端线程还像调用普通方法一样,方法调用自动转换成一个method request,交给另外一个处理线程,然后这个method request会在这个线程中被调用。

这种模式包含6个组件:

  • proxy: 定义了客户端要调用的Active Object接口。当客户端调用它的方法是,方法调用被转换成method request放入到scheduler的activation queue之中。
  • method request: 用来封装方法调用的上下文
  • activation queue:待处理的 method request队列
  • scheduler:一个独立的线程,管理activation queue,调度方法的执行
  • servant:active object的方法执行的具体实现,
  • future:当客户端调用方法时,一个future对象会立即返回,允许客户端可以获取返回结果。

一些正式的实现,比如一些Java程序的实现,可以严格的按照这些组件实现对应的类,而对于Go语言来讲,可能实现形式上略微不同,因为Go并不是严格意义上的面向对象的编程,而且Go的语言设计目标时简单,所以实现这个并发模式的时候,有时候你不必使用面向对象的设计来实现,使用函数、方法的形式更简洁。而且这种并发设计模式也有一些变种,比如使用callback代替future,或者在不需要返回值的情况下省略future。

"Sometimes, the elegant implementation is just a function. Not a method. Not a class. Not a framework. Just a function." - John Carmack

首先我们看一个hello world一样一个简单的例子,再详细分析一个标准库中使用Active Object的例子。

type Service struct {
func (s *Service) Incr() {
s.v++
func (s *Service) Decr() {
s.v--

上面这个例子Service对象并不是线程安全的,当多个goroutine并发调用的时候会有data race问题。当然你可以通过增加一个sync.Mutex的方式保证同步,对于这个例子来说,使用Mutex去保护比较简单,但是如果对于复杂的业务来说,并发控制将变得很难,并且性能上影响也会非常大。我们可以使用Active Object方式去实现。

type MethodRequest int
const (
Incr MethodRequest = iota
type Service struct {
queue chan MethodRequest
func New(buffer int) *Service {
s := &Service{
queue: make(chan MethodRequest, buffer),
go s.schedule()
return s
func (s *Service) schedule() {
for r := range s.queue {
if r == Incr {
s.v++
} else {
s.v--
func (s *Service) Incr() {
s.queue <- Incr
func (s *Service) Decr() {
s.queue <- Decr

从上面这个简单的例子,你可以大致找到Active Object对应的组件。MethodRequest对应method request, Service对应proxy,schedule对应scheduler,Service.queue对应activation queue,因为不需要返回值,我们没有实现future。这里Service也对应servant,不像某些语言,为了保证面向对象的设计,以及接口和实现的分离,会定义很多的接口和对象,Go不一样,以简单为主,一个Service类型实现了多种角色,这也简化了Active Object设计模式的实现。

在标准库中,有一个非常好的Active Object设计模式的例子,就是标准库net/rpc的Client的实现。

对于一个rpc服务Arith来说,

type Args struct {
A, B int
type Arith int
func (t *Arith) Multiply(args *Args, reply *int) error {
*reply = args.A * args.B
return nil

它的客户端是线程安全的,可以在多个goroutine中并发的调用,通过一个tcp connection和服务器端调用:

args := &server.Args{7,8}
var reply int
call := client.Go("Arith.Multiply", args, &reply, nil)
<- call.Done
if call.Error != nil {
log.Fatal("arith error:", call.Error)
fmt.Printf("Arith: %d*%d=%d", args.A, args.B, reply)

那么它又是如何实现的Active Object模式的呢?

Client提供了Go方法实现异步的方法调用。

它将上下文(请求参数和返回)封装成一个Call对象, call对象的done字段提供了future的功能。你可以利用它获取方法是否已经执行完毕。

func (client *Client) Go(serviceMethod string, args interface{}, reply interface{}, done chan *Call) *Call {
call := new(Call)
call.ServiceMethod = serviceMethod
call.Args = args
call.Reply = reply
if done == nil {
done = make(chan *Call, 10) // buffered.
} else {
// If caller passes done != nil, it must arrange that
// done has enough buffer for the number of simultaneous
// RPCs that will be using that channel. If the channel
// is totally unbuffered, it's best not to run at all.
if cap(done) == 0 {
log.Panic("rpc: done channel is unbuffered")
call.Done = done
client.send(call)
return call

然后调用client.send将这个call对象放入到待处理处理队列中(实际实现需要处理并发放入的问题,以及异常情况error的处理):

func (client *Client) send(call *Call) {
......
seq := client.seq
client.seq++
client.pending[seq] = call
......

实际这个send要更复杂一点,它还会把请求发送给服务端,所以严格意义上来讲,它做了一些方法执行的逻辑。如果网络有问题,就可以快速地返回。

Client初始化的时候,就会启动一个goroutine去处理Client.input

Client.input是独立于调用goroutine的一个单独的goroutine,它不断的从服务器读取消息,处理异常和正常的返回,并找到对应的Call对象。

它会调用Call.done方法提供给调用者一个调用完成的信号,客户端可以监控这个channel感知到方法调用是否完成。

func (client *Client) input() {
var err error
var response Response
for err == nil {
response = Response{}
err = client.codec.ReadResponseHeader(&response)
if err != nil {
break
seq := response.Seq
client.mutex.Lock()
call := client.pending[seq]
delete(client.pending, seq)
client.mutex.Unlock()
switch {
case call == nil:
......
case response.Error != "":
......
call.done()
default:
......
call.done()
......
func (call *Call) done() {
select {
case call.Done <- call:
default:
......

可以看到,net/rpc相对于标准的Active Object有所简化,其中相当于Active ObjectClient具有多个角色,他是这个设计模式的核心。通过将方法调用的上下文封装成一个Call对象,客户端可以像传统方法调用一样异步去处理,客户端并不需要理会内部的复杂的处理逻辑。针对这种网络访问的场景,Client又将部分方法执行的逻辑放在了方法调用send里面,作为guard condition可以快速返回。

我也将这种模式应用在rpcx的客户端调用上。

当然这种模式也仅限于网络调用上,比如服务端的单一进程的程序中也可以应用。由于Go语言的先天的并发处理的优势,很多情况下我们都request-per-goroutine + mutex/shared object 的方式去处理。

同步调用也很简单,可以很容易的基于异步调用Go实现同步调用Call:

func (client *Client) Call(serviceMethod string, args interface{}, reply interface{}) error {
call := <-client.Go(serviceMethod, args, reply, make(chan *Call, 1)).Done
return call.Error
0 comments

Be the first person to leave a comment!


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK