2

InfluxDB集群 -- 移除MetaNode源码分析

 2 years ago
source link: https://segmentfault.com/a/1190000040724304
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

InfluxDB集群 -- 移除MetaNode源码分析

发布于 今天 14:11

influxdb集群中,client在node1上执行remove metaNode node3的命令:

influxd_ctl remove-meta node3:8091

整体流程如下:

  • node1收到CLI命令,向自己8084端口发送GET /remove-meta的请求,request body: {"metahttp:": "node3:8091"};
  • node1向node3发送GET http://node3:8091/ping,探测node3是否存活;
  • node1执行metaclient.DeleteMetaNode():Raft中删除该节点信息;
  • node向node3发送GET http://node3:8091/remove-meta,node3做一些清理操作;

CLI命令处理

命令行代码入口:

// cmd/influxd_ctl/cli/cli.go
func (c *CommandLine) Run() error {
    ....
    switch cmd {
    case "remove-meta":
        return do_remove_meta(c)
    }
}

向自己的8084发送GET /remove-meta:

func do_remove_meta(c *CommandLine) error {
    // flag读命令行的参数
    fs := flag.NewFlagSet("", flag.ExitOnError)
    o := RemoveMetaOptions{}
    fs.BoolVar(&o.Force, "force", false, "force remove meta node")
    fs.Parse(c.CMD[1:])
    httpAddr := fs.Args()[len(fs.Args())-1]
    force := "false"
    if o.Force {
        force = "true"
    }
    // 向自身的8084发送remote-meta,body中带要删除节点的httpAddr
    url := c.getURL("remove-meta", map[string]string{"metahttp": httpAddr, "force": force})
    resp, err := http.Get(url)
    if err != nil {
        return err
    }
    defer resp.Body.Close()
    .....
}

admin_cluster监听8084端口,负责集群的管理功能,/remove-meta的handler:

// services/admin_cluster/handler.go
func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
    switch r.Method {
    case "GET":
        switch r.URL.Path {
        case "/remove-meta":
            h.WrapHandler("remove-meta", h.removeMetaNode).ServeHTTP(w, r)
        }
    }
}

具体的处理:

  • ping一下node3,看是否存活,若已不存活,则返回错误;
  • metaClient.DeleteMetaNode()删除集群中该节点的信息;
  • 向node3发送/remove-meta;
func (h *handler) removeMetaNode(w http.ResponseWriter, r *http.Request) {
    ps := r.URL.Query()
    var metahttp string
    metahttp = ps["metahttp"][0]
    var force bool
    if len(ps["force"]) == 1 && ps["force"][0] == "true" {
        force = true
    }
    metaLive := true
    // 先ping一下看移除的节点否活着
    // ping meta server first
    url := "http://" + metahttp + "/ping"
    resp, err := http.Get(url)
    if err != nil {
        metaLive = false
        h.logger.Info("ping meta server failed", zap.Error(err))
    } 
    if !force && !metaLive {
        h.sendResponse(fmt.Errorf("Meta node %s could not be connected", metahttp), w)
        return
    }
    err = func() error {
        // remove meta node and leave from raft cluster
        nodeInfo := h.MetaClient.MetaNodeByAddr(metahttp)
        if nodeInfo == nil {
            return fmt.Errorf("Meta node %s does not exist in cluster", metahttp)
        }
        // 删除该meta节点的元信息(Raft)
        if err := h.MetaClient.DeleteMetaNode(nodeInfo.ID); err != nil {
            return err
        }
        // 向该节点的8091发送remote-meta
        // delete directory
        url := "http://" + metahttp + "/remove-meta"
        resp, err := http.Get(url)
        if err != nil {
            return fmt.Errorf("Removed meta node from raft cluster, but remove meta data dir failed. %v", err)
        }
        body, err := ioutil.ReadAll(resp.Body)
        return nil
    }()
    h.sendResponse(err, w)
}

node1向node3发送/ping

node1发送GET http://node3:8091/ping
nod3的处理:

//services/httpd/handler.go
// servePing returns a simple response to let the client know the server is running.
func (h *Handler) servePing(w http.ResponseWriter, r *http.Request) {
    atomic.AddInt64(&h.stats.PingRequests, 1)
    h.writeHeader(w, http.StatusNoContent)
}

node1删除raft中该节点信息

通过metaClient.DeleteMetaNode()删除集群中该节点的信息;

将删除节点命令封装成1个Command_DeleteMetaNodeCommand,然后retryUtilExec()执行:

// services/meta/client.go
func (c *Client) DeleteMetaNode(id uint64) error {
    cmd := &internal.DeleteMetaNodeCommand{
        ID: proto.Uint64(id),
    }
    return c.retryUntilExec(internal.Command_DeleteMetaNodeCommand, internal.E_DeleteMetaNodeCommand_Command, cmd)
}

retryUtilExec()函数执行逻辑

  • 向metaServer发送POST /execute执行Command;
  • 若该metaServer是Follower,则将command转发给Leader;
  • 上述过程重试10次,直到成功;

Leader处理POST /execute请求,处理DeleteMetaNodeCommand:

// services/meta/handler.go
func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
    switch r.Method {
    case "POST":
        h.WrapHandler("execute", h.serveExec).ServeHTTP(w, r)
    }
}
func (h *handler) serveExec(w http.ResponseWriter, r *http.Request) {
    var resp *internal.Response
    if err := h.store.apply(body); err != nil {
        resp = &internal.Response{
            OK:    proto.Bool(false),
            Error: proto.String(err.Error()),
        }
    } else {
        resp = &internal.Response{
            OK:    proto.Bool(false),
            Index: proto.Uint64(h.store.index()),
        }
    }
    b, err := proto.Marshal(resp)
    w.Header().Add("Content-Type", "application/octet-stream")
    w.Write(b)
}

数据更新通过store.apply()执行:

func (fsm *storeFSM) Apply(l *raft.Log) interface{} {
    var cmd internal.Command
    if err := proto.Unmarshal(l.Data, &cmd); err != nil {
        panic(fmt.Errorf("cannot marshal command: %x", l.Data))
    }
    err := func() interface{} {
        switch cmd.GetType() {
        case internal.Command_DeleteMetaNodeCommand:
            return fsm.applyDeleteMetaNodeCommand(&cmd, s)
        }
    }()
    ...
}

具体到fsm.applyDeleteMetaNodeComamnd():更新fsm中metaNodes的信息:

func (fsm *storeFSM) applyDeleteMetaNodeCommand(cmd *internal.Command, s *store) interface{} {
    ext, _ := proto.GetExtension(cmd, internal.E_DeleteMetaNodeCommand_Command)
    v := ext.(*internal.DeleteMetaNodeCommand)
    //更新other
    other := fsm.data.Clone()
    node := other.MetaNode(v.GetID())
    if node == nil {
        return ErrNodeNotFound
    }
    //node离开
    if err := s.leave(node); err != nil && err != raft.ErrNotLeader {
        return err
    }
    if err := other.DeleteMetaNode(v.GetID()); err != nil {
        return err
    }
    fsm.data = other
    return nil
}

node1向node3发送/remove-meta

node3接收/remove-meta做一些资源清理操作:

// services/meta/handler.go
func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { 
    switch r.Method {
    case "GET":
        switch r.URL.Path {
        case "/remove-meta":
            h.WrapHandler("remove-meta", h.serveRemoveMeta).ServeHTTP(w, r)
        }
    }
}

资源清理操作:

  • 删除本机的meta目录;
  • 新建本机的meta目录;
  • 保存本机的节点信息node.json文件;
func (h *handler) serveRemoveMeta(w http.ResponseWriter, r *http.Request) {
    // admin_cluster have removed meta node from data
    // remove meta dir

    err := func() error {
        // 删除meta目录
        // remove contents in meta dir except the node file
        if err := os.RemoveAll(h.s.config.Dir); err != nil {
            return fmt.Errorf("remove meta dir failed: %v", err)
        }
        // 新建meta目录
        if err := os.Mkdir(h.s.config.Dir, 755); err != nil {
            return fmt.Errorf("create empty meta dir failed: %v", err)
        }
        // 保存node.json
        // node file should be kept. Data node still needs it.
        if err := h.s.Node.Save(); err != nil {
            return fmt.Errorf("save node file failed: %v", err)
        }
        return nil
    }()
    if err != nil {
        h.httpError(err, w, http.StatusInternalServerError)
    } else {
        if _, err := w.Write([]byte("OK")); err != nil {
            h.logger.Info("Write response error", zap.Error(err))
        }
    }
}

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK