7

实现Watch的Server Push与Client Poll方式

 1 year ago
source link: https://ninokop.github.io/2018/07/25/watch-push-or-pull/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

ServerPush & ClientPull

前两天总结了一下HTTP2相关的发展过程,其中最重要的两点是:实现了服务端Push和多路复用。本篇结合目前见过的一些实现Push&Pull的实例,总结一下数据交互的实现方式:

  1. etcd v2里的长轮询 long polling的方式
  2. k8s-apiserver的stream的方式
  3. service center和config center的websocket的方式
  4. gRPC也就是http2的server push方式

long polling

由于http1.x没有服务端push的机制,为了watch服务端的数据变化,最简单的办法当然是客户端去pull:客户端每隔定长时间去服务端拉数据同步,无论有没有服务端有没有数据变化。但是必然存在通知不及时和大量无效的轮询的问题。long polling就是在这个polling的基础上的优化,当客户端发起long polling时,如果服务端没有相关数据,会hold住请求,直到服务端有数据要发或者超时才会返回。

client

etcdv2是个比较典型的long polling的例子。下面是客户端keysAPI的代码,它通过Watcher接口返回一个实现了Next方法的实例,客户端通过循环调用Next获得所有服务端事件。

Watcher(key string, opts *WatcherOptions) Watcher

Next方法里client只是发了标记为wait的请求,通过统一的transport发到服务端。nexWait是用来生成请求体的,请求体的方法为GET,只是params带了wait字段,让服务端识别。

func (hw *httpWatcher) Next(ctx context.Context) (*Response, error) {
for {
httpresp, body, err := hw.client.Do(ctx, &hw.nextWait)
if err != nil {
return nil, err
}
resp, err := unmarshalHTTPResponse(httpresp.StatusCode,
httpresp.Header, body)
if err != nil {
if err == ErrEmptyBody {
continue
}
return nil, err
}
hw.nextWait.WaitIndex = resp.Node.ModifiedIndex + 1
return resp, nil
}
}

server

github.com/etcd/etcdserver/api/v2http/client.go

对应etcdv2的服务端keysHandler的处理过程是:调用etcdServer的Do方法,根据v2apistore的Get返回event或者watcher。如果请求中有wait字段,那么会返回一个kvStrore的watcher。

func (h *keysHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
resp, err := h.server.Do(ctx, rr)
switch {
case resp.Watcher != nil:
ctx, cancel := context.WithTimeout(
context.Background(), defaultWatchTimeout)
defer cancel()
handleKeyWatch(ctx, w, resp, rr.Stream)
...
}
}

在处理watch请求时,通常都是使用context设置超时时间,但是这里defaultWatchTimeout设置的是maxInt64,所以watch的超时是客户端决定的,当超时发生close连接,server通过CloseNotifier得到通知并放弃处理。

CloseNotifier Flusher

服务端首先把header flush到连接上,以免客户端等待header超时。之后等待内部kvstore的chan上有事件准备好,并发送。stream这个参数在etcdv2这个场景下为false,也就是long pollling获得数据即可以返回。

func handleKeyWatch(ctx context.Context, w http.ResponseWriter,
resp etcdserver.Response, stream bool) {
wa := resp.Watcher
defer wa.Remove()
ech := wa.EventChan()

w.Header().Set("Content-Type", "application/json")
w.Header().Set("X-Etcd-Index", fmt.Sprint(wa.StartIndex()))
w.Header().Set("X-Raft-Index", fmt.Sprint(resp.Index))
w.Header().Set("X-Raft-Term", fmt.Sprint(resp.Term))
w.WriteHeader(http.StatusOK)
// Ensure headers are flushed early, in case of long polling
w.(http.Flusher).Flush()
for {
select {
case <-nch: // CloseNotifier, Client closed connection. Nothing to do.
return
case <-ctx.Done(): // Timed out.
return
case ev, ok := <-ech:
if !ok {
return
}
ev = trimEventPrefix(ev, etcdserver.StoreKeysPrefix)
if err := json.NewEncoder(w).Encode(ev); err != nil {
plog.Warningf("error writing event (%v)", err)
return
}
if !stream {
return
}
w.(http.Flusher).Flush()
}
}
}

streaming

stream是要在同一个连接上,分多个部分发送HTTP响应。一般HTTP的响应中发送的数据是整个发送,并且通过Content-Length消息头字段表示数据的长度。如果分多块传输,需要另外的编码方式,于是Chunked编码(分块传输编码)引入到了HTTP1.1协议中。它允许HTTP服务端动态生成内容,消息体由数量未定的块组成,并且以最后一个大小为0的块结束。

HTTP/1.1 200 OK
Content-Type: text/plain
Transfer-Encoding: chunked
25
This is the data in the first chunk
1C
and this is the second one
3
con
8
sequence
0

server & serveWatch

k8s的服务端watch接口是通过etcd的watch接口实现的长连接方式。最终注册到go-restful的Watch路由,对应GET方法和ListResource这个handlerFunc。

k8s.io/apiserver/pkg/endpoints

func ListResource(r rest.Lister, rw rest.Watcher, scope RequestScope,
forceWatch bool, minRequestTimeout time.Duration) http.HandlerFunc {
return func(w http.ResponseWriter, req *http.Request) {
if opts.Watch || forceWatch {
if rw == nil {
return
}
timeout := time.Duration(0)
if opts.TimeoutSeconds != nil {
timeout = time.Duration(*opts.TimeoutSeconds) * time.Second
}
watcher, err := rw.Watch(ctx, &opts)
if err != nil {
scope.err(err, w, req)
return
}
serveWatch(watcher, scope, req, w, timeout)
return
}
}
}

其中watcher是内部storage通过etcd的watch接口封装的返回事件的chan。serveWatch就是在处理这个内部chan,并把chan上发生的事件通过chunk编码发给客户端。这个循环可能因为客户端close连接或超时而结束。

func serveWatch(watcher watch.Interface, scope RequestScope,
req *http.Request, w http.ResponseWriter, timeout time.Duration) {
// negotiate for the stream serializer ...
server := &WatchServer{
Watching: watcher,
Scope: scope,
UseTextFraming: useTextFraming,
MediaType: mediaType + ";stream=watch",
Framer: framer,
Encoder: encoder,
EmbeddedEncoder: embeddedEncoder,
Fixup: func(obj runtime.Object) {},
TimeoutFactory: &realTimeoutFactory{timeout},
}
server.ServeHTTP(w, req)
}
func (s *WatchServer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
// if isWebSocketRequest
cn, ok1 := w.(http.CloseNotifier)
flusher, ok2 := w.(http.Flusher)
if !ok1 || !ok2 {
return
}

framer := s.Framer.NewFrameWriter(w)
e := streaming.NewEncoder(framer, s.Encoder)
timeoutCh, cleanup := s.TimeoutFactory.TimeoutCh()

w.Header().Set("Content-Type", s.MediaType)
w.Header().Set("Transfer-Encoding", "chunked")
w.WriteHeader(http.StatusOK)
flusher.Flush()

buf := &bytes.Buffer{}
ch := s.Watching.ResultChan()
for {
select {
case <-cn.CloseNotify():
return
case <-timeoutCh:
return
case event, ok := <-ch:
if !ok {
return
}
obj := event.Object
s.EmbeddedEncoder.Encode(obj, buf)

unknown.Raw = buf.Bytes()
event.Object = &unknown
metav1.Convert_versioned_InternalEvent_to_versioned_Event(
metav1.InternalEvent(event),
&metav1.WatchEvent{}, nil)

e.Encode(outEvent)
if len(ch) == 0 {
flusher.Flush()
}
buf.Reset()
}
}
}

registry & storage

这一节本来跟stream没有关系,但它是对etcd的watch的封装所以还是记一下。

上面内部watcher是rest.StandardStorage接口,它是以下所有接口的组合。它的实现registry.Store提供了N个函数挂载点,对所有资源类型提供了统一的实现。比如每种资源都实现了NewFunc和KeyFunc,Store统一实现Creater接口实现对每种资源的创建,并最终调用storage包面向etcd的接口实现到后端数据库的持久化。

type StandardStorage interface {
Getter
Lister
CreaterUpdater
GracefulDeleter
CollectionDeleter
Watcher
}

比如store封装的watch接口最终到storage里面向etcd的watch接口。

func (e *Store) Watch(ctx context.Context, 
options *metainternalversion.ListOptions) (watch.Interface, error) {
predicate := e.PredicateFunc(labels.Everything(), fields.Everything())
return e.WatchPredicate(ctx, predicate, options.ResourceVersion)
}
func (e *Store) WatchPredicate(ctx context.Context,
p storage.SelectionPredicate, resourceVersion string)
(watch.Interface, error) {
...
w, err := e.Storage.WatchList(ctx, e.KeyRootFunc(ctx), resourceVersion, p)
return w, nil
}

etcdHelper这个包封装了etcdv2的接口,最终是通过循环处理Watcher.Next来实现内部事件的产生。这个过程还涉及到storage的watch cache。详细过程下一次写watch cache再写吧。

func (h *etcdHelper) WatchList(ctx context.Context, key string, 
resourceVersion string, pred storage.SelectionPredicate) (watch.Interface, error) {
key = path.Join(h.pathPrefix, key)
w := newEtcdWatcher(true, h.quorum, exceptKey(key), pred,
h.codec, h.versioner, nil, h.transformer, h)
go w.etcdWatch(ctx, h.etcdKeysAPI, key, resourceVersion)
return w, nil
}
func (w *etcdWatcher) etcdWatch(ctx context.Context, client etcd.KeysAPI, 
key string, resourceVersion uint64) {
watcher := client.Watcher(key, &opts)
w.ctx, w.cancel = context.WithCancel(ctx)
for {
resp, err := watcher.Next(w.ctx)
w.etcdIncoming <- resp
}
}

client

客户端通过Do获取到服务端的第一个Header响应。最后通过StreamWatcher封装好watch的ResultChan接口,它从连接上decoder反序列化数据由streamWatcher封装好返回。

func (r *Request) Watch() (watch.Interface, error) {
url := r.URL().String()
req, err := http.NewRequest(r.verb, url, r.body)
req.Header = r.headers
client := r.client

resp, err := client.Do(req)
if resp.StatusCode != http.StatusOK {
defer resp.Body.Close()
return nil, fmt.Errorf("got status: %v", resp.StatusCode)
}
framer := r.serializers.Framer.NewFrameReader(resp.Body)
decoder := streaming.NewDecoder(framer, r.serializers.StreamingSerializer)
return watch.NewStreamWatcher(
restclientwatch.NewDecoder(decoder, r.serializers.Decoder)), nil
}

这个接口最常见的地方是在reflectorlistWatch当中, 在watch循环中通常客户端会指定超时时间5分钟,好让服务端知道什么时候超时结束。

for {
timeoutSeconds:=int64(minWatchTimeout.Seconds()*(rand.Float64()+1.0))
options = metav1.ListOptions{
ResourceVersion: resourceVersion,
TimeoutSeconds: &timeoutSeconds,
}
w, err := r.listerWatcher.Watch(options)
if err != nil {
switch err {
case io.EOF:// watch closed normally
case io.ErrUnexpectedEOF:
}
return nil
}
r.watchHandler(w, &resourceVersion, resyncerrc, stopCh)
}

web-socket

上面的两种方式,其实都非常浪费资源。长轮询必须不停连接,长连接必须保持HTTP连接始终打开。websocket就是另一种解决服务端push的方法。简单来说,它是建立在TCP协议之上的ws协议,它跟HTTP协议有良好的兼容性,数据格式比较轻量,可以发送文本,也可以二进制。

WebSocket复用了HTTP一部分握手过程。客户端通过HTTP请求与WebSocket服务端协商要求升级协议。协议升级完成后,后续的数据交换则遵照WebSocket的协议。以下是客户端发出的请求。

GET / HTTP/1.1
Host: localhost:8080
Origin: http://127.0.0.1:3000
Connection: Upgrade
Upgrade: websocket
Sec-WebSocket-Version: 13
Sec-WebSocket-Key: w4v7O6xFTi36lq3RNcgctw==

服务端回应101表示切换协议。具体协议参考WebSocket Protocol

HTTP/1.1 101 Switching Protocols
Connection:Upgrade
Upgrade: websocket
Sec-WebSocket-Accept: Oy4NRAQ13jhfONC7bP8dTKb4PTU=

连接建立并协议升级后,双方的通信进入web-socket协议,它有以下特点:

  1. 是真正的全双工方式,可以互相主动请求。
  2. 在已经建立好的TCP连接中,交换数据不需要再发送和解析HTTP header。
  3. 可以利用协议头的sec-websocket-key来进行连接复用,不同的URL可以复用同一个连接。

upgrade

web-socket的服务端首先要完成协商协议升级的事情,且后续Handler的处理不用再经过httpServer的请求解析,然后路由的部分,仍然再已建立的httpconn上完成后续的信息交互。

func echo(w http.ResponseWriter, r *http.Request) {
c, err := websocket.Upgrader{}.Upgrade(w, r, nil)
defer c.Close()
for {
mt, message, err := c.ReadMessage()
err = c.WriteMessage(mt, message)
}
}

协议升级其实只是校验方法是否为GET,请求头是否有对应的升级标记。从http的Hijacker中获取原始netConn,并且回复server端的101协议升级信息。

func (u *Upgrader) Upgrade(w http.ResponseWriter, r *http.Request, 
responseHeader http.Header) (*Conn, error) {
...

challengeKey := r.Header.Get("Sec-Websocket-Key")
h, ok := w.(http.Hijacker)
netConn, rw, err = h.Hijack()
br = rw.Reader

c := newConn(netConn, true, u.ReadBufferSize, u.WriteBufferSize)
c.subprotocol = subprotocol

p := c.writeBuf[:0]
p = append(p, "HTTP/1.1 101 Switching Protocols\r\nUpgrade: \
websocket\r\nConnection: Upgrade\r\nSec-WebSocket-Accept: "...)
p = append(p, computeAcceptKey(challengeKey)...)
p = append(p, "\r\n"...)
if c.subprotocol != "" {
p = append(p, "Sec-Websocket-Protocol: "...)
p = append(p, c.subprotocol...)
p = append(p, "\r\n"...)
}

// Clear deadlines set by HTTP server.
netConn.SetDeadline(time.Time{})
if u.HandshakeTimeout > 0 {
netConn.SetWriteDeadline(time.Now().Add(u.HandshakeTimeout))
}

if _, err = netConn.Write(p); err != nil {
netConn.Close()
return nil, err
}
return c, nil
}

keepalive

websocket的连接无法感知对方是否关闭了连接,所以协议层设计了Ping-Pang来做keepalive。每隔7s秒发一次PingMessage,收到PongMessage就更新lastResponseTime,如果超过15s都没有PongMessage的回应,则断开websocket连接。

func (dynHandler *DynamicConfigHandler) startDynamicConfigHandler() error {
if dynHandler != nil && dynHandler.wsDialer != nil {
dynHandler.wsConnection, _, err = dynHandler.wsDialer.Dial(url, nil)
keepAlive(dynHandler.wsConnection, 15*time.Second)
go func() error {
for {
messageType, message, err := dynHandler.wsConnection.ReadMessage()
if err != nil {
break
}
if messageType == websocket.TextMessage {
dynHandler.EventHandler.OnReceive(message)
}
}
return dynHandler.wsConnection.Close()
}()
}
return nil
}
func keepAlive(c *websocket.Conn, timeout time.Duration) {
lastResponse := time.Now()
c.SetPongHandler(func(msg string) error {
lastResponse = time.Now()
return nil
})
go func() {
for {
err := c.WriteMessage(websocket.PingMessage, []byte("keepalive"))
if err != nil {
return
}
time.Sleep(timeout / 2)
if time.Now().Sub(lastResponse) > timeout {
c.Close()
return
}
}
}()
}

http2

http2的writer实现了Pusher接口,通过push可以把消息发给http2conn内部维护的wantStartPushCh,在conn的serve过程中分发这个msg开始startPush。最终是发了pushPromiseFrame,具体的过程下次写http2实现的时候再说吧。

http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
if push, ok := w.(http.Pusher); ok {
if err := pusher.Push("/app.js", nil); err != nil {
log.Printf("Failed to push: %v", err)
}
}
})

这里只解释一个问题:为什么http2没有改变语义、方法、状态码和URI、以及首部字段,它也没有使用分块传输编码,它如何实现push和多路复用,改进了传输性能。

应用层和传输层之间增加了一个二进制分帧层。

frame

HTTP2将要传输的信息分割成更小的消息和帧,并对它们采用二进制格式的编码。原来的HTTP header和HTTP body都以Frame Payload存在。每个Frame的FrameHeader描述了这个帧的长度,类型等信息。

  1. Length: Frame Payload 的长度, Frame Header 的长度是 9 字节(Length + Type + Flags + R + Stream Identifier = 72 bit)。
  2. Type: Frame Payload 存储的数据是属于 HTTP Header 还是 HTTP Body
  3. Flags: 共 8 位, 每位都起标记作用。每种不同的 Frame Type 都有不同的 Frame Flags。例如发送最后一个 DATA 类型的 Frame 时,就会将 Flags 最后一位设置 1(flags &= 0x01),表示 END_STREAM,说明这个 Frame 是流的最后一个数据包。
  4. Stream Identifier: 流 ID,当客户端和服务端建立 TCP 链接时,就会先发送一个 Stream ID = 0 的流,用来做些初始化工作。之后客户端和服务端从 1 开始发送请求/响应。
+-----------------------------------------------+
| Length (24) |
+---------------+---------------+---------------+
| Type (8) | Flags (8) |
+-+-------------+---------------+-------------------+
|R| Stream Identifier (31) |
+=+=================================================+
| Frame Payload (0...) ...
+---------------------------------------------------+

理解:HTTP1.x客户端解析读取响应是根据响应头的Content-Len读取body体,然后返回。为了让它流式读取,server端要在头里告诉client 现在要变更编码方式为chunked,之后进行分块传输,直到server端发了大小为0的数据。

HTTP2引入Frame了之后完全改变了原来的编解码方式,整个方式类似很多RPC协议。帧由二进制编码,帧头固定位置的字节描述body长度,就可以读取body体,直到flags遇到END_STREAM。这种方式天然支持服务端在stream上发送数据,不需要通知客户端做什么改变。

其次streamID这个是用来做多路复用的,跟许多RPC协议里的msgID是一个意思。

Reference


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK