IO多路复用
Selector可以说是Java NIO中最重要的东西,一般称为选择器,也称为多路复用器,因为它是基于IO多路复用机制,这个机制也叫做事件驱动模型。Linux下IO多路复用机制通过调用select,poll,epoll等系统函数来实现,其中epoll是比较先进的一种方式。这些函数都可以同时监视多个文件描述符(File Description)的读写就绪状况,一旦某个文件描述符就绪,能够通知程序进行相应的读写操作。这样,多个描述符的I/O操作都能在一个线程内并发交替地顺序完成,这就叫I/O多路复用,这里的“复用”指的是复用同一个线程。但select,poll,epoll本质上都是同步I/O,因为他们都需要在读写事件就绪后自己负责进行读写,也就是说这个读写过程是阻塞的,而异步I/O则无需自己负责进行读写,异步I/O的实现会负责把数据从内核拷贝到用户空间。
具体到Java中的 Selector,它的IO多路复用是什么样的呢?在Java NIO中我们可以将 Channel 注册到 Selector 上,注册后 Selector 就可以监测Channel的状态(或称为事件,包括是否可读、是否可写、是否连接成功等),然后根据对应的状态进行我们想要的操作。因为多个 Channel 可以注册到同一个 Selector 上,我们就可以只用一个线程来处理多个通道,这样就可以大大减少线程之间上下文切换的开销,这就是 Selector 的优势所在。

Selector
Selector的创建
使用Selector的静态方法open()就可以创建一个Selector实例。1
Selector selector = Selector.open();
Channel注册到Selector
可选择通道(SelectableChannel)
并不是所有的 Channel 都可以注册到 Selector 上。比如 FileChannel 就不行。为什么呢?因为只有继承了抽象类SelectableChannel的 Channel 才可以注册,SelectableChannel的子类调用这个父类中的register方法来实现注册功能,所有socket通道类,就都继承了SelectableChannel。
注册
1 | //通道必须处于非阻塞模式才能与选择器一起使用 |
Channel 必须是非阻塞的才能注册到 Selector,像 FileChannel 就不能切换成非阻塞所以不能和 Selector 配合使用。configureBlocking()这个方法位于AbstractSelectableChannel类中。
register()方法第二个参数,是传入一个想监控的事件的集合,可以监听四种不同类型的事件:
- Connect:表示socket连接成功,使用
SelectionKey.OP_CONNECT表示。 - Accept:表示准备接收新的连接了,使用
SelectionKey.OP_ACCEPT表示。 - Read:表示通道有数据可读了,使用
SelectionKey.OP_READ表示。 - Write:表示可以写入数据到通道里了,使用
SelectionKey.OP_WRITE表示。
如果你想监控不止一种试讲,使用或运算符即可,如下:1
int interestSet = SelectionKey.OP_READ | SelectionKey.OP_WRITE;
注意:一个Channel它可以注册到多个 Selector 上。
SelectionKey
register()方法会返回一个SelectionKey对象,每当有一个 Channel 注册了都会有一个SelectionKey对象,通过选择器的selectedKeys()方法可以获取到所有的这些SelectionKey。1
Set<SelectionKey> selectionKeys = selector.selectedKeys();
SelectionKey表示的事一个特定的通道对象和一个特定的选择器对象之间的注册关系。1
2
3
4
5key.attachment(); //返回SelectionKey的attachment,attachment可以在注册channel的时候指定。
key.channel(); // 返回该SelectionKey对应的channel。
key.selector(); // 返回该SelectionKey对应的Selector。
key.interestOps(); //返回代表需要Selector监控的IO操作的bit
key.readyOps(); // 返回一个bit,代表在相应channel上可以进行的IO操作。
key.interestOps()
我们可以通过以下方法来判断Selector是否对Channel的某种事件是否想监听:1
2
3
4
5int interestSet = selectionKey.interestOps();
boolean isInterestedInAccept = (interestSet & SelectionKey.OP_ACCEPT) == SelectionKey.OP_ACCEPT;
boolean isInterestedInConnect = interestSet & SelectionKey.OP_CONNECT;
boolean isInterestedInRead = interestSet & SelectionKey.OP_READ;
boolean isInterestedInWrite = interestSet & SelectionKey.OP_WRITE;
key.readyOps()
ready 集合是通道已经准备就绪的操作的集合。我们可以通过以下几个方法用来检查这些操作是否就绪。1
2
3
4
5//检查这些操作是否就绪的方法
boolean isReadable();//是否可读,是返回 true
boolean isWritable()://是否可写,是返回 true
boolean isConnectable()://是否完成socket连接,是返回 true
boolean isAcceptable()://是否可接收连接,是返回 true
attachment
可以将一个对象或者更多信息附着到SelectionKey上,这样就能方便的识别某个给定的通道。例如,可以附加 与通道一起使用的Buffer,或是包含聚集数据的某个对象。使用方法如下:1
2key.attach(theObject);
Object attachedObj = key.attachment();
还可以在用register()方法向Selector注册Channel的时候附加对象。如:1
SelectionKey key = channel.register(selector, SelectionKey.OP_READ, theObject);
Selector监听Channel
Selector对象调用select()方法后开始监听,这个方法会阻塞,当有通道就绪后方法返回,返回代表通道的 SelectionKey 的数量,这个返回值是有可能为0的(比如这个阻塞的线程被wakeup或者interrupted)。1
int select = selector.select();
除了这个方法,还有两个重载方法,第一个是1
public abstract int select(long timeout) throws IOException;
select(long timeout)和select()一样,除了最长会阻塞timeout毫秒(参数)。注意,这个方法并不能提供精确时间的保证,和当执行wait(long timeout)方法时并不能保证会延时timeout道理一样。
另一个是1
public abstract int selectNow() throws IOException;
这个方法与select()的区别在于,是非阻塞的,即当前操作即使没有通道准备好也是立即返回,只是返回的是0。
响应
一旦调用了select()方法,并且返回值表明有一个或更多个通道就绪了,然后可以通过调用selector的selectorKeys()方法,获取就绪通道对应的“选择键”的集合,然后遍历集合,获取对应的通道,因为这个通道是ServerSocketChannel,它可以创建一个socket连接对应的SocketChannel,用于响应数据。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30//获取已经就绪的通道
int select = selector.select();
//如果已经没有已经就绪的通道,每隔1s查询一次
if (select == 0) {
TimeUnit.SECONDS.sleep(1);
continue;
}
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> keyIterator = selectionKeys.iterator();
while (keyIterator.hasNext()) {
SelectionKey selectionKey = keyIterator.next();
//如果某个key对应的通道接收就绪
if (selectionKey.isAcceptable()) {
ServerSocketChannel serverChannel = (ServerSocketChannel) selectionKey.channel();
SocketChannel socketChannel = serverChannel.accept();
registerSocketChannel(selector, socketChannel, SelectionKey.OP_READ);
//向客户端发送数据
replyClient(socketChannel);
}
//如果某个key对应的通道读就绪
if (selectionKey.isReadable()) {
readDataFromSocket(selectionKey);
}
//移除对应的SelectionKey
keyIterator.remove();
}
注意,遍历的时候每次都要进行remove()操作。为什么呢?因为要防止重复处理。如果不移除,已经处理过的 SelectionKey 仍然还在selector.selectedKeys()返回的 Set 中,对于上面的例子,对于某个isAcceptable的通道,serverChannel.accept()方法第一次调用是能得到SocketChannel对象,但第二次调用就可能会得到一个null,因为第二次的时候可能并没有监听到有socket连接进来。综上,不主动移除处理过的 SelectionKey 往往会导致一些意想不到的结果。
另外还需弄明白,这里的 remove SelectionKey 并不是把注册在 Selector 上的通道移除了,注册关系并没有发生改变,如果后面这个通道上有事件就绪,仍然可以通过selector.selectedKeys()获取这个通道和 Selector 绑定对应的 SelectionKey。