5

Java NIO 选择器 Selector

 3 years ago
source link: https://www.daqianduan.com/21703.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

选择器 Selector 是 I/O 多路复用的核心组件,它可以监控实现了 SelectableChannel 的 通道 的就绪情况。有了多路复用(multiplexing) I/O 模型,使得单线程的 Java 程序在极端情况下能够处理数万个连接,极大提高了程序的并发数。

1. 多路复用 I/O 模型

I/O 多路复用模型是操作系统提供给应用程序的一种进行 I/O 操作的模型 。应用程序通过 select/poll 系统调用监控多个 I/O 设备,一旦某个或者多个 I/O 设备的处于就绪状态(例如:可读)则返回,应用程序随后可对就绪的设备进行操作。

应用程序 内核 select 数据未准备好 阻塞等待 等待数据 数据已准备好 复制完成 复制数据到 用户空间 read 处理数据 阻塞等待 系统调用 返回可读 系统调用 返回 OK

大致流程如下:

1)应用程序向内核发起 select 系统调用,该调用会阻塞应用程序。

2)内核等待数据到达。数据可能由 DMA 复制到内核缓冲区,也有可能是 CPU 进行复制。

3)数据准备完毕,select 调用返回。select 返回的是一个集和,可能有多个连接都已经就绪。

4)应用程序发起 read 系统调用。

5)操作系统将数据有内核缓冲区复制到用户缓冲区。

6)read 调用返回。

I/O 多路复用模型本质上是一种阻塞 I/O,进行读操作的 read 系统调用是阻塞的,select 的时候也是阻塞的。不过 I/O 多路复用模型的优势在于阻塞时可以等待多路 I/O 就绪,然后一并处理。与多线程处理多路 I/O 相比,它是单线程的,没有线程切换的开销,单位时间内能够处理多的连接数。

2. 选择器与通道关系

在 Java 中,通道 Channel 可以表示 I/O 连接,而选择器可以监控某些 I/O 事件就绪的通道,选择通道中就绪的 I/O 事件。这里的通道必须是实现了 SelectableChannel 接口的通道,例如:SocketChannel, DatagramChannel 等;而文件通道 FileChannel 没有实现该接口,所以不支持选择器。

Selector 选择器 SelectableChannel 可选择通道 SelectableChannel 可选择通道 SelectableChannel 可选择通道 SelectableChannel 可选择通道

3. 选择键 SelectionKey

选择器 Selector 监控通道时监控的是通道中的事件,选择键 SelectionKey 就代表着 I/O 事件。程序通过调用 Selector.select() 方法来选中选择器所监控的通道中的就绪的 I/O 事件的集合,然后遍历集合,对事件作出相应的处理。

选择键 SelectionKey 可以表示 4 种事件,这 4 种事件使用 int 类型的常量来表示。

1)SelectionKey.OP_ACCEPT 表示 accept 事件就绪。例如:对于 ServerSocketChannel 来说,该事件就绪表示可以调用 accept() 方法来获得与客户端连接的通道 SocketChannel。

2)SelectionKey.OP_CONNECT 表示客户端与服务端连接成功。

3)SelectionKey.OP_READ 表示通道中已经有了可读数据,可以调用 read() 方法从通道中读取数据。

4)SelectionKey.OP_WRITE 表示写事件就绪,可以调用 write() 方法往通道中写入数据。

不同的通道所能够支持的 I/O 事件不同,例如:ServerSocketChannel 只支持 accept 事件,而 DatagramChannel 只支持 read 和 write 事件。要查看通道所支持的事件,可以查看通道的 javadoc 文档,或者调用通道的 validOps() 方法来进行判断。例如:channel.validOps() & SelectionKey.OP_READ > 0 表示 channel 支持读事件。

4. 选择器使用步骤

4.1 获取选择器

与通道和缓冲区的获取类似,选择器的获取也是通过静态工厂方法 open() 来得到的。

Selector selector = Selector.open();    // 获取一个选择器实例

4.2 获取可选择通道

能够被选择器监控的通道必须实现了 SelectableChannel 接口,并且需要将通道配置成非阻塞模式,否则后续的注册步骤会抛出 IllegalBlockingModeException。

SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("localhost", 9090)); // 打开 SocketChannel 并连接到本机 9090 端口
socketChannel.configureBlocking(false); // 配置通道为非阻塞模式

4.3 将通道注册到选择器

通道在被指定的选择器监控之前,应该先告诉选择器,并且告知监控的事件,即:将通道注册到选择器。

通道的注册通过 SelectableChannel.register(Selector selector, int ops) 来完成,ops 表示关注的事件,如果需要关注该通道的多个 I/O 事件,可以传入这些事件类型或运算之后的结果。这些事件必须是通道所支持的,否则抛出 IllegalArgumentException。

socketChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE);   // 将套接字通过到注册到选择器,关注 read 和 write 事件

4.4 轮询 select 就绪事件

通过调用选择器的 Selector.select() 方法可以获取就绪事件,该方法会将就绪事件放到一个 SelectionKey 集合中,然后返回就绪的事件的个数。这个方法映射多路复用 I/O 模型中的 select 系统调用,它是一个阻塞方法。正常情况下,直到至少有一个就绪事件,或者其它线程调用了当前 Selector 对象的 wakeup() 方法,或者当前线程被中断时返回。

while (selector.select() > 0){ // 轮询,且返回时有就绪事件
    Set<SelectionKey> keys = selector.selectedKeys(); // 获取就绪事件集合
    .......
}

有 3 种方式可以 select 就绪事件:

1)select() 阻塞方法,有一个就绪事件,或者其它线程调用了 wakeup() 或者当前线程被中断时返回。

2)select(long timeout) 阻塞方法,有一个就绪事件,或者其它线程调用了 wakeup(),或者当前线程被中断,或者阻塞时长达到了 timeout 时返回。不抛出超时异常。

3)selectNode() 不阻塞,如果无就绪事件,则返回 0;如果有就绪事件,则将就绪事件放到一个集合,返回就绪事件的数量。

4.5 处理就绪事件

每次可以 select 出一批就绪的事件,所以需要对这些事件进行迭代。从一个 SelectionKey 对象可以得到:1)就绪事件的对应的通道;2)就绪的事件。通过这些信息,就可以很方便地进行 I/O 操作。

for(SelectionKey key : keys){
    if(key.isWritable()){ // 可写事件
        if("Bye".equals( (line = scanner.nextLine()) )){
            socketChannel.shutdownOutput();
            socketChannel.close();
            break;
        }
        buf.put(line.getBytes());
        buf.flip();
        socketChannel.write(buf);
        buf.compact();
    }
}
keys.clear(); // 清除选择键(事件)集,避免下次循环的时候重复处理。

需要注意的是,处理完 I/O 事件之后,需要清除选择键集和,避免下一轮循环的时候对同一事件重复处理。

5. 完整示例

下面给出一个完整的实例,实例中包含 TCP 客户端 TcpClient, UDP 客户端 UdpClient 和服务端 EchoServer。服务端 EchoServer 可以同时处理 UDP 请求和 TCP 请求,用户可以在客户端控制台输入内容,按回车发送给服务端,服务端打印客户端发送过来的内容。完整代码: https://github.com/Robothy/java-experiments/tree/main/nio/Selector

5.1 服务端

public class EchoServer {

    public static void main(String[] args) throws IOException {

        Selector selector = Selector.open();    // 获取选择器

        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); // 打开服务器通道
        serverSocketChannel.configureBlocking(false);                         // 服务器通道配置为非阻塞模式
        serverSocketChannel.bind(new InetSocketAddress(9090));           // 绑定 TCP 端口 9090
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);       // 将服务器通道注册到选择器 selector 中,注册事件为 ACCEPT

        DatagramChannel datagramChannel = DatagramChannel.open();             // 打开套接字通道
        datagramChannel.configureBlocking(false);                             // 配置通道为非阻塞模式
        datagramChannel.bind(new InetSocketAddress(9090));               // 绑定 UDP 端口 9090
        datagramChannel.register(selector, SelectionKey.OP_READ);             // 将通道注册到选择器 selector 中,注册事件为读取数据

        ByteBuffer buf = ByteBuffer.allocate(1024);                           // 分配一个 1024 字节的堆字节缓冲区

        while (selector.select() > 0){                                        // 轮询已经就绪的注册的通道的 I/O 事件
            Set<SelectionKey> keys = selector.selectedKeys();                 // 获取就绪的 I/O 事件,即选择器键集合
            for (SelectionKey key : keys){                                    // 遍历选择键,处理就绪事件
                if(key.isAcceptable()){                                       // 选择键的事件的是 I/O 连接事件
                    SocketChannel socketChannel = serverSocketChannel.accept(); // 执行 I/O 操作,获取套接字连接通道
                    socketChannel.configureBlocking(false);                   // 配置为套接字通道为非阻塞模式
                    socketChannel.register(selector, SelectionKey.OP_READ);   // 将套接字通过到注册到选择器,关注 READ 事件
                }else if(key.isReadable()){                        // 选择键的事件是 READ
                    StringBuilder sb = new StringBuilder();
                    if(key.channel() instanceof DatagramChannel){  // 选择的通道为数据报通道,客户端是通过 UDP 连接过来的
                        sb.append("UDP Client: ");
                        datagramChannel.receive(buf);              // 最多读取 1024 字节,数据报多出的部分自动丢弃
                        buf.flip();
                        while(buf.position() < buf.limit()) {
                            sb.append((char)buf.get());
                        }
                        buf.clear();
                    }else{                                          // 选择的通道为套接字通道,客户端时通过 TCP 连接过来的
                        sb.append("TCP Client: ");
                        ReadableByteChannel channel = (ReadableByteChannel) key.channel(); // 获取通道
                        int size;
                        while ( (size = channel.read(buf))>0){
                            buf.flip();
                            while (buf.position() < buf.limit()) {
                                sb.append((char)buf.get());
                            }
                            buf.clear();
                        }

                        if (size == -1) {
                            sb.append("Exit");
                            channel.close();
                        }
                    }
                    System.out.println(sb);
                }
            }
            keys.clear();  // 将选择键清空,防止下次循环时被重复处理
        }
    }
}

5.2 TCP 客户端

public class TcpClient {

    public static void main(String[] args) throws IOException {

        Selector selector = Selector.open();

        SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("localhost", 9090));
        socketChannel.configureBlocking(false);
        socketChannel.register(selector, SelectionKey.OP_WRITE);

        Scanner scanner = new Scanner(System.in);
        String line;
        ByteBuffer buf = ByteBuffer.allocate(1024);

        while (selector.select() > 0){
            Set<SelectionKey> keys = selector.selectedKeys();
            for(SelectionKey key : keys){
                if(key.isWritable()){
                    if("Bye".equals( (line = scanner.nextLine()) )){
                        socketChannel.shutdownOutput();
                        socketChannel.close();
                        break;
                    }
                    buf.put(line.getBytes());
                    buf.flip();
                    socketChannel.write(buf);
                    buf.compact();
                }
            }
            keys.clear();
            if(!socketChannel.isOpen()) break;
        }
    }
}

5.3 UDP 客户端

public class UdpClient {

    public static void main(String[] args) throws IOException {
        Selector selector = Selector.open();                        // 获取选择器
        DatagramChannel datagramChannel = DatagramChannel.open();   // 打开一个数据报通道
        datagramChannel.configureBlocking(false);                   // 配置通道为非阻塞模式
        datagramChannel.register(selector, SelectionKey.OP_WRITE);  // 将通道的写事件注册到选择器
        ByteBuffer buff = ByteBuffer.allocate(1024);                // 分配字节缓冲区
        Scanner scanner = new Scanner(System.in);                   // 创建扫描器,扫描控制台输入流
        InetSocketAddress server = new InetSocketAddress("localhost", 9090);
        while (selector.select() > 0){                              // 有就绪事件
            Set<SelectionKey> keys = selector.selectedKeys();       // 获取选择键,即就绪的事件
            for(SelectionKey key : keys){                           // 遍历选择键
                if(key.isWritable()){                               // 如果当前选择键是读就绪
                    String line;
                    if("Bye".equals( line = scanner.nextLine() )) { // 从控制台获取 1 行输入,并检查输入的是不是 Bye
                        System.exit(0);                 // 正常退出
                    }
                    buff.put(line.getBytes());          // 放入缓冲区
                    buff.flip();                        // 将缓冲区置为读状态
                    datagramChannel.send(buff, server); // 往 I/O 写数据
                    buff.compact();                     // 压缩缓冲区,保留没发送完的数据
                }
            }
            keys.clear();
        }
    }
}

6. 小结

Selector 作为多路复用 I/O 模型的核心组件,能够同时监控多路 I/O 通道。选择器在 select 就绪事件地时候会阻塞,在处理 I/O 事件的时候也会阻塞,它的优势在于在阻塞的时候可以等待多路 I/O 就绪,是一种异步阻塞 I/O 模型。与多线程处理多路 I/O 相比,多路复用模型只需要单个线程即可处理万级连接,没有线程切换的开销。

#感谢您访问本站#
#本文转载自互联网,若侵权,请联系删除,谢谢!657271#qq.com#

Recommend

  • 147
    • webuilddesign.com 6 years ago
    • Cache

    Java NIO Programming Cookbook - WBD

    Delve into the world of Java NIO Programming with this compact cookbook that introduces common recipes for the Java NIO operations! Java.nio (NIO stands for non-blocking I/O) is a collection of Java programming language APIs that offer features...

  • 58
    • www.tuicool.com 4 years ago
    • Cache

    Java NIO:浅析 I/O 模型

    也许很多朋友在学习NIO的时候都会感觉有点吃力,对里面的很多概念都感觉不是那么明朗。在进入Java NIO编程之前,我们今天先来讨论一些比较基础的知识:I/O模型。下面本文先从同步和异步的概念 说起,然后接着阐述了阻塞和非阻塞的区别,...

  • 22

    README.md PictureSelector 2.0 一款针对android平台下的图片选择器,支持从相册或拍照选择图片或视频、音频,支持动态权限获取、裁剪(单图or多图裁剪)、压缩、主...

  • 14

  • 17
    • www.cnblogs.com 3 years ago
    • Cache

    Java NIO:选择器

    最近打算把Java网络编程相关的知识深入一下(IO、NIO、Socket编程、Netty) Java NIO主要需要理解缓冲区、通道、选择器三个核心概念,作为对Java I/O的补充, 以提升大批量数据传输的效率。 学习NIO之前最好能有基础的网络...

  • 14
    • itimetraveler.github.io 3 years ago
    • Cache

    【java】nio的原理与浅析 | iTimeTraveler

    【Java】NIO的原理与浅析 用户空间与内核空间现在操作系统都是采用虚拟存储器,那么对32位操作系统而言,它的寻址空间(虚拟存储空间)为4G(2的32次方)。操作系统的核心是内核,独立于普通的应用程序,可以访问受保...

  • 17
    • segmentfault.com 3 years ago
    • Cache

    【对线面试官】Java NIO

    ...

  • 5
    • toweave.github.io 3 years ago
    • Cache

    CSS Selector 选择器

    CSS Selector 选择器Tuesday, April 2nd 2019, 10:15:53 am ( ISO 8601 )了解并且熟用 CSS3 为我们提供的强大并且优雅的选择器,就可以简化我们的代码。 ** 一、基本选择器 ** A. 通配选择器 「*」

  • 1
    • benpaodewoniu.github.io 1 year ago
    • Cache

    java | nio-selector 写入事件

    java | nio-selector 写入事件 可写事件。向客户端发送数据。 引入自写的方法类。

  • 3
    • benpaodewoniu.github.io 1 year ago
    • Cache

    java | nio-selector read 事件 消息边界

    java | nio-selector read 事件 消息边界 如果,ByteBuffer 的长度小于客户端发送数据长度,就会出现消息边界问题。 引入自...

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK