Categories
程式開發

NIO看破也说破(四)—— Java的NIO


Java的NIO有selector,系统内核也提供了多种非阻塞IO模型,Java社区也出现了像netty这种优秀的 NIO 框架。Java的NIO 与内核的阻塞模型到底什么关系,为什么Java有NIO的API还出现了netty这种框架,网上说的 reactor 到底是什么?本文通过分析代码,带你一步步搞清楚Java的NIO和系统函数之间的关系,以及Java NIO 是如何一步步衍生出来netty框架。

NIO概念

前几节我们提到了 Nonblocking IO 的概念,在Java中有Java NIO 的系列包,网上的大多数资料把Java的NIO等同于 Nonblocking IO ,这是错误的。Java 中的 NIO 指的是从1.4版本后,提供的一套可以替代标准的Java IO 的 new API。有三部分组成:

Buffer 缓冲区Channel 通道Selector 选择器

API的具体使用不在本文赘述。

代码模板

NIO大致分为这几步骤:

获取channel设置非阻塞创建多路复用器selectorchannel和selector做关联根据selector返回的channel状态处理逻辑

// 开启一个channel
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
// 设置为非阻塞
serverSocketChannel.configureBlocking(false);
// 绑定端口
serverSocketChannel.bind(new InetSocketAddress(PORT));
// 打开一个多路复用器
Selector selector = Selector.open();
// 绑定多路复用器和channel
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
// 获取到达的事件
while (selector.select() > 0) {
Set keys = selector.keys();
Iterator iterator = keys.iterator();
while (iterator.hasNext()) {
SelectionKey selectionKey = iterator.next();
if (selectionKey.isAcceptable()) {
// 处理逻辑
}
if (selectionKey.isReadable()) {
// 处理逻辑
}
}
}

单线程示例

参考代码模板,我们用 NIO 实现一个Echo Server。server代码如下:

public static void main(String[] args) throws IOException {
Selector selector = initServer();
while (selector.select() > 0) {
Set set = selector.selectedKeys();
Iterator iterator = set.iterator();
while (iterator.hasNext()) {
SelectionKey selectionKey = iterator.next();
try {
if (selectionKey.isAcceptable()) {
ServerSocketChannel serverSocketChannel =
(ServerSocketChannel) selectionKey.channel();
SocketChannel channel = serverSocketChannel.accept();
System.out.println("建立链接:" + channel.getRemoteAddress());
channel.configureBlocking(false);
ByteBuffer buffer = ByteBuffer.allocate(1024);
channel.register(selector, SelectionKey.OP_READ, buffer);
} else if (selectionKey.isReadable()) {
SocketChannel channel = (SocketChannel) selectionKey.channel();
ByteBuffer buffer = (ByteBuffer) selectionKey.attachment();
buffer.clear();
StringBuilder sb = new StringBuilder();
int read = 0;
while ((read = channel.read(buffer)) > 0) {
buffer.flip();
sb.append(Charset.forName("UTF-8").
newDecoder().decode(buffer));
buffer.clear();
}
System.out.printf("收到 %s 发来的:%sn",
channel.getRemoteAddress(), sb);
buffer.clear();
// 模拟server端处理耗时
Thread.sleep((int) (Math.random() * 1000));
buffer.put(("收到,你发来的是:" + sb + "rn").getBytes("utf-8"));
buffer.flip();
channel.write(buffer);
System.out.printf("回复 %s 内容是: %sn",
channel.getRemoteAddress(), sb);
channel.register(selector, SelectionKey.OP_READ, buffer.clear());
}
} catch (IOException | InterruptedException e) {
selectionKey.cancel();
selectionKey.channel().close();
System.err.println(e.getMessage());
}
iterator.remove();
}
}
}

写一个client ,模拟 50 个线程同时请求 server 端,在 readHandler 中模拟了随机sleep。client代码:

public static void main(String[] args) throws IOException {
for (int i = 0; i < 50; i++) { new Thread(new Runnable() { @Override public void run() { try { clientHandler(); } catch (IOException e) { e.printStackTrace(); } } }).start(); } return; } private static void clientHandler() throws IOException { long start = System.currentTimeMillis(); Socket socket = new Socket(); socket.setSoTimeout(10000); socket.connect(new InetSocketAddress(9999)); OutputStream outputStream = socket.getOutputStream(); BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(outputStream)); bw.write("你好,我是client " + socket.getLocalSocketAddress() + "rn"); bw.flush(); InputStream inputStream = socket.getInputStream(); BufferedReader br = new BufferedReader(new InputStreamReader(inputStream)); System.out.printf("接到服务端响应:%s,处理了%drn", br.readLine(), (System.currentTimeMillis() - start)); br.close(); inputStream.close(); bw.close(); outputStream.close(); }

NIO看破也说破(四)—— Java的NIO 1

Selector 实现原理

用strace启动,[[email protected] tmp]# strace -ff -o out /usr/lib/jvm/java-1.8.0/bin/java NIOServerSingle 分析执行的过程,在日志中可以看到如下片段:

20083 socket(AF_INET, SOCK_STREAM, IPPROTO_IP) = 4
20084 setsockopt(4, SOL_SOCKET, SO_REUSEADDR, [1], 4) = 0
20085 clock_gettime(CLOCK_MONOTONIC, {tv_sec=242065, tv_nsec=887240727}) = 0
20086 fcntl(4, F_GETFL) = 0x2 (flags O_RDWR)
20087 fcntl(4, F_SETFL, O_RDWR|O_NONBLOCK) = 0
20088 bind(4, {sa_family=AF_INET, sin_port=htons(9999), sin_addr=inet_addr("0.0.0.0")}, 16) = 0
20089 listen(4, 50) = 0
20090 getsockname(4, {sa_family=AF_INET, sin_port=htons(9999), sin_addr=inet_addr("0.0.0.0")}, [16]) = 0
20091 getsockname(4, {sa_family=AF_INET, sin_port=htons(9999), sin_addr=inet_addr("0.0.0.0")}, [16]) = 0
20092 epoll_create(256) = 7
21100 epoll_ctl(7, EPOLL_CTL_ADD, 4, {EPOLLIN, {u32=4, u64=158913789956}}) = 0
21101 epoll_wait(7, [{EPOLLIN, {u32=4, u64=158913789956}}], 8192, -1) = 1

可以看出,在Java的 NIO 中(java1.8)底层是调用的系统 epoll ,关于 epoll 请出门右转" 这里不再啰嗦。

从源码中也可以看出:

public static Selector open() throws IOException {
return SelectorProvider.provider().openSelector();
}

openSelector是抽象方法具体实现类,在Linux上代码如下:

public class EPollSelectorProvider
extends SelectorProviderImpl
{
public AbstractSelector openSelector() throws IOException {
return new EPollSelectorImpl(this);
}

public Channel inheritedChannel() throws IOException {
return InheritedChannel.getChannel();
}
}

跟踪代码可以看到最后调用 native 方法,说明:Java NIO 是利用系统内核提供的能力。

多线程处理

我们把单线程示例中,readHandler 随机 sleep,稍稍做些修改。模拟 server 端执行某一次请求时,处理过慢,如图示:

第十五个请求过来时,随机sleep:

// 模拟server端处理耗时
if (t.get() == 15) {
Thread.sleep((int) (Math.random() * 10000));
}

结果第十五个线程之后,所有 client 的执行都有一个短暂的等待

NIO看破也说破(四)—— Java的NIO 2

很容易解释,因为在单线程处理中,channel创建、IO读写均为一个 Thread ,面对50个 client,IO时间需要排队处理。因此我们Redis系列中也提到了在Redis中,尽量避免某一个key的操作会很耗时的情况。可以参考 出门右转"

我们对代码做一些改造,client 端代码不动,server 端代码稍作调整。增加一个线程来处理读写时间,代码片段如下:

if (selectionKey.isAcceptable()) {
ServerSocketChannel serverSocketChannel =
(ServerSocketChannel) selectionKey.channel();
SocketChannel channel = serverSocketChannel.accept();
System.out.println("建立链接:" + channel.getRemoteAddress());
channel.configureBlocking(false);
ByteBuffer buffer = ByteBuffer.allocate(1024);
channel.register(selector, SelectionKey.OP_READ, buffer);
} else if (selectionKey.isReadable()) {
service.execute(new Runnable() {
@Override
public void run() {
try {
// 处理逻辑……………………
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}

这样相当于server端有两个线程,一个是主线程启动的 selector 来监听 channel 的 OP_ACCEPT 状态,另一个线程是处理 channel 的读写。程序也可以继续执行,稍稍快了一些。

NIO看破也说破(四)—— Java的NIO 3

reactor模式

接触 NIO 就一定听过reactor 这个名词,reactor 经常被混入 NIO 中,让很多人混淆概念。Reactor 到底是什么,维基百科"的解释:

The reactor design pattern is an event handling pattern for handling service requests delivered concurrently to a service handler by one or more inputs. The service handler then demultiplexes the incoming requests and dispatches them synchronously to the associated request handlers.

核心点四个:

reactor design pattern (reactor是一种设计模式,不是专属于某个语言或框架的)event handling pattern (事件处理模式)delivered concurrently to a service handler by one or more inputs(一次处理一个或多个输入)demultiplexes the incoming requests and dispatches them(多路分解,分发)

我们对单线程示例做些修改:

public static void main(String[] args) throws IOException {
Selector selector = initServer();
while (selector.select() > 0) {
Set set = selector.selectedKeys();
Iterator iterator = set.iterator();
while (iterator.hasNext()) {
SelectionKey selectionKey = iterator.next();
dispatcher(selectionKey);
iterator.remove();
}
}
}

initServer的实现:

private static Selector initServer() throws IOException {
ServerSocketChannel serverChannel = ServerSocketChannel.open();
serverChannel.configureBlocking(false);
serverChannel.bind(new InetSocketAddress(9999));

Selector selector = Selector.open();
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("server 启动");
return selector;
}

dispatcher 的实现:

private static void dispatcher(SelectionKey selectionKey) {
try {
if (selectionKey.isAcceptable()) {
// 我只负责处理链接
acceptHandler(selector, selectionKey);
} else if (selectionKey.isReadable()) {
// 我只处理读写数据
readHandler(selector, selectionKey);
}
} catch (IOException | InterruptedException e) {
selectionKey.cancel();
selectionKey.channel().close();
System.err.println(e.getMessage());
}
}

acceptHandler的实现:

ServerSocketChannel serverSocketChannel = (ServerSocketChannel) selectionKey.channel();
SocketChannel channel = serverSocketChannel.accept();
System.out.println("建立链接:" + channel.getRemoteAddress());
channel.configureBlocking(false);
ByteBuffer buffer = ByteBuffer.allocate(1024);
channel.register(selector, SelectionKey.OP_READ, buffer);

readHandler的实现:

SocketChannel channel = (SocketChannel) selectionKey.channel();
ByteBuffer buffer = (ByteBuffer) selectionKey.attachment();
buffer.clear();
StringBuilder sb = new StringBuilder();
int read = 0;
while ((read = channel.read(buffer)) > 0) {
buffer.flip();
sb.append(Charset.forName("UTF-8").newDecoder().decode(buffer));
buffer.clear();
}
System.out.printf("收到 %s 发来的:%sn", channel.getRemoteAddress(), sb);
buffer.clear();
// 模拟server端处理耗时
Thread.sleep((int) (Math.random() * 1000));
buffer.put(("收到,你发来的是:" + sb + "rn").getBytes("utf-8"));
buffer.flip();
channel.write(buffer);
System.out.printf("回复 %s 内容是: %sn", channel.getRemoteAddress(), sb);
channel.register(selector, SelectionKey.OP_READ, buffer.clear());

单线程Reactor

改造后的代码,具备了以下特点:

基于事件驱动( NIO 的 selector,底层对事件驱动的epoll实现 jdk1.8)统一分派中心(dispatcher方法)不同的事件处理(accept 和 read write 拆分)

已经基本上实现了 Reactor 的单线程模式,我们把示例代码再做一些改造:

public class ReactorDemo {
private Selector selector;

public ReactorDemo() throws IOException {
initServer();
}
private void initServer() throws IOException {
ServerSocketChannel serverChannel = ServerSocketChannel.open();
serverChannel.configureBlocking(false);
serverChannel.bind(new InetSocketAddress(9999));

selector = Selector.open();
SelectionKey selectionKey = serverChannel.
register(selector, SelectionKey.OP_ACCEPT);
selectionKey.attach(new Acceptor());
System.out.println("server 启动");
}

public void start() throws IOException {
while (selector.select() > 0) {
Set set = selector.selectedKeys();
Iterator iterator = set.iterator();
while (iterator.hasNext()) {
SelectionKey selectionKey = iterator.next();
dispater(selectionKey);
iterator.remove();
}

}
}

public void dispater(SelectionKey selectionKey) {
Hander hander = (Hander) selectionKey.attachment();
if (hander != null) {
hander.process(selectionKey);
}
}

private interface Hander {
void process(SelectionKey selectionKey);
}

private class Acceptor implements Hander {

@Override
public void process(SelectionKey selectionKey) {
try {
ServerSocketChannel serverSocketChannel =
(ServerSocketChannel) selectionKey.channel();
SocketChannel channel = serverSocketChannel.accept();
System.out.println("建立链接:" + channel.getRemoteAddress());
channel.configureBlocking(false);
selectionKey.attach(new ProcessHander());
channel.register(selector, SelectionKey.OP_READ);
} catch (IOException e) {
e.printStackTrace();
}
}
}

private class ProcessHandler implements Hander {
@Override
public void process(SelectionKey selectionKey) {

}
}

public static void main(String[] args) throws IOException {
new ReactorDemo().start();
}

}

我们实现了最基本的单Reactor的单线程模型,程序启动后 selector 负责获取、分离可用的 socket 交给dispatcher处理,dispatcher 交给不同的 handler 处理。其中 Acceptor 只负责 socket 链接,IO 的处理交给 ProcessHandler。

我把网上流传的Reactor的图,按自己的理解重新画了一份

NIO看破也说破(四)—— Java的NIO 4

多线程Reactor

上面的示例代码中,从 socket 建立到 IO 完成,只有一个线程在处理。NIO 单线程示例中我们尝试加入线程池来加速 IO 任务的处理,reactor 模式中该如何实现呢?

简单理解,参考 NIO 多线程加入线程池处理所有的processHandler ,可以利用 CPU 多核心加快业务处理,代码不再赘述。

NIO看破也说破(四)—— Java的NIO 5

多Reactor模式

参考ReactorDemo ,我们的 acceptor 处理 socket 链接时和 handler 处理 IO 都是用的同一个 selector 。如果我们在多线程基础上有两个 selector ,一个只负责处理 socket 链接一个处理网路 IO 各司其职将会更高大的提升系统吞吐量,该怎么实现呢?

public class ReactorDemo {
private Selector selector;
private Selector ioSelector;

public ReactorDemo() throws IOException {
initServer();
}

private void initServer() throws IOException {
ServerSocketChannel serverChannel = ServerSocketChannel.open();
serverChannel.configureBlocking(false);
serverChannel.bind(new InetSocketAddress(9999));

selector = Selector.open();
ioSelector = Selector.open();
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("server 启动");
}

public void startServer() {
Executors.newFixedThreadPool(1).execute(new Runnable() {
@Override
public void run() {
try {
majorListen();
} catch (IOException e) {
e.printStackTrace();
}
}
});
Executors.newFixedThreadPool(1).execute(new Runnable() {
@Override
public void run() {
try {
subListen();
} catch (IOException e) {
e.printStackTrace();
}
}
});
}

public void majorListen() throws IOException {
System.out.println("主selector启动");
while (selector.select() > 0) {
System.out.println("主selector有事件");
Set set = selector.selectedKeys();
Iterator iterator = set.iterator();
while (iterator.hasNext()) {
SelectionKey selectionKey = iterator.next();
if (selectionKey.isAcceptable()) {
new Acceptor().process(selectionKey);
}
iterator.remove();
}

}
}

public void subListen() throws IOException {
System.out.println("子selector启动");
while (true) {
if (ioSelector.select(100) <= 0) { continue; } System.out.println("子selector有事件"); Set set = ioSelector.selectedKeys(); Iterator iterator = set.iterator(); while (iterator.hasNext()) { SelectionKey selectionKey = iterator.next(); selectionKey.attach(new ProcessHander()); dispater(selectionKey, true); iterator.remove(); } } } public void dispater(SelectionKey selectionKey, boolean isSub) { Hander hander = (Hander) selectionKey.attachment(); if (hander != null) { hander.process(selectionKey); } } private interface Hander { void process(SelectionKey selectionKey); } private class Acceptor implements Hander { @Override public void process(SelectionKey selectionKey) { try { ServerSocketChannel serverSocketChannel = (ServerSocketChannel) selectionKey.channel(); SocketChannel channel = serverSocketChannel.accept(); System.out.println("获取一个链接:" + channel.getRemoteAddress()); channel.configureBlocking(false); channel.register(ioSelector, SelectionKey.OP_READ, ByteBuffer.allocate(1024)); ioSelector.wakeup(); } catch (IOException e) { e.printStackTrace(); } } } private class ProcessHander implements Hander { @Override public void process(SelectionKey selectionKey) { try { SocketChannel channel = (SocketChannel) selectionKey.channel(); ByteBuffer buffer = ByteBuffer.allocate(1024); buffer.clear(); StringBuilder sb = new StringBuilder(); int read = 0; if ((read = channel.read(buffer)) > 0) {
buffer.flip();
sb.append(Charset.forName("UTF-8").newDecoder().decode(buffer));
buffer.clear();
} else if (read == 0) {
return;
} else if (read == -1) {
if (selectionKey.isValid()) {
selectionKey.cancel();
channel.close();
}
}
System.out.printf("收到 %s 发来的:%sn",
channel.getRemoteAddress(), sb);
buffer.clear();
buffer.put(("收到,你发来的是:" + sb + "rn").getBytes("utf-8"));
buffer.flip();
channel.write(buffer);
System.out.printf("回复 %s 内容是: %sn",
channel.getRemoteAddress(), sb);
channel.register(ioSelector, SelectionKey.OP_READ, buffer.clear());
} catch (IOException e) {
e.printStackTrace();
}
}
}

public static void main(String[] args) throws IOException {
new ReactorDemo().startServer();
}

}

示例中创建了 selector 和 ioSelector ,其中 selector 只处理 socket 的建立,在 Acceptor.process 方法中把 socket 注册给 ioSelector。在 ProcessHander.process 方法中 ioSelector 只负责处理 IO 事件。这样,我们把 selector 进行了拆分。参考多线程实现,同理我们可以创建 N 个线程,处理 ioSelector 对应的 IO 事件。

NIO看破也说破(四)—— Java的NIO 5

总结

至此,我们了解了 Reactor 的三种模型结果,分别是单 Reactor 单线程、单 Reactor 多线程、多 Reactor 多线程。所有代码不够严谨,只为了表示可以使用多个线程或者多个 selector 之间的关系。总结重点:

reactor 是一种设计模式基于事件驱动来处理利用多路分发的策略,让不同业务处理各司其职有单线程,单Reactor 多线程 和 多 Reactor 多线程,三种实现方式

参考:

http://gee.cs.oswego.edu/dl/cpjslides/nio.pdf"

系列

NIO 看破也说破(一)—— Linux/IO 基础"

NIO 看破也说破(二)—— Java 中的两种 BIO"

NIO 看破也说破(三)—— 不同的 IO 模型"

NIO 看破也说破(四)—— Java 的 NIO"

关注我

如果您在微信阅读,请您点击链接 关注我" ,如果您在 PC 上阅读请扫码关注我,欢迎与我交流随时指出错误

NIO看破也说破(四)—— Java的NIO 7