47

NSQ源码-nsqlookupd

 2年前 阅读数 46
以下为 快照 页面,建议前往来源网站查看,会有更好的阅读体验。
原文链接: https://studygolang.com/articles/16573?amp%3Butm_medium=referral

为什么选择nsq

之前一直在用erlang做电信产品的开发,对erlang的一些生态也比较了解,和erlang相关的产品在互联网公司使用最多的应该就是rabbitmq了,也许很多人听说过erlang就是因为他们公司在使用rabbitmq。在之前也看过一点rabbitmq的代码,以及后来的emqtt都看过一点, 所以对消息队列这块是情有独钟。转到go后也在关注消息队列这块,nsq是一个golng的消息系统, 而且架构也非常的简单。所以想通过源码的学习来掌握一些语言技巧。

nsq的架构与代码结构

nsq的的话主要有三个模块构成, 这里直接复制官方的介绍:

nsqd: is the daemon that receives, queues, and delivers messages to clients.

nsqlookupd: is the daemon that manages topology information and provides an eventually consistent discovery service.

nsqadmin: is a web UI to introspect the cluster in realtime (and perform various administrative tasks).

1460000017216692?w=420&h=281

这里是一个消息投递的过程, 显示了消息怎么从nsqd到达consumer, 缺少了producer和nsqlookupd. nsqlookupd主要提供了两个功能:

  • 向nsqd提供一个topic和channel的注册信息
  • 对consumser提供了toic和channel的查询功能然后

consumer查询到nsqd之后就是上面看到的动态图了, consumer直接和nsqd通信, 下面是一个更全面一点的时序图

bVbko0q?w=868&h=378

整个项目的代码结构也是围绕上面的三个模块构建:

  • internal(公共部分的实现)
  • nsqadmin(对nsqadmin的时间)
  • nsqd(对nsqd的实现)
  • nsqlookupd(对nsqlookupd的实现)

总共也就这四个package,是不是有很想看下去的冲动(smile).

lookupd的启动流程

经过上面的介绍,我们对lookupd有里简单的认识.首先他是一个独立的进程, 为topic和channel的发现服务. 但不参与时间的消息投递. 对lookup的实现是在nsq/apps/nsqlookupd/nsqlookupd.go和nsq/nsqlookupd/中. lookupd的启动是使用了一个叫 go-srv 的windows wrapper.通过在nsq/apps/nsqlookupd/nsqlookupd.go中实现:

type Service interface {
    // Init is called before the program/service is started and after it's
    // determined if the program is running as a Windows Service.
    Init(Environment) error

    // Start is called after Init. This method must be non-blocking.
    Start() error

    // Stop is called in response to os.Interrupt, os.Kill, or when a
    // Windows Service is stopped.
    Stop() error
}

来完成整个进程的管理,go-srv帮助我们做了系统信号的管理, 下面来看下lookupd的启动流程,

实例化一个NSQLookupd对象

// apps/nsqlookupd/nsqlookupd.go
    daemon := nsqlookupd.New(opts)  // 实例化一个NSQLookupd的对象
    err := daemon.Main()            // 开始启动NSQLookupd
    
// nsq/nsqlookupd/nsqlookupd.go
func New(opts *Options) *NSQLookupd {
     ....
    n := &NSQLookupd{
        opts: opts,    // 启动参数
        DB:   NewRegistrationDB(), // 内从里面的一个数据库,主要用来存储tpoic/channel以及nsqd的消息
    }
    ...
    return n
}

开始启动

// Main starts an instance of nsqlookupd and returns an
// error if there was a problem starting up.
func (l *NSQLookupd) Main() error {
    ctx := &Context{l}

    // 启动两场go routine来处理tcp/http的请求
    tcpListener, err := net.Listen("tcp", l.opts.TCPAddress)
    if err != nil {
        return fmt.Errorf("listen (%s) failed - %s", l.opts.TCPAddress, err)
    }
    httpListener, err := net.Listen("tcp", l.opts.HTTPAddress)
    if err != nil {
        return fmt.Errorf("listen (%s) failed - %s", l.opts.TCPAddress, err)
    }

    l.tcpListener = tcpListener
    l.httpListener = httpListener

    tcpServer := &tcpServer{ctx: ctx}
    l.waitGroup.Wrap(func() {
        protocol.TCPServer(tcpListener, tcpServer, l.logf)
    })
    httpServer := newHTTPServer(ctx)
    l.waitGroup.Wrap(func() {
        http_api.Serve(httpListener, httpServer, "HTTP", l.logf)
    })

    return nil
}

下面是一个lookupd里面的进程模型

bVbko0r?w=1009&h=579

lookupd里的主要数据结构

在上面创建一个instance的时候我们看到创建一个NewRegistrationDB()的函数, 这里就是存储lookupd所有数据结构的地方.

bVbko0F?w=540&h=486

每个topic/channe/clientl就是一个Registration的key, 然后value对应的就是该topic/channel对应的nsqd信息.所有的接口都是在操作上面的那个数据结构.

lookupd和其他模块的交互

在进程模型中我们看到一个tcp server和一个http seerver, 和其他模块之间的交互都是在里面完成的.看下tcp server的处理

有新的tcp连接进来,创建一个新的go routine去服务该请求

// /nsq/internal/tcp_server.go
func TCPServer(listener net.Listener, handler TCPHandler, logf lg.AppLogFunc) {
    for {
        ...
        go handler.Handle(clientConn)
    }

实例化一个protocol对象

// /nsq/nsqlookupd/tcp.go
func (p *tcpServer) Handle(clientConn net.Conn) {
    ...
    prot.IOLoop(clientConn)
    ...
}

对请求的具体处理

// /nsq/nsqlookupd/lookup_protocol_v1.go
func (p *LookupProtocolV1) IOLoop(conn net.Conn) error {
    ...
    p.Exec(client, reader, params)
    ...
}

// /nsq/nsqlookupd/lookup_protocol_v1.go
func (p *LookupProtocolV1) Exec(client *ClientV1, reader *bufio.Reader, params []string) ([]byte, error) {
    switch params[0] {
    case "PING": // NSQD的心跳包
        return p.PING(client, params)
    case "IDENTIFY": // NQSD启动时候的indentify就是我们上面看到的peerInfo
        return p.IDENTIFY(client, reader, params[1:])
    case "REGISTER": // 注册topic/channel信息到lookupd
        return p.REGISTER(client, reader, params[1:])
    case "UNREGISTER": // unregister topic/lookup 信息
        return p.UNREGISTER(client, reader, params[1:])
    }
    return nil, protocol.NewFatalClientErr(nil, "E_INVALID", fmt.Sprintf("invalid command %s", params[0]))
}

上面就是整个tcp server的流程, 每个连接都是一个go routine. 相对tcp server来说的话http server就简单很多, 如果你对httprouter熟悉的话就更简单了就是对RegistrationDB的增删查改. http测的api的话可以参考:

官方的文档

总结

lookupd是其中比较简单的模块,通过源码的学习我们可以更好的掌握go的一些技巧,也鼓励大家通过一一些开源的代码来掌握语言的一些技巧。其实通过lookupd我们可以抽象一套自己的HTTP/TCP服务端架构来。


猜你喜欢

  • 13
    • 微信 mp.weixin.qq.com 1年前
    • 快照

    golang-nsq系列(三)--nsqlookupd源码解析

    上一篇介绍了 nsqd 的代码逻辑与流程图,本篇来解析 nsq 中另一大模块 nsqlookupd,其负责维护 nsqd 节点的拓扑结构信息,实现了去中心化的服务注册与发现。 1. nsqlookupd 执行入口 在 nsq/apps/nsqlookupd/m...

  • 24
    • studygolang.com 2年前
    • 快照

    NSQ源码-Nsq客户端

    看完lookupd和nsqd之后我们再来看下nsq client端的代码。 我是想把nsq系统完完整整的看一遍,从而对他形成一个更整体的 认识。对message queue来说他的client端就是生产者和消费者,生产者负责想nsq中投递消息,消费者负责...

  • 29
    • studygolang.com 2年前
    • 快照

    NSQ源码-NSQD

    看完了nsqlookupd我们继续往下看, nsqd才是他的核心. 里面大量的使用到了go channel, 相信看完之后对你学习go有很大的帮助.相较于lookupd部分无论在代码逻辑和实现上都要复杂很多. 不过基本的代码结构基本上都是一样的, 进...

  • 46
    • www.tuicool.com 1年前
    • 快照

    NSQ v0.1.5 源码分析

    NSQ 是实时的分布式消息处理平台,其设计的目的是用来大规模地处理每天数以十亿计级别的消息。NSQ 具有分布式和去中心化拓扑结构,该结构具有无单点故障、故障容错、高可用性以及能够保证消息的可靠传递的特征,是一个成熟的、已在大规模...

  • 64
    • www.tuicool.com 1年前
    • 快照

    nsq源码review:go-nsq producer

    nsq是一个实时分布式的消息队列平台。 核心部分是一个叫nsqd的模块,它负责接收和转发消息。同时在go-nsq的包中,提供了consumer和producer的核心接口。在读nsq源码的时候,很好奇它的数据是怎么从producer给到了consumer的,于...

  • 2
    • jiajunhuang.com 1个月前
    • 快照

    NSQ源码分析

    NSQ源码分析 简单的翻了一下NSQ的源码,看看它是怎么实现的。我首先是从nsqtail开始看的,先从简单的入手。之后我看了nsqlookupd和nsqd。本文 只讲nsqd。 首先从nsqd的入口文件看起 apps/nsqd/main.go: func m...

  • 42

    php-nsq php-nsq 是nsq的php客户端,采用c扩展编写,性能和稳定性。 安装 : 请提前安装libevent Dependencies: libevent (apt-get install libevent-dev ,yum install libevent-devel) 1....

  • 35
    • 微信 mp.weixin.qq.com 2年前
    • 快照

    NSQ最佳实践

  • 77
    • 微信 mp.weixin.qq.com 2年前
    • 快照

    NSQ 最佳实践

    目前,全新的异步任务服务每天高效稳定的为唱吧提供数亿次的调用。服务器团队用全新的方式重新定义了异步任务实现方式,以为云计算而生的NSQ、成熟的PHP执行者PHP-FPM、自主开发的中间件NSQProxy以及admin管理后台共同组成了异步任务的队...

  • 62
    • bridgeforyou.cn 2年前
    • 快照

    MQ(6) —— Nsq vs Kafka

    Nsq vs Kafka 正如之前说的,Nsq是一款极简的消息中间件,通过学习Nsq,我们可以通过对比的方式,学习其他的Mq。 这一节,就让我们在对比中,学习另一种Mq,Kafka,在对比中,加深对Mq的理解。 首先,先放上...

关于极客头条


聚合每日国内外有价值,有趣的链接。

AD