71
GitHub - Allenxuxu/gev: gev 是一个轻量、快速的基于 Reactor 模式的非阻塞 TCP 网络...
source link: https://github.com/Allenxuxu/gev
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
README.md
gev
gev
是一个轻量、快速的基于 Reactor 模式的非阻塞 TCP 网络库。
特点
- 基于 epoll 和 kqueue 实现的高性能事件循环
- 支持多核多线程
- 动态扩容 Ring Buffer 实现的读写缓冲区
- 异步读写
- SO_REUSEPORT 端口重用支持
网络模型
gev
只使用极少的 goroutine, 一个 goroutine 负责监听客户端连接,其他 goroutine (work 协程)负责处理已连接客户端的读写事件,work 协程数量可以配置,默认与运行主机 CPU 数量相同。
性能测试
测试环境 Ubuntu18.04
吞吐量测试
限制 GOMAXPROCS=1(单线程),1 个 work 协程
限制 GOMAXPROCS=4,4 个 work 协程
其他测试
速度测试和同类库的简单性能比较, 压测方式与 evio 项目相同。
- gnet
- eviop
- evio
- net (标准库)
限制 GOMAXPROCS=1,1 个 work 协程
限制 GOMAXPROCS=1,4 个 work 协程
限制 GOMAXPROCS=4,4 个 work 协程
安装
go get -u github.com/Allenxuxu/gev
示例
echo serverpackage main import ( "flag" "strconv" "log" "github.com/Allenxuxu/gev" "github.com/Allenxuxu/gev/connection" "github.com/Allenxuxu/ringbuffer" ) type example struct{} func (s *example) OnConnect(c *connection.Connection) { log.Println(" OnConnect : ", c.PeerAddr()) } func (s *example) OnMessage(c *connection.Connection, buffer *ringbuffer.RingBuffer) (out []byte) { //log.Println("OnMessage") first, end := buffer.PeekAll() out = first if len(end) > 0 { out = append(out, end...) } buffer.RetrieveAll() return } func (s *example) OnClose() { log.Println("OnClose") } func main() { handler := new(example) var port int var loops int flag.IntVar(&port, "port", 1833, "server port") flag.IntVar(&loops, "loops", -1, "num loops") flag.Parse() s, err := gev.NewServer(handler, gev.Network("tcp"), gev.Address(":"+strconv.Itoa(port)), gev.NumLoops(loops)) if err != nil { panic(err) } s.Start() }
package main import ( "log" "github.com/Allenxuxu/gev" "github.com/Allenxuxu/gev/connection" "github.com/Allenxuxu/ringbuffer" "github.com/Allenxuxu/toolkit/sync/atomic" ) type Server struct { clientNum atomic.Int64 maxConnection int64 server *gev.Server } func New(ip, port string, maxConnection int64) (*Server, error) { var err error s := new(Server) s.maxConnection = maxConnection s.server, err = gev.NewServer(s, gev.Address(ip+":"+port)) if err != nil { return nil, err } return s, nil } func (s *Server) Start() { s.server.Start() } func (s *Server) Stop() { s.server.Stop() } func (s *Server) OnConnect(c *connection.Connection) { s.clientNum.Add(1) log.Println(" OnConnect : ", c.PeerAddr()) if s.clientNum.Get() > s.maxConnection { _ = c.ShutdownWrite() log.Println("Refused connection") return } } func (s *Server) OnMessage(c *connection.Connection, buffer *ringbuffer.RingBuffer) (out []byte) { log.Println("OnMessage") first, end := buffer.PeekAll() out = first if len(end) > 0 { out = append(out, end...) } buffer.RetrieveAll() return } func (s *Server) OnClose() { s.clientNum.Add(-1) log.Println("OnClose") } func main() { s, err := New("", "1833", 1) if err != nil { panic(err) } defer s.Stop() s.Start() }
package main import ( "container/list" "github.com/Allenxuxu/gev" "github.com/Allenxuxu/gev/connection" "github.com/Allenxuxu/ringbuffer" "log" "sync" "time" ) type Server struct { conn *list.List mu sync.RWMutex server *gev.Server } func New(ip, port string) (*Server, error) { var err error s := new(Server) s.conn = list.New() s.server, err = gev.NewServer(s, gev.Address(ip+":"+port)) if err != nil { return nil, err } return s, nil } func (s *Server) Start() { s.server.RunEvery(1*time.Second, s.RunPush) s.server.Start() } func (s *Server) Stop() { s.server.Stop() } func (s *Server) RunPush() { var next *list.Element s.mu.RLock() defer s.mu.RUnlock() for e := s.conn.Front(); e != nil; e = next { next = e.Next() c := e.Value.(*connection.Connection) _ = c.Send([]byte("hello\n")) } } func (s *Server) OnConnect(c *connection.Connection) { log.Println(" OnConnect : ", c.PeerAddr()) s.mu.Lock() e := s.conn.PushBack(c) s.mu.Unlock() c.SetContext(e) } func (s *Server) OnMessage(c *connection.Connection, buffer *ringbuffer.RingBuffer) (out []byte) { log.Println("OnMessage") first, end := buffer.PeekAll() out = first if len(end) > 0 { out = append(out, end...) } buffer.RetrieveAll() return } func (s *Server) OnClose(c *connection.Connection) { log.Println("OnClose") e := c.Context().(*list.Element) s.mu.Lock() s.conn.Remove(e) s.mu.Unlock() } func main() { s, err := New("", "1833") if err != nil { panic(err) } defer s.Stop() s.Start() }
参考
本项目受 evio 启发,参考 muduo 实现。
Recommend
About Joyk
Aggregate valuable and interesting links.
Joyk means Joy of geeK