76

Golang网络:核心API实现剖析(一)

 6 years ago
source link: https://zhuanlan.zhihu.com/p/31644462?
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

Golang网络:核心API实现剖析(一)

主业写代码,副业拉皮条,兼职当猎头,欢迎骚扰

这一章节我们将详细描述网络关键API的实现,主要包括Listen、Accept、Read、Write等。 另外,为了突出关键流程,我们选择忽略所有的错误。这样可以使得代码看起来更为简单。 而且我们只关注tcp协议实现,udp和unix socket不是我们关心的。

Listen

func Listen(net, laddr string) (Listener, error) {
   la, err := resolveAddr("listen", net, laddr, noDeadline)
   ......
   switch la := la.toAddr().(type) {
   case *TCPAddr:
       l, err = ListenTCP(net, la)
   case *UnixAddr:
       ......
   }
  ......
}

// 对于tcp协议,返回的的是TCPListener
func ListenTCP(net string, laddr *TCPAddr) (*TCPListener, error) {
   ......
   fd, err := internetSocket(net, laddr, nil, noDeadline, syscall.SOCK_STREAM, 0, "listen")
   ......
   return &TCPListener{fd}, nil
}

func internetSocket(net string, laddr, raddr sockaddr, deadline time.Time, sotype, proto int, mode string) (fd *netFD, err error) {
   ......
   return socket(net, family, sotype, proto, ipv6only, laddr, raddr, deadline)
}

func socket(net string, family, sotype, proto int, ipv6only bool, laddr, raddr sockaddr, deadline time.Time) (fd *netFD, err error) {
   // 创建底层socket,设置属性为O_NONBLOCK
   s, err := sysSocket(family, sotype, proto)
   ......
   setDefaultSockopts(s, family, sotype, ipv6only)
   // 创建新netFD结构
   fd, err = newFD(s, family, sotype, net)
   ......
   if laddr != nil && raddr == nil {
       switch sotype {
       case syscall.SOCK_STREAM, syscall.SOCK_SEQPACKET:
           // 调用底层listen监听创建的套接字
           fd.listenStream(laddr, listenerBacklog)
           return fd, nil
       case syscall.SOCK_DGRAM:
           ......
       }
   }   
}

// 最终调用该函数来创建一个socket
// 并且将socket属性设置为O_NONBLOCK
func sysSocket(family, sotype, proto int) (int, error) {
   syscall.ForkLock.RLock()
   s, err := syscall.Socket(family, sotype, proto)
   if err == nil {
       syscall.CloseOnExec(s)
   }
   syscall.ForkLock.RUnlock()
   if err != nil {
       return -1, err
   }
   if err = syscall.SetNonblock(s, true); err != nil {
       syscall.Close(s)
       return -1, err
   }
   return s, nil
}

func (fd *netFD) listenStream(laddr sockaddr, backlog int) error {
   if err := setDefaultListenerSockopts(fd.sysfd)
   if lsa, err := laddr.sockaddr(fd.family); err != nil {
       return err
   } else if lsa != nil {
       // Bind绑定至该socket
       if err := syscall.Bind(fd.sysfd, lsa); err != nil {
           return os.NewSyscallError("bind", err)
       }
   }
   // 监听该socket
   if err := syscall.Listen(fd.sysfd, backlog); 
   // 这里非常关键:初始化socket与异步IO相关的内容
   if err := fd.init(); err != nil {
       return err
   }
   lsa, _ := syscall.Getsockname(fd.sysfd)
   fd.setAddr(fd.addrFunc()(lsa), nil)
   return nil
}

我们这里看到了如何实现Listen。流程基本都很简单,但是因为我们使用了异步编程,因此,我们在Listen完该socket后,还必须将其添加到监听队列中,以后该socket有事件到来时能够及时通知到。

对linux有所了解的应该都知道epoll,没错golang使用的就是epoll机制来实现socket事件通知。那我们看对一个监听socket,是如何将其添加到epoll的监听队列中呢?

func (fd *netFD) init() error {
   if err := fd.pd.Init(fd); err != nil {
       return err
   }
   return nil
}

func (pd *pollDesc) Init(fd *netFD) error {
   // 利用了Once机制,保证一个进程只会执行一次
   // runtime_pollServerInit: 
   // TEXT net·runtime_pollServerInit(SB),NOSPLIT,$0-0
   // JMP runtime·netpollServerInit(SB)
   serverInit.Do(runtime_pollServerInit)
   // runtime_pollOpen:
   // TEXT net·runtime_pollOpen(SB),NOSPLIT,$0-0
   // JMP runtime·netpollOpen(SB)
   ctx, errno := runtime_pollOpen(uintptr(fd.sysfd))
   if errno != 0 {
       return syscall.Errno(errno)
   }
   pd.runtimeCtx = ctx
   return nil
}

这里就是socket异步编程的关键:

netpollServerInit()初始化异步编程结构,对于epoll,该函数是netpollinit,且使用Once机制保证一个进程 只会初始化一次;

func netpollinit() {
    epfd = epollcreate1(_EPOLL_CLOEXEC)
    if epfd >= 0 {
        return
    }
    epfd = epollcreate(1024)
    if epfd >= 0 {
        closeonexec(epfd)
        return
    }
    ......
}

netpollOpen则在socket被创建出来后将其添加到epoll队列中,对于epoll,该函数被实例化为netpollopen

func netpollopen(fd uintptr, pd *pollDesc) int32 {
   var ev epollevent
   ev.events = _EPOLLIN | _EPOLLOUT | _EPOLLRDHUP | _EPOLLET
   *(**pollDesc)(unsafe.Pointer(&ev.data)) = pd
   return -epollctl(epfd, _EPOLL_CTL_ADD, int32(fd), &ev)
}

OK,看到这里,我们也就明白了,监听一个套接字的时候无非就是传统的socket异步编程,然后将该socket添加到 epoll的事件监听队列中。

Accept

既然我们描述的重点的tcp协议,因此,我们看看TCPListener的Accept方法是怎么实现的:

func (l *TCPListener) Accept() (Conn, error) {
    c, err := l.AcceptTCP()
    ......
}

func (l *TCPListener) AcceptTCP() (*TCPConn, error) {
    ......
    fd, err := l.fd.accept()
    ......
    // 返回给调用者一个新的TCPConn
    return newTCPConn(fd), nil
}

func (fd *netFD) accept() (netfd *netFD, err error) {
    // 为什么对该函数加读锁?
    if err := fd.readLock(); err != nil {
        return nil, err
    }
    defer fd.readUnlock()
    ......
    for {
        // 这个accept是golang包装的系统调用
        // 用来处理跨平台
        s, rsa, err = accept(fd.sysfd)
        if err != nil {
            if err == syscall.EAGAIN {
                // 如果没有可用连接,WaitRead()阻塞该协程
                // 后面会详细分析WaitRead.
                if err = fd.pd.WaitRead(); err == nil {
                    continue
                }
            } else if err == syscall.ECONNABORTED {
                // 如果连接在Listen queue时就已经被对端关闭
                continue
            }
        }
        break
    }

    netfd, err = newFD(s, fd.family, fd.sotype, fd.net)
    ......
    // 这个前面已经分析,将该fd添加到epoll队列中
    err = netfd.init()
    ......
    lsa, _ := syscall.Getsockname(netfd.sysfd)
    netfd.setAddr(netfd.addrFunc()(lsa), netfd.addrFunc()(rsa))
    return netfd, nil
}

OK,从前面的编程事例中我们知道,一般在主协程中会accept新的connection,使用异步编程我们知道,如果没有 新连接到来,该协程会一直被阻塞,直到新连接到来有人唤醒了该协程。

一般在主协程中调用accept,如果返回值为EAGAIN,则调用WaitRead来阻塞当前协程,后续在该socket有事件到来时被唤醒,WaitRead以及唤醒过程我们会在后面仔细分析。

Read

func (c *conn) Read(b []byte) (int, error) {
    if !c.ok() {
        return 0, syscall.EINVAL
    }
    return c.fd.Read(b)
}

func (fd *netFD) Read(p []byte) (n int, err error) {
    // 为什么对函数调用加读锁
    if err := fd.readLock(); err != nil {
        return 0, err
    }
    defer fd.readUnlock()
    // 这个又是干嘛?
    if err := fd.pd.PrepareRead(); err != nil {
        return 0, &OpError{"read", fd.net, fd.raddr, err}
    }
    for {
        n, err = syscall.Read(int(fd.sysfd), p)
        if err != nil {
            n = 0
            // 如果返回EAGIN,阻塞当前协程直到有数据可读被唤醒
            if err == syscall.EAGAIN {
                if err = fd.pd.WaitRead(); err == nil {
                    continue
                }
            }
        }
        // 检查错误,封装io.EOF
        err = chkReadErr(n, err, fd)
        break
    }
    if err != nil && err != io.EOF {
        err = &OpError{"read", fd.net, fd.raddr, err}
    }
    return
}

func chkReadErr(n int, err error, fd *netFD) error {
    if n == 0 && err == nil && fd.sotype != syscall.SOCK_DGRAM && fd.sotype != syscall.SOCK_RAW {
        return io.EOF
    }
    return err
}

Read的流程与Accept流程极其一致,阅读起来也很简单。相信不用作过多解释,自己看吧。 需要注意的是每次Read不能保证可以读到想读的那么多内容,比如缓冲区大小是10,而实际可能只读到5,应用程序需要能够处理这种情况。

Write

func (fd *netFD) Write(p []byte) (nn int, err error) {
    // 为什么这里加写锁
    if err := fd.writeLock(); err != nil {
        return 0, err
    }
    defer fd.writeUnlock()
    // 这个是干什么?
    if err := fd.pd.PrepareWrite(); err != nil {
        return 0, &OpError{"write", fd.net, fd.raddr, err}
    }
    // nn记录总共写入的数据量,每次Write可能只能写入部分数据
    for {
        var n int
        n, err = syscall.Write(int(fd.sysfd), p[nn:])
        if n > 0 {
            nn += n
        }
        // 如果数组数据已经全部写完,函数返回
        if nn == len(p) {
            break
        }
        // 如果写入数据时被block了,阻塞当前协程
        if err == syscall.EAGAIN {
            if err = fd.pd.WaitWrite(); err == nil {
                continue
            }
        }
        if err != nil {
            n = 0
            break
        }
        // 如果返回值为0,代表了什么?
        if n == 0 {
            err = io.ErrUnexpectedEOF
            break
        }
    }
    if err != nil {
        err = &OpError{"write", fd.net, fd.raddr, err}
    }
    return nn, err
}

注意Write语义与Read不一样的地方:

Write尽量将用户缓冲区的内容全部写入至底层socket,如果遇到socket暂时不可写入,会阻塞当前协程; Read在某次读取成功时立即返回,可能会导致读取的数据量少于用户缓冲区的大小; 为什么会在实现上有此不同,我想可能read的优先级比较高吧,应用程序可能一直在等着,我们不能等到数据一直读完才返回,会阻塞用户。 而写不一样,优先级相对较低,而且用户一般也不着急写立即返回,所以可以将所有的数据全部写入,而且这样 也能简化应用程序的写法。

总结

上面我们基本说完了golang网络编程内的关键API流程,我们遗留了一个关键内容:当系统调用返回EAGAIN时,会 调用WaitRead/WaitWrite来阻塞当前协程,我会在接下来的章节中继续分析。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK