

gobox中的consumer处理框架
source link: http://blog.7rule.com/2018/09/01/gobox-consumer.html?amp%3Butm_medium=referral
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

我们都会有从异步队列中消费的需求,今天来说下gobox中的consumer处理框架
consumer处理架构图
重要的对象
IMessage
定义每条消息
type IMessage interface { Body() []byte }
ConsumerHandleFunc
consumer中从队列收到每条消息后,调用这个方法
type ConsumerHandleFunc func(message IMessage) error
IConsumer
定义消费者行为
type IConsumer interface { SetHandleFunc(hf ConsumerHandleFunc) Start() Stop() }
NewWorkerFunc
每个Worker的构造方法
type NewWorkerFunc func() IWorker
IWorker
定义Worker
type IWorker interface { SetWorkId(id int) SetLogger(logger golog.ILogger) Work(wg *sync.WaitGroup, lineCh chan []byte, stopCh chan bool) }
LineProcessFunc
每条消息在Worker中的实际处理方法
type LineProcessFunc func(line []byte) error
BaseWorker
框架提供的一个简单基础Worker对象,组合这个对象后,只需要实现 LineProcessFunc
即可
type BaseWorker struct
Task
Task用于实现consumer的处理框架
使用示例
package main import ( "github.com/goinbox/goconsumer" "fmt" "strconv" "time" ) // 这里实现Worker type DemoWorker struct { *goconsumer.BaseWorker } func NewDemoWorker() goconsumer.IWorker { worker := &DemoWorker{goconsumer.NewBaseWorker()} worker.SetLineProcessFunc(worker.LineProcessFunc) return worker } func (d *DemoWorker) LineProcessFunc(line []byte) error { idStr := strconv.Itoa(d.Id) fmt.Println("wid:" + idStr + " process line:" + string(line)) return nil } // 这里实现Message type DemoMessage struct { body []byte } func (d *DemoMessage) Body() []byte { return d.body } // 这里实现一个简单的Consumer,模拟从队列中获得100条消息 type DemoConsumer struct { hf goconsumer.ConsumerHandleFunc } func (d *DemoConsumer) SetHandleFunc(hf goconsumer.ConsumerHandleFunc) { d.hf = hf } func (d *DemoConsumer) Start() { for i := 0; i < 100; i++ { str := "This message is from DemoConsumer loop " + strconv.Itoa(i) d.hf(&DemoMessage{[]byte(str)}) } time.Sleep(time.Second * 1) } func (d *DemoConsumer) Stop() { } // 执行Task任务,调用consumer处理框架 func main() { task := goconsumer.NewTask("Demo") consumer := new(DemoConsumer) task.SetConsumer(consumer). SetWorker(10, NewDemoWorker). Start() }
欢迎大家使用,使用中有遇到问题随时反馈,我们会尽快响应,谢谢!
Recommend
-
42
今天来说下gobox中的连接池底层实现pool 为什么需要连接池 我们的系统在访问外部资源(redis、mysql等)时,为了提高性能,通常会用到的一个优化方法就是把已经使用过的tcp连接保存起来,这样当需要再次使用时,就可...
-
40
今天来说下使用gobox中的log操作 log级别定义 const ( LEVEL_EMERGENCY = 0 LEVEL_ALERT = 1 LEVEL_CRITICAL = 2 LEVEL_ERROR = 3 LEVEL_WARNING = 4 LEVEL_NOTICE = 5 LEVEL_INFO = 6...
-
49
今天来说下使用gobox中的http请求处理框架 http请求处理架构图 重要的对象 System system用于实现g...
-
11
gobox中的常用工具包gomisc Sep 8, 2018 有一些常用的工具函数,我们把它们放到gomisc这个包中。 Slice中的值Unique func IntSliceUnique(s []int) []int func StringSli...
-
13
gobox中的httpclient Aug 18, 2018 今天来说下使用gobox中httpclient,这个包就相当于命令行的curl工具,用于发起http请求。 重要的对象 config const ( DEFAUL...
-
15
gobox中的分页操作 Aug 3, 2018 今天来说下使用gobox中的分页操作 分页也是我们开发时的一个常见需求,gobox中提供了page包做这个事情 package main import ( "github....
-
17
gobox中redis操作 Jul 29, 2018 今天来说下使用gobox中redis操作相关 本包的driver部分使用了redigo:https://github.com/garyburd/redigo package main import ( "git...
-
11
gobox中mysql操作 Jul 21, 2018 今天来说下使用gobox中mysql操作相关 本包的driver部分使用了go-sql-driver:https://github.com/go-sql-driver/mysql 示例表结构为: | de...
-
9
gobox中的simplecache和levelcache Jun 10, 2018 今天来说下gobox中的simplecache和levelcache simplecache simplecache提供了一个简单的内存kv package main i...
-
13
gobox中的shardmap May 25, 2018 今天来说下gobox中的shardmap。 golang中的map使用简单,但并发写入时,如果不加锁,会导致panic,所以性能很差。 shardmap就是为了解决这个问题,其核心思想就...
About Joyk
Aggregate valuable and interesting links.
Joyk means Joy of geeK