64

nsq源码review:go-nsq producer

 1年前 阅读数 68
以下为 快照 页面,建议前往来源网站查看,会有更好的阅读体验。
原文链接: https://www.tuicool.com/articles/riiiYfy

nsq是一个实时分布式的消息队列平台。

核心部分是一个叫nsqd的模块,它负责接收和转发消息。同时在go-nsq的包中,提供了consumer和producer的核心接口。在读nsq源码的时候,很好奇它的数据是怎么从producer给到了consumer的,于是从源码的层面梳理了一下代码的实现细节。这部分先记录一下producer和consumer的代码细节,方便后续再查看相关代码。后面准备把nsqd和nsqdlookup相关的东西记录一下,包括数据分发、数据缓存、服务发现等实现细节。

go-nsq里的producer和consumer实现的功能就是一句话,提供消息接受和分发的接口。但是它内部的实现确很有意思。

nsq demo

学习源码还是要先从demo起步,首先装好nsq,可以git下来源码编译或者执行下载二进制文件。先写个producer.go,如下

package main

import (
	"github.com/nsqio/go-nsq"
	"log"
)

func main() {
	cfg := nsq.NewConfig()
	r := []byte("hello consumer")
	addr := "127.0.0.1:4150"
	p, err := nsq.NewProducer(addr, cfg)
	if err != nil {
		log.Print(err)
	}
	err = p.Publish("serving123", r)
	if err != nil {
		log.Println(err)
	}
}
复制代码

例子里,我创建了一个producer,然后向serving123的topic里发送消息。

下面是consumer的代码,consumer.go

import (
	"fmt"
	"github.com/nsqio/go-nsq"
	"math/rand"
	"os"
	"time"
)

type SimpleHandler struct {
}

func (sh *SimpleHandler) HandleMessage(m *nsq.Message) error {
	_, err := os.Stdout.Write(m.Body)
	if err != nil {
		fmt.Println(err)
	}
	return nil
}

func main() {
	pause := make(chan bool)
	caddr := "127.0.0.1:4150"
	cfg := nsq.NewConfig()
	channel := fmt.Sprintf("tail%06d#ephemeral", rand.Int()%999999)
	c, _ := nsq.NewConsumer("serving123", channel, cfg)
	c.AddHandler(&SimpleHandler{})
	c.ConnectToNSQD(caddr)
	<-pause
}
复制代码

consumer,我定义了一个SimpleHandler,它实现了HandleMessage的接口,功能就是向标准输出打印消息。 main函数里,我新建了一个想要消费serving123这个topic的consumer,然后连接上nsqd服务。

运行

直接执行nsqd启一个服务。先执行consumer,然后执行producer,可以看到consumer里打印出的消息。

bimyyiE.jpg!web

producer源码解析

完成上面的demo之后,先来看一下producer是怎么玩的。

producer实例,会去调用 Publish 方法去发布消息,这个方法接收了topic和message body。

func (w *Producer) Publish(topic string, body []byte) error {
	return w.sendCommand(Publish(topic, body))
}
复制代码

内部的另外一个 Publish 方法执行结果作为参数传入 sendCommandPublish 方法,构造了一个 Command 的三元消息体

func Publish(topic string, body []byte) *Command {
	var params = [][]byte{[]byte(topic)}
	return &Command{[]byte("PUB"), params, body}
}
复制代码

sendCommand 方法里,初始化一个名叫doneChan的 ProducerTransaction 的指针,然后,cmd消息三元体,和doneChan一起传入 sendCommandAsync 方法里。最后监听doneChan的管道输出,然后返回error。

func (w *Producer) sendCommand(cmd *Command) error {
	doneChan := make(chan *ProducerTransaction)
	err := w.sendCommandAsync(cmd, doneChan, nil)
	if err != nil {
		close(doneChan)
		return err
	}
	t := <-doneChan
	return t.Error
}
复制代码

ProducerTransaction 结构体很有意思,它持有一个自己相同类型的指针。目的是将最终的内容自己保存起来。在最终clearup的时候,它会调用一个 finish 方法去触发上面的chan监听返回,从而最终返回退出 sendCommand

type ProducerTransaction struct {
	cmd      *Command
	doneChan chan *ProducerTransaction
	Error    error         // the error (or nil) of the publish command
	Args     []interface{} // the slice of variadic arguments passed to PublishAsync or MultiPublishAsync
}
复制代码

这个finish方法会把 ProducerTransaction 它自己本身进行传递,也就是在 t.doneChan <- t 这里

func (t *ProducerTransaction) finish() {
	if t.doneChan != nil {
		t.doneChan <- t
	}
}
复制代码

看一下 sendCommandAsync 函数。首先,它会去调用原子操作 atomic.AddInt32 去记录并发producer数量。上面说的doneChan指针,作为一个新的 ProducerTransaction 的参数传入,最终这个新的 ProducerTransaction 的数据传给了 w.transactionChan 监听的channel。

func (w *Producer) sendCommandAsync(cmd *Command, doneChan chan *ProducerTransaction,
	args []interface{}) error {
	// keep track of how many outstanding producers we're dealing with
	// in order to later ensure that we clean them all up...
	atomic.AddInt32(&w.concurrentProducers, 1)
	defer atomic.AddInt32(&w.concurrentProducers, -1)

	if atomic.LoadInt32(&w.state) != StateConnected {
		err := w.connect()//建立和nsqd的连接
		if err != nil {
			return err
		}
	}

	t := &ProducerTransaction{
		cmd:      cmd,
		doneChan: doneChan,
		Args:     args,
	}

	select {
	case w.transactionChan <- t:
	case <-w.exitChan:
		return ErrStopped
	}

	return nil
}
复制代码

建立和nsqd的连接,以及发送数据都写在w.connect()。了解了上面channel异步操作,看下这个connect函数

func (w *Producer) connect() error {
	w.guard.Lock()
	defer w.guard.Unlock()

	if atomic.LoadInt32(&w.stopFlag) == 1 {
		return ErrStopped
	}

	switch state := atomic.LoadInt32(&w.state); state {
	case StateInit:
	case StateConnected:
		return nil
	default:
		return ErrNotConnected
	}

	w.log(LogLevelInfo, "(%s) connecting to nsqd", w.addr)

	logger, logLvl := w.getLogger()

	w.conn = NewConn(w.addr, &w.config, &producerConnDelegate{w})
	w.conn.SetLogger(logger, logLvl, fmt.Sprintf("%3d (%%s)", w.id))

	_, err := w.conn.Connect()
	if err != nil {
		w.conn.Close()
		w.log(LogLevelError, "(%s) error connecting to nsqd - %s", w.addr, err)
		return err
	}
	atomic.StoreInt32(&w.state, StateConnected)
	w.closeChan = make(chan int)
	w.wg.Add(1)
	go w.router()

	return nil
}
复制代码

函数开始部分很好理解,就是进行一些状态验证。 _, err := w.conn.Connect() 这里会实际去建立和nsqd的连接,会在里面跑一个readLoop和一个writeloop去进行读写的相关操作,东西比较多就不再赘述。这里关心的是这个 w.router 方法。

router方法里面开了一个for循环来监听producer的channel,包括transactionChan、responseChan、errorChan、closeChan、exitChan,如果是w.transactionChan,则把这个transaction塞进producer的transactions数组里, 然后向conn里去写消息即向nsqd发送数据 。如果是收到了返回信号或者是错误信号,则会弹出一个transaction。如果收到关闭或者是退出信号,则到exit里面,清理所有transaction,并退出。

func (w *Producer) router() {
	for {
		select {
		case t := <-w.transactionChan:
			w.transactions = append(w.transactions, t)
			err := w.conn.WriteCommand(t.cmd)
			if err != nil {
				w.log(LogLevelError, "(%s) sending command - %s", w.conn.String(), err)
				w.close()
			}
		case data := <-w.responseChan:
			w.popTransaction(FrameTypeResponse, data)
		case data := <-w.errorChan:
			w.popTransaction(FrameTypeError, data)
		case <-w.closeChan:
			goto exit
		case <-w.exitChan:
			goto exit
		}
	}

exit:
	w.transactionCleanup()
	w.wg.Done()
	w.log(LogLevelInfo, "exiting router")
}
复制代码

popTransaction 方法,会把transactions第一个元素弹出,保存剩下的元素。然后回去调用这个弹出的 ProducerTransactionfinish() 方法,也就是上面说的finish()方法。上面的 sendCommand 方法会受到通知退出,这样就完成了消息的发布过程。

func (w *Producer) popTransaction(frameType int32, data []byte) {
	t := w.transactions[0]
	w.transactions = w.transactions[1:]
	if frameType == FrameTypeError {
		t.Error = ErrProtocol{string(data)}
	}
	t.finish()
}
复制代码

总结

限于篇幅,consumer相关准备起另外一篇去写,consumer和producer在和nsqd通信的地方会复用一些代码。

关于nsq的producer,我们能学习到的是它channel的使用方法,以及进行数据传输的时候,是如何运载数据和如何通知和监听channel。核心的部分在于 ProducerTransaction 这个结构体,它负责了消息的运载。在上面提到的readLoop和writeloop,里面有许多代理的方法也值得关注和学习,具体内容可以自己看一眼。


猜你喜欢

  • 24
    • studygolang.com 2年前
    • 快照

    NSQ源码-Nsq客户端

    看完lookupd和nsqd之后我们再来看下nsq client端的代码。 我是想把nsq系统完完整整的看一遍,从而对他形成一个更整体的 认识。对message queue来说他的client端就是生产者和消费者,生产者负责想nsq中投递消息,消费者负责...

  • 29
    • studygolang.com 2年前
    • 快照

    NSQ源码-NSQD

    看完了nsqlookupd我们继续往下看, nsqd才是他的核心. 里面大量的使用到了go channel, 相信看完之后对你学习go有很大的帮助.相较于lookupd部分无论在代码逻辑和实现上都要复杂很多. 不过基本的代码结构基本上都是一样的, 进...

  • 47
    • studygolang.com 2年前
    • 快照

    NSQ源码-nsqlookupd

    为什么选择nsq 之前一直在用erlang做电信产品的开发,对erlang的一些生态也比较了解,和erlang相关的产品在互联网公司使用最多的应该就是rabbitmq了,也许很多人听说过erlang就是因为他们公司在使用rabbitmq。在之前也看过一点...

  • 46
    • www.tuicool.com 1年前
    • 快照

    NSQ v0.1.5 源码分析

    NSQ 是实时的分布式消息处理平台,其设计的目的是用来大规模地处理每天数以十亿计级别的消息。NSQ 具有分布式和去中心化拓扑结构,该结构具有无单点故障、故障容错、高可用性以及能够保证消息的可靠传递的特征,是一个成熟的、已在大规模...

  • 2
    • jiajunhuang.com 1个月前
    • 快照

    NSQ源码分析

    NSQ源码分析 简单的翻了一下NSQ的源码,看看它是怎么实现的。我首先是从nsqtail开始看的,先从简单的入手。之后我看了nsqlookupd和nsqd。本文 只讲nsqd。 首先从nsqd的入口文件看起 apps/nsqd/main.go: func m...

  • 42

    php-nsq php-nsq 是nsq的php客户端,采用c扩展编写,性能和稳定性。 安装 : 请提前安装libevent Dependencies: libevent (apt-get install libevent-dev ,yum install libevent-devel) 1....

  • 35
    • 微信 mp.weixin.qq.com 2年前
    • 快照

    NSQ最佳实践

  • 77
    • 微信 mp.weixin.qq.com 2年前
    • 快照

    NSQ 最佳实践

    目前,全新的异步任务服务每天高效稳定的为唱吧提供数亿次的调用。服务器团队用全新的方式重新定义了异步任务实现方式,以为云计算而生的NSQ、成熟的PHP执行者PHP-FPM、自主开发的中间件NSQProxy以及admin管理后台共同组成了异步任务的队...

  • 62
    • bridgeforyou.cn 2年前
    • 快照

    MQ(6) —— Nsq vs Kafka

    Nsq vs Kafka 正如之前说的,Nsq是一款极简的消息中间件,通过学习Nsq,我们可以通过对比的方式,学习其他的Mq。 这一节,就让我们在对比中,学习另一种Mq,Kafka,在对比中,加深对Mq的理解。 首先,先放上...

  • 64
    • www.tuicool.com 1年前
    • 快照

    使用NSQ(附Golang代码)

    上篇文章已经了解了消息中间件相关的知识,这篇文章学习一下Golang语言编写的知名消息中间件 NSQ 。 nsq最初是由bitly公司开源出来的一款简单易用的消息中间件,它可用于大规模系...

关于极客头条


聚合每日国内外有价值,有趣的链接。

AD