63

Golang网络:核心API实现剖析二)

 6 years ago
source link: https://zhuanlan.zhihu.com/p/31879717?
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

Golang网络:核心API实现剖析二)

主业写代码,副业拉皮条,兼职当猎头,欢迎骚扰

说明

前面的章节我们基本聊完了golang网络编程的关键API流程,但遗留了一个关键内容:当系统调用返回EAGAIN时,会调用WaitRead/WaitWrite来阻塞当前协程,现在我们接着聊。

WaitRead/WaitWrite

func (pd *pollDesc) Wait(mode int) error {
    res := runtime_pollWait(pd.runtimeCtx, mode)
    return convertErr(res)
}

func (pd *pollDesc) WaitRead() error {
    return pd.Wait('r')
}

func (pd *pollDesc) WaitWrite() error {
    return pd.Wait('w')
}

最终runtime_pollWait走到下面去了:

TEXT net·runtime_pollWait(SB),NOSPLIT,$0-0 
   JMP runtime·netpollWait(SB)

我们仔细考虑应该明白:netpollWait的主要作用是:等待关心的socket是否有事件(其实后面我们知道只是等待一个标记位是否发生改变),如果没有事件,那么就将当前的协程挂起,直到有通知事件发生,我们接下来看看到底如何实现:

func netpollWait(pd *pollDesc, mode int) int {
    // 先检查该socket是否有error发生(如关闭、超时等) 
    err := netpollcheckerr(pd, int32(mode))
    if err != 0 {
        return err
    }

    // As for now only Solaris uses level-triggered IO. 
    if GOOS == "solaris" {
        onM(func() {
            netpollarm(pd, mode)
        })
    }
    // 循环等待netpollblock返回值为true 
    // 如果返回值为false且该socket未出现任何错误 
    // 那该协程可能被意外唤醒,需要重新被挂起 
    // 还有一种可能:该socket由于超时而被唤醒 
    // 此时netpollcheckerr就是用来检测超时错误的 
    for !netpollblock(pd, int32(mode), false) {
        err = netpollcheckerr(pd, int32(mode))
        if err != 0 {
            return err
        }
    }
    return 0 
}

func netpollblock(pd *pollDesc, mode int32, waitio bool) bool {
    gpp := &pd.rg
    if mode == 'w' {
        gpp = &pd.wg
    }

    // set the gpp semaphore to WAIT 
    // 首先将轮询状态设置为pdWait 
    // 为什么要使用for呢?因为casuintptr使用了自旋锁 
    // 为什么使用自旋锁就要加for循环呢? 
    for {
        old := *gpp
        if old == pdReady {
            *gpp = 0 
            return true 
        }
        if old != 0 {
            gothrow("netpollblock: double wait")
        }
        // 将socket轮询相关的状态设置为pdWait 
        if casuintptr(gpp, 0, pdWait) {
            break 
        }
    }
    // 如果未出错将该协程挂起,解锁函数是netpollblockcommit 
    if waitio || netpollcheckerr(pd, mode) == 0 {
        f := netpollblockcommit
        gopark(**(**unsafe.Pointer)(unsafe.Pointer(&f)), unsafe.Pointer(gpp), "IO wait")
    }
    // 可能是被挂起的协程被唤醒 
    // 或者由于某些原因该协程压根未被挂起 
    // 获取其当前状态记录在old中 
    old := xchguintptr(gpp, 0)
    if old > pdWait {
        gothrow("netpollblock: corrupted state")
    }
    return old == pdReady
}

从上面的分析我们看到,如果无法读写,golang会将当前协程挂起,在协程被唤醒的时候,该标记位应该会被置位。 我们接下来看看这些挂起的协程何时会被唤醒。

事件通知

golang运行库在系统运行过程中存在socket事件检查点,目前,该检查点主要位于以下几个地方:

runtime·startTheWorldWithSema(void):在完成gc后;
findrunnable():这个暂时不知道何时会触发?
sysmon:golang中的监控协程,会周期性检查就绪socket

TODO: 为什么是在这些地方检查socket就绪事件呢?

接下来我们看看如何检查socket就绪事件,在socket就绪后又是如何唤醒被挂起的协程?主要调用函数runtime-netpoll()

我们只关注epoll的实现,对于epoll,上面的方法具体实现是netpoll_epoll.go中的netpoll

func netpoll(block bool) (gp *g) {
    if epfd == -1 {
        return 
    }
    waitms := int32(-1)
    if !block {
        // 如果调用者不希望block 
        // 设置waitsm为0 
        waitms = 0 
    }

    var events [128]epollevent
retry:
    // 调用epoll_wait获取就绪事件 
    n := epollwait(epfd, &events[0], int32(len(events)), waitms)
    if n < 0 {
        ...
    }
    goto retry
   }

    for i := int32(0); i < n; i++ {
        ev := &events[i]
        if ev.events == 0 {
            continue 
        }
        var mode int32 
        if ev.events&(_EPOLLIN|_EPOLLRDHUP|_EPOLLHUP|_EPOLLERR) != 0 {
            mode += 'r' 
        }
        if ev.events&(_EPOLLOUT|_EPOLLHUP|_EPOLLERR) != 0 {
            mode += 'w' 
        }

        // 对每个事件,调用了netpollready 
        // pd主要记录了与该socket关联的等待协程 
        if mode != 0 {
            pd := *(**pollDesc)(unsafe.Pointer(&ev.data))
            netpollready((**g)(noescape(unsafe.Pointer(&gp))), pd, mode)
        }
    }
    // 如果调用者同步等待且本次未获取到就绪socket 
    // 继续重试 
    if block && gp == nil {
        goto retry
    }
    return gp
}

这个函数主要调用epoll_wait(当然,golang封装了系统调用)来获取就绪socket fd,对每个就绪的fd,调用netpollready()作进一步处理。这个函数的最终返回值就是一个已经就绪的协程(g)链表。

netpollready主要是将该socket fd标记为IOReady,并唤醒等待在该fd上的协程g,将其添加到传入的g链表中。

// make pd ready, newly runnable goroutines (if any) are returned in rg/wg 
func netpollready(gpp **g, pd *pollDesc, mode int32) {
    var rg, wg *g
    if mode == 'r' || mode == 'r'+'w' {
        rg = netpollunblock(pd, 'r', true)
    }

    if mode == 'w' || mode == 'r'+'w' {
        wg = netpollunblock(pd, 'w', true)
    }
    // 将就绪协程添加至链表中 
    if rg != nil {
        rg.schedlink = *gpp
        *gpp = rg
    }
    if wg != nil {
        wg.schedlink = *gpp
        *gpp = wg
    }
}

// 将pollDesc的状态置为pdReady并返回就绪协程 
func netpollunblock(pd *pollDesc, mode int32, ioready bool) *g {
    gpp := &pd.rg
    if mode == 'w' {
        gpp = &pd.wg
    }
    for {
        old := *gpp
        if old == pdReady {
            return nil 
        }
        if old == 0 && !ioready {
            return nil 
        }
        var new uintptr 
        if ioready {
            new = pdReady
        }
        if casuintptr(gpp, old, new) {
            if old == pdReady || old == pdWait {
                old = 0 
            }
            return (*g)(unsafe.Pointer(old))
        }
    }
}

疑问:一个fd会被多个协程同时进行IO么?比如一个协程读,另外一个协程写?或者多个协程同时读?此时返回的是哪个协程就绪呢?

一个socket fd可支持并发读写,因为对于tcp协议来说,是全双工。读写操作的是不同缓冲区,但是不支持并发读和并发写,因为这样会错乱的。所以上面的netFD.RWLock()就是干这个作用的。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK