Golang NSQ 消息队列使用实战
source link: https://segmentfault.com/a/1190000040923001
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
网上看了好多,都是抄个官网 README,很多重要的东西不说清楚。只好自己研究了一下。
本人博客,关键词 Less-Bug.com ,欢迎关注。
NSQ 的全家桶介绍
- nsqd:守护进程,客户端通信。默认端口
4150
(TCP)4151
(HTTP) - nsqlookupd:相当于一个路由器。客户端可以经由它发现生产者、nsqd 广播的话题。一个 nsqlookupd 能够管理一群 nsqd。默认端口:
:4160
(TCP),:4161
(HTTP) - nsqadmin:在线面板,能够通过浏览器直接访问。默认端口
:4171
从命令行启动
可以直接下载二进制文件。开三个终端,分别执行:
nsqlookupd nsqd --lookupd-tcp-address=127.0.0.1:4160 --broadcast-address=127.0.0.1 nsqadmin --lookupd-http-address=127.0.0.1:4161
go-nsq 的使用
我封装了一个包:
package mq import ( "encoding/json" "fmt" "time" "github.com/nsqio/go-nsq" "go.uber.org/zap" ) type MessageQueueConfig struct { NsqAddr string NsqLookupdAddr string SupportedTopics []string } type MessageQueue struct { config MessageQueueConfig producer *nsq.Producer consumers map[string]*nsq.Consumer } func NewMessageQueue(config MessageQueueConfig) (mq *MessageQueue, err error) { zap.L().Debug("New message queue") producer, err := initProducer(config.NsqAddr) if err != nil { return nil, err } consumers := make(map[string]*nsq.Consumer) for _, topic := range config.SupportedTopics { nsq.Register(topic,"default") consumers[topic], err = initConsumer(topic, "default", config.NsqAddr) if err != nil { return } } return &MessageQueue{ config: config, producer: producer, consumers: consumers, }, nil } func (mq *MessageQueue) Run() { for name, c := range mq.consumers { zap.L().Info("Run consumer for " + name) // c.ConnectToNSQLookupd(mq.config.NsqLookupdAddr) c.ConnectToNSQD(mq.config.NsqAddr) } } func initProducer(addr string) (producer *nsq.Producer, err error) { zap.L().Debug("initProducer to " + addr) config := nsq.NewConfig() producer, err = nsq.NewProducer(addr, config) return } func initConsumer(topic string, channel string, address string) (c *nsq.Consumer, err error) { zap.L().Debug("initConsumer to " + topic + "/" + channel) config := nsq.NewConfig() config.LookupdPollInterval = 15 * time.Second c, err = nsq.NewConsumer(topic, channel, config) return } func (mq *MessageQueue) Pub(name string, data interface{}) (err error) { body, err := json.Marshal(data) if err != nil { return } zap.L().Info("Pub " + name + " to mq. data = " + string(body)) return mq.producer.Publish(name, body) } type Messagehandler func(v []byte) func (mq *MessageQueue) Sub(name string, handler Messagehandler) (err error) { zap.L().Info("Subscribe " + name) v, ok := mq.consumers[name] if !ok { err = fmt.Errorf("No such topic: " + name) return } v.AddHandler(nsq.HandlerFunc(func(message *nsq.Message) error { handler(message.Body) return nil })) return }
使用示例:
m, err := mq.NewMessageQueue(mq.MessageQueueConfig{ NsqAddr: "127.0.0.1:4150", NsqLookupdAddr: "127.0.0.1:4161", SupportedTopics: []string{"hello"}, }) if err != nil { zap.L().Fatal("Message queue error: " + err.Error()) } m.Sub("hello", func(resp []byte) { zap.L().Info("S1 Got: " + string(resp)) }) m.Sub("hello", func(resp []byte) { zap.L().Info("S2 Got: " + string(resp)) }) m.Run() err = m.Pub("hello", "world") if err != nil { zap.L().Fatal("Message queue error: " + err.Error()) } err = m.Pub("hello", "tom") if err != nil { zap.L().Fatal("Message queue error: " + err.Error()) } sigChan := make(chan os.Signal, 1) signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) <-sigChan os.Exit(0);
主要是进行解耦合,这样万一我们换成 Kalfa 之类的队列,就可以不用动业务代码。
输出结果:
2021-11-07T19:13:41.886+0800 DEBUG mq/mq.go:29 New message queue 2021-11-07T19:13:41.886+0800 DEBUG mq/mq.go:58 initProducer to 127.0.0.1:4150 2021-11-07T19:13:41.887+0800 DEBUG mq/mq.go:65 initConsumer to hello/default 2021-11-07T19:13:41.887+0800 INFO mq/mq.go:84 Subscribe hello 2021-11-07T19:13:41.887+0800 INFO mq/mq.go:84 Subscribe hello 2021-11-07T19:13:41.887+0800 INFO mq/mq.go:51 Run consumer for hello 2021/11/07 19:13:41 INF 2 [hello/default] (127.0.0.1:4150) connecting to nsqd 2021-11-07T19:13:41.887+0800 INFO mq/mq.go:77 Pub hello to mq. data = "world" 2021/11/07 19:13:41 INF 1 (127.0.0.1:4150) connecting to nsqd 2021-11-07T19:13:41.888+0800 INFO mq/mq.go:77 Pub hello to mq. data = "tom" 2021-11-07T19:13:41.888+0800 INFO buqi-admin-backend/main.go:60 S1 Got: "world" 2021-11-07T19:13:41.888+0800 INFO buqi-admin-backend/main.go:63 S2 Got: "tom"
从输出结果我们可以确认一个事实,就是对于订阅了同一个 topic,同一个 channel 的不同消费者,当消息涌入时,将会负载均衡——每个 Handler 只会收到一个消息。
遇到的问题
TOPIC_NOT_FOUND
遇到两个原因。
其一是大小写,Topic 名是大小写敏感的,因此 Hello
和 hello
是两个不同的 topic,写代码时应该规范操作:抽取常量,并维护一个所有 Topic 的列表。
其二是 Topic 未创建。第一次 pub 之后,对应的 topic/channel 才能创建。建议写个脚本调用 /topic/create
接口一次性创建好,不然后面第二次重试订阅的时候才能收到消息,造成不可预料的延迟。
发现客户端轮询 HTTP
这是因为 NsqLookupd 本身是一个中介,可以管理一堆不同 IP 的 nsqd,那么我们就不可能永远只连接一个 nsq,所以就要轮询来确认有哪些客户端。
对于小项目,可以绕过 NsqLookupd:
// c.ConnectToNSQLookupd(mq.config.NsqLookupdAddr) c.ConnectToNSQD(mq.config.NsqAddr)
如何让多个消费者消费同一个 topic?
显然,根据 nsq 的机制,我们需要让同一个 topic 的消费者使用不同的通道。一种方法是随机化 channel,比如使用一个递增量作为 channel 名。
第二种方法是根据用途定义 channel 名。
第三种方法:据说可以使用 AddConcurrentHandlers,尚未研究。
第四种方法:我们把 Handler 中介化,使用一个消费者去消费,但是手动将消息送入应用层的一个自定义的流水线,让流水线的 filter 去处理消息。我猜这样还能避免一些临界区问题。
我们试一下第四种方法。(代码已发布到 GIST,Github 用户名 Pluveto)
实现流水线 Handler
package mq import ( "encoding/json" "fmt" "time" "github.com/nsqio/go-nsq" "go.uber.org/zap" ) type MessageQueueConfig struct { NsqAddr string NsqLookupdAddr string EnableLookupd bool SupportedTopics []string } type MessageQueue struct { subscribers map[string]Subscriber config MessageQueueConfig producer *nsq.Producer } type Messagehandler func(v []byte) bool // LinkedHandlerNode 第一个节点为头节点,Handler 必须为 nil type LinkedHandlerNode struct { Handler *Messagehandler Index int NextNode *LinkedHandlerNode } type Subscriber struct { HandlerHeadNode *LinkedHandlerNode Consumer *nsq.Consumer Handler nsq.HandlerFunc } func createProducer(addr string) (producer *nsq.Producer, err error) { zap.L().Debug("initProducer to " + addr) config := nsq.NewConfig() producer, err = nsq.NewProducer(addr, config) return } func createConsumer(topic string, channel string, address string) (c *nsq.Consumer, err error) { zap.L().Debug("initConsumer to " + topic + "/" + channel) config := nsq.NewConfig() config.LookupdPollInterval = 15 * time.Second c, err = nsq.NewConsumer(topic, channel, config) return } func NewMessageQueue(config MessageQueueConfig) (mq *MessageQueue, err error) { zap.L().Debug("New message queue") producer, err := createProducer(config.NsqAddr) if err != nil { return nil, err } subscribers := make(map[string]Subscriber) for _, topic := range config.SupportedTopics { nsq.Register(topic, "default") consumer, err := createConsumer(topic, "default", config.NsqAddr) if err != nil { return nil, err } // 头节点不参与实际使用,所以 Index = -1 headNode := &LinkedHandlerNode{Index: -1} hubHandler := nsq.HandlerFunc(func(message *nsq.Message) error { // 循环链式调用各个 Handler curNode := headNode.NextNode // 当不存在任何用户定义的 Handler 时抛出警告 if(nil == curNode){ return fmt.Errorf("No handler provided!") } for nil != curNode { msg := message.Body zap.S().Debugf("handler[%v] for %v is invoked", curNode.Index, topic) stop := (*curNode.Handler)(msg) if stop { zap.S().Debugf("the message has stopped spreading ") break } curNode = curNode.NextNode } return nil }) consumer.AddHandler(hubHandler) subscribers[topic] = Subscriber{ Consumer: consumer, HandlerHeadNode: headNode, } } return &MessageQueue{ config: config, producer: producer, subscribers: subscribers, }, nil } func (mq *MessageQueue) Run() { for name, s := range mq.subscribers { zap.L().Info("Run consumer for " + name) if mq.config.EnableLookupd { s.Consumer.ConnectToNSQLookupd(mq.config.NsqLookupdAddr) } else { s.Consumer.ConnectToNSQD(mq.config.NsqAddr) } } } func (mq *MessageQueue) IsTopicSupported(topic string) bool { for _, v := range mq.config.SupportedTopics { if v == topic { return true } } return false } // Pub 向消息队列发送一个消息 func (mq *MessageQueue) Pub(topic string, data interface{}) (err error) { if !mq.IsTopicSupported(topic) { err = fmt.Errorf("unsupported topic name: " + topic) return } body, err := json.Marshal(data) if err != nil { return } zap.L().Info("Pub " + topic + " to mq. data = " + string(body)) return mq.producer.Publish(topic, body) } // Sub 从消息队列订阅一个消息 func (mq *MessageQueue) Sub(topic string, handler Messagehandler) (err error) { if !mq.IsTopicSupported(topic) { err = fmt.Errorf("unsupported topic name: " + topic) return } zap.L().Info("Subscribe " + topic) subscriber, ok := mq.subscribers[topic] if !ok { err = fmt.Errorf("No such topic: " + topic) return } // 抵达最后一个有效链表节点 curNode := subscriber.HandlerHeadNode for nil != curNode.NextNode { curNode = curNode.NextNode } // 创建节点 curNode.NextNode = &LinkedHandlerNode{ Handler: &handler, Index: 1 + curNode.Index, NextNode: nil, } return }
这里的思想是给每个消费者预先创建唯一的 Handler,这个 Handler 会依次调用链表中的各个具体的 Handler。当用户订阅 Topic 时,将用户提供的 Handler 添加到链表末尾。
使用示例:
m, err := mq.NewMessageQueue(mq.MessageQueueConfig{ NsqAddr: "127.0.0.1:4150", NsqLookupdAddr: "127.0.0.1:4161", SupportedTopics: []string{"hello"}, EnableLookupd: false, }) if err != nil { zap.L().Fatal("Message queue error: " + err.Error()) } m.Sub("hello", func(resp []byte) bool { zap.L().Info("S1 Got: " + string(resp)) return false }) m.Sub("hello", func(resp []byte) bool { zap.L().Info("S2 Got: " + string(resp)) return true }) m.Sub("hello", func(resp []byte) bool { zap.L().Info("S3 Got: " + string(resp)) return false }) m.Run() err = m.Pub("hello", "world") if err != nil { zap.L().Fatal("Message queue error: " + err.Error()) } err = m.Pub("hello", "tom") if err != nil { zap.L().Fatal("Message queue error: " + err.Error()) } sigChan := make(chan os.Signal, 1) signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) <-sigChan os.Exit(0)
2021-11-07T20:30:38.448+0800 DEBUG mq/mq.go:40 New message queue 2021-11-07T20:30:38.448+0800 DEBUG mq/mq.go:89 initProducer to 127.0.0.1:4150 2021-11-07T20:30:38.448+0800 DEBUG mq/mq.go:96 initConsumer to hello/default 2021-11-07T20:30:38.448+0800 INFO mq/mq.go:113 Subscribe hello 2021-11-07T20:30:38.448+0800 INFO mq/mq.go:113 Subscribe hello 2021-11-07T20:30:38.448+0800 INFO mq/mq.go:113 Subscribe hello 2021-11-07T20:30:38.448+0800 INFO mq/mq.go:82 Run consumer for hello 2021/11/07 20:30:38 INF 2 [hello/default] (127.0.0.1:4150) connecting to nsqd 2021-11-07T20:30:38.454+0800 INFO mq/mq.go:108 Pub hello to mq. data = "world" 2021/11/07 20:30:38 INF 1 (127.0.0.1:4150) connecting to nsqd 2021-11-07T20:30:38.455+0800 INFO mq/mq.go:108 Pub hello to mq. data = "tom" 2021-11-07T20:30:38.455+0800 DEBUG mq/mq.go:57 handler[0] for hello is invoked 2021-11-07T20:30:38.455+0800 INFO buqi-admin-backend/main.go:60 S1 Got: "world" 2021-11-07T20:30:38.455+0800 DEBUG mq/mq.go:57 handler[1] for hello is invoked 2021-11-07T20:30:38.455+0800 INFO buqi-admin-backend/main.go:64 S2 Got: "world" 2021-11-07T20:30:38.455+0800 DEBUG mq/mq.go:60 the message has stopped spreading 2021-11-07T20:30:38.455+0800 DEBUG mq/mq.go:57 handler[0] for hello is invoked 2021-11-07T20:30:38.455+0800 INFO buqi-admin-backend/main.go:60 S1 Got: "tom" 2021-11-07T20:30:38.455+0800 DEBUG mq/mq.go:57 handler[1] for hello is invoked 2021-11-07T20:30:38.455+0800 INFO buqi-admin-backend/main.go:64 S2 Got: "tom" 2021-11-07T20:30:38.455+0800 DEBUG mq/mq.go:60 the message has stopped spreading ^C
可以看到,Handler 返回 true 时,就可以阻断消息的传播。
Recommend
About Joyk
Aggregate valuable and interesting links.
Joyk means Joy of geeK