13

高性能消息中间件 NSQ 解析-nsqlookupd 实现细节介绍

 3 years ago
source link: http://blueskykong.com/2021/04/05/nsq-4/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

高性能消息中间件 NSQ 解析-nsqlookupd 实现细节介绍

我们在 前面 介绍了 nsq 的相关概念以及 nsq 的安装与应用以及 nsqd 的实现原理。本篇将会结合源码介绍 nsqlookupd 的实现细节。

nsqlookupd 主要流程与上一篇文章介绍的 nsqd 执行逻辑相似,区别在于具体运行的任务不同。

在 nsq/apps/nsqlookupd/main.go 可以找到执行入口文件。

// 位于apps/nsqlookupd/main.go:45
func main() {
prg := &program{}
if err := svc.Run(prg, syscall.SIGINT, syscall.SIGTERM); err != nil {
logFatal("%s", err)
}
}

func (p *program) Init(env svc.Environment) error {
if env.IsWindowsService() {
dir := filepath.Dir(os.Args[0])
return os.Chdir(dir)
}
return nil
}

func (p *program) Start() error {
opts := nsqlookupd.NewOptions()

flagSet := nsqlookupdFlagSet(opts)
...
}

同样,通过第三方 svc 包进行优雅的后台进程管理,svc.Run() -> svc.Init() -> svc.Start(),启动 nsqlookupd 实例。

// 位于 apps/nsqlookupd/main.go:80
options.Resolve(opts, flagSet, cfg)
nsqlookupd, err := nsqlookupd.New(opts)
if err != nil {
logFatal("failed to instantiate nsqlookupd", err)
}
p.nsqlookupd = nsqlookupd

go func() {
err := p.nsqlookupd.Main()
if err != nil {
p.Stop()
os.Exit(1)
}
}()

初始化配置参数(优先级:flagSet-命令行参数 > cfg-配置文件 > opts-默认值),开启协程,进入 nsqlookupd.Main() 主函数。

我们来看下 nsqlookupd 是如何监听请求的,代码实现如下:

// 位于 nsqlookupd/nsqlookupd.go:53
func (l *NSQLookupd) Main() error {
ctx := &Context{l}

exitCh := make(chan error)
var once sync.Once
exitFunc := func(err error) {
once.Do(func() {
if err != nil {
l.logf(LOG_FATAL, "%s", err)
}
exitCh <- err
})
}

tcpServer := &tcpServer{ctx: ctx}
l.waitGroup.Wrap(func() {
exitFunc(protocol.TCPServer(l.tcpListener, tcpServer, l.logf))
})
httpServer := newHTTPServer(ctx)
l.waitGroup.Wrap(func() {
exitFunc(http_api.Serve(l.httpListener, httpServer, "HTTP", l.logf))
})

err := <-exitCh
return err
}

开启 goroutine 执行 tcpServer, httpServer,分别监听 nsqd, nsqadmin 的客户端请求。

// 位于 internal/protocol/tcp_server.go:17
func TCPServer(listener net.Listener, handler TCPHandler, logf lg.AppLogFunc) error {
logf(lg.INFO, "TCP: listening on %s", listener.Addr())

for {
clientConn, err := listener.Accept()
if err != nil {
if nerr, ok := err.(net.Error); ok && nerr.Temporary() {
logf(lg.WARN, "temporary Accept() failure - %s", err)
runtime.Gosched()
continue
}
// theres no direct way to detect this error because it is not exposed
if !strings.Contains(err.Error(), "use of closed network connection") {
return fmt.Errorf("listener.Accept() error - %s", err)
}
break
}
go handler.Handle(clientConn)
}

logf(lg.INFO, "TCP: closing %s", listener.Addr())

return nil
}

TCPServer 循环监听客户端请求,建立长连接进行通信,并开启 handler 处理每一个客户端 conn。

装饰 http 路由

httpServer 通过 http_api.Decorate 装饰器实现对各 http 路由进行 handler 装饰,如加 log 日志、V1 协议版本号的统一格式输出等;

func newHTTPServer(ctx *Context) *httpServer {
log := http_api.Log(ctx.nsqlookupd.logf)

router := httprouter.New()
router.HandleMethodNotAllowed = true
router.PanicHandler = http_api.LogPanicHandler(ctx.nsqlookupd.logf)
router.NotFound = http_api.LogNotFoundHandler(ctx.nsqlookupd.logf)
router.MethodNotAllowed = http_api.LogMethodNotAllowedHandler(ctx.nsqlookupd.logf)
s := &httpServer{
ctx: ctx,
router: router,
}

router.Handle("GET", "/ping", http_api.Decorate(s.pingHandler, log, http_api.PlainText))
router.Handle("GET", "/info", http_api.Decorate(s.doInfo, log, http_api.V1))

// v1 negotiate
router.Handle("GET", "/debug", http_api.Decorate(s.doDebug, log, http_api.V1))
router.Handle("GET", "/lookup", http_api.Decorate(s.doLookup, log, http_api.V1))
router.Handle("GET", "/topics", http_api.Decorate(s.doTopics, log, http_api.V1))
router.Handle("GET", "/channels", http_api.Decorate(s.doChannels, log, http_api.V1))
}

处理客户端命令

tcp 解析 V1 协议,内部协议封装的 prot.IOLoop(conn) 进行循环处理客户端命令,直到客户端命令全部解析处理完毕才关闭连接。

var prot protocol.Protocol
switch protocolMagic {
case " V1":
prot = &LookupProtocolV1{ctx: p.ctx}
default:
protocol.SendResponse(clientConn, []byte("E_BAD_PROTOCOL"))
clientConn.Close()
p.ctx.nsqlookupd.logf(LOG_ERROR, "client(%s) bad protocol magic '%s'",
clientConn.RemoteAddr(), protocolMagic)
return
}

err = prot.IOLoop(clientConn)

通过内部协议进行 p.Exec(执行命令)、p.SendResponse(返回结果),保证每个 nsqd 节点都能正确的进行服务注册(register)与注销(unregister),并进行心跳检测(ping)节点的可用性,确保客户端取到的 nsqd 节点列表都是最新可用的。

for {
line, err = reader.ReadString('\n')
if err != nil {
break
}

line = strings.TrimSpace(line)
params := strings.Split(line, " ")

var response []byte
response, err = p.Exec(client, reader, params)
if err != nil {
ctx := ""
if parentErr := err.(protocol.ChildErr).Parent(); parentErr != nil {
ctx = " - " + parentErr.Error()
}
_, sendErr := protocol.SendResponse(client, []byte(err.Error()))
if sendErr != nil {
p.ctx.nsqlookupd.logf(LOG_ERROR, "[%s] - %s%s", client, sendErr, ctx)
break
}
continue
}

if response != nil {
_, err = protocol.SendResponse(client, response)
if err != nil {
break
}
}
}

conn.Close()

nsqlookupd 服务同时开启 tcp 和 http 两个监听服务,nsqd 会作为客户端,连上 nsqlookupd 的 tcp 服务,并上报自己的 topic 和 channel 信息,以及通过心跳机制判断 nsqd 状态;还有个 http 服务提供给 nsqadmin 获取集群信息。

本文主要介绍 nsqlookupd 的实现,nsqlookupd 同样是一个守护进程,负责管理拓扑信息。客户端通过查询 nsqlookupd 来发现指定话题( topic )的生产者,并且 nsqd 节点广播话题(topic)和通道( channel )信息。有两个接口: TCP 接口, nsqd 用它来广播。 HTTP 接口,客户端用它来发现和管理。

下一篇文章,将会继续介绍 nsq 中其他模块实现的细节。

高性能消息中间件 NSQ 解析-nsqd 实现细节介绍

高性能消息中间件 NSQ 解析-整体介绍

高性能消息中间件 NSQ 解析-应用实践

微服务架构中使用 ELK 进行日志采集以及统一处理

没有 try-catch,该如何处理 Go 错误异常?

订阅最新文章,欢迎关注我的公众号


Recommend

  • 39
    • dockone.io 6 years ago
    • Cache

    高性能消息中间件——NATS

    前 言 这段时间我的主要工作内容是将公司系统中使用的RabbitMQ替换成NATS,而此之前我对Nats一无所知。经过一段时间紧张的学习和开发之后我顺利的完成了任务,并对消息中间件有了更深的了解。在此感谢同事钟亮在此过程中对我的...

  • 67
    • studygolang.com 6 years ago
    • Cache

    NSQ源码-nsqlookupd

    为什么选择nsq 之前一直在用erlang做电信产品的开发,对erlang的一些生态也比较了解,和erlang相关的产品在互联网公司使用最多的应该就是rabbitmq了,也许很多人听说过erlang就是因为他们公司在使用rabbitmq。在之前也看过一点...

  • 31

    前言 在 2019年第五届 Gopher China 大会上,小米科技基础服务高级研发工程师徐成选做了题为《用 Go 构建高性能数据库中间件》的技术演讲,详细介绍了小米开源的数据库中间件 Gaea 的整体架构、内部模块...

  • 40

    在分布式数据库、云原生数据库、NewSQL 等名词在数据库领域层出不穷的当今,变革——在这个相对稳定的领域已愈加不可避免。相比于完全革新,渐进式增强的方案在拥有厚重沉淀的行业则更受青睐。 同所有分布式领域的解决方案相同,...

  • 18
    • 微信 mp.weixin.qq.com 5 years ago
    • Cache

    golang-nsq系列(三)--nsqlookupd源码解析

    上一篇介绍了 nsqd 的代码逻辑与流程图,本篇来解析 nsq 中另一大模块 nsqlookupd,其负责维护 nsqd 节点的拓扑结构信息,实现了去中心化的服务注册与发现。 1. nsqlookupd 执行入口 在 nsq/apps/nsqlookupd/m...

  • 6

    高性能消息中间件 NSQ 解析-窥探 nsq 设计思路(一)

  • 39

    高性能消息中间件 NSQ 解析-应用实践

  • 35

    高性能消息中间件 NSQ 解析-整体介绍

  • 5

    高性能消息中间件 NSQ 解析-nsqd 实现细节介绍

  • 7

    消息中间件该如何实现高可用架构 作者:中华石杉 2022-09-21 16:09:28 本文对消息中间件的集群高可用架构的探讨,是完全脱离于某个具体技术的,非常朴素的从本质的原理层面来讨论这个话题。

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK