

InfluxDB集群 -- 节点部署命令的源码分析
source link: https://segmentfault.com/a/1190000040706233
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

InfluxDB集群 -- 节点部署命令的源码分析
上文分析到,InfluxDB集群的部署,涉及到3个命令:
- influxd进程的启动;
- 添加集群的data-nodes;
- 查询集群的节点信息;
本文结合源码,分析每一步具体都是怎么实现的。
influxd进程启动
# influxd -config /etc/influxdb/influxdb.conf -join ops1:8091,ops2:8091,ops3:8091 --hostname ops3
这里重点分析-join参数。
1.读取参数
//cmd/influxd/run/command.go func (cmd *Command) ParseFlags(args ...string) (Options, error) { ....... fs.StringVar(&options.Join, "join", "", "") return options, nil } // Run parses the config from args and runs the server. func (cmd *Command) Run(args ...string) error { // Propogate the top-level join options down to the meta config if config.Join != "" { config.Meta.JoinPeers = strings.Split(options.Join, ",") } .... }
2.将peers加入Raft
将joinPeers作为判断条件,找到所有的raftAddr,然后传入Raft-lib作为初始节点:
//services/meta/store.go // open opens and initializes the raft store. func (s *store) open(raftln net.Listener) error { joinPeers = s.config.JoinPeers var initializePeers []string for { peers := c.peers() if !Peers(peers).Contains(s.raftAddr) { peers = append(peers, s.raftAddr) } if len(s.config.JoinPeers)-len(peers) == 0 { initializePeers = peers break } } // Open the raft store. if err := s.openRaft(initializePeers, raftln); err != nil ...... }
添加data nodes
# influxd_ctl add-data ops3:8088
比如在node1上,执行添加node3为dataNode的命令:
首先在node1上进行命令解析(实际上跟API一样的流程);
//services/admin_cluster/handler.go // ServeHTTP responds to HTTP request to the handler. func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { switch r.Method { case "GET": switch r.URL.Path { case "/add-data": h.WrapHandler("add-data", h.addDataNode).ServeHTTP(w, r) } } }
然后向node3发送AddDataNode的tcp消息;
// add new data node to the cluster func (h *handler) addDataNode(w http.ResponseWriter, r *http.Request) { ps := r.URL.Query() if len(ps["node"]) != 1 { http.Error(w, "", http.StatusBadRequest) } node := ps["node"][0] req := Request{AddDataNode, "", 0} err := h.call(node, req) h.sendResponse(err, w) }
发送TCP消息:
func (h *handler) call(node string, req Request) error { conn, err := tcp.Dial("tcp", node, MuxHeader) if err != nil { return fmt.Errorf("tcp dial to node %s failed. %v", node, err) } defer conn.Close() if err = json.NewEncoder(conn).Encode(req); err != nil { return fmt.Errorf("encode and send request failed. %v", err) } // read response resp, err := h.readResponse(conn) .... return nil }
node3接收处理AddDataNode的消息:
//services/admin_cluster/tcphandler.go func (h *TCPHandler) handleConn(conn net.Conn) { switch r.Type { case AddDataNode: h.handleAddDataNode(r, conn) } } func (h *TCPHandler) handleAddDataNode(r *Request, conn net.Conn) { err := h.Server.DataServerJoin() ... }
node3使用metaclient,带上自己的httpAddr(8091)和tcpAddr(8088),向meta集群发送添加节点的请求:node3会重试,直到成功为止;
//cmd/influxd/run/server.go func (s *Server) DataServerJoin() (err error) { n, err := s.MetaClient.CreateDataNode(s.HTTPAddr(), s.TCPAddr()) for err != nil { time.Sleep(time.Second) n, err = s.MetaClient.CreateDataNode(s.HTTPAddr(), s.TCPAddr()) } }
查询集群节点
# influxd_ctl show-nodes
查询的过程:
- 首先通过命令行解析,最终定位到cluster的Get /nodes代码;
- 然后由metaClient查询集群的所有metaNodes和dataNodes信息;
//services/admin_cluster/handler.go func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { switch r.Method { case "GET": switch r.URL.Path { case "/nodes": h.WrapHandler("nodes", h.serveNodes).ServeHTTP(w, r) } ...... } } func (h *handler) serveNodes(w http.ResponseWriter, r *http.Request) { metaNodes, _ := h.MetaClient.MetaNodes() dataNodes, _ := h.MetaClient.DataNodes() nodes := make(map[string][]meta.NodeInfo) nodes["Meta"] = metaNodes nodes["Data"] = dataNodes w.Header().Add("Content-Type", "application/json") w.Write(MarshalJSON(nodes, true)) }
Recommend
-
10
一、基础概念 TSDB与传统DB比较 1、传统数据库多用于记录数据的当前值。 2、时序数据库记录基于时间的一系列数据。angelscriptTSDB应用场景
-
18
除了 influxdb,还有哪些可以集群部署的时序数据库? V2EX › 数据库 除了 influxdb,还有哪些可以集群部署的时序数据库? ...
-
7
InfluxDB集群 -- 移除MetaNode源码分析发布于 今天 14:11 influxdb集群中,client在node1上执行remove metaNode node3的命令:influxd_ctl remove-meta no...
-
5
InfluxDB集群 -- hinted-handoff源码分析(三)--points发送到远端节点发布于 今天 14:48 在本机节点上,给每个远端节点分配一个NodeProcessor对象,负责数据的写入和...
-
13
1、安装Docker 在21、22、200三台机器上安装Docker。安装命令: 在21、22、200三台主机上部署Docker。 ~]# curl -fsSL https://get.docker.com | bash -s docker --mirror Aliyun 1.1 配置Docker
-
8
Kubernetes二进制单节点集群部署 常见的K8S按照部署方式 ●Mini kube Minikube是一个工具,可以在本地快速运行一个单节点微型K8S,仅用于学习、预览K8S的一些特性使用部署地址:
-
5
这几天在忙着搭建多节点的 TenderMint 节点, 中间遇到了一些坑, 会通过这篇博客记录下整个过程. 欢迎感兴趣的同学多多交流!基于 docker-compose 搭建本来是想直接在本机启动多个 tendermint node 节点来实现的, 后来觉得每个 node...
-
3
原文链接:全网最新的nacos 2.1.0集群多节点部署教程-语雀 进度整理中 版本 2.1.0 版本发布日期 2022-0...
-
7
InfluxDB Cluster - InfluxDB Enterprise 集群的开源替代方案 InfluxDB Cluster - 一个开源分布式时间序列数据库,InfluxDB Enterprise 集群的开源替代方案 InfluxDB Cluster 是一个开源的 ...
-
6
Redis 集群偶数节点跨地域部署之高可用测试 笔者目前...
About Joyk
Aggregate valuable and interesting links.
Joyk means Joy of geeK