2

InfluxDB集群 -- hinted-handoff源码分析(三)--points发送到远端节点

 2 years ago
source link: https://segmentfault.com/a/1190000040738776
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

InfluxDB集群 -- hinted-handoff源码分析(三)--points发送到远端节点

发布于 今天 14:48

在本机节点上,给每个远端节点分配一个NodeProcessor对象,负责数据的写入和数据的读取。

NodeProcessor定期的读取本机队列中的数据,然后将其发送给远端节点,尝试写入远端shard。

// services/hh/node_processor.go
func (n *NodeProcessor) run() {
    ......
    for {
    case <-time.After(currInterval):
        limiter := NewRateLimiter(n.RetryRateLimit)    //限流
        for {
            c, err := n.SendWrite()
            ...
            limiter.Update(c)
            time.Sleep(limiter.Delay())
        }
    }
}

读取和发送过程

  • 首先,ping远端节点,判断其是否活跃,如果不活跃,则直接返回;
  • 然后,读Head segment中的block,读操作均是从segment中的pos处开始;
  • 然后,将读到的block数据反序列化,发送给远端节点;
  • 最后,更新head segment中的pos;
// services/hh/node_processor.go
// SendWrite attempts to sent the current block of hinted data to the target node. If successful,
// it returns the number of bytes it sent and advances to the next block. Otherwise returns EOF
// when there is no more data or the node is inactive.
func (n *NodeProcessor) SendWrite() (int, error) {
    //ping远端节点,判断是否活跃
    active, err := n.Active()
    if err != nil {
        return 0, err
    }    
    //读head segment中:pos开始的block
    // Get the current block from the queue
    buf, err := n.queue.Current()
    
    //反序列化 
    // unmarshal the byte slice back to shard ID and points
    shardID, points, err := unmarshalWrite(buf)
    
    //写入远端节点
    if err := n.writer.WriteShard(shardID, n.nodeID, points); err != nil {
        atomic.AddInt64(&n.stats.WriteNodeReqFail, 1)
        return 0, err
    }
    //更新head segment的pos,下次仍然从pos读取
    if err := n.queue.Advance(); err != nil {
        n.Logger.Info("failed to advance queue for node", zap.Uint64("nodeid", n.nodeID), zap.Error(err))
    }
    return len(buf), nil
}

1. 判断节点是否活跃

先根据nodeId查询node信息(ip):

//services/hh/node_processor.go
// Active returns whether this node processor is for a currently active node.
func (n *NodeProcessor) Active() (bool, error) {
    nio, err := n.meta.DataNode(n.nodeID)    
    if nio == nil {
        return false, nil
    }
    ping := n.Ping(nio)
    return ping == nil, nil
}
// Ping returns whether this node processor is for a currently active node.
func (n *NodeProcessor) Ping(node *meta.NodeInfo) error {
    return n.ping(node.Host)
}

ping的动作,实际是发送HTTP GET http://ip:8091/ping

//services/hh/node_processor.go
// Ping returns whether this node processor is for a currently active node.
func (n *NodeProcessor) ping(host string) error {
    url := "http://" + host + "/ping"
    request, _ := http.NewRequest("GET", url, nil)
    transport := http.Transport{
        DisableKeepAlives: true,
    }
    client := &http.Client{
        Transport: &transport,
        Timeout:   time.Duration(3) * time.Second,
    }
    resp, err := client.Do(request)
    if err != nil {
        return err
    }
    defer resp.Body.Close()
    if resp.StatusCode == http.StatusNoContent {
        return nil
    }
    return fmt.Errorf("ping %s fail", url)
}

2. 读head segment中的block

从head segment读取:

  • 先定位到segment的pos位置,pos即下一个要读取的block;
  • 按照block格式,先读block len,再读block data;
//services/hh/queue.go
// current returns byte slice that the current segment points
func (l *segment) current() ([]byte, error) {
    //定位到pos
    if err := l.seekToCurrent(); err != nil {
        return nil, err
    }
    //先读block len
    // read the record size
    sz, err := l.readUint64()
    l.currentSize = int64(sz)
    //再读block data
    b := make([]byte, sz)
    if err := l.readBytes(b); err != nil {
        return nil, err
    }
    return b, nil
}

如何定位到segment的pos位置?实际上都是文件操作:

//services/hh/queue.go
func (l *segment) seekToCurrent() error {
    return l.seek(int64(l.pos))
}
func (l *segment) seek(pos int64) error {
    n, err := l.file.Seek(pos, os.SEEK_SET)
    if err != nil {
        return err
    }
    if n != pos {
        return fmt.Errorf("bad seek. exp %v, got %v", 0, n)
    }
    return nil
}

3. 将读到的block数据反序列化,写入远端节点

反序列化得到points,按照跟序列化相反的格式:

  • 先读8byte的shardId;
  • 然后逐行(\n)解析point;
//services/hh/node_processor.go
func unmarshalWrite(b []byte) (uint64, []models.Point, error) {
    if len(b) < 8 {
        return 0, nil, fmt.Errorf("too short: len = %d", len(b))
    }
    ownerID := binary.BigEndian.Uint64(b[:8])
    points, err := models.ParsePoints(b[8:])
    return ownerID, points, err
}
//models/points.go
func ParsePoints(buf []byte) ([]Point, error) {
    return ParsePointsWithPrecision(buf, time.Now().UTC(), "n")
}
func ParsePointsWithPrecision(buf []byte, defaultTime time.Time, precision string) ([]Point, error) {
    points := make([]Point, 0, bytes.Count(buf, []byte{'\n'})+1)
    for pos < len(buf) {
        pos, block = scanLine(buf, pos)
        pos++
        start := skipWhitespace(block, 0)
        pt, err := parsePoint(block[start:], defaultTime, precision)
        points = append(points, pt)
    }
    return points, nil
}

使用shardWriter将points写入远端节点:

//cluster/shard_writer.go
// WriteShard writes time series points to a shard
func (w *ShardWriter) WriteShard(shardID, ownerID uint64, points []models.Point) error {
    c, err := w.dial(ownerID)
    conn, ok := c.(*pooledConn)
    // Determine the location of this shard and whether it still exists
    db, rp, sgi := w.MetaClient.ShardOwner(shardID)    
    // Build write request.
    var request WriteShardRequest
    request.SetShardID(shardID)
    request.SetDatabase(db)
    request.SetRetentionPolicy(rp)
    request.AddPoints(points)
    //points序列化
    // Marshal into protocol buffers.
    buf, err := request.MarshalBinary()
    //points按照TLV格式写入conn
    // Write request.
    conn.SetWriteDeadline(time.Now().Add(w.timeout))
    if err := WriteTLV(conn, writeShardRequestMessage, buf); err != nil {
        conn.MarkUnusable()
        return err
    }
    //读取response
    // Read the response.
    conn.SetReadDeadline(time.Now().Add(w.timeout))
    _, buf, err = ReadTLV(conn)
    if err != nil {
        conn.MarkUnusable()
        return err
    }    
    ....
    return nil
}

4. 更新head segment的pos

更新head segment的pos,下次继续从pos读取:

  • 先尝试移动head segment: head.advance();
  • 若head segment读完,则继续下一个segment:
//services/hh/queue.go
// Advance moves the head point to the next byte slice in the queue
func (l *queue) Advance() error {
    l.mu.Lock()
    defer l.mu.Unlock()
    if l.head == nil {
        return ErrNotOpen
    }
    err := l.head.advance()
    if err == io.EOF {
        ....
        //head segment读完了,删除head,继续下一个
        if err := l.trimHead(); err != nil {
            return err
        }
    }
    ....
    return nil
}

尝试移动head segment:

  • 首先定位到footer,写入新的pos,然后同步到disk:
//services/hh/queue.go
// advance advances the current value pointer
func (l *segment) advance() error {
    //定位到end-8byte
    if err := l.seekEnd(-footerSize); err != nil {
        return err
    }
    //写入新的pos
    pos := l.pos + l.currentSize + 8
    if err := l.writeUint64(uint64(pos)); err != nil {
        return err
    }
    //同步到disk
    if err := l.file.Sync(); err != nil {
        return err
    }
    l.pos = pos
    //定位到pos
    if err := l.seekToCurrent(); err != nil {
        return err
    }
    ....
    return nil
}

若head segment读完,则继续下一个segment:

  • 如果head segment读到io.EOF,则删除head,head=head.next:
//services/hh/queue.go
func (l *queue) trimHead() error {
    if len(l.segments) > 1 {
        l.segments = l.segments[1:]
        if err := l.head.close(); err != nil {
            return err
        }
        //删除segment文件
        if err := os.Remove(l.head.path); err != nil {
            return err
        }
        //下一个segment作为head
        l.head = l.segments[0]
    }
    return nil
}

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK