Categories
程式開發

你一定看得懂的Netty客戶端啟動源碼分析!


前言

前面已經講解了NIO和Netty服務端啟動,這一講是Client的啟動過程。

源碼系列的文章依舊還是遵循大白話+畫圖的風格來講解,本文Netty源碼及以後的文章版本都基於:4.1.22.Final

本篇是以NettyClient啟動為切入點,帶大家一步步進入Netty源碼的世界。

Client啟動流程揭秘

1、探秘的入口:netty-client demo

這裡用netty-exmaple中的EchoClient來作為例子:

public final class EchoClient {
public static void main(String[] args) throws Exception {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new EchoClientHandler());
}
});

ChannelFuture f = b.connect(HOST, PORT).sync();

f.channel().closeFuture().sync();
} finally {
group.shutdownGracefully();
}
}
}

代碼沒有什麼獨特的地方,我們上一篇文章時也梳理過Netty網絡編程的一些套路,這裡就不再贅述了。

(忘記的小朋友可以查看Netty系列文章中查找~)

上面的客戶端代碼雖然簡單, 但是卻展示了Netty 客戶端初始化時所需的所有內容:

EventLoopGroup:Netty服務端或者客戶端,都必須指定EventLoopGroup,客戶端指定的是NioEventLoopGroupBootstrap: Netty客戶端啟動類,負責客戶端的啟動和初始化過程channel()類型:指定Channel的類型,因為這裡是客戶端,所以使用的是NioSocketChannel,服務端會使用NioServerSocketChannelHandler:設置數據的處理器bootstrap.connect(): 客戶端連接netty服務的方法

2、NioEventLoopGroup 流程解析

我們先從NioEventLoopGroup開始,一行行代碼解析,先看看其類結構:

你一定看得懂的Netty客戶端啟動源碼分析! 1

上面是大致的類結構,而 EventLoop 又繼承自EventLoopGroup,所以類的大致結構我們可想而知。 這裡一些核心邏輯會在MultithreadEventExecutorGroup中,包含EventLoopGroup的創建和初始化操作等。

接著從NioEventLoopGroup構造方法開始看起,一步步往下跟(代碼都只展示重點的部分,省去很多暫時不需要關心的代碼,以下代碼都遵循這個原則):

EventLoopGroup group = new NioEventLoopGroup();

public NioEventLoopGroup() {
this(0);
}

public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider) {
this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
}

protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}

這里通過調用this()和super()方法一路往下傳遞,期間會構造一些默認屬性,一直傳遞到MultithreadEventExecutorGroup類中,接著往西看。

2.1,MultithreadEventExecutorGroup

上面構造函數有一個重要的參數傳遞:DEFAULT_EVENT_LOOP_THREADS,這個值默認是CPU核數* 2。

為什麼要傳遞這個參數呢? 我們之前說過EventLoopGroup可以理解成一個線程池,MultithreadEventExecutorGroup有一個線程數組EventExecutor[] children屬性,而傳遞過來的DEFAULT_EVENT_LOOP_THREADS就是數組的長度。

先看下MultithreadEventExecutorGroup中的構造方法:

protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
if (executor == null) {
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}

children = new EventExecutor[nThreads];

for (int i = 0; i < nThreads; i ++) { children[i] = newChild(executor, args); } // ... 省略 }

這段代碼執行邏輯可以理解為:

通過ThreadPerTaskExecutor構造一個Executor執行器,後面會細說,裡麵包含了線程執行的execute()方法接著創建一個EventExecutor數組對象,大小為傳遞進來的threads數量,這個所謂的EventExecutor可以理解為我們的EventLoop,在這個demo中就是NioEventLoop對象最後調用newChild 方法逐個初始化EventLoopGroup中的EventLoop對象

上面只是大概說了下MultithreadEventExecutorGroup中的構造方法做的事情,後面還會一個個詳細展開,先不用著急,我們先有個整體的認知就好。

再回到MultithreadEventExecutorGroup中的構造方法入參中,有個EventExecutorChooserFactory對象,這裡面是有個很亮眼的細節設計,通過它我們來洞悉Netty的良苦用心。

2.1、亮點設計:DefaultEventExecutorChooserFactory

你一定看得懂的Netty客戶端啟動源碼分析! 2

EventExecutorChooserFactory這個類的作用是用來選擇EventLoop執行器的,我們知道EventLoopGroup是一個包含了CPU * 2個數量的EventLoop數組對象,那每次選擇EventLoop來執行任務是選擇數組中的哪一個呢?

我們看一下這個類的具體實現,紅框中都是需要重點查看的地方:

你一定看得懂的Netty客戶端啟動源碼分析! 3

DefaultEventExecutorChooserFactory是一個選擇器工廠類,調用裡面的next()方法達到一個輪詢選擇的目的。

數組的長度是length,執行第n次,取數組中的哪個元素就是對length取餘

你一定看得懂的Netty客戶端啟動源碼分析! 4

繼續回到代碼的實現,這裡的優化就是在於先通過isPowerOfTwo()方法判斷數組的長度是否為2的n次冪,判斷的方式很巧妙,使用val & -val == val,這裡我不做過多的解釋,網上還有很多判斷2的n次冪的優秀解法,我就不班門弄斧了。 (可參考:https://leetcode-cn.com/problems/power-of-two/solution/2de-mi-by-leetcode/”)

當然我認為這裡還有更容易理解的一個算法:x & (x - 1) == 0 大家可以看下面的圖就懂了,這裡就不延展了:

你一定看得懂的Netty客戶端啟動源碼分析! 5

BUT!!! 這里為什麼要去煞費苦心的判斷數組的長度是2的n次冪?

不知道小伙伴們是否還記得大明湖畔的HashMap? 一般我們要求HashMap數組的長度需要是2的n次冪,因為在key值尋找數組位置的方法:(n - 1) & hash n是數組長度,這裡如果數組長度是2的n次冪就可以通過位運算來提升性能,當length為2的n次冪時下面公式是等價的:

n&(長度-1)n%長度

還記得上面說過,數組的長度默認都是CPU * 2,而一般服務器CPU核心數都是2、4、8、16等等,所以這一個小優化就很實用了,再仔細想想,原來數組長度的初始化也是很講究的。

這裡位運算的好處就是效率遠遠高於與運算,Netty針對於這個小細節都做了優化,真是太棒了。

2.3、線程執行器:ThreadPerTaskExecutor

接著看下ThreadPerTaskExecutor線程執行器,每次執行任務都會通過它來創建一個線程實體。

public final class ThreadPerTaskExecutor implements Executor {
private final ThreadFactory threadFactory;

public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
if (threadFactory == null) {
throw new NullPointerException("threadFactory");
}
this.threadFactory = threadFactory;
}

@Override
public void execute(Runnable command) {
threadFactory.newThread(command).start();
}
}

傳遞進來的threadFactory為DefaultThreadFactory,這裡面會構造NioEventLoop線程命名規則為nioEventLoop-1-xxx,我們就不細看這個了。 當線程執行的時候會調用execute()方法,這裡會創建一個FastThreadLocalThread線程,具體看代碼:

public class DefaultThreadFactory implements ThreadFactory {
@Override
public Thread newThread(Runnable r) {
Thread t = newThread(FastThreadLocalRunnable.wrap(r), prefix + nextId.incrementAndGet());
return t;
}

protected Thread newThread(Runnable r, String name) {
return new FastThreadLocalThread(threadGroup, r, name);
}
}

這里通過newThread()來創建一個線程,然後初始化線程對像數據,最終會調用到Thread.init()中。

2.4、EventLoop初始化

接著繼續看MultithreadEventExecutorGroup構造方法:

protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
children = new EventExecutor[nThreads];
for (int i = 0; i < nThreads; i ++) { children[i] = newChild(executor, args); // .... 省略部分代码 } }

上面代碼的最後一部分是 newChild 方法, 這個是一個抽象方法, 它的任務是實例化 EventLoop 對象. 我們跟踪一下它的代碼, 可以發現, 這個方法在 NioEventLoopGroup 類中實現了, 其內容很簡單:

@Override
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
return new NioEventLoop(this, executor, (SelectorProvider) args[0],
((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
}

NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
if (selectorProvider == null) {
throw new NullPointerException("selectorProvider");
}
if (strategy == null) {
throw new NullPointerException("selectStrategy");
}
provider = selectorProvider;
final SelectorTuple selectorTuple = openSelector();
selector = selectorTuple.selector;
unwrappedSelector = selectorTuple.unwrappedSelector;
selectStrategy = strategy;
}

其實就是實例化一個 NioEventLoop 對象, 然後返回。 NioEventLoop構造函數中會保存provider和事件輪詢器selector,在其父類中還會創建一個MpscQueue隊列,然後保存線程執行器executor。

再回過頭來想一想,MultithreadEventExecutorGroup 內部維護了一個 EventExecutor[] children數組, Netty 的 EventLoopGroup 的實現機制其實就建立在 MultithreadEventExecutorGroup 之上。

每當 Netty 需要一個 EventLoop 時, 會調用 next() 方法從EventLoopGroup數組中獲取一個可用的 EventLoop對象。 其中next方法的實現是通過NioEventLoopGroup.next()來完成的,就是用的上面有過講解的通過輪詢算法來計算得出的。

最後總結一下整個 EventLoopGroup 的初始化過程:

你一定看得懂的Netty客戶端啟動源碼分析! 6

EventLoopGroup(其實是MultithreadEventExecutorGroup) 內部維護一個類型為EventExecutor children 數組,數組長度是nThreads如果我們在實例化NioEventLoopGroup 時, 如果指定線程池大小, 則nThreads 就是指定的值, 反之是處理器核心數* 2MultithreadEventExecutorGroup 中會調用newChild 抽象方法來初始化children 數組抽象方法newChild 是在NioEventLoopGroup 中實現的, 它返回一個NioEventLoop 實例.NioEventLoop 屬性:SelectorProvider provider 屬性: NioEventLoopGroup 構造器中通過SelectorProvider.provider() 獲取一個SelectorProviderSelector selector 屬性: NioEventLoop 構造器中通過調用通過selector = provider.openSelector() 獲取一個selector 對象.

2.5,NioSocketChannel

在Netty中,Channel是對Socket的抽象,每當Netty建立一個連接後,都會有一個與其對應的Channel實例。

我們在開頭的Demo中,設置了channel(NioSocketChannel.class),NioSocketChannel的類結構如下:

你一定看得懂的Netty客戶端啟動源碼分析! 7

接著分析代碼,當我們調用b.channel()時實際上會進入AbstractBootstrap.channel()邏輯,接著看AbstractBootstrap中代碼:

public B channel(Class channelClass) {
if (channelClass == null) {
throw new NullPointerException("channelClass");
}
return channelFactory(new ReflectiveChannelFactory(channelClass));
}

public ReflectiveChannelFactory(Class clazz) {
if (clazz == null) {
throw new NullPointerException("clazz");
}
this.clazz = clazz;
}

public B channelFactory(ChannelFactory channelFactory) {
if (channelFactory == null) {
throw new NullPointerException("channelFactory");
}
if (this.channelFactory != null) {
throw new IllegalStateException("channelFactory set already");
}

this.channelFactory = channelFactory;
return self();
}

可以看到,這裡ReflectiveChannelFactory其實就是返回我們指定的channelClass:NioSocketChannel, 然後指定AbstractBootstrap中的channelFactory = new ReflectiveChannelFactory()。

2.6、Channel初始化流程

到了這一步,我們已經知道NioEventLoopGroup和channel()的流程,接著來看看Channel的初始化流程,這也是Netty客戶端啟動的的核心流程之一:

ChannelFuture f = b.connect(HOST, PORT).sync();

接著就開始從b.connect()為入口一步步往後跟,先看下NioSocketChannel構造的整體流程:

你一定看得懂的Netty客戶端啟動源碼分析! 8

從connet往後梳理下整體流程:

Bootstrap.connect-> Bootstrap.doResolveAndConnect-> AbstractBootstrap.initAndRegister

final ChannelFuture initAndRegister() {
Channel channel = channelFactory.newChannel();
init(channel);

ChannelFuture regFuture = config().group().register(channel);
return regFuture;
}

為了更易讀,這裡代碼都做了簡化,只保留了一些重要的代碼。

緊接著我們看看channelFactory.newChannel()做了什麼,這裡channelFactory是ReflectiveChannelFactory,我們在上面的章節分析過:

@Override
public T newChannel() {
try {
return clazz.getConstructor().newInstance();
} catch (Throwable t) {
throw new ChannelException("Unable to create Channel from class " + clazz, t);
}
}

這裡的clazz是NioSocketChannel,同樣是在上面章節講到過,這裡是調用NioSocketChannel的構造函數然後初始化一個Channel實例。

public class NioSocketChannel extends AbstractNioByteChannel implements io.netty.channel.socket.SocketChannel {
public NioSocketChannel() {
this(DEFAULT_SELECTOR_PROVIDER);
}

public NioSocketChannel(SelectorProvider provider) {
this(newSocket(provider));
}

private static SocketChannel newSocket(SelectorProvider provider) {
try {
return provider.openSocketChannel();
} catch (IOException e) {
throw new ChannelException("Failed to open a socket.", e);
}
}
}

這裡其實也很簡單,就是創建一個Java NIO SocketChannel而已,接著看看NioSocketChannel的父類還做了哪些事情,這裡梳理下類的關係:

NioSocketChannel->擴展AbstractNioByteChannel->擴展AbstractNioChannel

public abstract class AbstractNioChannel extends AbstractChannel {
protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
super(parent, ch, SelectionKey.OP_READ);
}

protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent);
ch.configureBlocking(false);
}
}

這裡會調用父類的構造參數,並且傳遞readInterestOp = SelectionKey.OP_READ:,這裡還有一個很重要的點,配置Java NIO SocketChannel 為非阻塞的,我們之前在NIO章節的時候講解過,這裡也不再贅述。

接著繼續看AbstractChannel的構造函數:

public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
protected AbstractChannel(Channel parent) {
this.parent = parent;
id = newId();
unsafe = newUnsafe();
pipeline = newChannelPipeline();
}
}

這裡創建一個ChannelId,創建一個Unsafe對象,這裡的Unsafe並不是Java中的Unsafe,後面也會講到。 然後創建一個ChannelPipeline,後面也會講到,到了這裡,一個完整的NioSocketChannel 就初始化完成了,我們再來總結一下:

Netty 的 SocketChannel 會與 Java 原生的 SocketChannel 綁定在一起;會註冊 Read 事件;會為每一個 Channel 分配一個 channelId;會為每一個 Channel 創建一個Unsafe對象;會為每一個 Channel 分配一個 ChannelPipeline;

2.7、Channel 註冊流程

還是回到最上面initAndRegister方法,我們上面都是在分析裡面newChannel的操作,這個方法是NioSocketChannel創建的一個流程,接著我們在繼續跟init()和register()的過程:

public abstract class AbstractBootstrap, C extends Channel> implements Cloneable {
final ChannelFuture initAndRegister() {
Channel channel = channelFactory.newChannel();
init(channel);
ChannelFuture regFuture = config().group().register(channel);
}
}

init()就是將一些參數options和attrs設置到channel中,我們重點需要看的是register方法,其調用鍊為:

AbstractBootstrap.initAndRegister-> MultithreadEventLoopGroup.register-> SingleThreadEventLoop.register-> AbstractUnsafe.register

這裡最後到了unsafe的register()方法,最終調用到AbstractNioChannel.doRegister():

@Override
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
}
}

javaChannel()就是Java NIO中的SocketChannel,這裡是將SocketChannel註冊到與eventLoop相關聯的selector上。

你一定看得懂的Netty客戶端啟動源碼分析! 9

最後我們整理一下服務啟動的整體流程:

initAndRegister()初始化並註冊什麼呢?

channelFactory.newChannel()通過反射創建一個 NioSocketChannel將 Java 原生 Channel 綁定到 NettyChannel 中註冊 Read 事件為 Channel 分配 id為 Channel 創建 unsafe對象為 Channel 創建ChannelPipeline(默認是 headtail 的雙向鍊錶)

`init(頻道)``

把 Bootstrap 中的配置設置到 Channel 中

註冊(頻道)

把 Channel 綁定到一個 EventLoop 上把 Java 原生 Channel、Netty 的 Channel、Selector 綁定到 SelectionKey 中觸發 Register 相關的事件

2.8 unsafe初始化

上面有提到過在初始化Channel的過程中會創建一個Unsafe的對象,然後綁定到Channel上:

protected AbstractChannel(Channel parent) {
this.parent = parent;
id = newId();
unsafe = newUnsafe();
pipeline = newChannelPipeline();
}

newUnsafe直接調用到了NioSocketChannel中的方法:

@Override
protected AbstractNioUnsafe newUnsafe() {
return new NioSocketChannelUnsafe();
}

NioSocketChannelUnsafe是NioSocketChannel中的一個內部類,然後向上還有幾個父類繼承,這裡主要是對應到相關Java底層的Socket操作。

2.9 pipeline初始化

我們還是回到pipeline初始化的過程,來看一下newChannelPipeline()的具體實現:

protected DefaultChannelPipeline newChannelPipeline() {
return new DefaultChannelPipeline(this);
}

protected DefaultChannelPipeline(Channel channel) {
this.channel = ObjectUtil.checkNotNull(channel, "channel");
succeededFuture = new SucceededChannelFuture(channel, null);
voidPromise = new VoidChannelPromise(channel, true);

tail = new TailContext(this);
head = new HeadContext(this);

head.next = tail;
tail.prev = head;
}

我們調用 DefaultChannelPipeline 的構造器, 傳入了一個 channel, 而這個 channel 其實就是我們實例化的 NioSocketChannel。

DefaultChannelPipeline 會將這個NioSocketChannel 對象保存在channel 字段中. DefaultChannelPipeline 中, 還有兩個特殊的字段, 即head 和tail, 而這兩個字段是一個雙向鍊錶的頭和尾. 其實在DefaultChannelPipeline 中, 維護了一個以AbstractChannelHandlerContext 為節點的雙向鍊錶, 這個鍊錶是Netty 實現Pipeline 機制的關鍵.

關於 DefaultChannelPipeline 中的雙向鍊錶以及它所起的作用, 我們會在後續章節詳細講解。 這裡只是對pipeline做個初步的認識。

HeadContext 的繼承層次結構如下所示:

你一定看得懂的Netty客戶端啟動源碼分析! 10

TailContext 的繼承層次結構如下所示:

你一定看得懂的Netty客戶端啟動源碼分析! 11

我們可以看到, 鍊錶中 head 是一個 ChannelOutboundHandler, 而 tail 則是一個 ChannelInboundHandler.

3.0、客戶端connect過程

客戶端連接的入口方法還是在Bootstrap.connect()中,上面也分析過一部分內容,請求的具體流程是:

Bootstrap.connect()-> AbstractChannel.coonnect()-> NioSocketChannel.doConnect()

public static boolean connect(final SocketChannel socketChannel, final SocketAddress remoteAddress)
throws IOException {
try {
return AccessController.doPrivileged(new PrivilegedExceptionAction() {
@Override
public Boolean run() throws IOException {
return socketChannel.connect(remoteAddress);
}
});
} catch (PrivilegedActionException e) {
throw (IOException) e.getCause();
}
}

看到這裡,還是用Java NIO SocketChannel發送的connect請求進行客戶端連接請求。

總結

本篇文章以一個Netty Client demo為入口,然後解析了NioEventLoopGroup創建的流程、Channel的創建和註冊的流程,以及客戶端發起connect的具體流程,這裡對於很多細節並沒有很深的深入下去,這些會放到後續的源碼分析文章,敬請期待~

來自:https://www.cnblogs.com/wang-meng/p/13711756.html