Java NIO:Selector详解

Selector事件分发器(单线程选择就绪的事件)作为Java NIO的核心组件,这里详细了解内部实现

服务端代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
ServerSocketChannel serverChannel = ServerSocketChannel.open();
serverChannel.configureBlocking(false);
serverChannel.socket().bind(new InetSocketAddress(port));
Selector selector = Selector.open();
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
while(true){
selector.select();
Iterator iter = this.selector.selectedKeys().iterator();
while(iter.hasNext()){
SelectionKey key = (SelectionKey)iter.next();
if (key.isAcceptable()){
SocketChannel clientChannel = ((ServerSocketChannel) key.channel()).accept();
clientChannel.configureBlocking(false);
// 监听客户端socket可读就绪事件
client.register(selector, SelectionKey.OP_READ);
}
if (key.isReadable()){
handleRead(key);
}
if (key.isWritable() && key.isValid()){
handleWrite(key);
}
iter.remove();
}
}
  1. 如果有客户端A连接服务,执行select方法时,可以通过serverSocketChannel获取客户端A的socketChannel,并在selector上注册socketChannelOP_READ事件。
  2. 如果客户端A发送数据,会触发OP_READ事件,这样下次轮询调用select方法时,就能通过socketChannel读取数据,同时在selector上注册该socketChannelOP_WRITE事件,实现服务器往客户端写数据。

Selector.open()实现原理

注意:以下源代码皆来源于openjdk8

  • Selector.open()可以得到一个Selector实例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public static Selector open() throws IOException {
// 首先找到provider,然后再打开Selector
return SelectorProvider.provider().openSelector();
}
// java.nio.channels.spi.SelectorProvider
public static SelectorProvider provider() {
synchronized (lock) {
if (provider != null)
return provider;
return AccessController.doPrivileged(
new PrivilegedAction<SelectorProvider>() {
public SelectorProvider run() {
if (loadProviderFromProperty())
return provider;
if (loadProviderAsService())
return provider;
// 实际创建SelectorProvider的方法
provider = sun.nio.ch.DefaultSelectorProvider.create();
return provider;
}
});
}
}
  • sun.nio.ch.DefaultSelectorProvider

不同系统对应着不同的sun.nio.ch.DefaultSelectorProvider,以Linux为例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
/**
* Returns the default SelectorProvider.
*/
public static SelectorProvider create() {
// 获取OS名称
String osname = AccessController
.doPrivileged(new GetPropertyAction("os.name"));
// 根据名称来创建不同的Selctor
if (osname.equals("SunOS"))
return createProvider("sun.nio.ch.DevPollSelectorProvider");
if (osname.equals("Linux"))
return createProvider("sun.nio.ch.EPollSelectorProvider");
return new sun.nio.ch.PollSelectorProvider();
}

如果系统名称是Linux的话,真正创建的是sun.nio.ch.EPollSelectorProvider

1
2
3
4
// EPollSelectorProvider.openSelector()
public AbstractSelector openSelector() throws IOException {
return new EPollSelectorImpl(this);
}

Linux最终的Selector实现:sun.nio.ch.EPollSelectorImpl

EPollSelectorImpl.select()实现原理

epoll系统调用主要分为3个函数: epoll_create, epoll_ctl, epoll_wait

epoll_create:创建一个epoll fd

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// class EPollSelectorImpl extends SelectorImpl
EPollSelectorImpl(SelectorProvider sp) throws IOException {
super(sp);
// makePipe返回管道的2个文件描述符,编码在一个long类型的变量中
// 高32位代表读 低32位代表写
// 使用pipe为了实现Selector的wakeup逻辑
long pipeFds = IOUtil.makePipe(false);
fd0 = (int) (pipeFds >>> 32);
fd1 = (int) pipeFds;
// 创建一个EPollArrayWrapper
pollWrapper = new EPollArrayWrapper();
pollWrapper.initInterrupt(fd0, fd1);
fdToKey = new HashMap<>();
}

创建一个EPollArrayWrapper 初始化

1
2
3
4
5
6
EPollArrayWrapper() throws IOException {
// 创建epoll fd
epfd = epollCreate();
}
private native int epollCreate();

在初始化过程中调用了native epollCreate方法。

1
2
3
4
5
6
7
8
9
Java_sun_nio_ch_EPollArrayWrapper_epollCreate(JNIEnv *env, jobject this)
{
//从Linux2.6.8之后,改用了红黑树结构,指定了大小也没用
int epfd = epoll_create(256);
if (epfd < 0) {
JNU_ThrowIOExceptionWithLastError(env, "epoll_create failed");
}
return epfd;
}

epoll_create: 内核系统调用,创建一个epoll fd, 并且开辟epoll自己的内核高速cache区,建立红黑树分配初始size的内存对象,同时建立一个list链表,用于存储准备就绪的事件

Epoll wait:等待内核IO事件

调用Selector.select(),最后会委托给EPollSelectorImpldoSelect()方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
protected int doSelect(long timeout) throws IOException {
if (closed)
throw new ClosedSelectorException();
processDeregisterQueue();
try {
begin();
// 等待事件到来,收集事件到来的fd并用来处理
pollWrapper.poll(timeout);
} finally {
end();
}
processDeregisterQueue();
int numKeysUpdated = updateSelectedKeys();
if (pollWrapper.interrupted()) {
// Clear the wakeup pipe
pollWrapper.putEventOps(pollWrapper.interruptedIndex(), 0);
synchronized (interruptLock) {
pollWrapper.clearInterrupted();
IOUtil.drain(fd0);
interruptTriggered = false;
}
}
return numKeysUpdated;
}

实际执行EPollArrayWrapper.poll(timeout);

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
int poll(long timeout) throws IOException {
// 看下文
updateRegistrations();
// 调用native方法,发起系统内核调用
updated = epollWait(pollArrayAddress, NUM_EPOLLEVENTS, timeout, epfd);
for (int i=0; i<updated; i++) {
if (getDescriptor(i) == incomingInterruptFD) {
interruptedIndex = i;
interrupted = true;
break;
}
}
return updated;
}
private native int epollWait(long pollAddress, int numfds, long timeout,
int epfd) throws IOException;

epollWait也是个native方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
Java_sun_nio_ch_EPollArrayWrapper_epollWait(JNIEnv *env, jobject this,
jlong address, jint numfds,
jlong timeout, jint epfd)
{
struct epoll_event *events = jlong_to_ptr(address);
int res;
if (timeout <= 0) {
// 发起epoll_wait系统调用等待内核事件
RESTARTABLE(epoll_wait(epfd, events, numfds, timeout), res);
} else {
res = iepoll(epfd, events, numfds, timeout);
}
if (res < 0) {
JNU_ThrowIOExceptionWithLastError(env, "epoll_wait failed");
}
return res;
}

epoll_wait: 内核系统调用, 等待内核返回IO事件

epoll_ctl: IO事件管理

注册到Selector上的IO事件是使用SelectionKey来表示,代表了Channel感兴趣的事件,如Read,Write,Connect,Accept.

调用Selector.register()完成IO事件注册,实际执行EPollSelectorImpl.implRegister()方法

1
2
3
4
5
6
7
8
9
protected void implRegister(SelectionKeyImpl ski) {
if (closed)
throw new ClosedSelectorException();
SelChImpl ch = ski.channel;
int fd = Integer.valueOf(ch.getFDVal());
fdToKey.put(fd, ski);
pollWrapper.add(fd);
keys.add(ski);
}

调用Selector.register()时均会将事件存储到EpollArrayWrapper的成员变量eventsLoweventsHigh

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// 使用数组保存事件变更, 数组的最大长度是MAX_UPDATE_ARRAY_SIZE, 最大64*1024
private final byte[] eventsLow = new byte[MAX_UPDATE_ARRAY_SIZE];
// 超过数组长度的事件会缓存到这个map中,等待下次处理
private Map<Integer,Byte> eventsHigh;
// 添加文件描述符fd
void add(int fd) {
// force the initial update events to 0 as it may be KILLED by a
// previous registration.
synchronized (updateLock) {
assert !registered.get(fd);
setUpdateEvents(fd, (byte)0, true);
}
}
private void setUpdateEvents(int fd, byte events, boolean force) {
// 判断fd和数组长度
if (fd < MAX_UPDATE_ARRAY_SIZE) {
if ((eventsLow[fd] != KILLED) || force) {
eventsLow[fd] = events;
}
} else {
Integer key = Integer.valueOf(fd);
if (!isEventsHighKilled(key) || force) {
eventsHigh.put(key, Byte.valueOf(events));
}
}
}

执行EpollArrayWrapper.poll()的时候, 首先会调用updateRegistrations()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
/**
* Returns the pending update events for the given file descriptor.
*/
private byte getUpdateEvents(int fd) {
if (fd < MAX_UPDATE_ARRAY_SIZE) {
return eventsLow[fd];
} else {
Byte result = eventsHigh.get(Integer.valueOf(fd));
// result should never be null
return result.byteValue();
}
}
private void updateRegistrations() {
synchronized (updateLock) {
int j = 0;
while (j < updateCount) {
int fd = updateDescriptors[j];
// 从保存的eventsLow和eventsHigh里取出事件
short events = getUpdateEvents(fd);
boolean isRegistered = registered.get(fd);
int opcode = 0;
if (events != KILLED) {
// 判断操作类型以传给epoll_ctl
// 没有指定EPOLLET事件类型
if (isRegistered) {
// 如果已经注册过,不需要调用epollCtl去内核红黑树新增节点
opcode = (events != 0) ? EPOLL_CTL_MOD : EPOLL_CTL_DEL;
} else {
opcode = (events != 0) ? EPOLL_CTL_ADD : 0;
}
if (opcode != 0) {
// 熟悉的epoll_ctl
epollCtl(epfd, opcode, fd, events);
if (opcode == EPOLL_CTL_ADD) {
registered.set(fd);
} else if (opcode == EPOLL_CTL_DEL) {
registered.clear(fd);
}
}
}
j++;
}
updateCount = 0;
}
}
private native void epollCtl(int epfd, int opcode, int fd, int events);

在获取到事件之后将操作委托给了epollCtl,这又是个native方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
Java_sun_nio_ch_EPollArrayWrapper_epollCtl(JNIEnv *env, jobject this, jint epfd,
jint opcode, jint fd, jint events)
{
struct epoll_event event;
int res;
event.events = events;
event.data.fd = fd;
// 发起epoll_ctl调用来进行IO事件的管理
RESTARTABLE(epoll_ctl(epfd, (int)opcode, (int)fd, &event), res);
if (res < 0 && errno != EBADF && errno != ENOENT && errno != EPERM) {
JNU_ThrowIOExceptionWithLastError(env, "epoll_ctl failed");
}
}

注意:jdk nio没有指定ET(边缘触发)还是LT(水平触发), 所以默认会用LT, 而Netty epoll transport使用ET触发

通过channel就能不断的获取客户端socket数据,实现后端业务处理

参考

Java NIO分析(8): 高并发核心Selector详解

java NIO 运行原理介绍

Introduction to the Java NIO Selector

深入浅出NIO之Selector实现原理

谈一谈 Java IO 模型

热评文章