26

Golang NSQ 消息队列使用实战

 2 years ago
source link: https://segmentfault.com/a/1190000040923001
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

网上看了好多,都是抄个官网 README,很多重要的东西不说清楚。只好自己研究了一下。

本人博客,关键词 Less-Bug.com ,欢迎关注。

NSQ 的全家桶介绍

  • nsqd:守护进程,客户端通信。默认端口 4150(TCP) 4151(HTTP)
  • nsqlookupd:相当于一个路由器。客户端可以经由它发现生产者、nsqd 广播的话题。一个 nsqlookupd 能够管理一群 nsqd。默认端口::4160(TCP),:4161(HTTP)
  • nsqadmin:在线面板,能够通过浏览器直接访问。默认端口 :4171

从命令行启动

可以直接下载二进制文件。开三个终端,分别执行:

nsqlookupd
nsqd --lookupd-tcp-address=127.0.0.1:4160 --broadcast-address=127.0.0.1
nsqadmin --lookupd-http-address=127.0.0.1:4161

go-nsq 的使用

我封装了一个包:

package mq

import (
    "encoding/json"
    "fmt"
    "time"

    "github.com/nsqio/go-nsq"
    "go.uber.org/zap"
)

type MessageQueueConfig struct {
    NsqAddr         string
    NsqLookupdAddr  string
    SupportedTopics []string
}

type MessageQueue struct {
    config    MessageQueueConfig
    producer  *nsq.Producer
    consumers map[string]*nsq.Consumer
}

func NewMessageQueue(config MessageQueueConfig) (mq *MessageQueue, err error) {
    zap.L().Debug("New message queue")
    producer, err := initProducer(config.NsqAddr)
    if err != nil {
        return nil, err
    }
    consumers := make(map[string]*nsq.Consumer)
    for _, topic := range config.SupportedTopics {
        nsq.Register(topic,"default")
        consumers[topic], err = initConsumer(topic, "default", config.NsqAddr)
        if err != nil {
            return
        }
    }
    return &MessageQueue{
        config:    config,
        producer:  producer,
        consumers: consumers,
    }, nil
}

func (mq *MessageQueue) Run() {
    for name, c := range mq.consumers {
        zap.L().Info("Run consumer for " + name)
        // c.ConnectToNSQLookupd(mq.config.NsqLookupdAddr)
        c.ConnectToNSQD(mq.config.NsqAddr)
    }
}

func initProducer(addr string) (producer *nsq.Producer, err error) {
    zap.L().Debug("initProducer to " + addr)
    config := nsq.NewConfig()
    producer, err = nsq.NewProducer(addr, config)    
    return
}

func initConsumer(topic string, channel string, address string) (c *nsq.Consumer, err error) {
    zap.L().Debug("initConsumer to " + topic + "/" + channel)
    config := nsq.NewConfig()
    config.LookupdPollInterval = 15 * time.Second
    c, err = nsq.NewConsumer(topic, channel, config)
    return
}

func (mq *MessageQueue) Pub(name string, data interface{}) (err error) {
    body, err := json.Marshal(data)
    if err != nil {
        return
    }
    zap.L().Info("Pub " + name + " to mq. data = " + string(body))
    return mq.producer.Publish(name, body)
}

type Messagehandler func(v []byte)

func (mq *MessageQueue) Sub(name string, handler Messagehandler) (err error) {
    zap.L().Info("Subscribe " + name)
    v, ok := mq.consumers[name]
    if !ok {
        err = fmt.Errorf("No such topic: " + name)
        return
    }
    v.AddHandler(nsq.HandlerFunc(func(message *nsq.Message) error {
        handler(message.Body)
        return nil
    }))
    return
}

使用示例:

    m, err := mq.NewMessageQueue(mq.MessageQueueConfig{
        NsqAddr:         "127.0.0.1:4150",
        NsqLookupdAddr:  "127.0.0.1:4161",
        SupportedTopics: []string{"hello"},
    })

    if err != nil {
        zap.L().Fatal("Message queue error: " + err.Error())
    }

    m.Sub("hello", func(resp []byte) {
        zap.L().Info("S1 Got: " + string(resp))
    })
    m.Sub("hello", func(resp []byte) {
        zap.L().Info("S2 Got: " + string(resp))
    })
    m.Run()
    err = m.Pub("hello", "world")
    if err != nil {
        zap.L().Fatal("Message queue error: " + err.Error())
    }
    err = m.Pub("hello", "tom")
    if err != nil {
        zap.L().Fatal("Message queue error: " + err.Error())
    }

    sigChan := make(chan os.Signal, 1)
    signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
    <-sigChan
    os.Exit(0);

主要是进行解耦合,这样万一我们换成 Kalfa 之类的队列,就可以不用动业务代码。

输出结果:

2021-11-07T19:13:41.886+0800    DEBUG   mq/mq.go:29     New message queue
2021-11-07T19:13:41.886+0800    DEBUG   mq/mq.go:58     initProducer to 127.0.0.1:4150
2021-11-07T19:13:41.887+0800    DEBUG   mq/mq.go:65     initConsumer to hello/default
2021-11-07T19:13:41.887+0800    INFO    mq/mq.go:84     Subscribe hello
2021-11-07T19:13:41.887+0800    INFO    mq/mq.go:84     Subscribe hello
2021-11-07T19:13:41.887+0800    INFO    mq/mq.go:51     Run consumer for hello
2021/11/07 19:13:41 INF    2 [hello/default] (127.0.0.1:4150) connecting to nsqd
2021-11-07T19:13:41.887+0800    INFO    mq/mq.go:77     Pub hello to mq. data = "world"
2021/11/07 19:13:41 INF    1 (127.0.0.1:4150) connecting to nsqd
2021-11-07T19:13:41.888+0800    INFO    mq/mq.go:77     Pub hello to mq. data = "tom"
2021-11-07T19:13:41.888+0800    INFO    buqi-admin-backend/main.go:60   S1 Got: "world"
2021-11-07T19:13:41.888+0800    INFO    buqi-admin-backend/main.go:63   S2 Got: "tom"

从输出结果我们可以确认一个事实,就是对于订阅了同一个 topic,同一个 channel 的不同消费者,当消息涌入时,将会负载均衡——每个 Handler 只会收到一个消息

遇到的问题

TOPIC_NOT_FOUND

遇到两个原因。

其一是大小写,Topic 名是大小写敏感的,因此 Hellohello 是两个不同的 topic,写代码时应该规范操作:抽取常量,并维护一个所有 Topic 的列表。

其二是 Topic 未创建。第一次 pub 之后,对应的 topic/channel 才能创建。建议写个脚本调用 /topic/create 接口一次性创建好,不然后面第二次重试订阅的时候才能收到消息,造成不可预料的延迟。

发现客户端轮询 HTTP

这是因为 NsqLookupd 本身是一个中介,可以管理一堆不同 IP 的 nsqd,那么我们就不可能永远只连接一个 nsq,所以就要轮询来确认有哪些客户端。

对于小项目,可以绕过 NsqLookupd:

        // c.ConnectToNSQLookupd(mq.config.NsqLookupdAddr)
        c.ConnectToNSQD(mq.config.NsqAddr)

如何让多个消费者消费同一个 topic?

显然,根据 nsq 的机制,我们需要让同一个 topic 的消费者使用不同的通道。一种方法是随机化 channel,比如使用一个递增量作为 channel 名。

第二种方法是根据用途定义 channel 名。

第三种方法:据说可以使用 AddConcurrentHandlers,尚未研究。

第四种方法:我们把 Handler 中介化,使用一个消费者去消费,但是手动将消息送入应用层的一个自定义的流水线,让流水线的 filter 去处理消息。我猜这样还能避免一些临界区问题。

我们试一下第四种方法。(代码已发布到 GIST,Github 用户名 Pluveto)

实现流水线 Handler

package mq

import (
    "encoding/json"
    "fmt"
    "time"

    "github.com/nsqio/go-nsq"
    "go.uber.org/zap"
)

type MessageQueueConfig struct {
    NsqAddr         string
    NsqLookupdAddr  string
    EnableLookupd   bool
    SupportedTopics []string
}

type MessageQueue struct {
    subscribers map[string]Subscriber
    config      MessageQueueConfig
    producer    *nsq.Producer
}

type Messagehandler func(v []byte) bool

// LinkedHandlerNode 第一个节点为头节点,Handler 必须为 nil
type LinkedHandlerNode struct {
    Handler  *Messagehandler
    Index    int
    NextNode *LinkedHandlerNode
}

type Subscriber struct {
    HandlerHeadNode *LinkedHandlerNode
    Consumer        *nsq.Consumer
    Handler         nsq.HandlerFunc
}

func createProducer(addr string) (producer *nsq.Producer, err error) {
    zap.L().Debug("initProducer to " + addr)
    config := nsq.NewConfig()
    producer, err = nsq.NewProducer(addr, config)
    return
}

func createConsumer(topic string, channel string, address string) (c *nsq.Consumer, err error) {
    zap.L().Debug("initConsumer to " + topic + "/" + channel)
    config := nsq.NewConfig()
    config.LookupdPollInterval = 15 * time.Second
    c, err = nsq.NewConsumer(topic, channel, config)
    return
}

func NewMessageQueue(config MessageQueueConfig) (mq *MessageQueue, err error) {    
    zap.L().Debug("New message queue")
    producer, err := createProducer(config.NsqAddr)
    if err != nil {
        return nil, err
    }
    subscribers := make(map[string]Subscriber)
    for _, topic := range config.SupportedTopics {
        nsq.Register(topic, "default")
        consumer, err := createConsumer(topic, "default", config.NsqAddr)
        if err != nil {
            return nil, err
        }
        // 头节点不参与实际使用,所以 Index = -1
        headNode := &LinkedHandlerNode{Index: -1}
        hubHandler := nsq.HandlerFunc(func(message *nsq.Message) error {
            // 循环链式调用各个 Handler
            curNode := headNode.NextNode
            // 当不存在任何用户定义的 Handler 时抛出警告
            if(nil == curNode){
                return fmt.Errorf("No handler provided!")
            }
            for nil != curNode {
                msg := message.Body
                zap.S().Debugf("handler[%v] for %v is invoked", curNode.Index, topic)
                stop := (*curNode.Handler)(msg)
                if stop {
                    zap.S().Debugf("the message has stopped spreading ")
                    break
                }
                curNode = curNode.NextNode
            }
            return nil
        })
        consumer.AddHandler(hubHandler)
        subscribers[topic] = Subscriber{
            Consumer:        consumer,
            HandlerHeadNode: headNode,
        }
    }
    return &MessageQueue{
        config:      config,
        producer:    producer,
        subscribers: subscribers,
    }, nil
}

func (mq *MessageQueue) Run() {
    for name, s := range mq.subscribers {
        zap.L().Info("Run consumer for " + name)
        if mq.config.EnableLookupd {
            s.Consumer.ConnectToNSQLookupd(mq.config.NsqLookupdAddr)
        } else {
            s.Consumer.ConnectToNSQD(mq.config.NsqAddr)
        }
    }
}

func (mq *MessageQueue) IsTopicSupported(topic string) bool {

    for _, v := range mq.config.SupportedTopics {
        if v == topic {
            return true
        }
    }
    return false
}

// Pub 向消息队列发送一个消息
func (mq *MessageQueue) Pub(topic string, data interface{}) (err error) {
    if !mq.IsTopicSupported(topic) {
        err = fmt.Errorf("unsupported topic name: " + topic)
        return
    }
    body, err := json.Marshal(data)
    if err != nil {
        return
    }
    zap.L().Info("Pub " + topic + " to mq. data = " + string(body))
    return mq.producer.Publish(topic, body)
}

// Sub 从消息队列订阅一个消息
func (mq *MessageQueue) Sub(topic string, handler Messagehandler) (err error) {
    if !mq.IsTopicSupported(topic) {
        err = fmt.Errorf("unsupported topic name: " + topic)
        return
    }
    zap.L().Info("Subscribe " + topic)
    subscriber, ok := mq.subscribers[topic]
    if !ok {
        err = fmt.Errorf("No such topic: " + topic)
        return
    }
    // 抵达最后一个有效链表节点
    curNode := subscriber.HandlerHeadNode
    for nil != curNode.NextNode {
        curNode = curNode.NextNode
    }
    // 创建节点
    curNode.NextNode = &LinkedHandlerNode{
        Handler:  &handler,
        Index:    1 + curNode.Index,
        NextNode: nil,
    }
    return
}

这里的思想是给每个消费者预先创建唯一的 Handler,这个 Handler 会依次调用链表中的各个具体的 Handler。当用户订阅 Topic 时,将用户提供的 Handler 添加到链表末尾。

使用示例:

    m, err := mq.NewMessageQueue(mq.MessageQueueConfig{
        NsqAddr:         "127.0.0.1:4150",
        NsqLookupdAddr:  "127.0.0.1:4161",
        SupportedTopics: []string{"hello"},
        EnableLookupd:   false,
    })

    if err != nil {
        zap.L().Fatal("Message queue error: " + err.Error())
    }

    m.Sub("hello", func(resp []byte) bool {
        zap.L().Info("S1 Got: " + string(resp))
        return false
    })
    m.Sub("hello", func(resp []byte) bool {
        zap.L().Info("S2 Got: " + string(resp))
        return true
    })
    m.Sub("hello", func(resp []byte) bool {
        zap.L().Info("S3 Got: " + string(resp))
        return false
    })
    m.Run()
    err = m.Pub("hello", "world")
    if err != nil {
        zap.L().Fatal("Message queue error: " + err.Error())
    }
    err = m.Pub("hello", "tom")
    if err != nil {
        zap.L().Fatal("Message queue error: " + err.Error())
    }

    sigChan := make(chan os.Signal, 1)
    signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
    <-sigChan
    os.Exit(0)
2021-11-07T20:30:38.448+0800    DEBUG   mq/mq.go:40     New message queue
2021-11-07T20:30:38.448+0800    DEBUG   mq/mq.go:89     initProducer to 127.0.0.1:4150
2021-11-07T20:30:38.448+0800    DEBUG   mq/mq.go:96     initConsumer to hello/default
2021-11-07T20:30:38.448+0800    INFO    mq/mq.go:113    Subscribe hello
2021-11-07T20:30:38.448+0800    INFO    mq/mq.go:113    Subscribe hello
2021-11-07T20:30:38.448+0800    INFO    mq/mq.go:113    Subscribe hello
2021-11-07T20:30:38.448+0800    INFO    mq/mq.go:82     Run consumer for hello
2021/11/07 20:30:38 INF    2 [hello/default] (127.0.0.1:4150) connecting to nsqd
2021-11-07T20:30:38.454+0800    INFO    mq/mq.go:108    Pub hello to mq. data = "world"
2021/11/07 20:30:38 INF    1 (127.0.0.1:4150) connecting to nsqd
2021-11-07T20:30:38.455+0800    INFO    mq/mq.go:108    Pub hello to mq. data = "tom"
2021-11-07T20:30:38.455+0800    DEBUG   mq/mq.go:57     handler[0] for hello is invoked
2021-11-07T20:30:38.455+0800    INFO    buqi-admin-backend/main.go:60   S1 Got: "world"
2021-11-07T20:30:38.455+0800    DEBUG   mq/mq.go:57     handler[1] for hello is invoked
2021-11-07T20:30:38.455+0800    INFO    buqi-admin-backend/main.go:64   S2 Got: "world"
2021-11-07T20:30:38.455+0800    DEBUG   mq/mq.go:60     the message has stopped spreading 
2021-11-07T20:30:38.455+0800    DEBUG   mq/mq.go:57     handler[0] for hello is invoked
2021-11-07T20:30:38.455+0800    INFO    buqi-admin-backend/main.go:60   S1 Got: "tom"
2021-11-07T20:30:38.455+0800    DEBUG   mq/mq.go:57     handler[1] for hello is invoked
2021-11-07T20:30:38.455+0800    INFO    buqi-admin-backend/main.go:64   S2 Got: "tom"
2021-11-07T20:30:38.455+0800    DEBUG   mq/mq.go:60     the message has stopped spreading 
^C

可以看到,Handler 返回 true 时,就可以阻断消息的传播。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK