39

gobox中的consumer处理框架

 6 years ago
source link: http://blog.7rule.com/2018/09/01/gobox-consumer.html?amp%3Butm_medium=referral
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

我们都会有从异步队列中消费的需求,今天来说下gobox中的consumer处理框架

consumer处理架构图

3I3Yvmr.png!web

重要的对象

IMessage

定义每条消息

type IMessage interface {
	Body() []byte
}

ConsumerHandleFunc

consumer中从队列收到每条消息后,调用这个方法

type ConsumerHandleFunc func(message IMessage) error

IConsumer

定义消费者行为

type IConsumer interface {
	SetHandleFunc(hf ConsumerHandleFunc)
	Start()
	Stop()
}

NewWorkerFunc

每个Worker的构造方法

type NewWorkerFunc func() IWorker

IWorker

定义Worker

type IWorker interface {
	SetWorkId(id int)
	SetLogger(logger golog.ILogger)

	Work(wg *sync.WaitGroup, lineCh chan []byte, stopCh chan bool)
}

LineProcessFunc

每条消息在Worker中的实际处理方法

type LineProcessFunc func(line []byte) error

BaseWorker

框架提供的一个简单基础Worker对象,组合这个对象后,只需要实现 LineProcessFunc 即可

type BaseWorker struct

Task

Task用于实现consumer的处理框架

使用示例

package main

import (
	"github.com/goinbox/goconsumer"

	"fmt"
	"strconv"
	"time"
)

// 这里实现Worker
type DemoWorker struct {
	*goconsumer.BaseWorker
}

func NewDemoWorker() goconsumer.IWorker {
	worker := &DemoWorker{goconsumer.NewBaseWorker()}
	worker.SetLineProcessFunc(worker.LineProcessFunc)

	return worker
}

func (d *DemoWorker) LineProcessFunc(line []byte) error {
	idStr := strconv.Itoa(d.Id)
	fmt.Println("wid:" + idStr + " process line:" + string(line))

	return nil
}

// 这里实现Message
type DemoMessage struct {
	body []byte
}

func (d *DemoMessage) Body() []byte {
	return d.body
}

// 这里实现一个简单的Consumer,模拟从队列中获得100条消息
type DemoConsumer struct {
	hf goconsumer.ConsumerHandleFunc
}

func (d *DemoConsumer) SetHandleFunc(hf goconsumer.ConsumerHandleFunc) {
	d.hf = hf
}

func (d *DemoConsumer) Start() {
	for i := 0; i < 100; i++ {
		str := "This message is from DemoConsumer loop " + strconv.Itoa(i)
		d.hf(&DemoMessage{[]byte(str)})
	}

	time.Sleep(time.Second * 1)
}

func (d *DemoConsumer) Stop() {

}


// 执行Task任务,调用consumer处理框架
func main() {
	task := goconsumer.NewTask("Demo")
	consumer := new(DemoConsumer)

	task.SetConsumer(consumer).
		SetWorker(10, NewDemoWorker).
		Start()
}

欢迎大家使用,使用中有遇到问题随时反馈,我们会尽快响应,谢谢!


Recommend

  • 42
    • blog.7rule.com 6 years ago
    • Cache

    gobox中的连接池pool

    今天来说下gobox中的连接池底层实现pool 为什么需要连接池 我们的系统在访问外部资源(redis、mysql等)时,为了提高性能,通常会用到的一个优化方法就是把已经使用过的tcp连接保存起来,这样当需要再次使用时,就可...

  • 40
    • blog.7rule.com 6 years ago
    • Cache

    gobox中的log操作

    今天来说下使用gobox中的log操作 log级别定义 const ( LEVEL_EMERGENCY = 0 LEVEL_ALERT = 1 LEVEL_CRITICAL = 2 LEVEL_ERROR = 3 LEVEL_WARNING = 4 LEVEL_NOTICE = 5 LEVEL_INFO = 6...

  • 49
    • blog.7rule.com 6 years ago
    • Cache

    gobox中的http请求处理框架

    今天来说下使用gobox中的http请求处理框架 http请求处理架构图 重要的对象 System system用于实现g...

  • 11
    • blog.7rule.com 4 years ago
    • Cache

    gobox中的常用工具包gomisc

    gobox中的常用工具包gomisc Sep 8, 2018 有一些常用的工具函数,我们把它们放到gomisc这个包中。 Slice中的值Unique func IntSliceUnique(s []int) []int func StringSli...

  • 13
    • blog.7rule.com 4 years ago
    • Cache

    gobox中的httpclient

    gobox中的httpclient Aug 18, 2018 今天来说下使用gobox中httpclient,这个包就相当于命令行的curl工具,用于发起http请求。 重要的对象 config const ( DEFAUL...

  • 15
    • blog.7rule.com 4 years ago
    • Cache

    gobox中的分页操作

    gobox中的分页操作 Aug 3, 2018 今天来说下使用gobox中的分页操作 分页也是我们开发时的一个常见需求,gobox中提供了page包做这个事情 package main import ( "github....

  • 17
    • blog.7rule.com 4 years ago
    • Cache

    gobox中redis操作

    gobox中redis操作 Jul 29, 2018 今天来说下使用gobox中redis操作相关 本包的driver部分使用了redigo:https://github.com/garyburd/redigo package main import ( "git...

  • 11
    • blog.7rule.com 4 years ago
    • Cache

    gobox中mysql操作

    gobox中mysql操作 Jul 21, 2018 今天来说下使用gobox中mysql操作相关 本包的driver部分使用了go-sql-driver:https://github.com/go-sql-driver/mysql 示例表结构为: | de...

  • 9
    • blog.7rule.com 4 years ago
    • Cache

    gobox中的simplecache和levelcache

    gobox中的simplecache和levelcache Jun 10, 2018 今天来说下gobox中的simplecache和levelcache simplecache simplecache提供了一个简单的内存kv package main i...

  • 13
    • blog.7rule.com 4 years ago
    • Cache

    gobox中的shardmap

    gobox中的shardmap May 25, 2018 今天来说下gobox中的shardmap。 golang中的map使用简单,但并发写入时,如果不加锁,会导致panic,所以性能很差。 shardmap就是为了解决这个问题,其核心思想就...

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK