75

nsq源码review:go-nsq producer

 4 years ago
source link: https://www.tuicool.com/articles/riiiYfy
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

nsq是一个实时分布式的消息队列平台。

核心部分是一个叫nsqd的模块,它负责接收和转发消息。同时在go-nsq的包中,提供了consumer和producer的核心接口。在读nsq源码的时候,很好奇它的数据是怎么从producer给到了consumer的,于是从源码的层面梳理了一下代码的实现细节。这部分先记录一下producer和consumer的代码细节,方便后续再查看相关代码。后面准备把nsqd和nsqdlookup相关的东西记录一下,包括数据分发、数据缓存、服务发现等实现细节。

go-nsq里的producer和consumer实现的功能就是一句话,提供消息接受和分发的接口。但是它内部的实现确很有意思。

nsq demo

学习源码还是要先从demo起步,首先装好nsq,可以git下来源码编译或者执行下载二进制文件。先写个producer.go,如下

package main

import (
	"github.com/nsqio/go-nsq"
	"log"
)

func main() {
	cfg := nsq.NewConfig()
	r := []byte("hello consumer")
	addr := "127.0.0.1:4150"
	p, err := nsq.NewProducer(addr, cfg)
	if err != nil {
		log.Print(err)
	}
	err = p.Publish("serving123", r)
	if err != nil {
		log.Println(err)
	}
}
复制代码

例子里,我创建了一个producer,然后向serving123的topic里发送消息。

下面是consumer的代码,consumer.go

import (
	"fmt"
	"github.com/nsqio/go-nsq"
	"math/rand"
	"os"
	"time"
)

type SimpleHandler struct {
}

func (sh *SimpleHandler) HandleMessage(m *nsq.Message) error {
	_, err := os.Stdout.Write(m.Body)
	if err != nil {
		fmt.Println(err)
	}
	return nil
}

func main() {
	pause := make(chan bool)
	caddr := "127.0.0.1:4150"
	cfg := nsq.NewConfig()
	channel := fmt.Sprintf("tail%06d#ephemeral", rand.Int()%999999)
	c, _ := nsq.NewConsumer("serving123", channel, cfg)
	c.AddHandler(&SimpleHandler{})
	c.ConnectToNSQD(caddr)
	<-pause
}
复制代码

consumer,我定义了一个SimpleHandler,它实现了HandleMessage的接口,功能就是向标准输出打印消息。 main函数里,我新建了一个想要消费serving123这个topic的consumer,然后连接上nsqd服务。

运行

直接执行nsqd启一个服务。先执行consumer,然后执行producer,可以看到consumer里打印出的消息。

bimyyiE.jpg!web

producer源码解析

完成上面的demo之后,先来看一下producer是怎么玩的。

producer实例,会去调用 Publish 方法去发布消息,这个方法接收了topic和message body。

func (w *Producer) Publish(topic string, body []byte) error {
	return w.sendCommand(Publish(topic, body))
}
复制代码

内部的另外一个 Publish 方法执行结果作为参数传入 sendCommandPublish 方法,构造了一个 Command 的三元消息体

func Publish(topic string, body []byte) *Command {
	var params = [][]byte{[]byte(topic)}
	return &Command{[]byte("PUB"), params, body}
}
复制代码

sendCommand 方法里,初始化一个名叫doneChan的 ProducerTransaction 的指针,然后,cmd消息三元体,和doneChan一起传入 sendCommandAsync 方法里。最后监听doneChan的管道输出,然后返回error。

func (w *Producer) sendCommand(cmd *Command) error {
	doneChan := make(chan *ProducerTransaction)
	err := w.sendCommandAsync(cmd, doneChan, nil)
	if err != nil {
		close(doneChan)
		return err
	}
	t := <-doneChan
	return t.Error
}
复制代码

ProducerTransaction 结构体很有意思,它持有一个自己相同类型的指针。目的是将最终的内容自己保存起来。在最终clearup的时候,它会调用一个 finish 方法去触发上面的chan监听返回,从而最终返回退出 sendCommand

type ProducerTransaction struct {
	cmd      *Command
	doneChan chan *ProducerTransaction
	Error    error         // the error (or nil) of the publish command
	Args     []interface{} // the slice of variadic arguments passed to PublishAsync or MultiPublishAsync
}
复制代码

这个finish方法会把 ProducerTransaction 它自己本身进行传递,也就是在 t.doneChan <- t 这里

func (t *ProducerTransaction) finish() {
	if t.doneChan != nil {
		t.doneChan <- t
	}
}
复制代码

看一下 sendCommandAsync 函数。首先,它会去调用原子操作 atomic.AddInt32 去记录并发producer数量。上面说的doneChan指针,作为一个新的 ProducerTransaction 的参数传入,最终这个新的 ProducerTransaction 的数据传给了 w.transactionChan 监听的channel。

func (w *Producer) sendCommandAsync(cmd *Command, doneChan chan *ProducerTransaction,
	args []interface{}) error {
	// keep track of how many outstanding producers we're dealing with
	// in order to later ensure that we clean them all up...
	atomic.AddInt32(&w.concurrentProducers, 1)
	defer atomic.AddInt32(&w.concurrentProducers, -1)

	if atomic.LoadInt32(&w.state) != StateConnected {
		err := w.connect()//建立和nsqd的连接
		if err != nil {
			return err
		}
	}

	t := &ProducerTransaction{
		cmd:      cmd,
		doneChan: doneChan,
		Args:     args,
	}

	select {
	case w.transactionChan <- t:
	case <-w.exitChan:
		return ErrStopped
	}

	return nil
}
复制代码

建立和nsqd的连接,以及发送数据都写在w.connect()。了解了上面channel异步操作,看下这个connect函数

func (w *Producer) connect() error {
	w.guard.Lock()
	defer w.guard.Unlock()

	if atomic.LoadInt32(&w.stopFlag) == 1 {
		return ErrStopped
	}

	switch state := atomic.LoadInt32(&w.state); state {
	case StateInit:
	case StateConnected:
		return nil
	default:
		return ErrNotConnected
	}

	w.log(LogLevelInfo, "(%s) connecting to nsqd", w.addr)

	logger, logLvl := w.getLogger()

	w.conn = NewConn(w.addr, &w.config, &producerConnDelegate{w})
	w.conn.SetLogger(logger, logLvl, fmt.Sprintf("%3d (%%s)", w.id))

	_, err := w.conn.Connect()
	if err != nil {
		w.conn.Close()
		w.log(LogLevelError, "(%s) error connecting to nsqd - %s", w.addr, err)
		return err
	}
	atomic.StoreInt32(&w.state, StateConnected)
	w.closeChan = make(chan int)
	w.wg.Add(1)
	go w.router()

	return nil
}
复制代码

函数开始部分很好理解,就是进行一些状态验证。 _, err := w.conn.Connect() 这里会实际去建立和nsqd的连接,会在里面跑一个readLoop和一个writeloop去进行读写的相关操作,东西比较多就不再赘述。这里关心的是这个 w.router 方法。

router方法里面开了一个for循环来监听producer的channel,包括transactionChan、responseChan、errorChan、closeChan、exitChan,如果是w.transactionChan,则把这个transaction塞进producer的transactions数组里, 然后向conn里去写消息即向nsqd发送数据 。如果是收到了返回信号或者是错误信号,则会弹出一个transaction。如果收到关闭或者是退出信号,则到exit里面,清理所有transaction,并退出。

func (w *Producer) router() {
	for {
		select {
		case t := <-w.transactionChan:
			w.transactions = append(w.transactions, t)
			err := w.conn.WriteCommand(t.cmd)
			if err != nil {
				w.log(LogLevelError, "(%s) sending command - %s", w.conn.String(), err)
				w.close()
			}
		case data := <-w.responseChan:
			w.popTransaction(FrameTypeResponse, data)
		case data := <-w.errorChan:
			w.popTransaction(FrameTypeError, data)
		case <-w.closeChan:
			goto exit
		case <-w.exitChan:
			goto exit
		}
	}

exit:
	w.transactionCleanup()
	w.wg.Done()
	w.log(LogLevelInfo, "exiting router")
}
复制代码

popTransaction 方法,会把transactions第一个元素弹出,保存剩下的元素。然后回去调用这个弹出的 ProducerTransactionfinish() 方法,也就是上面说的finish()方法。上面的 sendCommand 方法会受到通知退出,这样就完成了消息的发布过程。

func (w *Producer) popTransaction(frameType int32, data []byte) {
	t := w.transactions[0]
	w.transactions = w.transactions[1:]
	if frameType == FrameTypeError {
		t.Error = ErrProtocol{string(data)}
	}
	t.finish()
}
复制代码

总结

限于篇幅,consumer相关准备起另外一篇去写,consumer和producer在和nsqd通信的地方会复用一些代码。

关于nsq的producer,我们能学习到的是它channel的使用方法,以及进行数据传输的时候,是如何运载数据和如何通知和监听channel。核心的部分在于 ProducerTransaction 这个结构体,它负责了消息的运载。在上面提到的readLoop和writeloop,里面有许多代理的方法也值得关注和学习,具体内容可以自己看一眼。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK