58

NSQ v0.1.5 源码分析

 4 years ago
source link: https://www.tuicool.com/articles/EVNraqz
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

NSQ 是实时的分布式消息处理平台,其设计的目的是用来大规模地处理每天数以十亿计级别的消息。NSQ 具有分布式和去中心化拓扑结构,该结构具有无单点故障、故障容错、高可用性以及能够保证消息的可靠传递的特征,是一个成熟的、已在大规模生成环境下应用的产品。

源码地址:https://github.com/nsqio/nsq

对于一个大型的项目来讲,我个人的学习习惯于从最小版本开始学起。这是因为,在一个项目最初的时候,大体功能和架构都已经成形,最初的版本,一般来说,代码量都较少,功能集最小。学习曲线低,并且又最初版本,慢慢往高版本过渡,也能更了解项目进化的过程,也是一个学习的过程。

并且在实际使用过程中,大多数情况下,我们可能不需要那么多的功能集,并且需要根据实际情况做一些二次开发,此时的话,也许低版本的会更贴近实际使用场景和二次开发场景。

开源代码学习-nsq(v0.1.1版本)源码分析 中已经对v0.1.1版本进行了分析。其是一个单机版本,但发布订阅功能是已经实现。

本文对v0.1.5版本进行分析。此版本已经实现了分布式,分成了两个部分你nsqd:用于做订阅发布。nsqlookupd:用于做topic服务注册与发现。

这种模式就类似于微服务中的模式,增加了一个服务注册和发现模块。

每个单机nsqd用于存储topic,并提供订阅与发布功能。增加的nsqlookupd,则是用于多个单机进行注册服务,并对消费者提供查找对应topic所在的nsqd服务器。

这是一种非常简单的分布式架构。

那么看上传代码日志。分别为v0.1.1版本所在位置和v0.1.5所在位置。

feUJBrA.jpg!web

那么看v0.1.5下代码目录:

IJfAfaR.jpg!web

代码量相对比v0.1.1版本多了好多文件。

分两个大模块开始分解:

nsqd

还是从main函数开始

github.com/nsqio/nsq/nsqd/main.go

ZFnQzaM.jpg!web

104:初始化NSQD

105:真正入口nsqd.Main()

github.com/nsqio/nsq/nsqd/nsqd.go

mAJZBnn.jpg!web

初始化,并开启了一个协程idPump,这个协程用于全局生成guid的。

FFJNviQ.jpg!web

这个协程并非主要流程中的,就不做解释。这里对应0.1.1版本中的

uuidFactory

3uEjuey.jpg!web

这里为真正的main入口。

46:相对比0.1.1版本多了一个 LookupRouter 协程处理。

53:tcpserver

60:httpserver

Yf2EraV.jpg!web

在nsqd中提供了一个接口, GetTopic 。少了 topicFactory

下面各个模块分析:

topic

github.com/nsqio/nsq/nsqd/topic.go

A3QJvqA.jpg!web

与0.1.1版本中,最大的差别就是多了 读写锁 。在0.1.1版本中,所有的操作都是在Router中,通过chan的方式,然后在单协程中处理所有请求来实现同步。

在0.1.5版本中,增加了读写锁,提供更大的并发操作。

QbABJfr.jpg!web

初始化,然后开启了Router协程。

这里的操作没什么区别。

那么对于topic来说分为几个操作:

1、GetChannel

IVnmMjA.jpg!web

58-59:读写锁的操作

61:查找channel

63:未查找到的,生成新的channel

67:每个channel开启了一个MessagePump协程操作。这里用了sync.Once操作。

2、MessagePump 信息分发

UJj22eR.jpg!web

89:从命名来看修改成了 memorMsgchan ,从此chan中获取msg

90:从backend缓冲区中获取msg

91:解码

100-108:通过加锁操作实现并发,遍历所有的channel,然后将msg分发给channel中。

3、PutMessage

UjeMbyE.jpg!web

这里的与0.1.1版本中一样,通过incomingMessageChan,发送给Router中处理

4、Router

fQnE3ey.jpg!web

由于一些操作通过提供加锁的函数,使得Router简洁了非常多。

Router中只剩下对信息发布的一个操作

119:获取incomingMessagechan信息

121:写入memoryMsgChan中

123:若memoryMsgChan阻塞,则写入backend中。

小结:

topic的代码逻辑更简洁了。msg写入,分发,也都很清晰了。通过加锁的操作来实现并发的操作。

channel

github.com/nsqio/nsq/nsqd/channel.go

iuU3EbV.jpg!web

channel也同样增加了读写锁,为的是将之前的单协程的Router操作,分解为了可以多并发操作。

EBVBvyn.jpg!web

在NewChannel中,初始化之后,开启了多个协程。

其中最重要的有router,messagePump两个协程。

重要的操作有:

1、AddClient

7BvAjii.jpg!web

addclient,为外部消费者来进行消费channel。这个是一个很重要的操作

这里采用了加锁的操作,来实现并发的同步

2、PutMessage 从topic中分发msg到channel中

b2qqMjZ.jpg!web

3、router 收到topic分发的msg后的操作

QRbyieZ.jpg!web

处理挺简单的,监听incomingMessageChan,将msg发送到memoryMsgChan中,若其阻塞则放到backend缓冲区中。

4、messagePump 将channel中的信息分发给client

INrqIvR.jpg!web

244:从memoryMsgChan中读取msg

245:从backend缓冲区中读取msg

257: 这里是向client提供msg分发的地方,是通过clientMessageChan的方式

小结:

与topic代码一样,channel的代码逻辑也简洁了很多。函数功能也单一了。

tcpserver

github.com/nsqio/nsq/nsqd/tcp.go

IjIBzqb.jpg!web

入口为client.Handle

server_client

github.com/nsqio/nsq/nsq/server_client.go

Eja26nR.jpg!web

server_client是一个基础的类

nEFFJf6.jpg!web

Handle中

83:读取了传入的protocols

90:调用了protocol的IOLoop

server_protocol_v2

github.com/nsqio/nsq/nsqd/server_protocol_v2.go

raI7BbQ.jpg!web

在init中进行protocol的初始化

V77VZvN.jpg!web

IOLoop中真正的处理地方为

43:不断读取line

55: ProtocolExecute ,为真正处理入口

75:client的write,将Response返回

github.com/nsqio/nsq/nsq/protocol.go

faQniuA.jpg!web

这里的操作与0.1.1版本类似,通过反射查找到对应操作。

回到github.com/nsqio/nsq/nsqd/server_protocol_v2.go中

SUB订阅

BzuQnm3.jpg!web

关键操作:

198:获取topic

199:获取channel

200:addclient,将client加入到channel中

205:开启了PushMessages协程。这里的操作和0.1.1版本中不一样。

在0.1.1版本中是要调用get操作获取message,而在这个版本中是直接提供了pushMessage一个协程操作。

PushMessage

ryMb2eF.jpg!web

在channel中,就有说过提供的接口chan,clientMessageChan。每个client都会在PushMessage协程中,监听对应channel的clientMessageChan,来获取msg。

126:监听clientMessageChan

134:msg encode

144:调用client的write将msg发送出去。

小结:

tcp监听,生成client,再通过protocol与channel链接起来。

httpserver

github.com/nsqio/nsq/nsqd/http.go

VviyymN.jpg!web

相对比0.1.1中,增加了很多的handle

pingHandler

MzUVZvY.jpg!web

不解释

putHandler

yI3yMzu.jpg!web

这里就是发布msg的对外接口

72:获取topic

73:生成msg

74:调用topic的接口PutMessage

mputHandler

EzieMrM.jpg!web

另外提供了一个多msg的发布接口

不同之处在:

100-103:msg的分割,再将msg一个一个调用topic的PutMessage。

lookup

github.com/nsqio/nsq/nsqd/lookup.go

lookup模块是nsqd与nsqlookupd通信的模块

J3aiIzq.jpg!web

66-76:解析了所有lookup服务器,并防止lookupPeers中

Jn6Ffur.jpg!web

在此协程中提供了以下操作:

86-91:15秒定时的操作,定对所有lookup服务器进行ping操作

92-98:进行channel的一个announce操作,通知lookup服务器新增channel

99-105:进行topic的一个announce操作,通知lookup服务器新增topic

106-119:进行所有的topic和channel的同步

总结:

nsqd中的模块越来越清晰简洁。

nsqlookupd

新增加的模块,在文章开头已经讲过其模块功能。

直接看源码

github.com/nsqio/nsq/nsqlookupd/nsqlookupd.go

MzU7nay.jpg!web

main函数中,开启了两个模块

76:tcpserver

82:httpserver

tcpserver

github.com/nsqio/nsq/nsqlookupd/tcp.go

2UvayuF.jpg!web

入口为Handle,这个已经成了标准操作了

github.com/nsqio/nsq/nsqlookupd/server_lookup_protocol_v1.go

zmm26vN.jpg!web

协议注册

vaQvQ3F.jpg!web

这个也是标准操作

入口是

32:初始化了一个 safe_map ,用于存储信息

47:protocolExecute

提供的操作有

ping

这里的ping还是很简单

announce

7BVzyyz.jpg!web

86:获取safe_map

88:将announce信息,存放到safe_map中

小结:

tcpserver提供给nsqd用于ping和announce操作

httpserver

github.com/nsqio/nsq/nsqlookupd/http.go

AzURJjU.jpg!web

对消费者提供了两个handle

1、pingHandler

zUneyaa.jpg!web

比较简单,不做解释

2、lookupHandler

RvM3IvJ.jpg!web

提供给消费者,用于查找对应topic的信息,包括channel,所在的nsqd的ip和port

37:读取topicname

43:从safe map中查找到对应信息

50-54:信息封装

63:Response返回

总结:

nsqlookupd的功能定位简单,代码量也少,模块也清晰。

全文总结:

本文通过增加了一个nsqlookupd服务,来实现了一个分布式的消息中间件。

消费者,先从nsqlookupd,查找获取到对应topic所在的nsqd的服务ip和port。再与对应的nsqd服务进行发布与订阅操作。

nsqd则向nsqlookup进行topic与channel信息的announce操作。

此种分布式的操作,简单,直接。 后续版本,待有时间的时候,再做分析。

龚浩华

月牙寂道长

qq:29185807

2019年05月09日

如果你觉得本文对你有帮助,可以转到你的朋友圈,让更多人一起学习。

第一时间获取文章,可以关注本人公众号:月牙寂道长,也可以扫码关注

ruiuIvy.jpg!web


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK