1

InfluxDB集群 -- 节点部署命令的源码分析

 2 years ago
source link: https://segmentfault.com/a/1190000040706233
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

InfluxDB集群 -- 节点部署命令的源码分析

发布于 今天 14:55

上文分析到,InfluxDB集群的部署,涉及到3个命令:

  • influxd进程的启动;
  • 添加集群的data-nodes;
  • 查询集群的节点信息;

本文结合源码,分析每一步具体都是怎么实现的。

influxd进程启动

# influxd -config /etc/influxdb/influxdb.conf -join ops1:8091,ops2:8091,ops3:8091 --hostname ops3

这里重点分析-join参数。
1.读取参数

//cmd/influxd/run/command.go
func (cmd *Command) ParseFlags(args ...string) (Options, error) {
    .......
    fs.StringVar(&options.Join, "join", "", "")
    return options, nil
}
// Run parses the config from args and runs the server.
func (cmd *Command) Run(args ...string) error {
    // Propogate the top-level join options down to the meta config
    if config.Join != "" {
        config.Meta.JoinPeers = strings.Split(options.Join, ",")
    }
    ....
}

2.将peers加入Raft
将joinPeers作为判断条件,找到所有的raftAddr,然后传入Raft-lib作为初始节点:

//services/meta/store.go
// open opens and initializes the raft store.
func (s *store) open(raftln net.Listener) error {
    joinPeers = s.config.JoinPeers
    var initializePeers []string
    for {
        peers := c.peers()
        if !Peers(peers).Contains(s.raftAddr) {
            peers = append(peers, s.raftAddr)
        }
        if len(s.config.JoinPeers)-len(peers) == 0 {
            initializePeers = peers
            break
        }
    }
    // Open the raft store.
    if err := s.openRaft(initializePeers, raftln); err != nil
    ......
}

添加data nodes

# influxd_ctl add-data ops3:8088

比如在node1上,执行添加node3为dataNode的命令:

首先在node1上进行命令解析(实际上跟API一样的流程);

//services/admin_cluster/handler.go
// ServeHTTP responds to HTTP request to the handler.
func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
    switch r.Method {
    case "GET":
        switch r.URL.Path {
        case "/add-data":
            h.WrapHandler("add-data", h.addDataNode).ServeHTTP(w, r)
        }
    }    
}

然后向node3发送AddDataNode的tcp消息;

// add new data node to the cluster
func (h *handler) addDataNode(w http.ResponseWriter, r *http.Request) {
    ps := r.URL.Query()
    if len(ps["node"]) != 1 {
        http.Error(w, "", http.StatusBadRequest)
    }
    node := ps["node"][0]
    req := Request{AddDataNode, "", 0}
    err := h.call(node, req)
    h.sendResponse(err, w)
}

发送TCP消息:

func (h *handler) call(node string, req Request) error {
    conn, err := tcp.Dial("tcp", node, MuxHeader)
    if err != nil {
        return fmt.Errorf("tcp dial to node %s failed. %v", node, err)
    }
    defer conn.Close()
    if err = json.NewEncoder(conn).Encode(req); err != nil {
        return fmt.Errorf("encode and send request failed. %v", err)
    }
    // read response
    resp, err := h.readResponse(conn)
    ....
    return nil
}

node3接收处理AddDataNode的消息:

//services/admin_cluster/tcphandler.go
func (h *TCPHandler) handleConn(conn net.Conn) {
    switch r.Type {
    case AddDataNode:
        h.handleAddDataNode(r, conn)
    }
}
func (h *TCPHandler) handleAddDataNode(r *Request, conn net.Conn) {
    err := h.Server.DataServerJoin()
    ...
}

node3使用metaclient,带上自己的httpAddr(8091)和tcpAddr(8088),向meta集群发送添加节点的请求:node3会重试,直到成功为止;

//cmd/influxd/run/server.go
func (s *Server) DataServerJoin() (err error) {
    n, err := s.MetaClient.CreateDataNode(s.HTTPAddr(), s.TCPAddr())
    for err != nil {
        time.Sleep(time.Second)
        n, err = s.MetaClient.CreateDataNode(s.HTTPAddr(), s.TCPAddr())
    }
}

查询集群节点

# influxd_ctl show-nodes

查询的过程:

  • 首先通过命令行解析,最终定位到cluster的Get /nodes代码;
  • 然后由metaClient查询集群的所有metaNodes和dataNodes信息;
//services/admin_cluster/handler.go
func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
    switch r.Method {
    case "GET":
        switch r.URL.Path {
        case "/nodes":
            h.WrapHandler("nodes", h.serveNodes).ServeHTTP(w, r)
        }
        ......
    }
}
func (h *handler) serveNodes(w http.ResponseWriter, r *http.Request) {
    metaNodes, _ := h.MetaClient.MetaNodes()
    dataNodes, _ := h.MetaClient.DataNodes()

    nodes := make(map[string][]meta.NodeInfo)
    nodes["Meta"] = metaNodes
    nodes["Data"] = dataNodes

    w.Header().Add("Content-Type", "application/json")
    w.Write(MarshalJSON(nodes, true))
}

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK