42

NSQ源码-NSQD

 5 years ago
source link: https://studygolang.com/articles/16575?amp%3Butm_medium=referral
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

看完了nsqlookupd我们继续往下看, nsqd才是他的核心. 里面大量的使用到了go channel, 相信看完之后对你学习go有很大的帮助.相较于lookupd部分无论在代码逻辑和实现上都要复杂很多.

不过基本的代码结构基本上都是一样的, 进程使用go-srv来管理, Main里启动一个http sever和一个tcp server, 这里可以参考下之前文章的进程模型小节, 不过在nsqd中会启动另外的两个goroutine queueScanLoop和lookupLoop。下面是一个

具体的进程模型。

bVbko1h?w=848&h=506

后面的分析都是基于这个进程模型。

NSQD的启动

启动时序这块儿大体上和lookupd中的一致, 我们下面来看看lookupLoop和queueScanLoop.

lookupLoop代码见nsqd/lookup.go中 主要做以下几件事情:

  • 和lookupd建立连接(这里是一个长连接)
  • 每隔15s ping一下lookupd
  • 新增或者删除topic的时候通知到lookupd
  • 新增或者删除channel的时候通知到lookupd
  • 动态的更新options

由于设计到了nsq里的in-flight/deferred message, 我们把queueScanLoop放到最后来看.

一条message的LifeLine

下面我们就通过一条message的生命周期来看下nsqd的工作原理. 根据官方的QuickStart, 我们可以通过curl来pub一条消息.

curl -d 'hello world 1' 'http://127.0.0.1:4151/pub?topic=test'

http handler

我们就跟着代码看一下, 首先是http对此的处理:

// nsq/nsqd/http.go
func (s *httpServer) doPUB(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
    ...
    reqParams, topic, err := s.getTopicFromQuery(req) // 从http query中拿到topic信息
    ...
}
// nsq/nsqd/http.go
func (s *httpServer) getTopicFromQuery(req *http.Request) (url.Values, *Topic, error) {
    reqParams, err := url.ParseQuery(req.URL.RawQuery)
    topicNames, ok := reqParams["topic"]
    return reqParams, s.ctx.nsqd.GetTopic(topicName), nil
}
// nsq/nsqd/nsqd.go
// GetTopic performs a thread safe operation
// to return a pointer to a Topic object (potentially new)
func (n *NSQD) GetTopic(topicName string) *Topic {
    // 1. 首先查看n.topicMap,确认该topic是否已经存在(存在直接返回)
    t, ok := n.topicMap[topicName]
    // 2. 否则将新建一个topic
    t = NewTopic(topicName, &context{n}, deleteCallback)
    n.topicMap[topicName] = t

    // 3. 查看该nsqd是否设置了lookupd, 从lookupd获取该tpoic的channel信息
    // 这个topic/channel已经通过nsqlookupd的api添加上去的, 但是nsqd的本地
    // 还没有, 针对这种情况我们需要创建该channel对应的deffer queue和inFlight
    // queue.
    lookupdHTTPAddrs := n.lookupdHTTPAddrs()
    if len(lookupdHTTPAddrs) > 0 {
        channelNames, err := n.ci.GetLookupdTopicChannels(t.name, lookupdHTTPAddrs)
    }
    // now that all channels are added, start topic messagePump
    // 对该topic的初始化已经完成下面就是message
    t.Start()
    return t
}

topic messagePump

在上面消息初始化完成之后就启动了tpoic对应的messagePump

// nsq/nsqd/topic.go
// messagePump selects over the in-memory and backend queue and
// writes messages to every channel for this topic
func (t *Topic) messagePump() {

    // 1. do not pass messages before Start(), but avoid blocking Pause() 
    // or GetChannel() 
    // 等待channel相关的初始化完成,GetTopic中最后的t.Start()才正式启动该Pump
    

    // 2. main message loop
    // 开始从Memory chan或者disk读取消息
    // 如果topic对应的channel发生了变化,则更新channel信息
    
    // 3. 往该tpoic对应的每个channel写入message(如果是deffermessage
    // 的话放到对应的deffer queue中
    // 否则放到该channel对应的memoryMsgChan中)。
}

至此也就完成了从tpoic memoryMsgChan收到消息投递到channel memoryMsgChan的投递, 我们先看下http

收到消息到通知pump处理的过程。

// nsq/nsqd/http.go
func (s *httpServer) doPUB(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
    ...
    msg := NewMessage(topic.GenerateID(), body)
    msg.deferred = deferred
    err = topic.PutMessage(msg)
    if err != nil {
        return nil, http_api.Err{503, "EXITING"}
    }

    return "OK", nil
}
// nsq/nsqd/topic.go
// PutMessage writes a Message to the queue
func (t *Topic) PutMessage(m *Message) error {
    t.RLock()
    defer t.RUnlock()
    if atomic.LoadInt32(&t.exitFlag) == 1 {
        return errors.New("exiting")
    }
    err := t.put(m)
    if err != nil {
        return err
    }
    atomic.AddUint64(&t.messageCount, 1)
    return nil
}
func (t *Topic) put(m *Message) error {
    select {
    case t.memoryMsgChan <- m:
    default:
        b := bufferPoolGet()
        err := writeMessageToBackend(b, m, t.backend)
        bufferPoolPut(b)
        t.ctx.nsqd.SetHealth(err)
        if err != nil {
            t.ctx.nsqd.logf(LOG_ERROR,
                "TOPIC(%s) ERROR: failed to write message to backend - %s",
                t.name, err)
            return err
        }
    }
    return nil
}

这里memoryMsgChan的大小我们可以通过--mem-queue-size参数来设置,上面这段代码的流程是如果memoryMsgChan还没有满的话

就把消息放到memoryMsgChan中,否则就放到backend(disk)中。topic的mesasgePump检测到有新的消息写入的时候就开始工作了,

从memoryMsgChan/backend(disk)读取消息投递到channel对应的chan中。 还有一点请注意就是messagePump中

if len(chans) > 0 && !t.IsPaused() {
        memoryMsgChan = t.memoryMsgChan
        backendChan = t.backend.ReadChan()
    }

这段代码只有channel(此channel非golang里的channel而是nsq的channel类似nsq_to_file)存在的时候才会去投递。上面部分就是

msg从producer生产消息到吧消息写到memoryChan/Disk的过程,下面我们来看下consumer消费消息的过程。

首先是consumer从nsqlookupd查询到自己所感兴趣的topic/channel的nsqd信息, 然后就是来连接了。

tcp handler

对新的client的处理

//nsq/internal/protocol/tcp_server.go
func TCPServer(listener net.Listener, handler TCPHandler, logf lg.AppLogFunc) {
    go handler.Handle(clientConn)
}
//nsq/nsqd/tcp.go
func (p *tcpServer) Handle(clientConn net.Conn) {
    prot.IOLoop(clientConn)
}

针对每个client起一个messagePump吧msg从上面channel对应的chan 写入到consumer侧

//nsq/nsqd/protocol_v2.go
func (p *protocolV2) IOLoop(conn net.Conn) error {
    client := newClientV2(clientID, conn, p.ctx)
    p.ctx.nsqd.AddClient(client.ID, client)

    messagePumpStartedChan := make(chan bool)
    go p.messagePump(client, messagePumpStartedChan)

    // read the request
    line, err = client.Reader.ReadSlice('\n')
    response, err = p.Exec(client, params)
    p.Send(client, frameTypeResponse, response)

}
//nsq/nsqd/protocol_v2.go
func (p *protocolV2) Exec(client *clientV2, params [][]byte) ([]byte, error) {
    switch {
    case bytes.Equal(params[0], []byte("FIN")):
        return p.FIN(client, params)
    case bytes.Equal(params[0], []byte("RDY")):
        return p.RDY(client, params)
    case bytes.Equal(params[0], []byte("REQ")):
        return p.REQ(client, params)
    case bytes.Equal(params[0], []byte("PUB")):
        return p.PUB(client, params)
    case bytes.Equal(params[0], []byte("MPUB")):
        return p.MPUB(client, params)
    case bytes.Equal(params[0], []byte("DPUB")):
        return p.DPUB(client, params)
    case bytes.Equal(params[0], []byte("NOP")):
        return p.NOP(client, params)
    case bytes.Equal(params[0], []byte("TOUCH")):
        return p.TOUCH(client, params)
    case bytes.Equal(params[0], []byte("SUB")):
        return p.SUB(client, params)
    case bytes.Equal(params[0], []byte("CLS")):
        return p.CLS(client, params)
    case bytes.Equal(params[0], []byte("AUTH")):
        return p.AUTH(client, params)
    }
}
//nsq/nsqd/protocol_v2.go
func (p *protocolV2) SUB(client *clientV2, params [][]byte) ([]byte, error) {
    var channel *Channel
    topic := p.ctx.nsqd.GetTopic(topicName)
    channel = topic.GetChannel(channelName)
    channel.AddClient(client.ID, client)

    // 通知messagePump开始工作
    client.SubEventChan <- channel

通知topic的messagePump开始工作

func (t *Topic) GetChannel(channelName string) *Channel {
    t.Lock()
    channel, isNew := t.getOrCreateChannel(channelName)
    t.Unlock()

    if isNew {
        // update messagePump state
        select {
        case t.channelUpdateChan <- 1:
        case <-t.exitChan:
        }
    }

    return channel
}

message 对应的Pump

func (p *protocolV2) messagePump(client *clientV2, startedChan chan bool) {
    for {
        if subChannel == nil || !client.IsReadyForMessages() {
            // the client is not ready to receive messages...
            // 等待client ready,并且channel的初始化完成
            flushed = true
        } else if flushed {
            // last iteration we flushed...
            // do not select on the flusher ticker channel
            memoryMsgChan = subChannel.memoryMsgChan
            backendMsgChan = subChannel.backend.ReadChan()
            flusherChan = nil
        } else {
            // we're buffered (if there isn't any more data we should flush)...
            // select on the flusher ticker channel, too
            memoryMsgChan = subChannel.memoryMsgChan
            backendMsgChan = subChannel.backend.ReadChan()
            flusherChan = outputBufferTicker.C
        }

        select {
        case <-flusherChan:
            // if this case wins, we're either starved
            // or we won the race between other channels...
            // in either case, force flush
        case <-client.ReadyStateChan:
        case subChannel = <-subEventChan:
            // you can't SUB anymore
            // channel初始化完成,pump开始工作
            subEventChan = nil
        case identifyData := <-identifyEventChan:
            // you can't IDENTIFY anymore
        case <-heartbeatChan:
            // heartbeat的处理
        case b := <-backendMsgChan:
            // 1. decode msg
            // 2. 把msg push到Flight Queue里
            // 3. send msg to client
        case msg := <-memoryMsgChan:
            // 1. 把msg push到Flight Queue里
            // 2. send msg to client
        case <-client.ExitChan:
            // exit the routine
        }
    }

至此我们看的代码就是一条消息从pub到nsqd中到被消费者处理的过程。不过得注意一点,我们在上面的代码分析中,创建

topic/channel的部分放到了message Pub的链上, 如果是没有lookupd的模式的话这部分是在client SUB链上的。

topic/hannel的管理

在NSQ内部通过

type NSQD struct {
    topicMap map[string]*Topic
}
和
type Topic struct {
    channelMap        map[string]*Channel
}

来维护一个内部的topic/channel状态,然后在提供了如下的接口来管理topic和channel

/topic/create - create a new topic
/topic/delete - delete a topic
/topic/empty - empty a topic
/topic/pause - pause message flow for a topic
/topic/unpause - unpause message flow for a topic
/channel/create - create a new channel
/channel/delete - delete a channel
/channel/empty - empty a channel
/channel/pause - pause message flow for a channel
/channel/unpause - unpause message flow for a channel

create topic/channel的话我们在之前的代码看过了,这里可以重点看下topic/channel delete的时候怎样保证数据优雅的删除的,以及

messagePump的退出机制。

queueScanLoop的工作

// queueScanLoop runs in a single goroutine to process in-flight and deferred
// priority queues. It manages a pool of queueScanWorker (configurable max of
// QueueScanWorkerPoolMax (default: 4)) that process channels concurrently.
//
// It copies Redis's probabilistic expiration algorithm: it wakes up every
// QueueScanInterval (default: 100ms) to select a random QueueScanSelectionCount
// (default: 20) channels from a locally cached list (refreshed every
// QueueScanRefreshInterval (default: 5s)).
//
// If either of the queues had work to do the channel is considered "dirty".
//
// If QueueScanDirtyPercent (default: 25%) of the selected channels were dirty,
// the loop continues without sleep.

这里的注释已经说的很明白了,queueScanLoop就是通过动态的调整queueScanWorker的数目来处理

in-flight和deffered queue的。在具体的算法上的话参考了redis的随机过期算法。

总结

阅读源码就是走走停停的过程,从一开始的无从下手到后面的一点点的把它啃透。一开始都觉得很困难,无从下手。以前也是尝试着去看一些

经典的开源代码,但都没能坚持下来,有时候人大概是会高估自己的能力的,好多东西自以为看个一两遍就能看懂,其实不然,

好多知识只有不断的去研究你才能参透其中的原理。

一定要持续的读,不然过几天之后就忘了前面读的内容
      一定要多总结, 总结就是在不断的读的过程,从第一遍读通到你把它表述出来至少需要再读5-10次
      多思考,这段时间在地铁上/跑步的时候我会回向一下其中的流程
      分享(读懂是一个层面,写出来是一个层面,讲给别人听是另外一个层面)

后面我会先看下go-nsqd部分的代码,之后会研究下gnatsd, 两个都是cloud native的消息系统,看下有啥区别。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK