24

golang调用原生epoll引起event loop阻塞问题

 3 years ago
source link: http://xiaorui.cc/archives/6758
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
golang调用原生epoll引起event loop阻塞问题 – 峰云就她了
专注于Golang、Python、DB、cluster

golang调用原生epoll引起event loop阻塞问题

golang标准库net很优秀,可以让开发者轻易构建非阻塞网络服务,但开发爽快带来的问题协程数加大,比如在net/http里一个连接两个协程,grpc算是业务和keepalive心跳是四个协程,数据的进出是通过channel传输。

golang netpoll抽象了epoll事件的调用,借助runtime的gopark&goready实现就绪协程的调度,让应用层用同步方法构建io异步的网络应用。

该文章原文地址 http://xiaorui.cc/archives/6758

golang gnet

那么如何规避netpoll的协程太多的问题? 业界通用的方案是通过原syscall epoll实现网络应用,比如evio、gnet库。我先前使用过evio构建过不少服务,但当你的业务调用含有阻塞逻辑时会使event loop陷入同步阻塞。当然在evio里可以开多个event loop来加大并发,且减少同步的可能,但问题要开多少个event loop?

假设同一个event loop中有两个fd被唤醒,第一个fd完成了协议解析开始执行业务逻辑,业务逻辑是访问另一个三方的http api,但对端的api处理很慢,这就意味着你需要等待api的返回,而不能处理第二个fd事件。

那么为什么不把阻塞逻辑用go func异步出去?同一个fd先来后来好几个事件,都异步?那么又要考虑socket并发安全问题。

如不能解决阻塞问题,那么自定义epoll会特别的低效。业务逻辑的阻塞行为压根是没法避免的,除非像redis服务那样,它的业务逻辑就是访问内存的数据结构,无阻塞逻辑。

evio的另外几个问题

evio封装的epoll各类事件看起来很完美,但依旧发现了他几个问题。

第一,多个event loop引起epoll listen fd惊群问题。

虽然2.6早已解决accept惊群,但对于错误性的同时绑定listen fd依然带来惊群。nginx对于epoll listen fd是通过accept mutex实现的,简单说同一时间只有一个进程在监听listen fd,谁拿锁谁去监听。高内核4.5后的EPOLLEXCLUSIVE和内核3.6 reuseport可在内核层面实现负载均衡。

除了内核方法外,还可通过reactor的架构模型也是可以解决listen fd惊群问题。

第二, 用来唤醒epoll的eventfd写入数据没有读出。

第三, loopWrite在内核缓冲区已满无法一次写入时出现写入数据丢失。

为了规避event loop的阻塞问题,我曾经在2019年时跟evio的作者沟通过方案,可以为他提交一个协程池的方案来规避阻塞问题,他的反应很冷淡…

😅 最后还是没给他提交pr。跟同事一起在公司内部基于evio封装了一个叫goepoll网络库,重要的是加入了协程池的支持。可惜的是公司没考虑开源。好在2019年有个gnet库横空出世,不仅解决了evio的种种问题,还设计了ractor网络模型,且加入了协程池的支持。

gnet的协程池设计?

项目地址:https://github.com/panjf2000/gnet ,通过benchmark得出gnet要比我们自己封装的goepoll要更稳定,接口更友好 😅。

下面是gnet官方给出协程池的用法样例。react业务逻辑中把阻塞逻辑扔到ants构建的协程池里,最后通过asyncWrite()来唤醒event loop写入返回。

// xiaorui.cc

func (es *echoServer) React(c gnet.Conn) (out []byte, action gnet.Action) {
        data := append([]byte{}, c.Read()...)
        c.ResetBuffer()

        // Use ants pool to unblock the event-loop.
        _ = es.pool.Submit(func() {
                time.Sleep(1 * time.Second)
                c.AsyncWrite(data)
        })

        return
}

对于阻塞逻辑的处理可以直接go func,也可以使用协程池,主要的关键点在于调用AsyncWrite方法。

// xiaorui.cc

func (c *conn) AsyncWrite(buf []byte) {
    if encodedBuf, err := c.codec.Encode(c, buf); err == nil {
        _ = c.loop.poller.Trigger(func() error {  // 尝试唤醒eventfd
            if c.opened {
                c.write(encodedBuf)  // 写到ringbuffer里并加入事件
                pool.PutBytes(buf)
            }
            return nil
        })
    }
}

AsyncWrite会唤醒fd所在的eventLoop.Poller,唤醒的方法是激活eventloop里的event fd。不仅是唤醒,而且会把异步执行的func放在一个队列里,这个函数队列是slice实现的。AsyncWrite中的write把resp raw写入到ringbuffer数据结构里,然后进行socket写入,当出现eagain时加入读写事件,直到ringbuffer为空。

// xiaorui.cc

func (c *conn) write(buf []byte) {
	if !c.outboundBuffer.IsEmpty() {
		_, _ = c.outboundBuffer.Write(buf)
		return
	}
	n, err := unix.Write(c.fd, buf)
	if err != nil {
		if err == unix.EAGAIN {
			_, _ = c.outboundBuffer.Write(buf)
			_ = c.loop.poller.ModReadWrite(c.fd)
			return
		}
		_ = c.loop.loopCloseConn(c, err)
		return
	}
	if n < len(buf) {
		_, _ = c.outboundBuffer.Write(buf[n:])
		_ = c.loop.poller.ModReadWrite(c.fd)
	}
}

当event loop的poller在epoll_wait拿到wakeFD时,遍历执行函数队列里的函数。为什么不在异步协程里直接write fd,主要是为了保证fd写入安全,所有的write fd都统一由于epoll poller来操作。

// xiaorui.cc

type Poller struct {
	fd            int    // epoll fd
	wfd           int    // wake fd
	wfdBuf        []byte // wfd buffer to read packet
	asyncJobQueue internal.AsyncJobQueue
}

// 存放异步协程最后的resp写入func
type AsyncJobQueue struct {
	lock sync.Locker
	jobs []func() error
}

// 把func放到队列中,并且通过写wfd事件来唤醒event loop的poller
func (p *Poller) Trigger(job internal.Job) error {
	if p.asyncJobQueue.Push(job) == 1 {
		_, err := unix.Write(p.wfd, b)
		return err
	}
	return nil
}

// 阻塞调用epoll wait,直到有新的事件
func (p *Poller) Polling(callback func(fd int, ev uint32) error) (err error) {
	el := newEventList(InitEvents)
	var wakenUp bool
	for {
		n, err0 := unix.EpollWait(p.fd, el.events, -1)
                ...
                // 遍历执行所有的事件
		for i := 0; i < n; i++ {
			if fd := int(el.events[i].Fd); fd != p.wfd {
                ...
			} else {
                // 唤醒的fd是wfd
				wakenUp = true
				_, _ = unix.Read(p.wfd, p.wfdBuf)
			}
		}
		if wakenUp {
                        // 执行asyncWrite的的收尾动作
			if err = p.asyncJobQueue.ForEach(); err != nil {
				return
			}
		}
        ...
	}
}

func (q *AsyncJobQueue) ForEach() (err error) {
	for i := range jobs {
        // 遍历执行该poller里所有异步协程收尾工作
		if err = jobs[i](); err != nil {
			return err
		}
	}
}

为什么要使用协程池?

我们在二次开发evio时加入了协程池,gnet同样也使用了协程池。原因在于一方面规避了more stack的发生,另一方面避免系统抖动时协程大量的产生,退出的协程依旧在allg结构中造成gc的扫描问题。

协程调度延迟加大问题? (很重要!!!)

event loop通过epoll_wait发现无事件时会syscall阻塞。如果event loop数目超过gomaxprocs,那么其他业务协程很大程度是拿不到调度,依赖sysmon来抢占并handoffp解绑pmg才能拿到调度。

当然epoll_wait可以非阻塞的调用,就是timeout = 0,这样其他协程虽然拿到调度,但对于poller来说无脑调用epoll_wait会增加系统消耗。所以event loop的数目要好好斟酌。我们的经验值是3/5。😁

使用自定义epoll封装网络服务,性能相当的友好。我们已经在多个项目中实践了自定义epoll方案,qps普遍能提高最少20%左右。蚂蚁金服出品的高性能sidecar mosn同样实现了自定义epoll。

前公司的领导是广发证券的架构师,他设计的推送服务就是通过原生epoll来构建的,但没有使用gnet/evio那种syscall epoll方法,而是使用c封装epoll系统调用常驻监听,使用mmap开辟一个无锁队列,这样c和go共享这个队列地址来通信。

go epoll

下个章节专门源码分析gnet reactor实现原理。


大家觉得文章对你有些作用! 如果想赏钱,可以用微信扫描下面的二维码,感谢!
另外再次标注博客原地址  xiaorui.cc
weixin_new.jpg

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK