

python异步socket-server
source link: https://www.tuicool.com/articles/eqQZvi2
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

同步示例代码
import socket HOST = 'localhost' PORT = 8888 with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) s.bind((HOST, PORT)) s.listen(128) while True: conn, addr = s.accept() print('connected by', addr) with conn: while 1: msg = conn.recv(1024) if not msg: break conn.sendall(msg)
IO复用的方式进行改造
这里使用 python3 提供的 selectros 来改造它,这个模块封装了操作系统底层提供的 I/O 复用机制(同步非阻塞),比如 linux 上使用了 epoll。通过 I/O 复用机制我们可以监听多个文件描述符的可读写事件并且注册回调函数,拥有更好的并发性能。
import selectors import socket sel = selectors.DefaultSelector() def accept(sock, mask): conn, addr = sock.accept() # Should be ready print('accepted', conn, 'from', addr) conn.setblocking(False) sel.register(conn, selectors.EVENT_READ, read) def read(conn, mask): data = conn.recv(1000) # Should be ready if data: print('echoing', repr(data), 'to', conn) conn.send(data) # Hope it won't block else: print('closing', conn) sel.unregister(conn) conn.close() sock = socket.socket() sock.bind(('localhost', 1234)) sock.listen(100) sock.setblocking(False) sel.register(sock, selectors.EVENT_READ, accept) while True: # EventLoop events = sel.select() for key, mask in events: # key代表selectorKey对象, mask代表read or write方法 # callback相当于调accept函数 callback = key.data # 获取函数内存地址,加入参数 # key.fileobj = 文件句柄 callback(key.fileobj, mask)
实现EventLoop类的改造方案
import selectors import socket class EventLoop: def __init__(self, selector=None): if selector is None: selector = selectors.DefaultSelector() self.selector = selector def run_forever(self): while True: # EventLoop event = self.selector.select() for key, mask in event: if mask == selectors.EVENT_READ: # 当监听到是读事件 callback = key.data # callback is _on_read or accept callback(key.fileobj) else: callback, msg = key.data # 当监听到是写事件 callback(key.fileobj, msg) # callback is _on_write class TCPEchoServer: def __init__(self, host, port, loop): self.host = host self.port = port self._loop = loop self.s = socket.socket() def run(self): self.s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.s.bind((self.host, self.port)) self.s.listen(128) self.s.setblocking(False) self._loop.selector.register(self.s, selectors.EVENT_READ, self._accept) self._loop.run_forever() def _accept(self, sock): conn, addr = sock.accept() print('accepted', conn, 'from', addr) conn.setblocking(False) self._loop.selector.register(conn, selectors.EVENT_READ, self._on_read) def _on_read(self, conn): msg = conn.recv(1024) if msg: print('echoing', repr(msg), 'to', conn) self._loop.selector.modify(conn, selectors.EVENT_WRITE, (self._on_write, msg)) else: print('closing', conn) self._loop.selector.unregister(conn) conn.close() def _on_write(self, conn, msg): conn.sendall(msg) self._loop.selector.modify(conn, selectors.EVENT_READ, self._on_read) event_loop = EventLoop() echo_server = TCPEchoServer('localhost', 8888, event_loop) echo_server.run()
基于future对象的异步模式
上述的EventLoop模型都是通过采用监听,回调的方式获取到异步调用的结果呢?python提供了一个叫做 Future的对象,当异步调用执行完的时候,用来保存它的结果。 Future 对象的 result 用来保存未来的执行结果,setresult 用来设置 result并且运行给 future 对象添加的回调。
这里使用了原生的协程实现方式——yield from。
import selectors import socket class Future: # 自定义future对象 def __init__(self): self.result = None self._callbacks = [] def add_done_callback(self, fn): self._callbacks.append(fn) def set_result(self, result): # 设置结果同时,执行回调自己 self.result = result for callback in self._callbacks: callback(self) def __iter__(self): yield self # 产出自己 return self.result # yield from 将把 result 值返回作为 yield from 表达式的值 class Task: """管理生成器的执行""" def __init__(self, coro): self.coro = coro f = Future() f.set_result(None) self.step(f) def step(self, future): try: # 把当前 future 的结果发送给协程作为 yield from 表达式的值,同时执行到下一个 future 处 next_future = self.coro.send(future.result) except StopIteration: return next_future.add_done_callback(self.step) class TCPEchoServer: def __init__(self, host, port, loop): self.host = host self.port = port self._loop = loop self.s = socket.socket() def run(self): self.s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.s.bind((self.host, self.port)) self.s.listen(128) self.s.setblocking(False) while True: conn, addr = yield from self.accept() msg = yield from self.read(conn) if msg: yield from self.sendall(conn, msg) else: conn.close() def accept(self): f = Future() def on_accept(): conn, addr = self.s.accept() print('accepted', conn, 'from', addr) conn.setblocking(False) f.set_result((conn, addr)) # accept 的 result 是接受连接的新对象 conn, addr self._loop.selector.register(self.s, selectors.EVENT_READ, on_accept) conn, addr = yield from f # 委派给 future 对象,直到 future 执行了 socket.accept() 并且把 result 返回 self._loop.selector.unregister(self.s) return conn, addr def read(self, conn): f = Future() def on_read(): msg = conn.recv(1024) f.set_result(msg) self._loop.selector.register(conn, selectors.EVENT_READ, on_read) msg = yield from f return msg def sendall(self, conn, msg): f = Future() def on_write(): conn.sendall(msg) f.set_result(None) self._loop.selector.unregister(conn) conn.close() self._loop.selector.modify(conn, selectors.EVENT_WRITE, on_write) yield from f class EventLoop: def __init__(self, selector=None): if selector is None: selector = selectors.DefaultSelector() self.selector = selector def create_task(self, coro): return Task(coro) def run_forever(self): while 1: events = self.selector.select() for event_key, event_mask in events: callback = event_key.data callback() event_loop = EventLoop() echo_server = TCPEchoServer('localhost', 8888, event_loop) task = Task(echo_server.run()) event_loop.run_forever()
性能测试
建立10000个连接的tcp客户端
# Echo client program import socket import concurrent.futures import time HOST = '127.0.0.1' # The remote host PORT = 8888 # The same port as used by the server time_list = [] def run_time(func): def wrapper(i, *args, **kwargs): start_time = time.perf_counter() func(i) end_time = time.perf_counter() time_list.append(end_time-start_time) return wrapper @run_time def send_message(i): with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.connect((HOST, PORT)) s.sendall(('hello world {}\n'.format(i)).encode('utf-8')) data = s.recv(1024) # print('Received', repr(data)) with concurrent.futures.ThreadPoolExecutor(max_workers=128) as executors: executors.map(send_message, [i for i in range(10000)]) all_time = 0 for i in time_list: all_time += i print(all_time)
时间结果
同步测试时间: 368.0034764000006 select IO复用:154.46833750000008 异步+IO复用:69.28810920000004
Recommend
-
53
My private reason is to understand how does it work, because Signal uses it. So, to understand it better, I would like to write one simple-basic-just-work example. I’ll write it down with embedded…
-
47
Opal implementation of a simple server with Express and Socket.io Demo https://opal-realtime-socket-express.herokuapp.com/ Run...
-
16
When connecting to a local MySQL instance, you have two commonly used methods: use TCP/IP protocol to connect to local address – “localhost” o...
-
29
点击上方蓝字可直接关注!方便下次阅读。如果对你有帮助,麻烦点点击下文末的广告,感谢~ 之前文章写过Linux C Socket 收发 Json 数据,最近用 Qt Server 实现了一遍。给我自己的感觉就是 cJSON 接口与...
-
12
Socket层实现系列 — 信号驱动的异步等待 2015-06-12 17:13:00 http://blog.csdn.net/zhangskd/article/details/45932775 主要内容:S...
-
6
Yosemite - MAMP - Unable to connect to local MySQL server via socket '/Applications/MAMP/tmp/mysql/mysql.sock' (2) advertisements I have probl...
-
6
安全传输层协议 TLS,以前称为 SSL(Secure Sockets Layer) ,由于HTTPS的推出受到了很多人的欢迎。但是正如TLS的名称 Transport Layer Security 所示的那样,它实际上是独立于 HTTP,一个更深入的安全协议,我们可以将 TLS 视为 TCP 的安全版本,其提供了对 soc...
-
7
CLIENT -SERVER SOCKET PROGRAMMING advertisements I have written client-server message sending program using sockets in Java, which is working...
-
6
Imagine that you are building an app with chat rooms and it will have thousands of users how do you think a server could handle this load ?! With Two concepts: Reverse Proxy A reverse p...
-
6
Creating a Node.js server/client with Socket.IO & MySQLNovember 07, 2013 · 6 min read ·
About Joyk
Aggregate valuable and interesting links.
Joyk means Joy of geeK