64

NSQ源码-nsqlookupd

 5 years ago
source link: https://studygolang.com/articles/16573?amp%3Butm_medium=referral
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

为什么选择nsq

之前一直在用erlang做电信产品的开发,对erlang的一些生态也比较了解,和erlang相关的产品在互联网公司使用最多的应该就是rabbitmq了,也许很多人听说过erlang就是因为他们公司在使用rabbitmq。在之前也看过一点rabbitmq的代码,以及后来的emqtt都看过一点, 所以对消息队列这块是情有独钟。转到go后也在关注消息队列这块,nsq是一个golng的消息系统, 而且架构也非常的简单。所以想通过源码的学习来掌握一些语言技巧。

nsq的架构与代码结构

nsq的的话主要有三个模块构成, 这里直接复制官方的介绍:

nsqd: is the daemon that receives, queues, and delivers messages to clients.

nsqlookupd: is the daemon that manages topology information and provides an eventually consistent discovery service.

nsqadmin: is a web UI to introspect the cluster in realtime (and perform various administrative tasks).

1460000017216692?w=420&h=281

这里是一个消息投递的过程, 显示了消息怎么从nsqd到达consumer, 缺少了producer和nsqlookupd. nsqlookupd主要提供了两个功能:

  • 向nsqd提供一个topic和channel的注册信息
  • 对consumser提供了toic和channel的查询功能然后

consumer查询到nsqd之后就是上面看到的动态图了, consumer直接和nsqd通信, 下面是一个更全面一点的时序图

bVbko0q?w=868&h=378

整个项目的代码结构也是围绕上面的三个模块构建:

  • internal(公共部分的实现)
  • nsqadmin(对nsqadmin的时间)
  • nsqd(对nsqd的实现)
  • nsqlookupd(对nsqlookupd的实现)

总共也就这四个package,是不是有很想看下去的冲动(smile).

lookupd的启动流程

经过上面的介绍,我们对lookupd有里简单的认识.首先他是一个独立的进程, 为topic和channel的发现服务. 但不参与时间的消息投递. 对lookup的实现是在nsq/apps/nsqlookupd/nsqlookupd.go和nsq/nsqlookupd/中. lookupd的启动是使用了一个叫 go-srv 的windows wrapper.通过在nsq/apps/nsqlookupd/nsqlookupd.go中实现:

type Service interface {
    // Init is called before the program/service is started and after it's
    // determined if the program is running as a Windows Service.
    Init(Environment) error

    // Start is called after Init. This method must be non-blocking.
    Start() error

    // Stop is called in response to os.Interrupt, os.Kill, or when a
    // Windows Service is stopped.
    Stop() error
}

来完成整个进程的管理,go-srv帮助我们做了系统信号的管理, 下面来看下lookupd的启动流程,

实例化一个NSQLookupd对象

// apps/nsqlookupd/nsqlookupd.go
    daemon := nsqlookupd.New(opts)  // 实例化一个NSQLookupd的对象
    err := daemon.Main()            // 开始启动NSQLookupd
    
// nsq/nsqlookupd/nsqlookupd.go
func New(opts *Options) *NSQLookupd {
     ....
    n := &NSQLookupd{
        opts: opts,    // 启动参数
        DB:   NewRegistrationDB(), // 内从里面的一个数据库,主要用来存储tpoic/channel以及nsqd的消息
    }
    ...
    return n
}

开始启动

// Main starts an instance of nsqlookupd and returns an
// error if there was a problem starting up.
func (l *NSQLookupd) Main() error {
    ctx := &Context{l}

    // 启动两场go routine来处理tcp/http的请求
    tcpListener, err := net.Listen("tcp", l.opts.TCPAddress)
    if err != nil {
        return fmt.Errorf("listen (%s) failed - %s", l.opts.TCPAddress, err)
    }
    httpListener, err := net.Listen("tcp", l.opts.HTTPAddress)
    if err != nil {
        return fmt.Errorf("listen (%s) failed - %s", l.opts.TCPAddress, err)
    }

    l.tcpListener = tcpListener
    l.httpListener = httpListener

    tcpServer := &tcpServer{ctx: ctx}
    l.waitGroup.Wrap(func() {
        protocol.TCPServer(tcpListener, tcpServer, l.logf)
    })
    httpServer := newHTTPServer(ctx)
    l.waitGroup.Wrap(func() {
        http_api.Serve(httpListener, httpServer, "HTTP", l.logf)
    })

    return nil
}

下面是一个lookupd里面的进程模型

bVbko0r?w=1009&h=579

lookupd里的主要数据结构

在上面创建一个instance的时候我们看到创建一个NewRegistrationDB()的函数, 这里就是存储lookupd所有数据结构的地方.

bVbko0F?w=540&h=486

每个topic/channe/clientl就是一个Registration的key, 然后value对应的就是该topic/channel对应的nsqd信息.所有的接口都是在操作上面的那个数据结构.

lookupd和其他模块的交互

在进程模型中我们看到一个tcp server和一个http seerver, 和其他模块之间的交互都是在里面完成的.看下tcp server的处理

有新的tcp连接进来,创建一个新的go routine去服务该请求

// /nsq/internal/tcp_server.go
func TCPServer(listener net.Listener, handler TCPHandler, logf lg.AppLogFunc) {
    for {
        ...
        go handler.Handle(clientConn)
    }

实例化一个protocol对象

// /nsq/nsqlookupd/tcp.go
func (p *tcpServer) Handle(clientConn net.Conn) {
    ...
    prot.IOLoop(clientConn)
    ...
}

对请求的具体处理

// /nsq/nsqlookupd/lookup_protocol_v1.go
func (p *LookupProtocolV1) IOLoop(conn net.Conn) error {
    ...
    p.Exec(client, reader, params)
    ...
}

// /nsq/nsqlookupd/lookup_protocol_v1.go
func (p *LookupProtocolV1) Exec(client *ClientV1, reader *bufio.Reader, params []string) ([]byte, error) {
    switch params[0] {
    case "PING": // NSQD的心跳包
        return p.PING(client, params)
    case "IDENTIFY": // NQSD启动时候的indentify就是我们上面看到的peerInfo
        return p.IDENTIFY(client, reader, params[1:])
    case "REGISTER": // 注册topic/channel信息到lookupd
        return p.REGISTER(client, reader, params[1:])
    case "UNREGISTER": // unregister topic/lookup 信息
        return p.UNREGISTER(client, reader, params[1:])
    }
    return nil, protocol.NewFatalClientErr(nil, "E_INVALID", fmt.Sprintf("invalid command %s", params[0]))
}

上面就是整个tcp server的流程, 每个连接都是一个go routine. 相对tcp server来说的话http server就简单很多, 如果你对httprouter熟悉的话就更简单了就是对RegistrationDB的增删查改. http测的api的话可以参考:

官方的文档

总结

lookupd是其中比较简单的模块,通过源码的学习我们可以更好的掌握go的一些技巧,也鼓励大家通过一一些开源的代码来掌握语言的一些技巧。其实通过lookupd我们可以抽象一套自己的HTTP/TCP服务端架构来。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK