KCP-GO源码解析
source link: http://www.10tiao.com/html/415/201806/2649839102/1.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
社区零食大礼包限量发售:
(豆干,牛肉丝,创意福袋等四川特产)
KCP-GO源码解析
概念
ARQ:自动重传请求(Automatic Repeat-reQuest,ARQ)是OSI模型中数据链路层的错误纠正协议之一.
RTO:Retransmission TimeOut
FEC:Forward Error Correction
kcp简介
kcp是一个基于udp实现快速、可靠、向前纠错的的协议,能以比TCP浪费10%-20%的带宽的代价,换取平均延迟降低30%-40%,且最大延迟降低三倍的传输效果。纯算法实现,并不负责底层协议(如UDP)的收发。查看官方文档kcp
kcp-go是用go实现了kcp协议的一个库,其实kcp类似tcp,协议的实现也很多参考tcp协议的实现,滑动窗口,快速重传,选择性重传,慢启动等。
kcp和tcp一样,也分客户端和监听端。
1+-+-+-+-+-+ +-+-+-+-+-+
2 | Client | | Server |
3 +-+-+-+-+-+ +-+-+-+-+-+
4 |------ kcp data ------>|
5 |<----- kcp data -------|
kcp协议
layer model
1+----------------------+
2| Session |
3+----------------------+
4| KCP(ARQ) |
5+----------------------+
6| FEC(OPTIONAL) |
7+----------------------+
8| CRYPTO(OPTIONAL)|
9+----------------------+
10| UDP(Packet) |
11+----------------------+
KCP header
KCP Header Format
1 4 1 1 2 (Byte)
2+---+---+---+---+---+---+---+---+
3| conv |cmd|frg| wnd |
4+---+---+---+---+---+---+---+---+
5| ts | sn |
6+---+---+---+---+---+---+---+---+
7| una | len |
8+---+---+---+---+---+---+---+---+
9| |
10+ DATA +
11| |
12+---+---+---+---+---+---+---+---+
13
代码结构
1src/vendor/github.com/xtaci/kcp-go/
2├── LICENSE
3├── README.md
4├── crypt.go 加解密实现
5├── crypt_test.go
6├── donate.png
7├── fec.go 向前纠错实现
8├── frame.png
9├── kcp-go.png
10├── kcp.go kcp协议实现
11├── kcp_test.go
12├── sess.go 会话管理实现
13├── sess_test.go
14├── snmp.go 数据统计实现
15├── updater.go 任务调度实现
16├── xor.go xor封装
17└── xor_test.go
着重研究两个文件kcp.go
和sess.go
kcp浅析
kcp是基于udp实现的,所有udp的实现这里不做介绍,kcp做的事情就是怎么封装udp的数据和怎么解析udp的数据,再加各种处理机制,为了重传,拥塞控制,纠错等。下面介绍kcp客户端和服务端整体实现的流程,只是大概介绍一下函数流,不做详细解析,详细解析看后面数据流的解析。
kcp client整体函数流
和tcp一样,kcp要连接服务端需要先拨号,但是和tcp有个很大的不同是,即使服务端没有启动,客户端一样可以拨号成功,因为实际上这里的拨号没有发送任何信息,而tcp在这里需要三次握手。
1DialWithOptions(raddr string, block BlockCrypt, dataShards, parityShards int)
2 V
3net.DialUDP("udp", nil, udpaddr)
4 V
5NewConn()
6 V
7newUDPSession() {初始化UDPSession}
8 V
9NewKCP() {初始化kcp}
10 V
11updater.addSession(sess) {管理session会话,任务管理,根据用户设置的internal参数间隔来轮流唤醒任务}
12 V
13go sess.readLoop()
14 V
15go s.receiver(chPacket)
16 V
17s.kcpInput(data)
18 V
19s.fecDecoder.decodeBytes(data)
20 V
21s.kcp.Input(data, true, s.ackNoDelay)
22 V
23kcp.parse_data(seg) {将分段好的数据插入kcp.rcv_buf缓冲}
24 V
25notifyReadEvent()
26
客户端大体的流程如上面所示,先Dial
,建立udp连接,将这个连接封装成一个会话,然后启动一个go程,接收udp的消息。
kcp server整体函数流
1ListenWithOptions()
2 V
3net.ListenUDP()
4 V
5ServerConn()
6 V
7newFECDecoder()
8 V
9go l.monitor() {从chPacket接收udp数据,写入kcp}
10 V
11go l.receiver(chPacket) {从upd接收数据,并入队列}
12 V
13newUDPSession()
14 V
15updater.addSession(sess) {管理session会话,任务管理,根据用户设置的internal参数间隔来轮流唤醒任务}
16 V
17s.kcpInput(data)`
18 V
19s.fecDecoder.decodeBytes(data)
20 V
21s.kcp.Input(data, true, s.ackNoDelay)
22 V
23kcp.parse_data(seg) {将分段好的数据插入kcp.rcv_buf缓冲}
24 V
25notifyReadEvent()
服务端的大体流程如上图所示,先Listen
,启动udp监听,接着用一个go程监控udp的数据包,负责将不同session的数据写入不同的udp连接,然后解析封装将数据交给上层。
kcp 数据流详细解析
不管是kcp的客户端还是服务端,他们都有io行为,就是读与写,我们只分析一个就好了,因为它们读写的实现是一样的,这里分析客户端的读与写。
kcp client 发送消息
1s.Write(b []byte)
2 V
3s.kcp.WaitSnd() {}
4 V
5s.kcp.Send(b) {将数据根据mss分段,并存在kcp.snd_queue}
6 V
7s.kcp.flush(false) [flush data to output] {
8 if writeDelay==true {
9 flush
10 }else{
11 每隔`interval`时间flush一次
12 }
13}
14 V
15kcp.output(buffer, size)
16 V
17s.output(buf)
18 V
19s.conn.WriteTo(ext, s.remote)
20 V
21s.conn..Conn.WriteTo(buf)
读写都是在sess.go
文件中实现的,Write方法:
1// Write implements net.Conn
2func (s *UDPSession) Write(b []byte) (n int, err error) {
3 for {
4 ...
5 // api flow control
6 if s.kcp.WaitSnd() < int(s.kcp.snd_wnd) {
7 n = len(b)
8 for {
9 if len(b) <= int(s.kcp.mss) {
10 s.kcp.Send(b)
11 break
12 } else {
13 s.kcp.Send(b[:s.kcp.mss])
14 b = b[s.kcp.mss:]
15 }
16 }
17 if !s.writeDelay {
18 s.kcp.flush(false)
19 }
20 s.mu.Unlock()
21 atomic.AddUint64(&DefaultSnmp.BytesSent, uint64(n))
22 return n, nil
23 }
24 ...
25 // wait for write event or timeout
26 select {
27 case <-s.chWriteEvent:
28 case <-c:
29 case <-s.die:
30 }
31 if timeout != nil {
32 timeout.Stop()
33 }
34 }
35}
假设发送一个hello消息,Write方法会先判断发送窗口是否已满,满的话该函数阻塞,不满则kcp.Send(“hello”),而Send函数实现根据mss的值对数据分段,当然这里的发送的hello,长度太短,只分了一个段,并把它们插入发送的队列里。
1func (kcp *KCP) Send(buffer []byte) int {
2 ...
3 for i := 0; i < count; i++ {
4 var size int
5 if len(buffer) > int(kcp.mss) {
6 size = int(kcp.mss)
7 } else {
8 size = len(buffer)
9 }
10 seg := kcp.newSegment(size)
11 copy(seg.data, buffer[:size])
12 if kcp.stream == 0 { // message mode
13 seg.frg = uint8(count - i - 1)
14 } else { // stream mode
15 seg.frg = 0
16 }
17 kcp.snd_queue = append(kcp.snd_queue, seg)
18 buffer = buffer[size:]
19 }
20 return 0
21}
接着判断参数writeDelay
,如果参数设置为false,则立马发送消息,否则需要任务调度后才会触发发送,发送消息是由flush函数实现的。
1// flush pending data
2func (kcp *KCP) flush(ackOnly bool) {
3 var seg Segment
4 seg.conv = kcp.conv
5 seg.cmd = IKCP_CMD_ACK
6 seg.wnd = kcp.wnd_unused()
7 seg.una = kcp.rcv_nxt
8 buffer := kcp.buffer
9 // flush acknowledges
10 ptr := buffer
11 for i, ack := range kcp.acklist {
12 size := len(buffer) - len(ptr)
13 if size+IKCP_OVERHEAD > int(kcp.mtu) {
14 kcp.output(buffer, size)
15 ptr = buffer
16 }
17 // filter jitters caused by bufferbloat
18 if ack.sn >= kcp.rcv_nxt || len(kcp.acklist)-1 == i {
19 seg.sn, seg.ts = ack.sn, ack.ts
20 ptr = seg.encode(ptr)
21 }
22 }
23 kcp.acklist = kcp.acklist[0:0]
24 if ackOnly { // flash remain ack segments
25 size := len(buffer) - len(ptr)
26 if size > 0 {
27 kcp.output(buffer, size)
28 }
29 return
30 }
31 // probe window size (if remote window size equals zero)
32 if kcp.rmt_wnd == 0 {
33 current := currentMs()
34 if kcp.probe_wait == 0 {
35 kcp.probe_wait = IKCP_PROBE_INIT
36 kcp.ts_probe = current + kcp.probe_wait
37 } else {
38 if _itimediff(current, kcp.ts_probe) >= 0 {
39 if kcp.probe_wait < IKCP_PROBE_INIT {
40 kcp.probe_wait = IKCP_PROBE_INIT
41 }
42 kcp.probe_wait += kcp.probe_wait / 2
43 if kcp.probe_wait > IKCP_PROBE_LIMIT {
44 kcp.probe_wait = IKCP_PROBE_LIMIT
45 }
46 kcp.ts_probe = current + kcp.probe_wait
47 kcp.probe |= IKCP_ASK_SEND
48 }
49 }
50 } else {
51 kcp.ts_probe = 0
52 kcp.probe_wait = 0
53 }
54 // flush window probing commands
55 if (kcp.probe & IKCP_ASK_SEND) != 0 {
56 seg.cmd = IKCP_CMD_WASK
57 size := len(buffer) - len(ptr)
58 if size+IKCP_OVERHEAD > int(kcp.mtu) {
59 kcp.output(buffer, size)
60 ptr = buffer
61 }
62 ptr = seg.encode(ptr)
63 }
64 // flush window probing commands
65 if (kcp.probe & IKCP_ASK_TELL) != 0 {
66 seg.cmd = IKCP_CMD_WINS
67 size := len(buffer) - len(ptr)
68 if size+IKCP_OVERHEAD > int(kcp.mtu) {
69 kcp.output(buffer, size)
70 ptr = buffer
71 }
72 ptr = seg.encode(ptr)
73 }
74 kcp.probe = 0
75 // calculate window size
76 cwnd := _imin_(kcp.snd_wnd, kcp.rmt_wnd)
77 if kcp.nocwnd == 0 {
78 cwnd = _imin_(kcp.cwnd, cwnd)
79 }
80 // sliding window, controlled by snd_nxt && sna_una+cwnd
81 newSegsCount := 0
82 for k := range kcp.snd_queue {
83 if _itimediff(kcp.snd_nxt, kcp.snd_una+cwnd) >= 0 {
84 break
85 }
86 newseg := kcp.snd_queue[k]
87 newseg.conv = kcp.conv
88 newseg.cmd = IKCP_CMD_PUSH
89 newseg.sn = kcp.snd_nxt
90 kcp.snd_buf = append(kcp.snd_buf, newseg)
91 kcp.snd_nxt++
92 newSegsCount++
93 kcp.snd_queue[k].data = nil
94 }
95 if newSegsCount > 0 {
96 kcp.snd_queue = kcp.remove_front(kcp.snd_queue, newSegsCount)
97 }
98 // calculate resent
99 resent := uint32(kcp.fastresend)
100 if kcp.fastresend <= 0 {
101 resent = 0xffffffff
102 }
103 // check for retransmissions
104 current := currentMs()
105 var change, lost, lostSegs, fastRetransSegs, earlyRetransSegs uint64
106 for k := range kcp.snd_buf {
107 segment := &kcp.snd_buf[k]
108 needsend := false
109 if segment.xmit == 0 { // initial transmit
110 needsend = true
111 segment.rto = kcp.rx_rto
112 segment.resendts = current + segment.rto
113 } else if _itimediff(current, segment.resendts) >= 0 { // RTO
114 needsend = true
115 if kcp.nodelay == 0 {
116 segment.rto += kcp.rx_rto
117 } else {
118 segment.rto += kcp.rx_rto / 2
119 }
120 segment.resendts = current + segment.rto
121 lost++
122 lostSegs++
123 } else if segment.fastack >= resent { // fast retransmit
124 needsend = true
125 segment.fastack = 0
126 segment.rto = kcp.rx_rto
127 segment.resendts = current + segment.rto
128 change++
129 fastRetransSegs++
130 } else if segment.fastack > 0 && newSegsCount == 0 { // early retransmit
131 needsend = true
132 segment.fastack = 0
133 segment.rto = kcp.rx_rto
134 segment.resendts = current + segment.rto
135 change++
136 earlyRetransSegs++
137 }
138 if needsend {
139 segment.xmit++
140 segment.ts = current
141 segment.wnd = seg.wnd
142 segment.una = seg.una
143 size := len(buffer) - len(ptr)
144 need := IKCP_OVERHEAD + len(segment.data)
145 if size+need > int(kcp.mtu) {
146 kcp.output(buffer, size)
147 current = currentMs() // time update for a blocking call
148 ptr = buffer
149 }
150 ptr = segment.encode(ptr)
151 copy(ptr, segment.data)
152 ptr = ptr[len(segment.data):]
153 if segment.xmit >= kcp.dead_link {
154 kcp.state = 0xFFFFFFFF
155 }
156 }
157 }
158 // flash remain segments
159 size := len(buffer) - len(ptr)
160 if size > 0 {
161 kcp.output(buffer, size)
162 }
163 // counter updates
164 sum := lostSegs
165 if lostSegs > 0 {
166 atomic.AddUint64(&DefaultSnmp.LostSegs, lostSegs)
167 }
168 if fastRetransSegs > 0 {
169 atomic.AddUint64(&DefaultSnmp.FastRetransSegs, fastRetransSegs)
170 sum += fastRetransSegs
171 }
172 if earlyRetransSegs > 0 {
173 atomic.AddUint64(&DefaultSnmp.EarlyRetransSegs, earlyRetransSegs)
174 sum += earlyRetransSegs
175 }
176 if sum > 0 {
177 atomic.AddUint64(&DefaultSnmp.RetransSegs, sum)
178 }
179 // update ssthresh
180 // rate halving, https://tools.ietf.org/html/rfc6937
181 if change > 0 {
182 inflight := kcp.snd_nxt - kcp.snd_una
183 kcp.ssthresh = inflight / 2
184 if kcp.ssthresh < IKCP_THRESH_MIN {
185 kcp.ssthresh = IKCP_THRESH_MIN
186 }
187 kcp.cwnd = kcp.ssthresh + resent
188 kcp.incr = kcp.cwnd * kcp.mss
189 }
190 // congestion control, https://tools.ietf.org/html/rfc5681
191 if lost > 0 {
192 kcp.ssthresh = cwnd / 2
193 if kcp.ssthresh < IKCP_THRESH_MIN {
194 kcp.ssthresh = IKCP_THRESH_MIN
195 }
196 kcp.cwnd = 1
197 kcp.incr = kcp.mss
198 }
199 if kcp.cwnd < 1 {
200 kcp.cwnd = 1
201 kcp.incr = kcp.mss
202 }
203}
flush函数非常的重要,kcp的重要参数都是在调节这个函数的行为,这个函数只有一个参数ackOnly
,意思就是只发送ack,如果ackOnly
为true的话,该函数只遍历ack列表,然后发送,就完事了。 如果不是,也会发送真实数据。 在发送数据前先进行windSize探测,如果开启了拥塞控制nc=0
,则每次发送前检测服务端的winsize,如果服务端的winsize变小了,自身的winsize也要更着变小,来避免拥塞。如果没有开启拥塞控制,就按设置的winsize进行数据发送。
接着循环每个段数据,并判断每个段数据的是否该重发,还有什么时候重发:
1. 如果这个段数据首次发送,则直接发送数据。 2. 如果这个段数据的当前时间大于它自身重发的时间,也就是RTO,则重传消息。 3. 如果这个段数据的ack丢失累计超过resent次数,则重传,也就是快速重传机制。这个resent参数由resend
参数决定。 4. 如果这个段数据的ack有丢失且没有新的数据段,则触发ER,ER相关信息ER
最后通过kcp.output发送消息hello,output是个回调函数,函数的实体是sess.go
的:
1func (s *UDPSession) output(buf []byte) {
2 var ecc [][]byte
3 // extend buf's header space
4 ext := buf
5 if s.headerSize > 0 {
6 ext = s.ext[:s.headerSize+len(buf)]
7 copy(ext[s.headerSize:], buf)
8 }
9 // FEC stage
10 if s.fecEncoder != nil {
11 ecc = s.fecEncoder.Encode(ext)
12 }
13 // encryption stage
14 if s.block != nil {
15 io.ReadFull(rand.Reader, ext[:nonceSize])
16 checksum := crc32.ChecksumIEEE(ext[cryptHeaderSize:])
17 binary.LittleEndian.PutUint32(ext[nonceSize:], checksum)
18 s.block.Encrypt(ext, ext)
19 if ecc != nil {
20 for k := range ecc {
21 io.ReadFull(rand.Reader, ecc[k][:nonceSize])
22 checksum := crc32.ChecksumIEEE(ecc[k][cryptHeaderSize:])
23 binary.LittleEndian.PutUint32(ecc[k][nonceSize:], checksum)
24 s.block.Encrypt(ecc[k], ecc[k])
25 }
26 }
27 }
28 // WriteTo kernel
29 nbytes := 0
30 npkts := 0
31 // if mrand.Intn(100) < 50 {
32 for i := 0; i < s.dup+1; i++ {
33 if n, err := s.conn.WriteTo(ext, s.remote); err == nil {
34 nbytes += n
35 npkts++
36 }
37 }
38 // }
39 if ecc != nil {
40 for k := range ecc {
41 if n, err := s.conn.WriteTo(ecc[k], s.remote); err == nil {
42 nbytes += n
43 npkts++
44 }
45 }
46 }
47 atomic.AddUint64(&DefaultSnmp.OutPkts, uint64(npkts))
48 atomic.AddUint64(&DefaultSnmp.OutBytes, uint64(nbytes))
49}
output函数才是真正的将数据写入内核中,在写入之前先进行了fec编码,fec编码器的实现是用了一个开源库github.com/klauspost/reedsolomon,编码以后的hello就不是和原来的hello一样了,至少多了几个字节。 fec编码器有两个重要的参数reedsolomon.New(dataShards, parityShards, reedsolomon.WithMaxGoroutines(1)),dataShards
和parityShards
,这两个参数决定了fec的冗余度,冗余度越大抗丢包性就越强。
kcp的任务调度器
其实这里任务调度器是一个很简单的实现,用一个全局变量updater
来管理session,代码文件为updater.go
。其中最主要的函数
1func (h *updateHeap) updateTask() {
2 var timer <-chan time.Time
3 for {
4 select {
5 case <-timer:
6 case <-h.chWakeUp:
7 }
8 h.mu.Lock()
9 hlen := h.Len()
10 now := time.Now()
11 if hlen > 0 && now.After(h.entries[0].ts) {
12 for i := 0; i < hlen; i++ {
13 entry := heap.Pop(h).(entry)
14 if now.After(entry.ts) {
15 entry.ts = now.Add(entry.s.update())
16 heap.Push(h, entry)
17 } else {
18 heap.Push(h, entry)
19 break
20 }
21 }
22 }
23 if hlen > 0 {
24 timer = time.After(h.entries[0].ts.Sub(now))
25 }
26 h.mu.Unlock()
27 }
28}
任务调度器实现了一个堆结构,每当有新的连接,session都会插入到这个堆里,接着for循环每隔interval时间,遍历这个堆,得到entry
然后执行entry.s.update()
。而entry.s.update()
会执行s.kcp.flush(false)
来发送数据。
总结
这里简单介绍了kcp的整体流程,详细介绍了发送数据的流程,但未介绍kcp接收数据的流程,其实在客户端发送数据后,服务端是需要返回ack的,而客户端也需要根据返回的ack来判断数据段是否需要重传还是在队列里清除该数据段。处理返回来的ack是在函数kcp.Input()函数实现的。具体详细流程下次再介绍。
github:https://github.com/Golangltd/kcp-go
社区例子:
github:https://github.com/Golangltd/leafltd/tree/master/src/LollipopGoKCP
总目录:github:https://github.com/Golangltd
Recommend
About Joyk
Aggregate valuable and interesting links.
Joyk means Joy of geeK