Netty学习指北08
在上一篇中围绕ChannelHandler
进行了简要概括.
而这时需要思考一个问题,ChannelHandler
到底是由什么控制,
多个ChannelHandler
的执行顺序又是怎样的呢?
ChannelPipeline接口
每创建一个Channel
都将会被分配一个新的ChannelPipeline
.这项关联是永久性的;
Channel
既不能附加到另一个ChannelPipeline
.也不能分离其当前的.在Netty组件的生命周期中,
这是一个固定的操做,不需要开发人员干预.
I/O Request
via {@link Channel} or
{@link ChannelHandlerContext}
|
+---------------------------------------------------+---------------+
| ChannelPipeline | |
| \|/ |
| +---------------------+ +-----------+----------+ |
| | Inbound Handler N | | Outbound Handler 1 | |
| +----------+----------+ +-----------+----------+ |
| /|\ | |
| | \|/ |
| +----------+----------+ +-----------+----------+ |
| | Inbound Handler N-1 | | Outbound Handler 2 | |
| +----------+----------+ +-----------+----------+ |
| /|\ . |
| . . |
| ChannelHandlerContext.fireIN_EVT() ChannelHandlerContext.OUT_EVT()|
| [ method call] [method call] |
| . . |
| . \|/ |
| +----------+----------+ +-----------+----------+ |
| | Inbound Handler 2 | | Outbound Handler M-1 | |
| +----------+----------+ +-----------+----------+ |
| /|\ | |
| | \|/ |
| +----------+----------+ +-----------+----------+ |
| | Inbound Handler 1 | | Outbound Handler M | |
| +----------+----------+ +-----------+----------+ |
| /|\ | |
+---------------+-----------------------------------+---------------+
| \|/
+---------------+-----------------------------------+---------------+
| | | |
| [ Socket.read() ] [ Socket.write() ] |
| |
| Netty Internal I/O Threads (Transport Implementation) |
+-------------------------------------------------------------------+
上面是ChannelPipeline
源码的注解中的一段注释,形象的描述了ChannelPipeline
的职能,与功能范围.
根据事件的起源,事件将会被ChannelInboundHandler
或者ChannelOutboundHandler
处理.
随后通过调用ChannelHandlerContext
实现,它将被转发给同一超类型的下一个ChannelHandler
.
修改ChannelPipeline
通过调用ChannelPipeline
上的相关方法,ChannelHandler
,可以添加,删除或者替换为其他的ChannelHandler
,
从而实时的修改ChannelPipeline
的布局.(它也可以将它自己从ChannelPipeline
上移除)
这是ChannelPipeline
最重要的功能之一.
ChannelHandler的执行和阻塞.
通常ChannelPipeline
中的每个ChannelHandler
都是通过它的EventLoop
(I/O线程)
来处理传递给它的事件的.所以至关重要的是不要阻塞这个线程,因为这会对整体的I/O处理产生负面的影响.
但有时可能需要与那些使用阻塞API的遗留代码进行交互.对于这种情况,
ChannelPipeline
有一些可以可以接受一个EventExecutorGroup
的add()方法.
如果一个事件被传递给一个自定义的EventExecutorGroup
,它将被包含在这个EventExecutorGroup
中的某个EventExecutor
所处理,
从而被从该Channel
本身的EventLoop
中移除.对于这种用例,Netty提供了一个叫DefaultEventExecutorGroup
的实现.
ChannelHandler的执行顺序
ChannelPipeline p = ...;
p.addLast("1", new InboundHandlerA());
p.addLast("2", new InboundHandlerB());
p.addLast("3", new OutboundHandlerA());
p.addLast("4", new OutboundHandlerB());
p.addLast("5", new InboundOutboundHandlerX());
上面程序ChannelHandler
的执行顺序是怎样的呢?
可以看出1,2是入站事件,3,4是出站事件,5既是入站事件又是出站事件.
所以可以得出执行顺序是1->2->5->5->4->3
(没错,5会执行两次,入站时一次,出站时一次)
ChannelPipeline源码分析
由于ChannelPipeline
是一个接口,我们找到它的默认实现DefaultChannelPipeline
.
public class DefaultChannelPipeline implements ChannelPipeline {
static final InternalLogger logger = InternalLoggerFactory.getInstance(DefaultChannelPipeline.class);
// 头节点名称
private static final String HEAD_NAME = generateName0(HeadContext.class);
// 尾节点名称
private static final String TAIL_NAME = generateName0(TailContext.class);
private static final FastThreadLocal<Map<Class<?>, String>> nameCaches =
new FastThreadLocal<Map<Class<?>, String>>() {
@Override
protected Map<Class<?>, String> initialValue() throws Exception {
return new WeakHashMap<Class<?>, String>();
}
};
private static final AtomicReferenceFieldUpdater<DefaultChannelPipeline, MessageSizeEstimator.Handle> ESTIMATOR =
AtomicReferenceFieldUpdater.newUpdater(
DefaultChannelPipeline.class, MessageSizeEstimator.Handle.class, "estimatorHandle");
// 链表的头节点
final AbstractChannelHandlerContext head;
// 链表的尾节点
final AbstractChannelHandlerContext tail;
private final Channel channel;
private final ChannelFuture succeededFuture;
private final VoidChannelPromise voidPromise;
private final boolean touch = ResourceLeakDetector.isEnabled();
private Map<EventExecutorGroup, EventExecutor> childExecutors;
private volatile MessageSizeEstimator.Handle estimatorHandle;
private boolean firstRegistration = true;
由成员变量即可看出,其内部维护了一个双向链表.
直接看添加ChannelHandler
到链表尾部的函数(省略外层逻辑)
可以看出双向链表的尾节点并不是我们创建的节点,是Netty提供的.(头节点同理)
private void addLast0(AbstractChannelHandlerContext newCtx) {
AbstractChannelHandlerContext prev = tail.prev;
newCtx.prev = prev;
newCtx.next = tail;
prev.next = newCtx;
tail.prev = newCtx;
}
可以看出节点类型为AbstractChannelHandlerContext
,向内跟进
abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
implements ChannelHandlerContext, ResourceLeakHint {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractChannelHandlerContext.class);
// 下一个节点
volatile AbstractChannelHandlerContext next;
// 上一个节点
volatile AbstractChannelHandlerContext prev;
private static final AtomicIntegerFieldUpdater<AbstractChannelHandlerContext> HANDLER_STATE_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(AbstractChannelHandlerContext.class, "handlerState");
/**
* {@link ChannelHandler#handlerAdded(ChannelHandlerContext)} is about to be called.
*/
private static final int ADD_PENDING = 1;
/**
* {@link ChannelHandler#handlerAdded(ChannelHandlerContext)} was called.
*/
private static final int ADD_COMPLETE = 2;
/**
* {@link ChannelHandler#handlerRemoved(ChannelHandlerContext)} was called.
*/
private static final int REMOVE_COMPLETE = 3;
/**
* Neither {@link ChannelHandler#handlerAdded(ChannelHandlerContext)}
* nor {@link ChannelHandler#handlerRemoved(ChannelHandlerContext)} was called.
*/
private static final int INIT = 0;
// 是否是入站节点
private final boolean inbound;
// 是否是出站节点
private final boolean outbound;
// 该节点相关的ChannelPipeline
private final DefaultChannelPipeline pipeline;
// 节点名称
private final String name;
private final boolean ordered;
// Will be set to null if no child executor should be used, otherwise it will be set to the
// child executor.
final EventExecutor executor;
private ChannelFuture succeededFuture;
// Lazily instantiated tasks used to trigger events to a handler with different executor.
// There is no need to make this volatile as at worse it will just create a few more instances then needed.
private Runnable invokeChannelReadCompleteTask;
private Runnable invokeReadTask;
private Runnable invokeChannelWritableStateChangedTask;
private Runnable invokeFlushTask;
private volatile int handlerState = INIT;
ChannelHandler
的执行顺序(入站和出站事件)就是由一条双向链表维护的.
总结:
ChannelPipeline
保存了与Channel
相关联的ChannelHandler
.ChannelHandler
的执行顺序,入站时正序,出站时倒序.ChannelPipeline
是一个双线链表,头尾节点是Netty提供的.ChannelPipeline
可以根据需要,通过添加或者删除ChannelHandler
来动态地修改执行逻辑.ChannelPipeline
有着丰富的API用于被调用,以相应出入站事件.
Reference
转载
本文遵循 CC 4.0 by-sa 版权协议,转载请附上原文出处链接和本声明。