33

KCP-GO源码解析

 5 years ago
source link: http://www.10tiao.com/html/415/201806/2649839102/1.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

社区零食大礼包限量发售:

(豆干牛肉丝创意福袋等四川特产)


KCP-GO源码解析

概念

ARQ:自动重传请求(Automatic Repeat-reQuest,ARQ)是OSI模型中数据链路层的错误纠正协议之一.
RTO:Retransmission TimeOut
FEC:Forward Error Correction

kcp简介

kcp是一个基于udp实现快速、可靠、向前纠错的的协议,能以比TCP浪费10%-20%的带宽的代价,换取平均延迟降低30%-40%,且最大延迟降低三倍的传输效果。纯算法实现,并不负责底层协议(如UDP)的收发。查看官方文档kcp

kcp-go是用go实现了kcp协议的一个库,其实kcp类似tcp,协议的实现也很多参考tcp协议的实现,滑动窗口,快速重传,选择性重传,慢启动等。
kcp和tcp一样,也分客户端和监听端。

1+-+-+-+-+-+            +-+-+-+-+-+
2    |  Client |            |  Server |
3    +-+-+-+-+-+            +-+-+-+-+-+
4        |------ kcp data ------>|    
5        |<----- kcp data -------|    

kcp协议

layer model

 1+----------------------+
2|      Session         |
3+----------------------+
4|      KCP(ARQ)        |
5+----------------------+
6|      FEC(OPTIONAL)   |
7+----------------------+
8|      CRYPTO(OPTIONAL)|
9+----------------------+
10|      UDP(Packet)     |
11+----------------------+

KCP header

KCP Header Format

 1   4           1   1     2 (Byte)
2+---+---+---+---+---+---+---+---+
3|     conv      |cmd|frg|  wnd  |
4+---+---+---+---+---+---+---+---+
5|
    ts        |     sn        |
6+---+---+---+---+---+---+---+---+
7|     una       |     len       |
8+---+---+---+---+---+---+---+---+
9|
                              |
10+             DATA              +
11|
                              |
12+---+---+---+---+---+---+---+---+
13

代码结构

 1src/vendor/github.com/xtaci/kcp-go/
2├── LICENSE
3├── README.md
4├── crypt.go    加解密实现
5├── crypt_test.go
6├── donate.png
7├── fec.go      向前纠错实现
8├── frame.png
9├── kcp-go.png
10├── kcp.go      kcp协议实现
11├── kcp_test.go
12├── sess.go     会话管理实现
13├── sess_test.go
14├── snmp.go     数据统计实现
15├── updater.go  任务调度实现
16├── xor.go      xor封装
17└── xor_test.go

着重研究两个文件kcp.gosess.go

kcp浅析

kcp是基于udp实现的,所有udp的实现这里不做介绍,kcp做的事情就是怎么封装udp的数据和怎么解析udp的数据,再加各种处理机制,为了重传,拥塞控制,纠错等。下面介绍kcp客户端和服务端整体实现的流程,只是大概介绍一下函数流,不做详细解析,详细解析看后面数据流的解析。

kcp client整体函数流

和tcp一样,kcp要连接服务端需要先拨号,但是和tcp有个很大的不同是,即使服务端没有启动,客户端一样可以拨号成功,因为实际上这里的拨号没有发送任何信息,而tcp在这里需要三次握手。

 1DialWithOptions(raddr string, block BlockCrypt, dataShards, parityShards int)
2    V
3net.DialUDP("udp", nil, udpaddr)
4    V
5NewConn()
6    V
7newUDPSession()
{初始化UDPSession}
8    V
9NewKCP()
{初始化kcp}
10    V
11updater.addSession(sess) {管理session会话,任务管理,根据用户设置的internal参数间隔来轮流唤醒任务}
12    V
13go sess.readLoop()
14    V
15go s.receiver(chPacket)
16    V
17s.kcpInput(data)
18    V
19s.fecDecoder.decodeBytes(data)
20    V
21s.kcp.Input(data, true, s.ackNoDelay)
22    V
23kcp.parse_data(seg) {将分段好的数据插入kcp.rcv_buf缓冲}
24    V
25notifyReadEvent()
26

客户端大体的流程如上面所示,先Dial,建立udp连接,将这个连接封装成一个会话,然后启动一个go程,接收udp的消息。

kcp server整体函数流

 1ListenWithOptions() 
2    V
3net.ListenUDP()
4    V
5ServerConn()
6    V
7newFECDecoder()
8    V
9go l.monitor() {从chPacket接收udp数据,写入kcp}
10    V
11go l.receiver(chPacket) {从upd接收数据,并入队列}
12    V
13newUDPSession()
14    V
15updater.addSession(sess) {管理session会话,任务管理,根据用户设置的internal参数间隔来轮流唤醒任务}
16    V
17s.kcpInput(data)`
18    V
19s.fecDecoder.decodeBytes(data)
20    V
21s.kcp.Input(data, true, s.ackNoDelay)
22    V
23kcp.parse_data(seg) {将分段好的数据插入kcp.rcv_buf缓冲}
24    V
25notifyReadEvent()

服务端的大体流程如上图所示,先Listen,启动udp监听,接着用一个go程监控udp的数据包,负责将不同session的数据写入不同的udp连接,然后解析封装将数据交给上层。

kcp 数据流详细解析

不管是kcp的客户端还是服务端,他们都有io行为,就是读与写,我们只分析一个就好了,因为它们读写的实现是一样的,这里分析客户端的读与写。

kcp client 发送消息

 1s.Write(b []byte) 
2    V
3s.kcp.WaitSnd() {}
4    V
5s.kcp.Send(b) {将数据根据mss分段,并存在kcp.snd_queue}
6     V
7s.kcp.flush(false) [flush data to output] {
8    if writeDelay==true {
9        flush
10    }else{
11        每隔`interval`时间flush一次
12    }
13}
14     V
15kcp.output(buffer, size)
16     V
17s.output(buf)
18     V
19s.conn.WriteTo(ext, s.remote)
20     V
21s.conn..Conn.WriteTo(buf)

读写都是在sess.go文件中实现的,Write方法:

 1// Write implements net.Conn
2func (s *UDPSession) Write(b []byte) (n int, err error) {
3    for {
4        ...
5        // api flow control
6        if s.kcp.WaitSnd() < int(s.kcp.snd_wnd) {
7            n = len(b)
8            for {
9                if len(b) <= int(s.kcp.mss) {
10                    s.kcp.Send(b)
11                    break
12                } else {
13                    s.kcp.Send(b[:s.kcp.mss])
14                    b = b[s.kcp.mss:]
15                }
16            }
17            if !s.writeDelay {
18                s.kcp.flush(false)
19            }
20            s.mu.Unlock()
21            atomic.AddUint64(&DefaultSnmp.BytesSent, uint64(n))
22            return n, nil
23        }
24        ...
25        // wait for write event or timeout
26        select {
27        case <-s.chWriteEvent:
28        case <-c:
29        case <-s.die:
30        }
31        if timeout != nil {
32            timeout.Stop()
33        }
34    }
35}

假设发送一个hello消息,Write方法会先判断发送窗口是否已满,满的话该函数阻塞,不满则kcp.Send(“hello”),而Send函数实现根据mss的值对数据分段,当然这里的发送的hello,长度太短,只分了一个段,并把它们插入发送的队列里。

 1func (kcp *KCP) Send(buffer []byte) int {
2    ...
3    for i := 0; i < count; i++ {
4        var size int
5        if len(buffer) > int(kcp.mss) {
6            size = int(kcp.mss)
7        } else {
8            size = len(buffer)
9        }
10        seg := kcp.newSegment(size)
11        copy(seg.data, buffer[:size])
12        if kcp.stream == 0 { // message mode
13            seg.frg = uint8(count - i - 1)
14        } else { // stream mode
15            seg.frg = 0
16        }
17        kcp.snd_queue = append(kcp.snd_queue, seg)
18        buffer = buffer[size:]
19    }
20    return 0
21}

接着判断参数writeDelay,如果参数设置为false,则立马发送消息,否则需要任务调度后才会触发发送,发送消息是由flush函数实现的。

  1// flush pending data
 2func (kcp *KCP) flush(ackOnly bool) {
 3    var seg Segment
 4    seg.conv = kcp.conv
 5    seg.cmd = IKCP_CMD_ACK
 6    seg.wnd = kcp.wnd_unused()
 7    seg.una = kcp.rcv_nxt
 8    buffer := kcp.buffer
 9    // flush acknowledges
10    ptr := buffer
11    for i, ack := range kcp.acklist {
12        size := len(buffer) - len(ptr)
13        if size+IKCP_OVERHEAD > int(kcp.mtu) {
14            kcp.output(buffer, size)
15            ptr = buffer
16        }
17        // filter jitters caused by bufferbloat
18        if ack.sn >= kcp.rcv_nxt || len(kcp.acklist)-1 == i {
19            seg.sn, seg.ts = ack.sn, ack.ts
20            ptr = seg.encode(ptr)
21        }
22    }
23    kcp.acklist = kcp.acklist[0:0]
24    if ackOnly { // flash remain ack segments
25        size := len(buffer) - len(ptr)
26        if size > 0 {
27            kcp.output(buffer, size)
28        }
29        return
30    }
31    // probe window size (if remote window size equals zero)
32    if kcp.rmt_wnd == 0 {
33        current := currentMs()
34        if kcp.probe_wait == 0 {
35            kcp.probe_wait = IKCP_PROBE_INIT
36            kcp.ts_probe = current + kcp.probe_wait
37        } else {
38            if _itimediff(current, kcp.ts_probe) >= 0 {
39                if kcp.probe_wait < IKCP_PROBE_INIT {
40                    kcp.probe_wait = IKCP_PROBE_INIT
41                }
42                kcp.probe_wait += kcp.probe_wait / 2
43                if kcp.probe_wait > IKCP_PROBE_LIMIT {
44                    kcp.probe_wait = IKCP_PROBE_LIMIT
45                }
46                kcp.ts_probe = current + kcp.probe_wait
47                kcp.probe |= IKCP_ASK_SEND
48            }
49        }
50    } else {
51        kcp.ts_probe = 0
52        kcp.probe_wait = 0
53    }
54    // flush window probing commands
55    if (kcp.probe & IKCP_ASK_SEND) != 0 {
56        seg.cmd = IKCP_CMD_WASK
57        size := len(buffer) - len(ptr)
58        if size+IKCP_OVERHEAD > int(kcp.mtu) {
59            kcp.output(buffer, size)
60            ptr = buffer
61        }
62        ptr = seg.encode(ptr)
63    }
64    // flush window probing commands
65    if (kcp.probe & IKCP_ASK_TELL) != 0 {
66        seg.cmd = IKCP_CMD_WINS
67        size := len(buffer) - len(ptr)
68        if size+IKCP_OVERHEAD > int(kcp.mtu) {
69            kcp.output(buffer, size)
70            ptr = buffer
71        }
72        ptr = seg.encode(ptr)
73    }
74    kcp.probe = 0
75    // calculate window size
76    cwnd := _imin_(kcp.snd_wnd, kcp.rmt_wnd)
77    if kcp.nocwnd == 0 {
78        cwnd = _imin_(kcp.cwnd, cwnd)
79    }
80    // sliding window, controlled by snd_nxt && sna_una+cwnd
81    newSegsCount := 0
82    for k := range kcp.snd_queue {
83        if _itimediff(kcp.snd_nxt, kcp.snd_una+cwnd) >= 0 {
84            break
85        }
86        newseg := kcp.snd_queue[k]
87        newseg.conv = kcp.conv
88        newseg.cmd = IKCP_CMD_PUSH
89        newseg.sn = kcp.snd_nxt
90        kcp.snd_buf = append(kcp.snd_buf, newseg)
91        kcp.snd_nxt++
92        newSegsCount++
93        kcp.snd_queue[k].data = nil
94    }
95    if newSegsCount > 0 {
96        kcp.snd_queue = kcp.remove_front(kcp.snd_queue, newSegsCount)
97    }
98    // calculate resent
99    resent := uint32(kcp.fastresend)
100    if kcp.fastresend <= 0 {
101        resent = 0xffffffff
102    }
103    // check for retransmissions
104    current := currentMs()
105    var change, lost, lostSegs, fastRetransSegs, earlyRetransSegs uint64
106    for k := range kcp.snd_buf {
107        segment := &kcp.snd_buf[k]
108        needsend := false
109        if segment.xmit == 0 { // initial transmit
110            needsend = true
111            segment.rto = kcp.rx_rto
112            segment.resendts = current + segment.rto
113        } else if _itimediff(current, segment.resendts) >= 0 { // RTO
114            needsend = true
115            if kcp.nodelay == 0 {
116                segment.rto += kcp.rx_rto
117            } else {
118                segment.rto += kcp.rx_rto / 2
119            }
120            segment.resendts = current + segment.rto
121            lost++
122            lostSegs++
123        } else if segment.fastack >= resent { // fast retransmit
124            needsend = true
125            segment.fastack = 0
126            segment.rto = kcp.rx_rto
127            segment.resendts = current + segment.rto
128            change++
129            fastRetransSegs++
130        } else if segment.fastack > 0 && newSegsCount == 0 { // early retransmit
131            needsend = true
132            segment.fastack = 0
133            segment.rto = kcp.rx_rto
134            segment.resendts = current + segment.rto
135            change++
136            earlyRetransSegs++
137        }
138        if needsend {
139            segment.xmit++
140            segment.ts = current
141            segment.wnd = seg.wnd
142            segment.una = seg.una
143            size := len(buffer) - len(ptr)
144            need := IKCP_OVERHEAD + len(segment.data)
145            if size+need > int(kcp.mtu) {
146                kcp.output(buffer, size)
147                current = currentMs() // time update for a blocking call
148                ptr = buffer
149            }
150            ptr = segment.encode(ptr)
151            copy(ptr, segment.data)
152            ptr = ptr[len(segment.data):]
153            if segment.xmit >= kcp.dead_link {
154                kcp.state = 0xFFFFFFFF
155            }
156        }
157    }
158    // flash remain segments
159    size := len(buffer) - len(ptr)
160    if size > 0 {
161        kcp.output(buffer, size)
162    }
163    // counter updates
164    sum := lostSegs
165    if lostSegs > 0 {
166        atomic.AddUint64(&DefaultSnmp.LostSegs, lostSegs)
167    }
168    if fastRetransSegs > 0 {
169        atomic.AddUint64(&DefaultSnmp.FastRetransSegs, fastRetransSegs)
170        sum += fastRetransSegs
171    }
172    if earlyRetransSegs > 0 {
173        atomic.AddUint64(&DefaultSnmp.EarlyRetransSegs, earlyRetransSegs)
174        sum += earlyRetransSegs
175    }
176    if sum > 0 {
177        atomic.AddUint64(&DefaultSnmp.RetransSegs, sum)
178    }
179    // update ssthresh
180    // rate halving, https://tools.ietf.org/html/rfc6937
181    if change > 0 {
182        inflight := kcp.snd_nxt - kcp.snd_una
183        kcp.ssthresh = inflight / 2
184        if kcp.ssthresh < IKCP_THRESH_MIN {
185            kcp.ssthresh = IKCP_THRESH_MIN
186        }
187        kcp.cwnd = kcp.ssthresh + resent
188        kcp.incr = kcp.cwnd * kcp.mss
189    }
190    // congestion control, https://tools.ietf.org/html/rfc5681
191    if lost > 0 {
192        kcp.ssthresh = cwnd / 2
193        if kcp.ssthresh < IKCP_THRESH_MIN {
194            kcp.ssthresh = IKCP_THRESH_MIN
195        }
196        kcp.cwnd = 1
197        kcp.incr = kcp.mss
198    }
199    if kcp.cwnd < 1 {
200        kcp.cwnd = 1
201        kcp.incr = kcp.mss
202    }
203}

flush函数非常的重要,kcp的重要参数都是在调节这个函数的行为,这个函数只有一个参数ackOnly,意思就是只发送ack,如果ackOnly为true的话,该函数只遍历ack列表,然后发送,就完事了。 如果不是,也会发送真实数据。 在发送数据前先进行windSize探测,如果开启了拥塞控制nc=0,则每次发送前检测服务端的winsize,如果服务端的winsize变小了,自身的winsize也要更着变小,来避免拥塞。如果没有开启拥塞控制,就按设置的winsize进行数据发送。
接着循环每个段数据,并判断每个段数据的是否该重发,还有什么时候重发:
1. 如果这个段数据首次发送,则直接发送数据。 2. 如果这个段数据的当前时间大于它自身重发的时间,也就是RTO,则重传消息。 3. 如果这个段数据的ack丢失累计超过resent次数,则重传,也就是快速重传机制。这个resent参数由resend参数决定。 4. 如果这个段数据的ack有丢失且没有新的数据段,则触发ER,ER相关信息ER

最后通过kcp.output发送消息hello,output是个回调函数,函数的实体是sess.go的:

 1func (s *UDPSession) output(buf []byte) {
2    var ecc [][]byte
3    // extend buf's header space
4    ext := buf
5    if s.headerSize > 0 {
6        ext = s.ext[:s.headerSize+len(buf)]
7        copy(ext[s.headerSize:], buf)
8    }
9    // FEC stage
10    if s.fecEncoder != nil {
11        ecc = s.fecEncoder.Encode(ext)
12    }
13    // encryption stage
14    if s.block != nil {
15        io.ReadFull(rand.Reader, ext[:nonceSize])
16        checksum := crc32.ChecksumIEEE(ext[cryptHeaderSize:])
17        binary.LittleEndian.PutUint32(ext[nonceSize:], checksum)
18        s.block.Encrypt(ext, ext)
19        if ecc != nil {
20            for k := range ecc {
21                io.ReadFull(rand.Reader, ecc[k][:nonceSize])
22                checksum := crc32.ChecksumIEEE(ecc[k][cryptHeaderSize:])
23                binary.LittleEndian.PutUint32(ecc[k][nonceSize:], checksum)
24                s.block.Encrypt(ecc[k], ecc[k])
25            }
26        }
27    }
28    // WriteTo kernel
29    nbytes := 0
30    npkts := 0
31    // if mrand.Intn(100) < 50 {
32    for i := 0; i < s.dup+1; i++ {
33        if n, err := s.conn.WriteTo(ext, s.remote); err == nil {
34            nbytes += n
35            npkts++
36        }
37    }
38    // }
39    if ecc != nil {
40        for k := range ecc {
41            if n, err := s.conn.WriteTo(ecc[k], s.remote); err == nil {
42                nbytes += n
43                npkts++
44            }
45        }
46    }
47    atomic.AddUint64(&DefaultSnmp.OutPkts, uint64(npkts))
48    atomic.AddUint64(&DefaultSnmp.OutBytes, uint64(nbytes))
49}

output函数才是真正的将数据写入内核中,在写入之前先进行了fec编码,fec编码器的实现是用了一个开源库github.com/klauspost/reedsolomon,编码以后的hello就不是和原来的hello一样了,至少多了几个字节。 fec编码器有两个重要的参数reedsolomon.New(dataShards, parityShards, reedsolomon.WithMaxGoroutines(1)),dataShardsparityShards,这两个参数决定了fec的冗余度,冗余度越大抗丢包性就越强。

kcp的任务调度器

其实这里任务调度器是一个很简单的实现,用一个全局变量updater来管理session,代码文件为updater.go。其中最主要的函数

 1func (h *updateHeap) updateTask() {
2    var timer <-chan time.Time
3    for {
4        select {
5        case <-timer:
6        case <-h.chWakeUp:
7        }
8        h.mu.Lock()
9        hlen := h.Len()
10        now := time.Now()
11        if hlen > 0 && now.After(h.entries[0].ts) {
12            for i := 0; i < hlen; i++ {
13                entry := heap.Pop(h).(entry)
14                if now.After(entry.ts) {
15                    entry.ts = now.Add(entry.s.update())
16                    heap.Push(h, entry)
17                } else {
18                    heap.Push(h, entry)
19                    break
20                }
21            }
22        }
23        if hlen > 0 {
24            timer = time.After(h.entries[0].ts.Sub(now))
25        }
26        h.mu.Unlock()
27    }
28}

任务调度器实现了一个堆结构,每当有新的连接,session都会插入到这个堆里,接着for循环每隔interval时间,遍历这个堆,得到entry然后执行entry.s.update()。而entry.s.update()会执行s.kcp.flush(false)来发送数据。

总结

这里简单介绍了kcp的整体流程,详细介绍了发送数据的流程,但未介绍kcp接收数据的流程,其实在客户端发送数据后,服务端是需要返回ack的,而客户端也需要根据返回的ack来判断数据段是否需要重传还是在队列里清除该数据段。处理返回来的ack是在函数kcp.Input()函数实现的。具体详细流程下次再介绍。

github:https://github.com/Golangltd/kcp-go


社区例子:

github:https://github.com/Golangltd/leafltd/tree/master/src/LollipopGoKCP


总目录:github:https://github.com/Golangltd



About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK