37

Go实现简单负载均衡

 4 years ago
source link: http://mp.weixin.qq.com/s?__biz=MzUxMTcwOTM4Mg%3D%3D&%3Bmid=2247486144&%3Bidx=1&%3Bsn=44db0717e590ca07735b42c37f974d37
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

奇技指南

今天小编为大家分享一篇关于Go实现简单的负载均衡器的文章,只是对负载均衡进行了基础的功能实现,有助于对负载均衡的理解。如果有兴趣,也可以以此为基础进行功能扩展,希望能对大家有所帮助。

本文转载自360云计算

负载均衡在Web体系架构中一直是扮演着非常关键的角色。它可以实现在一组后端服务之间进行负载分配,从而增加服务的可扩展性。由于配置了多个后端服务,因此可以提高服务的可用性。在发生故障的时,负载均衡器可以帮我们把请求转发到正常运行的节点上。

在使用过像 Nginx 之类的专业的负载均衡之后,为了加深对负载均衡的原理理解,本次我们使用 Golang 也来实现一个简单的负载均衡。

1

工作原理

均衡器可以使用不同的分配策略来分发请求到后端服务。

例如:

  • 轮询 - 平均的分配负载,假定所有后端服务具有相同的处理能力

  • 加权轮询 - 根据后端服务的处理能力,可以赋予相应权重

  • 最少连接 - 负载分配到活跃连接最少的服务器上

对于我们将要实现的简单负载均衡,我们首先尝试实现这些方式中最简单的一种,即轮询方式。

niqUVzi.png!web

轮询在实现方面非常简单,它以均等的机会让后端服务轮流执行请求任务。

aQBzyiB.png!web

如图所示,请求周期性的轮流转发给后端服务。但是我们不能直接这样简单来实现,需要考虑其他因素。

如果后端服务宕机了怎么办?我们肯定不想把流量转发到这台挂掉的节点上。因此,除非有附加条件,否则不能直接轮流转发负载。我们需要将流量仅路由到已启动并正常运行的后端服务节点上。

2

数据结构

思路梳理完成,现在我们需要一种方法,来跟踪关于后端的所有详细信息。我们需要知道它是否存活,同时还需要跟踪 URL。

可以简单地定义下面的结构来保存后端服务信息。

type Backend struct {

URL *url.URL

Alive bool

mux sync.RWMutex

ReverseProxy *httputil.ReverseProxy

}

同时还需要另外一种方法,来跟踪我们负载均衡中所有的后端服务。为此,我们可以直接使用切片及一个计数器来实现。

type ServerPool struct {

backends []*Backend

current uint64

}

3

ReverseProxy

如我们上面确定,负载均衡的唯一目的就是将流量路由到不同的后端服务,并将结果返回给原始请求端。

Go 的文档中有相关的描述:

ReverseProxy 是一个HTTP处理程序,它接收传入的请求并将其发送到另一台服务器,并将响应代理回客户端。

这正是我们想要的。所以不需要重复造轮子了。我们可以简单的使用它来转发我们的请求。

u, _ := url.Parse("http://localhost:8080")

rp := httputil.NewSingleHostReverseProxy(u)

// initialize your server and add this as handler

http.HandlerFunc(rp.ServeHTTP)

使用 httputil.NewSingleHostReverseProxy(url) 我们可以初始化一个反向代理,它将请求转发给传入的 url。在上面的示例中,所有请求都将转发到 localhost:8080 上,并将结果返回给原始请求端。

如果我们看一下 ServeHTTP 方法签名,会发现它具有 HTTP 处理程序的签名,这就是为什么我们可以将其传递给 http 中的 HandlerFunc 的原因。

对于我们简单的负载均衡实现,我们可以使用后端服务的 URL 来初始化 ReverseProxy,以便 ReverseProxy 将请求发送到URL。

4

选择过程

在下一个轮询选择中,我们需要跳过挂掉的后端服务。这里我们需要一种计数方法。

多个客户端将连接到负载均衡中,并且当每个客户端请求下一个连接时,都有可能会发生竞争情况。所以为了防止这种情况,我们可以使用 mutex 来锁定 ServerPool。但这将是一个过大的杀伤力,我们原本不想锁定 ServerPool。而只想将计数器增加一。

为了满足该要求,理想的解决方案是原子的增加。可以通过 Go 的原子包来实现。

func (s *ServerPool) NextIndex() int {

return int(atomic.AddUint64(&s.current, uint64(1)) % uint64(len(s.backends)))

}

这里,我们以原子方式将当前值进行加一,并通过修改切片的长度来返回索引。这意味着该值将始终在 0 到切片的长度之间。但是,我们只关心索引,而不是总数。

5

取回一个存活后端

GetNext() 始终返回一个介于 0 和切片长度之间的值。在任何时候,我们都会得到下一个端点,如果它不存在,我们将循环搜索整个切片。

zqEVFzi.png!web

如上图所示,我们想从整个列表的下一个元素开始遍历,这可以简单地通过遍历 next + length 来完成。但是要选择一个索引,我们希望将其限制在切片长度之间。 通过修改可以轻松完成此操作。

通过搜索找到可用后端,将其标记为当前后端。

// GetNextPeer returns next active peer to take a connection

func (s *ServerPool) GetNextPeer() *Backend {

// loop entire backends to find out an Alive backend

next := s.NextIndex()

l := len(s.backends) + next // start from next and move a full cycle

for i := next; i < l; i++ {

idx := i % len(s.backends) // take an index by modding with length

// if we have an alive backend, use it and store if its not the original one

if s.backends[idx].IsAlive() {

if i != next {

atomic.StoreUint64(&s.current, uint64(idx)) // mark the current one

}

return s.backends[idx]

}

}

return nil

}

6

处理竞争

有一个严重的问题,我们的 Backend 数据结构中有个变量 Alive,可以由不同的 goroutine 同时修改或访问。

我们知道,将会有更多的 goroutine 从中读取而不是写入。因此,我们使用RWMutex 来序列化对变量 Alive 的访问。

// SetAlive for this backend

func (b *Backend) SetAlive(alive bool) {

b.mux.Lock()

b.Alive = alive

b.mux.Unlock()

}


// IsAlive returns true when backend is alive

func (b *Backend) IsAlive() (alive bool) {

b.mux.RLock()

alive = b.Alive

b.mux.RUnlock()

return

}

7

请求处理

我们可以实现下面简单的方法来负载均衡请求。

// lb load balances the incoming request

func lb(w http.ResponseWriter, r *http.Request) {

peer := serverPool.GetNextPeer()

if peer != nil {

peer.ReverseProxy.ServeHTTP(w, r)

return

}

http.Error(w, "Service not available", http.StatusServiceUnavailable)

}

方法可以简单地作为 HandlerFunc 传递给 http 服务器。

server := http.Server{

Addr: fmt.Sprintf(":%d", port),

Handler: http.HandlerFunc(lb),

}

8

只路由到健康节点

现在 lb 有一个很严重的问题,我们不知道哪一个后端节点是健康的。所以还需要一个后端服务健康检查。

proxy.ErrorHandler = func(writer http.ResponseWriter, request *http.Request, e error) {

log.Printf("[%s] %s\n", serverUrl.Host, e.Error())

retries := GetRetryFromContext(request)

if retries < 3 {

select {

case <-time.After(10 * time.Millisecond):

ctx := context.WithValue(request.Context(), Retry, retries+1)

proxy.ServeHTTP(writer, request.WithContext(ctx))

}

return

}


// after 3 retries, mark this backend as down

serverPool.MarkBackendStatus(serverUrl, false)


// if the same request routing for few attempts with different backends, increase the count

attempts := GetAttemptsFromContext(request)

log.Printf("%s(%s) Attempting retry %d\n", request.RemoteAddr, request.URL.Path, attempts)

ctx := context.WithValue(request.Context(), Attempts, attempts+1)

lb(writer, request.WithContext(ctx))

}

我们利用闭包设计错误处理程序。它允许我们将外部变量捕获到方法中。它将检查现有的重试计数,如果小于3,我们将再次向相同的后端发送相同的请求。

每次重试失败后,我们将此后端标记为下线。

我们可以简单地从请求中获取尝试次数,如果超过最大次数,则消除请求。

// lb load balances the incoming request

func lb(w http.ResponseWriter, r *http.Request) {

attempts := GetAttemptsFromContext(r)

if attempts > 3 {

log.Printf("%s(%s) Max attempts reached, terminating\n", r.RemoteAddr, r.URL.Path)

http.Error(w, "Service not available", http.StatusServiceUnavailable)

return

}


peer := serverPool.GetNextPeer()

if peer != nil {

peer.ReverseProxy.ServeHTTP(w, r)

return

}

http.Error(w, "Service not available", http.StatusServiceUnavailable)

}

使用 context

context 包允许将有用的数据存储在 Http 请求中。我们利用它来跟踪请求的特定数据,例如尝试次数和重试次数。

const (

Attempts int = iota

Retry

)

然后,我们可以像使用 HashMap 一样来检索值。

// GetAttemptsFromContext returns the attempts for request

func GetRetryFromContext(r *http.Request) int {

if retry, ok := r.Context().Value(Retry).(int); ok {

return retry

}

return 0

}

9

被动健康检查

被动健康检查可以恢复挂掉的节点。以固定的间隔 ping 后端节点来检查状态。

// isAlive checks whether a backend is Alive by establishing a TCP connection

func isBackendAlive(u *url.URL) bool {

timeout := 2 * time.Second

conn, err := net.DialTimeout("tcp", u.Host, timeout)

if err != nil {

log.Println("Site unreachable, error: ", err)

return false

}

_ = conn.Close() // close it, we dont need to maintain this connection

return true

}

现在,我们可以递归检查节点并标记状态。

// HealthCheck pings the backends and update the status

func (s *ServerPool) HealthCheck() {

for _, b := range s.backends {

status := "up"

alive := isBackendAlive(b.URL)

b.SetAlive(alive)

if !alive {

status = "down"

}

log.Printf("%s [%s]\n", b.URL, status)

}

}

定期执行,可以在 Go 中启动一个计时器,然后使用通道来监听事件。

// healthCheck runs a routine for check status of the backends every 20 secs

func healthCheck() {

t := time.NewTicker(time.Second * 20)

for {

select {

case <-t.C:

log.Println("Starting health check...")

serverPool.HealthCheck()

log.Println("Health check completed")

}

}

}

最后,我们单独启一个 goroutine 来运行它。

go healthCheck()

总结

以上我们只是实现了一个简单的负载均衡,当然我们还可以对它做很多的改善提升。

例如:

  • 使用堆来整理存活的后端节点可以减少搜索范围

  • 收集统计数据

  • 实现加权轮询/最少连接

  • 添加配置文件的支持

  • ...

如果对负载均衡有兴趣,可以做相应的功能扩展开发。

关注我们

界世的你当不

只做你的肩膀

fIvqquI.jpg!web

6rAZruZ.jpg!web

360官方技术公众号 

技术干货|一手资讯|精彩活动

空·


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK