19

【源码阅读】Nsqd

 4 years ago
source link: https://studygolang.com/articles/26357
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

Nsqd源码阅读

简介

nsqd为nsq daemon的简写,是nsq组件最主要的服务。

nsqd提供一个tcp服务、一个http服务以及一个可选的https服务,tcp服务于客户端(生产者或消费者),http则提供API(可用于创建、删除topic与channel,生产数据,清空数据等)。

初始化

nsqd的启动入口为apps/nsqd/nsqd.go文件里的main函数。

首先定义了一个program的结构体,用于对程序的控制。结构体内元素为指向NSQD的指针。

main函数里面定义了一个具体的prg,然后Run它。

Run函数负责启动prg并阻塞,直至接收到对应的信号(对于nsqd为SIGINT或者SIGTERM信号)。

type program struct {  
   nsqd *nsqd.NSQD  
}

func main() {
   //定义一个具体的prg,并启动它
   prg := &program{}  
   if err := svc.Run(prg, syscall.SIGINT, syscall.SIGTERM); err != nil {  
      log.Fatal(err)  
   }  
}

Run函数会调用program的Start方法,并调用Main()来启动nsqd服务。

func (p *program) Start() error {
    opts := nsqd.NewOptions()
    
    ... //配置读取过程,会修改opt
    
    nsqd := nsqd.New(opts)
    
    ... //Metadata的处理,以后再说
    
    nsqd.Main() //启动nsqd服务

    p.nsqd = nsqd
    return nil
}

启动

nsqd首先启动一个Tcp服务、一个Http服务以及一个可选的Https服务,然后调用queueScanLoop函数来处理in-flight与defered数据。

func (n *NSQD) Main() {
    var httpListener net.Listener
    var httpsListener net.Listener

    ctx := &context{n}

    //连续启动Tcp、Https、Http服务
    tcpListener, err := net.Listen("tcp", n.getOpts().TCPAddress)
    ...

    if n.tlsConfig != nil && n.getOpts().HTTPSAddress != "" {
        httpsListener, err = tls.Listen("tcp", n.getOpts().HTTPSAddress, n.tlsConfig)
        ...
    }
    
    httpListener, err = net.Listen("tcp", n.getOpts().HTTPAddress)
    ...

    n.waitGroup.Wrap(func() { n.queueScanLoop() })
    n.waitGroup.Wrap(func() { n.lookupLoop() })
    if n.getOpts().StatsdAddress != "" {
        n.waitGroup.Wrap(func() { n.statsdLoop() })
    }
}

客户端连接

客户端连接nsqd的tcp server以后,nsqd会启动一个IOLoop,IOLoop里面首先启动messagePump,然后启动循环处理后续请求。

messagePump负责将Channel里面的消息取出来,并push给客户端。

func (p *protocolV2) IOLoop(conn net.Conn) error {
    var err error
    var line []byte
    var zeroTime time.Time

    clientID := atomic.AddInt64(&p.ctx.nsqd.clientIDSequence, 1)
    client := newClientV2(clientID, conn, p.ctx)

    // messagePump初始化会用到client的一些参数,这里的messagePumpStartedChan保证了初始化完成以后才会接收新的请求,避免了IDENTIFY请求对client的参数可能进行的修改。
    messagePumpStartedChan := make(chan bool)
    go p.messagePump(client, messagePumpStartedChan)
    <-messagePumpStartedChan
    
    for {
        ...
        //读取下一次请求
        line, err = client.Reader.ReadSlice('\n')
        ...
        params := bytes.Split(line, separatorBytes)

        p.ctx.nsqd.logf(LOG_DEBUG, "PROTOCOL(V2): [%s] %s", client, params)
        
        //处理请求
        response, err = p.Exec(client, params)
        ...
        if response != nil {
            //发送响应
            err = p.Send(client, frameTypeResponse, response)
            ...
        }
    }
    ...
}

数据生产

topic创建

调用http的"/topic/create"接口、"/pub"接口,tcp的SUB/PUB请求等都会触发topic的创建。

创建topic位于NSQD的GetTopic方法。

首先使用读锁判断topic是否存在,如果存在则直接返回;如果不存在,则加写锁,然后调用NewTopic函数创建新的topic。

func (n *NSQD) GetTopic(topicName string) *Topic {
    //读锁判断topic是否存在,如果存在则直接返回
    // most likely, we already have this topic, so try read lock first.
    n.RLock()
    t, ok := n.topicMap[topicName]
    n.RUnlock()
    if ok {
        return t
    }

    n.Lock()
    //获取写锁后再次判断topic是否存在,如果存在则直接返回
    t, ok = n.topicMap[topicName]
    if ok {
        n.Unlock()
        return t
    }
    deleteCallback := func(t *Topic) {
        n.DeleteExistingTopic(t.name)
    }
    //创建topic
    t = NewTopic(topicName, &context{n}, deleteCallback)
    n.topicMap[topicName] = t

    n.logf(LOG_INFO, "TOPIC(%s): created", t.name)

    // release our global nsqd lock, and switch to a more granular topic lock while we init our
    // channels from lookupd. This blocks concurrent PutMessages to this topic.
    t.Lock()
    n.Unlock()

    // 使用lookup的相关处理,如果不使用可以先忽略
    ...

    t.Unlock()

    // 触发messagePump更新channel状态
    select {
    case t.channelUpdateChan <- 1:
    case <-t.exitChan:
    }
    return t
}

生产消息

调用http的"/pub"接口,或者tcp的PUB操作,都可以将消息发送给nsqd,nsqd首先将消息存入topic中作为过渡。

两种处理过程分别在(s httpServer)的doPUB方法与(p protocolV2)的PUB方法,二者殊途同归,都会调用(t *Topic)的PutMessage方法,将消息写入topic中。

func (t *Topic) PutMessage(m *Message) error {
    ...
    succ, err := t.put(m)
    ...
}

func (t *Topic) put(m *Message) (succ bool, err error) {
    if t.putMode == PUTMODE_NORMAL{
        select {
        //将消息写入到Topic的memoryMsgChan
        case t.memoryMsgChan <- m:
        default:
            t.put2Disk(m)
        }
    }else{
    ...
    }
}

数据消费

channel创建

消费者的“订阅”(SUB)请求会触发channel的创建,在tcp服务的SUB处理里面。

func (p *protocolV2) SUB(client *clientV2, params [][]byte) ([]byte, error) {
    ...

    topicName := string(params[1])
    ...

    channelName := string(params[2])
    ...

    // 防止该topic或者channel是正处于退出状态
    var channel *Channel
    for {
        //获取或者创建topic
        topic := p.ctx.nsqd.GetTopic(topicName)
        //获取或者创建channel
        channel = topic.GetChannel(channelName)
        channel.AddClient(client.ID, client)

        if (channel.ephemeral && channel.Exiting()) || (topic.ephemeral && topic.Exiting()) {
            channel.RemoveClient(client.ID)
            time.Sleep(1 * time.Millisecond)
            continue
        }
        break
    }
    atomic.StoreInt32(&client.State, stateSubscribed)
    client.Channel = channel
    // 将channel告知client
    client.SubEventChan <- channel

    return okBytes, nil
}

topic的消息复制到channel

topic创建时调用的NewTopic函数会启动messagePump函数,负责更新channel,并将topic中的消息复制到所有channel。

// messagePump selects over the in-memory and backend queue and
// writes messages to every channel for this topic
func (t *Topic) messagePump() {
    //获取所有channel
    t.RLock()
    for _, c := range t.channelMap {
        chans = append(chans, c)
    }
    t.RUnlock()

    if len(chans) > 0 {
        memoryMsgChan = t.memoryMsgChan
        limitedMsgChan = t.limitedMsgChan
        backendChan = t.backend.ReadChan()
    }

    for {
        select {
        //接收消息
        case msg = <-memoryMsgChan:
            if msg == nil {
                continue
            }
        ...
        //更新channel
        case <-t.channelUpdateChan:
            chans = chans[:0]
            t.RLock()
            for _, c := range t.channelMap {
                chans = append(chans, c)
            }
            t.RUnlock()
            if len(chans) == 0 || t.IsPaused() {
                memoryMsgChan = nil
                backendChan = nil
            } else {
                limitedMsgChan = t.limitedMsgChan
                memoryMsgChan = t.memoryMsgChan
                backendChan = t.backend.ReadChan()
            }
            continue
        ...
        case <-t.exitChan:
            goto exit
        }

        if msg == nil {
            continue
        }

        //将消息发送到所有channel
        for i, channel := range chans {
            ...
            chanMsg := msg
            //为每一个channel单独复制一份数据
            if i > 0 {
                chanMsg = NewMessage(msg.ID,msg.MsgType, msg.Body)
                chanMsg.Timestamp = msg.Timestamp
                chanMsg.deferred = msg.deferred
            }
            ...
            //将消息存储到channel
            //此处的PutMessage与Topic的同名方法类似,也是将消息写到channel的memoryMsgChan
            err := channel.PutMessage(chanMsg)
            if err != nil {
                t.ctx.nsqd.logf(LOG_ERROR,
                    "TOPIC(%s) ERROR: failed to put msg(%s) to channel(%s) - %s",
                    t.name, msg.ID, channel.name, err)
            }
        }
    }

exit:
    t.ctx.nsqd.logf(LOG_INFO, "TOPIC(%s): closing ... messagePump", t.name)
}

//PutMessage会调用到c.put(msg)
func (c *Channel) put(m *Message) (succ bool, err error) {
    succ = true
    if c.putMode == PUTMODE_NORMAL {
        select {
        //将数据发送到Channel的memoryMsgChan
        case c.memoryMsgChan <- m:
        default:
            ...
        }
    }else{
    ...
    }
}

数据push到消费者

回忆下前面介绍的两点:一是客户端连接时,会启动messagePump负责将Channel里面的消息取出来,并push给客户端;二是channel创建时,会将创建的channel告知client。

messagePump获取创建的这个channel,并从channel的memoryMsgChan接收消息,然后push给消费者。

func (p *protocolV2) messagePump(client *clientV2, startedChan chan bool) {
    ...

    //channel创建成功后,通过SubEventChan告知client
    subEventChan := client.SubEventChan

    for {
        if subChannel == nil || !client.IsReadyForMessages() {
            ...
        } else if flushed {
            ...
        } else {
            // 获取channel的memoryMsgChan
            memoryMsgChan = subChannel.memoryMsgChan
            ...
        }

        select {
        ...
        case subChannel = <-subEventChan:
            p.ctx.nsqd.logf(LOG_INFO, "get subEventChan:%+v", subChannel)
            // you can't SUB anymore
            subEventChan = nil


        ...
        //从memoryMsgChan里接收消息,并push给客户端
        case msg := <-memoryMsgChan:
            if msg == nil{
                continue
            }

            if sampleRate > 0 && rand.Int31n(100) > sampleRate {
                continue
            }
            msg.Attempts++

            //将消息放入in-flight队列
            subChannel.StartInFlightTimeout(msg, client.ID, msgTimeout)
            client.SendingMessage()
            p.ctx.nsqd.logf(LOG_INFO, "get memory msg:%+v", msg)
            //将消息push给消费者
            err = p.SendMessage(client, msg, &buf)
            if err != nil {
                goto exit
            }
            flushed = false
        ...
        case <-client.ExitChan:
            goto exit
        }
    }

exit:
    ...
}

in-flight数据与deferred数据处理

回一下前面讲到两点:一是queueScanLoop函数会处理in-flight数据与deferred数据;二是消息push给消费者之前会调用StartInFlightTimeout将该消息放入in-flight队列。

queueScanLoop管理一个queueScanWorker pool(默认大小为4),各个worker并发处理channel数据。

in-flight数据的存储与清理

in-flight数据存储时会记录下该消息的到期时间,以便到期后将该消息重新push给消费者。

func (c *Channel) StartInFlightTimeout(msg *Message, clientID int64, timeout time.Duration) error {
    now := time.Now()
    msg.clientID = clientID
    msg.deliveryTS = now
    //存储到期时间
    msg.pri = now.Add(timeout).UnixNano()
    err := c.pushInFlightMessage(msg)
    if err != nil {
        return err
    }
    c.addToInFlightPQ(msg)
    return nil
}

如果消费者成功接收,则会回应一个"FIN",nsqd收到"FIN"则将该消息从in-flight队列中清除。

func (p *protocolV2) FIN(client *clientV2, params [][]byte) ([]byte, error) {
    ...

    id, err := getMessageID(params[1])
    if err != nil {
        return nil, protocol.NewFatalClientErr(nil, "E_INVALID", err.Error())
    }
    
    //将该消息从in-flight队列中清除
    err = client.Channel.FinishMessage(client.ID, *id)
    if err != nil {
        return nil, protocol.NewClientErr(err, "E_FIN_FAILED",
            fmt.Sprintf("FIN %s failed %s", *id, err.Error()))
    }

    client.FinishedMessage()

    return nil, nil
}

defered数据的存储与清理

如果消费者收到消息以后,如果一时间自己处理不过来,可以通过"REQ"将该消息重新入队,并可以设定多长时间后重新消费,时间为0的话则立即消费,否则延迟消费。

延迟消费的处理方式与in-flight数据类似,也是先写入到一个队列,并设定到期时间,等待重新读取。

下面介绍这两部分数据时如何重新消费的,主要是queueScanLoop的处理逻辑。

worker的创建与销毁

worker的创建与销毁是在resizePool函数。

worker的完美个数为channel总数的四分之一,但是不能大于QueueScanWorkerPoolMax。

1 <= pool <= min(num * 0.25, QueueScanWorkerPoolMax)

所有的worker都会监听同一个workCh、closeCh,如果worker过多,则只需要向closeCh写入一个“通知”,收到这个“通知”的worker就会被销毁。

一次for循环只创建或销毁一个worker,直至worker数目达到idealPoolSize。

func (n *NSQD) resizePool(num int, workCh chan *Channel, responseCh chan bool, closeCh chan int) {
    idealPoolSize := int(float64(num) * 0.25)
    if idealPoolSize < 1 {
        idealPoolSize = 1
    } else if idealPoolSize > n.getOpts().QueueScanWorkerPoolMax {
        idealPoolSize = n.getOpts().QueueScanWorkerPoolMax
    }
    for {
        if idealPoolSize == n.poolSize {
            break
        } else if idealPoolSize < n.poolSize {
            // contract
            closeCh <- 1
            n.poolSize--
        } else {
            // expand
            n.waitGroup.Wrap(func() {
                n.queueScanWorker(workCh, responseCh, closeCh)
            })
            n.poolSize++
        }
    }
}

channel的选择

queueScanLoop的处理方法模仿了Redis的概率到期算法(probabilistic expiration algorithm):每过一个QueueScanInterval(默认100ms)间隔,进行一次概率选择,从所有的channel缓存中随机选择QueueScanSelectionCount(默认20)个channel,如果某个被选中channel的任何一个queue有事可做,则认为该channel为“脏”channel。如果被选中channel中“脏”channel的比例大于QueueScanDirtyPercent(默认25%),则不投入睡眠,直接进行下一次概率选择。

channel缓存每QueueScanRefreshInterval(默认5s)刷新一次。

queueScanLoop与worker的交互

queueScanLoop与worker之间通过workCh与responseCh来进行交互。

  • workCh:queueScanLoop随机选择一定数目的channel后,通过workCh告诉worker。
  • responseCh:worker处理完成后,通过responseCh反馈该channel是否为“脏”。
func (n *NSQD) queueScanLoop() {
    workCh := make(chan *Channel, n.getOpts().QueueScanSelectionCount)
    responseCh := make(chan bool, n.getOpts().QueueScanSelectionCount)
    closeCh := make(chan int)

    workTicker := time.NewTicker(n.getOpts().QueueScanInterval)
    refreshTicker := time.NewTicker(n.getOpts().QueueScanRefreshInterval)

    //根据channel数目,创建worker
    channels := n.channels()
    n.resizePool(len(channels), workCh, responseCh, closeCh)

    for {
        select {
        case <-workTicker.C:
            if len(channels) == 0 {
                continue
            }
        case <-refreshTicker.C:
            //更新channel缓存,并据此创建或者销毁worker
            channels = n.channels()
            n.resizePool(len(channels), workCh, responseCh, closeCh)
            continue
        case <-n.exitChan:
            goto exit
        }

        //workTicker到期,且channels长度不为0时,会走到这里。
        num := n.getOpts().QueueScanSelectionCount
        if num > len(channels) {
            num = len(channels)
        }

    loop:
        //随机选择num个channel,并传入workCh
        for _, i := range util.UniqRands(num, len(channels)) {
            workCh <- channels[i]
        }

        //等待这num个channel的处理结果(是否为“脏”channel)
        numDirty := 0
        for i := 0; i < num; i++ {
            if <-responseCh {
                numDirty++
            }
        }

        //如果“脏”channel达到一定比例,直接进行下次处理
        if float64(numDirty)/float64(num) > n.getOpts().QueueScanDirtyPercent {
            goto loop
        }
    }

exit:
    n.logf(LOG_INFO, "QUEUESCAN: closing")
    close(closeCh)
    workTicker.Stop()
    refreshTicker.Stop()
}

worker处理

worker从queueScanLoop接收需要处理的channel,处理该channel的in-flight数据与deferred数据。processInFlightQueue与processDeferredQueue函数都会调用c.put(msg),将数据发送到Channel的memoryMsgChan,进而重新被push到消费者。

func (n *NSQD) queueScanWorker(workCh chan *Channel, responseCh chan bool, closeCh chan int) {
    for {
        select {
        case c := <-workCh:
            now := time.Now().UnixNano()
            dirty := false
            //处理in-flight消息
            if c.processInFlightQueue(now) {
                dirty = true
            }
            //处理defered消息
            if c.processDeferredQueue(now) {
                dirty = true
            }
            responseCh <- dirty
        case <-closeCh:
            return
        }
    }
}
func (c *Channel) processInFlightQueue(t int64) bool {
    c.exitMutex.RLock()
    defer c.exitMutex.RUnlock()

    if c.Exiting() {
        return false
    }

    dirty := false
    for {
        c.inFlightMutex.Lock()
        //获取超时的消息
        msg, _ := c.inFlightPQ.PeekAndShift(t)
        c.inFlightMutex.Unlock()

        if msg == nil {
            goto exit
        }
        dirty = true
        
        //判断该消息是否属于这个client
        _, err := c.popInFlightMessage(msg.clientID, msg.ID)
        if err != nil {
            goto exit
        }
        atomic.AddUint64(&c.timeoutCount, 1)
        c.RLock()
        client, ok := c.clients[msg.clientID]
        c.RUnlock()
        if ok {
            client.TimedOutMessage()
        }
        //将消息重新写入channel
        c.put(msg)
    }

exit:
    return dirty
}

processDeferredQueue的处理与此类似。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK