6

traefik 设计实现之 http、http2 和 grpc 代理

 1 year ago
source link: https://xiaorui.cc/archives/7300
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

traefik 设计实现之 http、http2 和 grpc 代理

https://doc.traefik.io/traefik/assets/img/traefik-architecture.png

traefik 对于 http、http2、http3 的反向代理,没有专门去造轮子解决拆包及请求转发,而是引用 golang 标准库中 httputil.ReveryProxy 实现反向代理.

另外,还分析社区中其他 golang 实现网关的源码,也多使用 httputil.ReveryProxy 实现.

连接池管理器 RoundTripperManager

RoundTripperManager 作用

RoundTripper 可以理解为一个连接池管理器,在连接池里可以配置连接的读写超时,空闲超时,池子大小,缓冲大小等,一个连接池是可以管理多个后端主机. 像 golang net/http client 标准库,如不专门配置 transport, 那么就使用 http 库包中定义的默认 Transport.

源码位置 net/http/transport.go

func (c *Client) transport() RoundTripper {
    if c.Transport != nil {
        return c.Transport
    }
    return DefaultTransport
}

var DefaultTransport RoundTripper = &Transport{
    DialContext: defaultTransportDialContext(&net.Dialer{
        Timeout:   30 * time.Second,
        KeepAlive: 30 * time.Second,
    }),
    ForceAttemptHTTP2:     true,
    MaxIdleConns:          100,
    IdleConnTimeout:       90 * time.Second,
    TLSHandshakeTimeout:   10 * time.Second,
    ExpectContinueTimeout: 1 * time.Second,
}

那么 RoundTripperManager 顾名思义就是用来管理多个 http.RoundTripper 对象, 通过 createRoundTripper() 创建 http 和 http2 transport 连接池对象,http.Transport 不仅可以配置连接池,读写超时等常规配置,还可以配置 TLS 证书.

另外 http.Transport 可以在 entrypoints 里配置,然后在 services 里使用对应的 entrypoints 的连接池,另外也可以配置专门的 serversTransport 配置项.

https://doc.traefik.io/traefik/routing/services/#serverstransport_1

连接池配置

entrypoints 配置

entryPoints:
  xiaorui-http:
    address: ":8888"
    transport:
      lifeCycle:
        requestAcceptGraceTimeout: 42
        graceTimeOut: 42
      respondingTimeouts:
        readTimeout: 42
        writeTimeout: 42
        idleTimeout: 42

serversTransports 配置

http:
  serversTransports:
    mytransport:
      maxIdleConnsPerHost: 7
      certificates:
        - certFile: foo.crt
          keyFile: bar.crt
      forwardingTimeouts:
        dialTimeout: "1s"
        idleConnTimeout: "1s"

分析 RoundTripperManager 主要代码:

Get

根据 name 获取 http.RoundTripper 连接池, traefik 会把 roundTripperManager 传递给 servcies 的代理层,当进行请求转发时,通过 Get() 获取对应的 roundTripper 进行代理转发.

createRoundTripper

根据配置创建 http.RoundTripper 对象

Update

更新连接池配置,这里可以跟 provider 动态配置发现关联,在启动阶段进行监听,当配置更新时进行 Update 更新配置,更新时为了避免影响当前的请求,所以直接加锁替换原来的连接池,旧的连接池当不再引用时,等待空闲超时来回收连接.

源代码位置: pkg/server/service/roundtripper.go

type RoundTripperManager struct {
    rtLock        sync.RWMutex
    roundTrippers map[string]http.RoundTripper
    configs       map[string]*dynamic.ServersTransport

    spiffeX509Source SpiffeX509Source
}

// 根据 name 获取 http.RundTriper 配置
func (r *RoundTripperManager) Get(name string) (http.RoundTripper, error) {
    r.rtLock.RLock()
    defer r.rtLock.RUnlock()

    if rt, ok := r.roundTrippers[name]; ok {
        return rt, nil
    }

    return nil, fmt.Errorf("servers transport not found %s", name)
}

// 根据配置创建 http.RoundTripper 对象
func (r *RoundTripperManager) createRoundTripper(cfg *dynamic.ServersTransport) (http.RoundTripper, error) {
    dialer := &net.Dialer{
        Timeout:   30 * time.Second,
        KeepAlive: 30 * time.Second,
    }

    transport := &http.Transport{
        DialContext:           dialer.DialContext,
        MaxIdleConnsPerHost:   cfg.MaxIdleConnsPerHost,
        IdleConnTimeout:       90 * time.Second,
        TLSHandshakeTimeout:   10 * time.Second,
        ExpectContinueTimeout: 1 * time.Second,
        ReadBufferSize:        64 * 1024,
        WriteBufferSize:       64 * 1024,
    }

    if cfg.InsecureSkipVerify || len(cfg.RootCAs) > 0 || len(cfg.ServerName) > 0 || len(cfg.Certificates) > 0 || cfg.PeerCertURI != "" {
        transport.TLSClientConfig = &tls.Config{
            ServerName:         cfg.ServerName,
            InsecureSkipVerify: cfg.InsecureSkipVerify,
            RootCAs:            createRootCACertPool(cfg.RootCAs),
            Certificates:       cfg.Certificates.GetCertificates(),
        }
        }
    }

    if cfg.DisableHTTP2 {
        return transport, nil
    }

    return newSmartRoundTripper(transport, cfg.ForwardingTimeouts)
}

// 更新连接池配置,这里可以跟 provider 动态配置发现关联,在启动阶段进行监听,当配置更新时进行 Update 更新配置,更新时为了避免影响当前的请求,所以直接加锁替换原来的连接池,旧的连接池当不再引用时,等待空闲超时来回收连接.
func (r *RoundTripperManager) Update(newConfigs map[string]*dynamic.ServersTransport) {
    r.rtLock.Lock()
    defer r.rtLock.Unlock()

    for configName, config := range r.configs {
        newConfig, ok := newConfigs[configName]
        if !ok {
            delete(r.configs, configName)
            delete(r.roundTrippers, configName)
            continue
        }

        if reflect.DeepEqual(newConfig, config) {
            continue
        }

        var err error
        r.roundTrippers[configName], err = r.createRoundTripper(newConfig)
        if err != nil {
            log.Error().Err(err).Msgf("Could not configure HTTP Transport %s, fallback on default transport", configName)
            r.roundTrippers[configName] = http.DefaultTransport
        }
    }

    for newConfigName, newConfig := range newConfigs {
        if _, ok := r.configs[newConfigName]; ok {
            continue
        }

        var err error
        r.roundTrippers[newConfigName], err = r.createRoundTripper(newConfig)
        if err != nil {
            log.Error().Err(err).Msgf("Could not configure HTTP Transport %s, fallback on default transport", newConfigName)
            r.roundTrippers[newConfigName] = http.DefaultTransport
        }
    }

    r.configs = newConfigs
}

那么 grpc 和 http2 是如何进行转发的 ?

202212041134793.png

grpc 的网络协议也是 http2,http2 的 transport 是通过 newSmartRoundTripper 创建. 本质是 golang 扩展包 golang.org/x/net/http2 里有 http2 的支持.

代码位置: pkg/server/service/smart_roundtripper.go

func newSmartRoundTripper(transport *http.Transport, forwardingTimeouts *dynamic.ForwardingTimeouts) (http.RoundTripper, error) {
    transportHTTP1 := transport.Clone()

    transportHTTP2, err := http2.ConfigureTransports(transport)
    if err != nil {
        return nil, err
    }

    transportH2C := &h2cTransportWrapper{
        Transport: &http2.Transport{
            DialTLS: func(network, addr string, cfg *tls.Config) (net.Conn, error) {
                return net.Dial(network, addr)
            },
            AllowHTTP: true,
        },
    }

    if forwardingTimeouts != nil {
        transportH2C.ReadIdleTimeout = time.Duration(forwardingTimeouts.ReadIdleTimeout)
        transportH2C.PingTimeout = time.Duration(forwardingTimeouts.PingTimeout)
    }

    transport.RegisterProtocol("h2c", transportH2C)

    return &smartRoundTripper{
        http2: transport,
        http:  transportHTTP1,
    }, nil
}

HTTP 和 HTTP2代理

为 services 的主机列表遍历实例化反向代理对象,并生成负载均衡器, 值得注意的是,不管是 loadbalancer 和 reverseproxy 都是 http.Handler 接口,而且 router 也实现了对应的 http.Handler 接口,这样关联到 entryPoints 入口层,会先调用 routers 的 rules ,继而调用 servcies 对应的 loadbalancer,最后根据调度算法找到对应的 proxy 对象完成请求转发.

202212031722503.png

Servcies Manager 管理器

实例化 Manager 对象时,需传入 servcies 配置和 roundTripper 管理器,由 routers 层调用其 BuildHTTP 来进行初始化 service 对应的 loadBalaner 均衡器, 这里 BuildHTTP() 返回的对象是 http.Handler.

pkg/server/service/service.go

func NewManager(configs map[string]*runtime.ServiceInfo, metricsRegistry metrics.Registry, routinePool *safe.Pool, roundTripperManager RoundTripperGetter) *Manager {
    return &Manager{
        routinePool:         routinePool,
        roundTripperManager: roundTripperManager,
        services:            make(map[string]http.Handler),
        configs:             configs,
        healthCheckers:      make(map[string]*healthcheck.ServiceHealthChecker),
    }
}

// BuildHTTP Creates a http.Handler for a service configuration.
func (m *Manager) BuildHTTP(rootCtx context.Context, serviceName string) (http.Handler, error) {
    serviceName = provider.GetQualifiedName(ctx, serviceName)
    handler, ok := m.services[serviceName]
    if ok {
        return handler, nil
    }

    conf, ok := m.configs[serviceName]
    if !ok {
        return nil, fmt.Errorf("the service %q does not exist", serviceName)
    }

    ...

    var lb http.Handler

    switch {
    case conf.LoadBalancer != nil:
        lb, err = m.getLoadBalancerServiceHandler(ctx, serviceName, conf)
    case conf.Weighted != nil:
        lb, err = m.getWRRServiceHandler(ctx, serviceName, conf.Weighted)
    case conf.Mirroring != nil:
        lb, err = m.getMirrorServiceHandler(ctx, conf.Mirroring)
    ...
    }

    m.services[serviceName] = lb

    return lb, nil
}

构建loadbalancer及反向代理

RoundTripperManager里获取对应的 roundTripper 连接池,遍历后端主机列表,并用原生库 buildSingleHostProxy -> httputil.ReverseProxy 对每个主机生成代理对象,最后使用这些代理对象构建负载均衡器. 不管是负载均衡器及内部的反向代理对象都实现 http.Handler 接口.

pkg/server/service/service.go

func (m *Manager) getLoadBalancerServiceHandler(ctx context.Context, serviceName string, info *runtime.ServiceInfo) (http.Handler, error) {
    service := info.LoadBalancer

    // 设置 header
    passHostHeader := dynamic.DefaultPassHostHeader
    if service.PassHostHeader != nil {
        passHostHeader = *service.PassHostHeader
    }

    // 从 roundTripper 获取连接池
    roundTripper, err := m.roundTripperManager.Get(service.ServersTransport)
    if err != nil {
        return nil, err
    }

  // 实例化负载均衡器
    lb := wrr.New(service.Sticky, service.HealthCheck != nil)

  // 随机洗牌
    for _, server := range shuffle(service.Servers, m.rand) {
        target, err := url.Parse(server.URL)
        if err != nil {
            return nil, fmt.Errorf("error parsing server URL %s: %w", server.URL, err)
        }

    // 创建反向代理对象
        proxy := buildSingleHostProxy(target, passHostHeader, time.Duration(flushInterval), roundTripper, m.bufferPool)

    // 把对象加入到 lb 里
        lb.Add(proxyName, proxy, nil)
    }

    return lb, nil
}

借助 httputil.ReverseProxy 实现反向代理,只需传入转发目标的 url 和连接池就可以创建代理. 看似简单粗暴,深思又是大道至简. 以前写过 httputil.ReverseProxy 的源码解析,这里就不再复述了.

func buildSingleHostProxy(target *url.URL, passHostHeader bool, flushInterval time.Duration, roundTripper http.RoundTripper, bufferPool httputil.BufferPool) http.Handler {
    return &httputil.ReverseProxy{
        Director:      directorBuilder(target, passHostHeader),
        Transport:     roundTripper,
        FlushInterval: flushInterval,
        BufferPool:    bufferPool,
        ErrorHandler:  errorHandler,
    }
}

在构建 loadbalancer 的时候, 不仅会实例化后端主机的反向代理, 也会对该 lb 创建一个 healthChecker 健康检查器.

type Manager struct {
    ...
    healthCheckers map[string]*healthcheck.ServiceHealthChecker
}

// LaunchHealthCheck launches the health checks.
func (m *Manager) LaunchHealthCheck(ctx context.Context) {
    for serviceName, hc := range m.healthCheckers {
        logger := log.Ctx(ctx).With().Str(logs.ServiceName, serviceName).Logger()
        go hc.Launch(logger.WithContext(ctx))
    }
}

ServiceHealthChecker 代码位置: pkg/healthcheck/healthcheck.go

func (shc *ServiceHealthChecker) Launch(ctx context.Context) {
    ticker := time.NewTicker(shc.interval) // 默认 30s
    defer ticker.Stop()

    for {
        select {
        case <-ctx.Done():
            return

        case <-ticker.C:
            // 遍历后端主机实例,当发现健康失败时, 更新 lb 状态.
            for proxyName, target := range shc.targets {
                up := true
                if err := shc.executeHealthCheck(ctx, shc.config, target); err != nil {
                    up = false
                }

                shc.balancer.SetStatus(ctx, proxyName, up)

                statusStr := runtime.StatusDown
                if up {
                    statusStr = runtime.StatusUp
                }
                shc.info.UpdateServerStatus(target.String(), statusStr)
            }
        }
    }
}

健康检查支持两种模式:

  • http 模式,判断返回的 code 是否是 200
  • grpc 模式,需要 grpc server 引入 health 包,这样 grpc.client 才可根据 resp.Status 子弹判断是否正常.
func (shc *ServiceHealthChecker) executeHealthCheck(ctx context.Context, config *dynamic.ServerHealthCheck, target *url.URL) error {
    if config.Mode == modeGRPC {
        return shc.checkHealthGRPC(ctx, target)
    }
    return shc.checkHealthHTTP(ctx, target)
}

checkHealthHTTP 和 checkHealthGRPC 实现没什么好说的.


大家觉得文章对你有些作用! 如果想赏钱,可以用微信扫描下面的二维码,感谢!
另外再次标注博客原地址  xiaorui.cc
weixin_new.jpg

Golang traefik grpc, traefik http 代理, traefik http2, traefik 健康检查, traefik 源码 Bookmark


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK