

NSQ v0.1.5 源码分析
以下为 快照 页面,建议前往来源网站查看,会有更好的阅读体验。
原文链接: https://www.tuicool.com/articles/EVNraqz
NSQ 是实时的分布式消息处理平台,其设计的目的是用来大规模地处理每天数以十亿计级别的消息。NSQ 具有分布式和去中心化拓扑结构,该结构具有无单点故障、故障容错、高可用性以及能够保证消息的可靠传递的特征,是一个成熟的、已在大规模生成环境下应用的产品。
源码地址:https://github.com/nsqio/nsq
对于一个大型的项目来讲,我个人的学习习惯于从最小版本开始学起。这是因为,在一个项目最初的时候,大体功能和架构都已经成形,最初的版本,一般来说,代码量都较少,功能集最小。学习曲线低,并且又最初版本,慢慢往高版本过渡,也能更了解项目进化的过程,也是一个学习的过程。
并且在实际使用过程中,大多数情况下,我们可能不需要那么多的功能集,并且需要根据实际情况做一些二次开发,此时的话,也许低版本的会更贴近实际使用场景和二次开发场景。
开源代码学习-nsq(v0.1.1版本)源码分析 中已经对v0.1.1版本进行了分析。其是一个单机版本,但发布订阅功能是已经实现。
本文对v0.1.5版本进行分析。此版本已经实现了分布式,分成了两个部分你nsqd:用于做订阅发布。nsqlookupd:用于做topic服务注册与发现。
这种模式就类似于微服务中的模式,增加了一个服务注册和发现模块。
每个单机nsqd用于存储topic,并提供订阅与发布功能。增加的nsqlookupd,则是用于多个单机进行注册服务,并对消费者提供查找对应topic所在的nsqd服务器。
这是一种非常简单的分布式架构。
那么看上传代码日志。分别为v0.1.1版本所在位置和v0.1.5所在位置。
那么看v0.1.5下代码目录:
代码量相对比v0.1.1版本多了好多文件。
分两个大模块开始分解:
nsqd
还是从main函数开始
github.com/nsqio/nsq/nsqd/main.go
104:初始化NSQD
105:真正入口nsqd.Main()
github.com/nsqio/nsq/nsqd/nsqd.go
初始化,并开启了一个协程idPump,这个协程用于全局生成guid的。
这个协程并非主要流程中的,就不做解释。这里对应0.1.1版本中的
uuidFactory
这里为真正的main入口。
46:相对比0.1.1版本多了一个 LookupRouter 协程处理。
53:tcpserver
60:httpserver
在nsqd中提供了一个接口, GetTopic 。少了 topicFactory
下面各个模块分析:
topic
github.com/nsqio/nsq/nsqd/topic.go
与0.1.1版本中,最大的差别就是多了 读写锁 。在0.1.1版本中,所有的操作都是在Router中,通过chan的方式,然后在单协程中处理所有请求来实现同步。
在0.1.5版本中,增加了读写锁,提供更大的并发操作。
初始化,然后开启了Router协程。
这里的操作没什么区别。
那么对于topic来说分为几个操作:
1、GetChannel
58-59:读写锁的操作
61:查找channel
63:未查找到的,生成新的channel
67:每个channel开启了一个MessagePump协程操作。这里用了sync.Once操作。
2、MessagePump 信息分发
89:从命名来看修改成了 memorMsgchan ,从此chan中获取msg
90:从backend缓冲区中获取msg
91:解码
100-108:通过加锁操作实现并发,遍历所有的channel,然后将msg分发给channel中。
3、PutMessage
这里的与0.1.1版本中一样,通过incomingMessageChan,发送给Router中处理
4、Router
由于一些操作通过提供加锁的函数,使得Router简洁了非常多。
Router中只剩下对信息发布的一个操作
119:获取incomingMessagechan信息
121:写入memoryMsgChan中
123:若memoryMsgChan阻塞,则写入backend中。
小结:
topic的代码逻辑更简洁了。msg写入,分发,也都很清晰了。通过加锁的操作来实现并发的操作。
channel
github.com/nsqio/nsq/nsqd/channel.go
channel也同样增加了读写锁,为的是将之前的单协程的Router操作,分解为了可以多并发操作。
在NewChannel中,初始化之后,开启了多个协程。
其中最重要的有router,messagePump两个协程。
重要的操作有:
1、AddClient
addclient,为外部消费者来进行消费channel。这个是一个很重要的操作
这里采用了加锁的操作,来实现并发的同步
2、PutMessage 从topic中分发msg到channel中
3、router 收到topic分发的msg后的操作
处理挺简单的,监听incomingMessageChan,将msg发送到memoryMsgChan中,若其阻塞则放到backend缓冲区中。
4、messagePump 将channel中的信息分发给client
244:从memoryMsgChan中读取msg
245:从backend缓冲区中读取msg
257: 这里是向client提供msg分发的地方,是通过clientMessageChan的方式
小结:
与topic代码一样,channel的代码逻辑也简洁了很多。函数功能也单一了。
tcpserver
github.com/nsqio/nsq/nsqd/tcp.go
入口为client.Handle
server_client
github.com/nsqio/nsq/nsq/server_client.go
server_client是一个基础的类
Handle中
83:读取了传入的protocols
90:调用了protocol的IOLoop
server_protocol_v2
github.com/nsqio/nsq/nsqd/server_protocol_v2.go
在init中进行protocol的初始化
IOLoop中真正的处理地方为
43:不断读取line
55: ProtocolExecute ,为真正处理入口
75:client的write,将Response返回
github.com/nsqio/nsq/nsq/protocol.go
这里的操作与0.1.1版本类似,通过反射查找到对应操作。
回到github.com/nsqio/nsq/nsqd/server_protocol_v2.go中
SUB订阅
关键操作:
198:获取topic
199:获取channel
200:addclient,将client加入到channel中
205:开启了PushMessages协程。这里的操作和0.1.1版本中不一样。
在0.1.1版本中是要调用get操作获取message,而在这个版本中是直接提供了pushMessage一个协程操作。
PushMessage
在channel中,就有说过提供的接口chan,clientMessageChan。每个client都会在PushMessage协程中,监听对应channel的clientMessageChan,来获取msg。
126:监听clientMessageChan
134:msg encode
144:调用client的write将msg发送出去。
小结:
tcp监听,生成client,再通过protocol与channel链接起来。
httpserver
github.com/nsqio/nsq/nsqd/http.go
相对比0.1.1中,增加了很多的handle
pingHandler
不解释
putHandler
这里就是发布msg的对外接口
72:获取topic
73:生成msg
74:调用topic的接口PutMessage
mputHandler
另外提供了一个多msg的发布接口
不同之处在:
100-103:msg的分割,再将msg一个一个调用topic的PutMessage。
lookup
github.com/nsqio/nsq/nsqd/lookup.go
lookup模块是nsqd与nsqlookupd通信的模块
66-76:解析了所有lookup服务器,并防止lookupPeers中
在此协程中提供了以下操作:
86-91:15秒定时的操作,定对所有lookup服务器进行ping操作
92-98:进行channel的一个announce操作,通知lookup服务器新增channel
99-105:进行topic的一个announce操作,通知lookup服务器新增topic
106-119:进行所有的topic和channel的同步
总结:
nsqd中的模块越来越清晰简洁。
nsqlookupd
新增加的模块,在文章开头已经讲过其模块功能。
直接看源码
github.com/nsqio/nsq/nsqlookupd/nsqlookupd.go
main函数中,开启了两个模块
76:tcpserver
82:httpserver
tcpserver
github.com/nsqio/nsq/nsqlookupd/tcp.go
入口为Handle,这个已经成了标准操作了
github.com/nsqio/nsq/nsqlookupd/server_lookup_protocol_v1.go
协议注册
这个也是标准操作
入口是
32:初始化了一个 safe_map ,用于存储信息
47:protocolExecute
提供的操作有
ping
这里的ping还是很简单
announce
86:获取safe_map
88:将announce信息,存放到safe_map中
小结:
tcpserver提供给nsqd用于ping和announce操作
httpserver
github.com/nsqio/nsq/nsqlookupd/http.go
对消费者提供了两个handle
1、pingHandler
比较简单,不做解释
2、lookupHandler
提供给消费者,用于查找对应topic的信息,包括channel,所在的nsqd的ip和port
37:读取topicname
43:从safe map中查找到对应信息
50-54:信息封装
63:Response返回
总结:
nsqlookupd的功能定位简单,代码量也少,模块也清晰。
全文总结:
本文通过增加了一个nsqlookupd服务,来实现了一个分布式的消息中间件。
消费者,先从nsqlookupd,查找获取到对应topic所在的nsqd的服务ip和port。再与对应的nsqd服务进行发布与订阅操作。
nsqd则向nsqlookup进行topic与channel信息的announce操作。
此种分布式的操作,简单,直接。 后续版本,待有时间的时候,再做分析。
龚浩华
月牙寂道长
qq:29185807
2019年05月09日
如果你觉得本文对你有帮助,可以转到你的朋友圈,让更多人一起学习。
第一时间获取文章,可以关注本人公众号:月牙寂道长,也可以扫码关注
猜你喜欢
-
2
NSQ源码分析 简单的翻了一下NSQ的源码,看看它是怎么实现的。我首先是从nsqtail开始看的,先从简单的入手。之后我看了nsqlookupd和nsqd。本文 只讲nsqd。 首先从nsqd的入口文件看起 apps/nsqd/main.go: func m...
-
24
看完lookupd和nsqd之后我们再来看下nsq client端的代码。 我是想把nsq系统完完整整的看一遍,从而对他形成一个更整体的 认识。对message queue来说他的client端就是生产者和消费者,生产者负责想nsq中投递消息,消费者负责...
-
29
看完了nsqlookupd我们继续往下看, nsqd才是他的核心. 里面大量的使用到了go channel, 相信看完之后对你学习go有很大的帮助.相较于lookupd部分无论在代码逻辑和实现上都要复杂很多. 不过基本的代码结构基本上都是一样的, 进...
-
47
为什么选择nsq 之前一直在用erlang做电信产品的开发,对erlang的一些生态也比较了解,和erlang相关的产品在互联网公司使用最多的应该就是rabbitmq了,也许很多人听说过erlang就是因为他们公司在使用rabbitmq。在之前也看过一点...
-
64
nsq是一个实时分布式的消息队列平台。 核心部分是一个叫nsqd的模块,它负责接收和转发消息。同时在go-nsq的包中,提供了consumer和producer的核心接口。在读nsq源码的时候,很好奇它的数据是怎么从producer给到了consumer的,于...
-
42
php-nsq php-nsq 是nsq的php客户端,采用c扩展编写,性能和稳定性。 安装 : 请提前安装libevent Dependencies: libevent (apt-get install libevent-dev ,yum install libevent-devel) 1....
-
35
-
77
目前,全新的异步任务服务每天高效稳定的为唱吧提供数亿次的调用。服务器团队用全新的方式重新定义了异步任务实现方式,以为云计算而生的NSQ、成熟的PHP执行者PHP-FPM、自主开发的中间件NSQProxy以及admin管理后台共同组成了异步任务的队...
-
62
Nsq vs Kafka 正如之前说的,Nsq是一款极简的消息中间件,通过学习Nsq,我们可以通过对比的方式,学习其他的Mq。 这一节,就让我们在对比中,学习另一种Mq,Kafka,在对比中,加深对Mq的理解。 首先,先放上...
-
64
上篇文章已经了解了消息中间件相关的知识,这篇文章学习一下Golang语言编写的知名消息中间件 NSQ 。 nsq最初是由bitly公司开源出来的一款简单易用的消息中间件,它可用于大规模系...
关于极客头条
聚合每日国内外有价值,有趣的链接。