25

看我如何把NIO拉下神坛

 4 years ago
source link: https://juejin.im/post/5dfae986518825122671c846
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

1. 传统的阻塞式I/O

c6fadd2b3b9b4eadb5382c598dd58fe6~tplv-k3u1fbpfcp-zoom-1.image

阻塞式I/O的阻塞指的是,socket的read函数、write函数是阻塞的。

1.2 阻塞式I/O编程模型

public static void main(String[] args) throws IOException {
        ServerSocket serverSocket = new ServerSocket(8091);
        System.out.println("step1: bind 8091");
        while (true) {
            // 阻塞
            Socket socket = serverSocket.accept();
            System.out.println("step2: accept " + socket.getPort());
            new Thread(() -> {
                try (BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
                     PrintWriter out = new PrintWriter(socket.getOutputStream(), true)) {
                    String line;
                    // 阻塞
                    while ((line = reader.readLine()) != null) {
                        System.out.println(line);
                        out.println("Server recv:" + line);
                    }
                } catch (Exception e) {

                }
            }).start();
        }

    }
复制代码

因为socket的accept函数,read函数,write函数是同步阻塞的,所以主线程不断调用socket的accept函数,轮询状态是established的TCP连接。

read函数会从内核缓冲区中读取已经准备好的数据,复制到用户进程,如果内核缓冲区中没有数据,那么这个线程就的就会被挂起,相应的cpu的使用权被释放出来。当内核缓冲中准备好数据后,cpu会响应I/O的中断信号,唤醒被阻塞的线程处理数据。

当一个连接在处理I/O的时候,系统是阻塞的,如果是单线程的话必然就挂死在那里;但CPU是被释放出来的,开启多线程,就可以让CPU去处理更多的事情。

阻塞式I/O模型

aa022e48184f44d3a3efc55eab73fb13~tplv-k3u1fbpfcp-zoom-1.image

阻塞式I/O的缺点

缺乏扩展性,严重依赖线程。Java的线程占用内存在512K-1M,线程数量过多会导致JVM内存溢出。大量的线程上下文切换严重消耗CPU性能。大量的I/O线程被激活会导致系统锯齿状负载。

2. NIO编程

同步非阻塞I/O模型

93da08a74260439bbefd5334cd3cd28e~tplv-k3u1fbpfcp-zoom-1.image

对于NIO来说,如果内核缓冲区中没有数据就直接返回一个EWOULDBLOCK错误,一般来说进程可以轮询调用read函数,当缓冲区中有数据的时候将数据复制到用户空间,而不用挂起线程。

所以同步非阻塞中的非阻塞指的是socket的读写函数不是阻塞的,但是用户进程依然需要轮询读写函数,所以是同步的。但是NIO给我们提供了不需要新起线程就可以利用CPU的可能,也就是I/O多路复用技术

2.1 I/O多路复用技术

在linux系统中,可以使用select/poll/epoll使用一个线程监控多个socket,只要有一个socket的读缓存有数据了,方法就立即返回,然后你就可以去读这个可读的socket了,如果所有的socket读缓存都是空的,则会阻塞,也就是将线程挂起。

一开始用的linux用的是select,但是selct比较慢,最终使用了epoll。

2.1.1 epoll的优点

  1. 支持打开的socket描述符(FD)仅受限于操作系统最大文件句柄数,而select最大支持1024。
  2. selcet每次都会扫描所有的socket,而epoll只扫描活跃的socket。
  3. 使用mmap加速数据在内核空间到用户空间的拷贝。

2.2 NIO的工作机制

NIO实际上是一个事件驱动的模型,NIO中最重要的就是多路复用器(Selector)。在NIO中它提供了选择就绪事件的能力,我们只需要把通道(Channel) 注册到Selector上,Selector就会通过select方法(实际上操作系统是通过epoll)不断轮询注册在其上的Channel,如果某个Channel上发生了读就绪、写就绪或者连接到来就会被Selector轮询出来,然后通过SelectionKey(Channel注册到Selector上时会返回和其绑定的SelectionKey)可以获取到已经就绪的Channel集合,否则Selector就会阻塞在select方法上。

Selector调用select方法,并不是一个线程通过for循环去选择就绪的Channel,而是操作系统通过epoll以事件的方式的通知JVM的线程,哪个通道发生了读就绪或者写就绪的事件。所以select方法更像是一个监听器。

多路复用的核心目的就是使用最少的线程去操作更多的通道,在其内部并不是只有一个线程。创建线程的个数是根据通道的数量来决定的,每注册1023个通道就创建1个新的线程。

NIO的核心是多路复用器和事件模型,搞清楚了这两点其实就能搞清楚NIO的基本工作原理。原来在学习NIO的时候感觉很复杂,随着对TCP理解的深入,发现NIO其实并不难。在使用NIO的时候,最核心的代就是把Channel和要监听的事件注册到Selector上。

不同类型通道支持的事件

382ab60c278443cf9f363f26869bf769~tplv-k3u1fbpfcp-zoom-1.image

NIO事件模型示意图

b4ead676bb714205a011c27fdd81a885~tplv-k3u1fbpfcp-zoom-1.image

2.2.1 代码示例

ServerReactor

@Slf4j
public class ServerReactor implements Runnable {
    private final Selector selector;
    private final ServerSocketChannel serverSocketChannel;
    private volatile boolean stop = false;

    public ServerReactor(int port, int backlog) throws IOException {
        selector = Selector.open();
        serverSocketChannel = ServerSocketChannel.open();
        ServerSocket serverSocket = serverSocketChannel.socket();
        serverSocket.bind(new InetSocketAddress(port), backlog);
        serverSocket.setReuseAddress(true);
        serverSocketChannel.configureBlocking(false);
        // 将channel注册到多路复用器上,并监听ACCEPT事件
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
    }

    public void setStop(boolean stop) {
        this.stop = stop;
    }

    @Override
    public void run() {
        try {
            // 无限的接收客户端连接
            while (!stop && !Thread.interrupted()) {
                int num = selector.select();
                Set<SelectionKey> selectionKeys = selector.selectedKeys();
                Iterator<SelectionKey> it = selectionKeys.iterator();
                while (it.hasNext()) {
                    SelectionKey key = it.next();
                    // 移除key,否则会导致事件重复消费
                    it.remove();
                    try {
                        handle(key);
                    } catch (Exception e) {
                        if (key != null) {
                            key.cancel();
                            if (key.channel() != null) {
                                key.channel().close();
                            }
                        }
                    }
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
        if (selector != null) {
            try {
                selector.close();
            } catch (IOException e) {
                e.printStackTrace();
            }

        }
    }

    private void handle(SelectionKey key) throws Exception {
        if (key.isValid()) {
            // 如果是ACCEPT事件,代表是一个新的连接请求
            if (key.isAcceptable()) {
                ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
                // 相当于三次握手后,从全连接队列中获取可用的连接
                // 必须使用accept方法消费ACCEPT事件,否则将导致多路复用器死循环
                SocketChannel socketChannel = serverSocketChannel.accept();
                // 设置为非阻塞模式,当没有可用的连接时直接返回null,而不是阻塞。
                socketChannel.configureBlocking(false);
                socketChannel.register(selector, SelectionKey.OP_READ);
            }

            if (key.isReadable()) {
                SocketChannel socketChannel = (SocketChannel) key.channel();
                ByteBuffer readBuffer = ByteBuffer.allocate(1024);
                int readBytes = socketChannel.read(readBuffer);
                if (readBytes > 0) {
                    readBuffer.flip();
                    byte[] bytes = new byte[readBuffer.remaining()];
                    readBuffer.get(bytes);
                    String content = new String(bytes);
                    System.out.println("recv client content: " + content);
                    ByteBuffer writeBuffer = ByteBuffer.allocate(1024);
                    writeBuffer.put(("服务端已收到: " + content).getBytes());
                    writeBuffer.flip();
                    socketChannel.write(writeBuffer);

                } else if (readBytes < 0) {
                    key.cancel();
                    socketChannel.close();
                }
            }

        }
    }
}
复制代码

ClientReactor

public class ClientReactor implements Runnable {
    final String host;
    final int port;
    final SocketChannel socketChannel;
    final Selector selector;
    private volatile boolean stop = false;

    public ClientReactor(String host, int port) throws IOException {
        this.socketChannel = SocketChannel.open();
        this.socketChannel.configureBlocking(false);
        Socket socket = this.socketChannel.socket();
        socket.setTcpNoDelay(true);
        this.selector = Selector.open();
        this.host = host;
        this.port = port;

    }

    @Override
    public void run() {

        try {
            // 如果通道呈阻塞模式,则立即发起连接;
            // 如果呈非阻塞模式,则不是立即发起连接,而是在随后的某个时间才发起连接。

            // 如果连接是立即建立的,说明通道是阻塞模式,当连接成功时,则此方法返回true,连接失败出现异常。
            // 如果此通道处于阻塞模式,则此方法的调用将会阻塞,直到建立连接或发生I/O错误。

            // 如果连接不是立即建立的,说明通道是非阻塞模式,则此方法返回false,
            // 并且以后必须通过调用finishConnect()方法来验证连接是否完成
            // socketChannel.isConnectionPending()判断此通道是否正在进行连接
            if (socketChannel.connect(new InetSocketAddress(host, port))) {
                socketChannel.register(selector, SelectionKey.OP_READ);
                doWrite(socketChannel);
            } else {
                socketChannel.register(selector, SelectionKey.OP_CONNECT);

            }
            while (!stop && !Thread.interrupted()) {
                int num = selector.select();
                Set<SelectionKey> selectionKeys = selector.selectedKeys();
                Iterator<SelectionKey> it = selectionKeys.iterator();
                while (it.hasNext()) {
                    SelectionKey key = it.next();
                    // 移除key,否则会导致事件重复消费
                    it.remove();
                    try {
                        handle(key);
                    } catch (Exception e) {
                        if (key != null) {
                            key.cancel();
                            if (key.channel() != null) {
                                key.channel().close();
                            }
                        }
                    }
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }

        if (selector != null) {
            try {
                selector.close();
            } catch (IOException e) {
                e.printStackTrace();
            }

        }


    }

    private void handle(SelectionKey key) throws IOException {

        if (key.isValid()) {

            SocketChannel socketChannel = (SocketChannel) key.channel();

            if (key.isConnectable()) {
                if (socketChannel.finishConnect()) {
                    socketChannel.register(selector, SelectionKey.OP_READ);
                    doWrite(socketChannel);
                }
            }

            if (key.isReadable()) {
                ByteBuffer readBuffer = ByteBuffer.allocate(1024);
                int readBytes = socketChannel.read(readBuffer);
                if (readBytes > 0) {
                    readBuffer.flip();
                    byte[] bytes = new byte[readBuffer.remaining()];
                    readBuffer.get(bytes);
                    System.out.println("recv server content: " + new String(bytes));
                } else if (readBytes < 0) {
                    key.cancel();
                    socketChannel.close();
                }
            }

        }
    }

    private void doWrite(SocketChannel socketChannel) {
        Scanner scanner = new Scanner(System.in);
        new Thread(() -> {
            while (scanner.hasNext()) {
                try {

                    ByteBuffer writeBuffer = ByteBuffer.allocate(1024);
                    writeBuffer.put(scanner.nextLine().getBytes());
                    writeBuffer.flip();
                    socketChannel.write(writeBuffer);
                } catch (Exception e) {

                }
            }
        }).start();
    }
}

复制代码

参考文章:

  1. [闲话高并发的那些神话,看京东架构师如何把它拉下神坛

](mp.weixin.qq.com/s/lAqn8CfSR…) 2. Java NIO浅析 3. TCP协议中常见伯克利API函数用法详解 4. 《NIO和Socket技术编程指南》


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK