39

单机百万并发,golang 50行代码

 4 years ago
source link: https://www.tuicool.com/articles/FfMVfqr
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

本文首先介绍单机百万并发的测试方法和测试结果,然后分析go语言50行代码实现的单机百万并发网络服务器背后的秘密

组网

采用6台2核8G内存的云主机作为client

采用1台4核16G内存的云主机作为server

FNFvu2U.png!web

组网图

client端设置

设置系统打开的最大文件数为20万

ulimit -n 200000

修改端口可用范围为1024到65535

echo 1024 65535 > /proc/sys/net/ipv4/ip_local_port_range

单台client虚机建立18万连接

配置单网卡多ip,每个网卡配置三个ip,启动三个client进程,每个client进程指定不同的local ip建立6万连接,总共18万连接

yQnYjyu.png!web

server端配置

设置系统打开的最大文件数为100万

ulimit -n 1000000

设置半连接队列和全连接队列长度

测试过程中出现了一个现象,客户端建立了30000连接,服务端只建立了28570连接

经过排查,原因是:

1 全连接队列满了,如下图,overflowed次数在增加

v2Evqem.png!web

2 tcp_abort_on_overflow 为0,表示如果三次握手第三步的时候全连接队列满了那么server扔掉client 发过来的ack(在server端认为连接还没建立起来)

tcp_abort_on_overflow为 1,表示第三步的时候如果全连接队列满了,server发送一个reset包给client,表示废掉这个握手过程和这个连接(本来在server端这个连接就还没建立起来)

RJFZFvY.png!web

解决方法:

设置半连接队列长度为10000

echo 10000 >/proc/sys/net/ipv4/tcp_max_syn_backlog

设置全连接队列长度为10000

echo 10000 >/proc/sys/net/core/somaxconn

参考 【转】关于TCP 半连接队列和全连接队列 - sidesky - 博客园

linux内核调优tcp_max_syn_backlog和somaxconn的区别-10931853-51CTO博客

设置conntrack最大连接数

默认net.nf_conntrack_max 为 262144,设置为100万

sysctl -w net.nf_conntrack_max=1000000

tcp最大连接数调优,可参考 Linux 内核优化-调大TCP最大连接数 - 简书

最终测试结果

server建立起96万连接

平时ss命令使用最多的是ss -anp,这里需要注意在连接数非常大的时候,指定p参数命令慢的几乎不可用,这里只指定an参数

ss比netstat性能好,参考 https://blog.csdn.net/hustsselbj/article/details/47438781

mUbQje3.png!web

cpu和内存使用情况

cpu大概占用2个核,内存3g

zIbIziB.png!web

查看cpu硬件信息,cpu的频率为2.4G

查看cpu硬件信息,参考 linux(centos)查看cpu硬件信息命令图解教程 电脑维修技术网

uMZbQz3.png!web

客户端、服务端代码实现

客户端

package main

import (
    "flag"
    "fmt"
    "net"
    "os"
    "time"
)

var RemoteAddr *string
var ConcurNum *int
var LocalAddr *string

func init() {
    RemoteAddr = flag.String("remote-ip", "127.0.0.1", "ip addr of remote server")
    ConcurNum = flag.Int("concurrent-num", 100, "concurrent number of client")
    LocalAddr = flag.String("local-ip", "0.0.0.0", "ip addr of remote server")
}

func consume() {

    laddr := &net.TCPAddr{IP: net.ParseIP(*LocalAddr)}

    var dialer net.Dialer
    dialer.LocalAddr = laddr

    conn, err := dialer.Dial("tcp", *RemoteAddr+":8888")
    if err != nil {
        fmt.Println("dial failed:", err)
        os.Exit(1)
    }
    defer conn.Close()

    buffer := make([]byte, 512)

    for {
        _, err2 := conn.Read(buffer)
        if err2 != nil {
            fmt.Println("Read failed:", err2)
            return
        }

        //  fmt.Println("count:", n, "msg:", string(buffer))

    }

}

func main() {
    flag.Parse()
    for i := 0; i < *ConcurNum; i++ {
        go consume()
    }
    time.Sleep(3600 * time.Second)
}

服务端

package main

import (
    "fmt"
    "net"
    "os"
    "time"
)

var array []byte = make([]byte, 10)

func checkError(err error, info string) (res bool) {

    if err != nil {
        fmt.Println(info + "  " + err.Error())
        return false
    }
    return true
}

func Handler(conn net.Conn) {
    for {
        _, err := conn.Write(array)
        if err != nil {
            return
        }
        time.Sleep(10 * time.Second)
    }
}

func main() {

    for i := 0; i < 10; i += 1 {
        array[i] = 'a'
    }

    service := ":8888"
    tcpAddr, _ := net.ResolveTCPAddr("tcp4", service)
    l, _ := net.ListenTCP("tcp", tcpAddr)

    for {
        conn, err := l.Accept()
        if err != nil {
            fmt.Printf("accept error, err=%s\n", err.Error())
            os.Exit(1)
        }
        go Handler(conn)
    }

}

高性能网络编程的线程模型

TPC

TPC 是 Thread Per Connection 的缩写,指每次有新的连接就新建一个线程去专门处理这个连接请求。

qaaYJ3U.png!web

模型特点:

  • 采用阻塞式I/O模型获取输入数据
  • 每个连接都需要独立的线程完成数据输入,业务处理,数据返回的完整操作

存在的问题:

  • 并发数较大时,需要创建大量线程来处理连接,系统资源占用较大

reactor

reactor模式的核心组成包括reactor和线程池。reactor负责监听网络连接的IO是否可读可写,线程池负责具体业务的处理。在高并发的场景下,reactor采用epoll的效率非常高。

2INnqq2.png!web

模型特点:

  • 采用非阻塞I/O,I/O多路复用
  • 采用线程池来处理业务

golang GPC模型

GPC 是 Goroutine Per Connection 的缩写,指每次有新的连接就新启动一个golang协程去专门处理这个连接请求。

UfUzm23.png!web

模型特点:

  • 可采用阻塞IO的方式编程
  • 每个连接都需要独立的协程完成数据输入,业务处理,数据返回的完整操作

为什么GPC可以支持单机百万并发

GPC模型跟TPC模型看起来非常相似,为什么GPC可以支持单机百万并发呢?

GPC模型、TPC模型比较

  1. 栈大小:GPC模型中goroutine栈初始大小为4kB,栈的大小可以按需动态增加或减小。而TPC模型中线程默认栈大小为1MB。
  2. IO模型:GPC和TPC都是阻塞式编程。但是GPC模型底层是非阻塞IO,golang在语言层面将非阻塞IO包装成了阻塞IO(底层实现是非阻塞IO未就绪时,读操作返回EAGAIN,golang运行时系统将协程状态设置为Wait,进行协程的切换)
  3. 协程、线程的切换: 协程的切换比线程切换要简单的多,可参考 linux操作系统笔记(进程)

GPC模型背后的秘密

GPC模型底层实现其实是reactor模型,golang在语言层面将这一模型封装好,可以采用阻塞的方式编码

zM3qEbq.png!web

GPC模型源码分析

golang源码版本为1.9.4

meu6zmV.png!web

IO线程的源码实现

启动一个线程运行sysmon函数

runtime/proc.go

// The main goroutine.
func main() {          
        g := getg()
                       
        // Racectx of m0->g0 is used only as the parent of the main goroutine.
        // It must not be used for anything else.
        g.m.g0.racectx = 0
                       
        // Max stack size is 1 GB on 64-bit, 250 MB on 32-bit.
        // Using decimal instead of binary GB and MB because
        // they look nicer in the stack overflow failure message.
        if sys.PtrSize == 8 {
                maxstacksize = 1000000000
        } else {    
                maxstacksize = 250000000
        }              
                       
        // Allow newproc to start new Ms.
        mainStarted = true 
                       
        systemstack(func() {
                //启动线程,运行sysmon函数
                newm(sysmon, nil) 
        })             
      ...........

sysmon的实现

sysmon函数执行netpoll,获得可读写的fd,将fd关联的协程的状态设置为ready

runtime/proc.go

func sysmon() {                                                                                                                                                                                
        // If a heap span goes unused for 5 minutes after a garbage collection,
        // we hand it back to the operating system.
        scavengelimit := int64(5 * 60 * 1e9)
                     
        if debug.scavenge > 0 {        
                // Scavenge-a-lot for testing.
                forcegcperiod = 10 * 1e6
                scavengelimit = 20 * 1e6
        }            
                     
        lastscavenge := nanotime()     
        nscavenge := 0                 
                     
        lasttrace := int64(0)          
        idle := 0 // how many cycles in succession we had not wokeup somebody
        delay := uint32(0)             
        for {        
                if idle == 0 { // start with 20us sleep...
                        delay = 20     
                } else if idle > 50 { // start doubling the sleep after 1ms...
                        delay *= 2     
                }    
                if delay > 10*1000 { // up to 10ms
                        delay = 10 * 1000
                }    
                usleep(delay)
                。。。。

                // poll network if not polled for more than 10ms
                lastpoll := int64(atomic.Load64(&sched.lastpoll))
                now := nanotime()   
                if lastpoll != 0 && lastpoll+10*1000*1000 < now {
                        atomic.Cas64(&sched.lastpoll, uint64(lastpoll), uint64(now))
                        //netpoll中会执行epollWait,epollWait返回可读写的fd
                        //netpoll返回可读写的fd关联的协程
                        gp := netpoll(false) // non-blocking - returns list of goroutines
                        if gp != nil { 
                                // Need to decrement number of idle locked M's
                                // (pretending that one more is running) before injectglist.
                                // Otherwise it can lead to the following situation:
                                // injectglist grabs all P's but before it starts M's to run the P's,
                                // another M returns from syscall, finishes running its G,
                                // observes that there is no work to do and no other running M's
                                // and reports deadlock.
                                incidlelocked(-1)
                                //将可读写fd关联的协程状态设置为ready
                                injectglist(gp)
                                incidlelocked(1)
                        }           
                } 
                。。。。。。
}

netpoll的实现

netpoll执行epollWait,获取可读写的fd,返回可读写fd关联的协程

runtime/netpoll_epoll.go

// polls for ready network connections
// returns list of goroutines that become runnable
func netpoll(block bool) *g {
        if epfd == -1 {
                return nil
        }
        waitms := int32(-1)
        if !block {
                waitms = 0
        }
        var events [128]epollevent
retry:  
        n := epollwait(epfd, &events[0], int32(len(events)), waitms)
        //      print("epoll wait\n")
        if n < 0 {
                if n != -_EINTR {
                        println("runtime: epollwait on fd", epfd, "failed with", -n)
                        throw("runtime: netpoll failed")
                }
                goto retry
        }
        var gp guintptr
        for i := int32(0); i < n; i++ {
                ev := &events[i]
                if ev.events == 0 {
                        continue
                }
                var mode int32
                if ev.events&(_EPOLLIN|_EPOLLRDHUP|_EPOLLHUP|_EPOLLERR) != 0 {
                        mode += 'r'
                }
                if ev.events&(_EPOLLOUT|_EPOLLHUP|_EPOLLERR) != 0 {
                        mode += 'w'
                }
                if mode != 0 {
                        pd := *(**pollDesc)(unsafe.Pointer(&ev.data))
                       //将pd关联的协程加入到gp协程链上
                        netpollready(&gp, pd, mode)
                }
        }
        if block && gp == 0 {
                goto retry
        }
        return gp.ptr()
}

injectglist的实现

injectglist将协程的状态设置为ready状态

runtime/proc.go

// Injects the list of runnable G's into the scheduler.
// Can run concurrently with GC.
func injectglist(glist *g) {
        if glist == nil {
                return  
        }               
        if trace.enabled {
                for gp := glist; gp != nil; gp = gp.schedlink.ptr() {
                        traceGoUnpark(gp, 0)
                }       
        }               
        lock(&sched.lock)
        var n int       
        for n = 0; glist != nil; n++ {
                gp := glist
                glist = gp.schedlink.ptr()
                //将waiting状态的协程设置为runnable
                casgstatus(gp, _Gwaiting, _Grunnable)
                globrunqput(gp)
        }               
        unlock(&sched.lock)
        for ; n != 0 && sched.npidle != 0; n-- {
                startm(nil, false)
        }               
}

服务端socket实现

net.ListenTCP的实现

ListenTCP调用socket函数,socket函数会通过系统调用创建socket、设置非阻塞、bind、listen

net/sock_posix.go

// socket returns a network file descriptor that is ready for
// asynchronous I/O using the network poller.
func socket(ctx context.Context, net string, family, sotype, proto int, ipv6only bool, laddr, raddr sockaddr) (fd *netFD, err error) {
        //sysSocket函数会通过系统调用创建socket,并通过系统调用设置非阻塞
        s, err := sysSocket(family, sotype, proto)
        if err != nil {
                return nil, err 
        }   
        if err = setDefaultSockopts(s, family, sotype, ipv6only); err != nil {
                poll.CloseFunc(s)
                return nil, err 
        }   
        //为socket分配文件描述符fd
        if fd, err = newFD(s, family, sotype, net); err != nil {
                poll.CloseFunc(s)
                return nil, err 
        }   
 
        // This function makes a network file descriptor for the
        // following applications:
        //  
        // - An endpoint holder that opens a passive stream
        //   connection, known as a stream listener
        //  
        // - An endpoint holder that opens a destination-unspecific
        //   datagram connection, known as a datagram listener
        //  
        // - An endpoint holder that opens an active stream or a
        //   destination-specific datagram connection, known as a
        //   dialer
        // - An endpoint holder that opens the other connection, such
        //   as talking to the protocol stack inside the kernel
        //
        // For stream and datagram listeners, they will only require
        // named sockets, so we can assume that it's just a request
        // from stream or datagram listeners when laddr is not nil but
        // raddr is nil. Otherwise we assume it's just for dialers or
        // the other connection holders.
        
        if laddr != nil && raddr == nil {
                switch sotype {
                case syscall.SOCK_STREAM, syscall.SOCK_SEQPACKET:
                        //listenStream会通过系统调用bind绑定socket地址,通过系统调用listen
                        //进行socket监听,通过fd.init()函数将fd加入epoll
                        if err := fd.listenStream(laddr, listenerBacklog); err != nil {
                                fd.Close()
                                return nil, err
                        }
                        return fd, nil
                case syscall.SOCK_DGRAM:
                        if err := fd.listenDatagram(laddr); err != nil {
                                fd.Close()
                                return nil, err
                        }
                        return fd, nil
                }
        }
        if err := fd.dial(ctx, laddr, raddr); err != nil {
                fd.Close()
                return nil, err
        }
        return fd, nil

Accept的实现

net/fd_unix.go

func (fd *netFD) accept() (netfd *netFD, err error) {
        //pfd.Accept会执行accept系统调用,返回新的socket连接,
        //并设置新的socket连接为非阻塞
        d, rsa, errcall, err := fd.pfd.Accept()
        if err != nil {
                if errcall != "" {
                        err = wrapSyscallError(errcall, err)
                }   
                return nil, err 
        }   
        //为新的连接分配一个文件描述符    
        if netfd, err = newFD(d, fd.family, fd.sotype, fd.net); err != nil {
                poll.CloseFunc(d)
                return nil, err 
        }   
        //通过netfd.init(),将accept新返回的socket fd添加到epoll
        if err = netfd.init(); err != nil {
                fd.Close()
                return nil, err 
        }                                                                                                                                                                                      
        lsa, _ := syscall.Getsockname(netfd.pfd.Sysfd)
        netfd.setAddr(netfd.addrFunc()(lsa), netfd.addrFunc()(rsa))
        return netfd, nil 
}

internal/poll/fd_unix.go

// Accept wraps the accept network call.
func (fd *FD) Accept() (int, syscall.Sockaddr, string, error) {
        if err := fd.readLock(); err != nil {
                return -1, nil, "", err 
        }   
        defer fd.readUnlock()
 
        if err := fd.pd.prepareRead(fd.isFile); err != nil {
                return -1, nil, "", err 
        }   
        for {
                //accept函数内部会执行accept系统调用
                //将返回的新的socket fd设置为非阻塞
                s, rsa, errcall, err := accept(fd.Sysfd)
                if err == nil {
                        return s, rsa, "", err 
                }   
                switch err {
                //socket全连接队列为空
                case syscall.EAGAIN:
                        if fd.pd.pollable() {
                                //设置协程状态为wait
                                if err = fd.pd.waitRead(fd.isFile); err == nil {
                                        continue
                                }   
                        }   
                case syscall.ECONNABORTED:
                        // This means that a socket on the listen
                        // queue was closed before we Accept()ed it;
                        // it's a silly error, so try again.
                        continue
                }   
                return -1, nil, errcall, err 
        }   
}

Read的实现

internal/poll/fd_unix.go

// Read implements io.Reader.
func (fd *FD) Read(p []byte) (int, error) {
        if err := fd.readLock(); err != nil {
                return 0, err 
        }   
        defer fd.readUnlock()
        if len(p) == 0 { 
                // If the caller wanted a zero byte read, return immediately
                // without trying (but after acquiring the readLock).
                // Otherwise syscall.Read returns 0, nil which looks like
                // io.EOF.
                // TODO(bradfitz): make it wait for readability? (Issue 15735)
                return 0, nil 
        }   
        if err := fd.pd.prepareRead(fd.isFile); err != nil {
                return 0, err 
        }   
        if fd.IsStream && len(p) > maxRW {
                p = p[:maxRW]
        }   
        for {
                //执行read系统调用
                n, err := syscall.Read(fd.Sysfd, p)
 
                if err != nil {
                        n = 0 
                        if err == syscall.EAGAIN && fd.pd.pollable() {
                                //socket fd没有数据可读,将协程状态设置为wait
                                if err = fd.pd.waitRead(fd.isFile); err == nil {
                                        continue
                                }
                        }
                }
        
                err = fd.eofError(n, err)
                return n, err
        }
}

Write的实现

internal/poll/fd_unix.go

// Write implements io.Writer.
func (fd *FD) Write(p []byte) (int, error) {
        if err := fd.writeLock(); err != nil {
                return 0, err
        }       
        defer fd.writeUnlock()
        if err := fd.pd.prepareWrite(fd.isFile); err != nil {
                return 0, err
        }       
        var nn int
        for {   
                max := len(p)
                if fd.IsStream && max-nn > maxRW {
                        max = nn + maxRW
                }
                //执行write系统调用
                n, err := syscall.Write(fd.Sysfd, p[nn:max])
                if n > 0 {
                        nn += n
                }
                if nn == len(p) {
                        return nn, err
                }
                if err == syscall.EAGAIN && fd.pd.pollable() {
                        //socket fd不可写,将协程状态设置为wait
                        if err = fd.pd.waitWrite(fd.isFile); err == nil {
                                continue
                        }
                }
                if err != nil {
                        return nn, err
                }
                if n == 0 {
                        return nn, io.ErrUnexpectedEOF
                }
        }    
}

GPC模型总结

1 新建socket、accept的socket都设置为非阻塞

2.新建socket、accept的socket的fd都加入epoll

  1. Read、Write采用循环读写,如果返回EAGAIN,将协程状态设置为wait
  2. io线程定期执行sysmon,通过epollWait获取可读写的fd,将fd关联的协程设置为runable

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK