Netty学习指北08
在上一篇中围绕ChannelHandler进行了简要概括.
而这时需要思考一个问题,ChannelHandler到底是由什么控制,
多个ChannelHandler的执行顺序又是怎样的呢?
ChannelPipeline接口
每创建一个Channel都将会被分配一个新的ChannelPipeline.这项关联是永久性的;
Channel既不能附加到另一个ChannelPipeline.也不能分离其当前的.在Netty组件的生命周期中,
这是一个固定的操做,不需要开发人员干预.
I/O Request
via {@link Channel} or
{@link ChannelHandlerContext}
|
+---------------------------------------------------+---------------+
| ChannelPipeline | |
| \|/ |
| +---------------------+ +-----------+----------+ |
| | Inbound Handler N | | Outbound Handler 1 | |
| +----------+----------+ +-----------+----------+ |
| /|\ | |
| | \|/ |
| +----------+----------+ +-----------+----------+ |
| | Inbound Handler N-1 | | Outbound Handler 2 | |
| +----------+----------+ +-----------+----------+ |
| /|\ . |
| . . |
| ChannelHandlerContext.fireIN_EVT() ChannelHandlerContext.OUT_EVT()|
| [ method call] [method call] |
| . . |
| . \|/ |
| +----------+----------+ +-----------+----------+ |
| | Inbound Handler 2 | | Outbound Handler M-1 | |
| +----------+----------+ +-----------+----------+ |
| /|\ | |
| | \|/ |
| +----------+----------+ +-----------+----------+ |
| | Inbound Handler 1 | | Outbound Handler M | |
| +----------+----------+ +-----------+----------+ |
| /|\ | |
+---------------+-----------------------------------+---------------+
| \|/
+---------------+-----------------------------------+---------------+
| | | |
| [ Socket.read() ] [ Socket.write() ] |
| |
| Netty Internal I/O Threads (Transport Implementation) |
+-------------------------------------------------------------------+
上面是ChannelPipeline源码的注解中的一段注释,形象的描述了ChannelPipeline的职能,与功能范围.
根据事件的起源,事件将会被ChannelInboundHandler或者ChannelOutboundHandler处理.
随后通过调用ChannelHandlerContext实现,它将被转发给同一超类型的下一个ChannelHandler.
修改ChannelPipeline
通过调用ChannelPipeline上的相关方法,ChannelHandler,可以添加,删除或者替换为其他的ChannelHandler,
从而实时的修改ChannelPipeline的布局.(它也可以将它自己从ChannelPipeline上移除)
这是ChannelPipeline最重要的功能之一.
ChannelHandler的执行和阻塞.
通常ChannelPipeline中的每个ChannelHandler都是通过它的EventLoop(I/O线程)
来处理传递给它的事件的.所以至关重要的是不要阻塞这个线程,因为这会对整体的I/O处理产生负面的影响.
但有时可能需要与那些使用阻塞API的遗留代码进行交互.对于这种情况,
ChannelPipeline有一些可以可以接受一个EventExecutorGroup的add()方法.
如果一个事件被传递给一个自定义的EventExecutorGroup,它将被包含在这个EventExecutorGroup中的某个EventExecutor所处理,
从而被从该Channel本身的EventLoop中移除.对于这种用例,Netty提供了一个叫DefaultEventExecutorGroup的实现.
ChannelHandler的执行顺序
ChannelPipeline p = ...;
p.addLast("1", new InboundHandlerA());
p.addLast("2", new InboundHandlerB());
p.addLast("3", new OutboundHandlerA());
p.addLast("4", new OutboundHandlerB());
p.addLast("5", new InboundOutboundHandlerX());
上面程序ChannelHandler的执行顺序是怎样的呢?
可以看出1,2是入站事件,3,4是出站事件,5既是入站事件又是出站事件.
所以可以得出执行顺序是1->2->5->5->4->3
(没错,5会执行两次,入站时一次,出站时一次)
ChannelPipeline源码分析
由于ChannelPipeline是一个接口,我们找到它的默认实现DefaultChannelPipeline.
public class DefaultChannelPipeline implements ChannelPipeline {
static final InternalLogger logger = InternalLoggerFactory.getInstance(DefaultChannelPipeline.class);
// 头节点名称
private static final String HEAD_NAME = generateName0(HeadContext.class);
// 尾节点名称
private static final String TAIL_NAME = generateName0(TailContext.class);
private static final FastThreadLocal<Map<Class<?>, String>> nameCaches =
new FastThreadLocal<Map<Class<?>, String>>() {
@Override
protected Map<Class<?>, String> initialValue() throws Exception {
return new WeakHashMap<Class<?>, String>();
}
};
private static final AtomicReferenceFieldUpdater<DefaultChannelPipeline, MessageSizeEstimator.Handle> ESTIMATOR =
AtomicReferenceFieldUpdater.newUpdater(
DefaultChannelPipeline.class, MessageSizeEstimator.Handle.class, "estimatorHandle");
// 链表的头节点
final AbstractChannelHandlerContext head;
// 链表的尾节点
final AbstractChannelHandlerContext tail;
private final Channel channel;
private final ChannelFuture succeededFuture;
private final VoidChannelPromise voidPromise;
private final boolean touch = ResourceLeakDetector.isEnabled();
private Map<EventExecutorGroup, EventExecutor> childExecutors;
private volatile MessageSizeEstimator.Handle estimatorHandle;
private boolean firstRegistration = true;
由成员变量即可看出,其内部维护了一个双向链表.
直接看添加ChannelHandler到链表尾部的函数(省略外层逻辑)
可以看出双向链表的尾节点并不是我们创建的节点,是Netty提供的.(头节点同理)
private void addLast0(AbstractChannelHandlerContext newCtx) {
AbstractChannelHandlerContext prev = tail.prev;
newCtx.prev = prev;
newCtx.next = tail;
prev.next = newCtx;
tail.prev = newCtx;
}
可以看出节点类型为AbstractChannelHandlerContext,向内跟进
abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
implements ChannelHandlerContext, ResourceLeakHint {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractChannelHandlerContext.class);
// 下一个节点
volatile AbstractChannelHandlerContext next;
// 上一个节点
volatile AbstractChannelHandlerContext prev;
private static final AtomicIntegerFieldUpdater<AbstractChannelHandlerContext> HANDLER_STATE_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(AbstractChannelHandlerContext.class, "handlerState");
/**
* {@link ChannelHandler#handlerAdded(ChannelHandlerContext)} is about to be called.
*/
private static final int ADD_PENDING = 1;
/**
* {@link ChannelHandler#handlerAdded(ChannelHandlerContext)} was called.
*/
private static final int ADD_COMPLETE = 2;
/**
* {@link ChannelHandler#handlerRemoved(ChannelHandlerContext)} was called.
*/
private static final int REMOVE_COMPLETE = 3;
/**
* Neither {@link ChannelHandler#handlerAdded(ChannelHandlerContext)}
* nor {@link ChannelHandler#handlerRemoved(ChannelHandlerContext)} was called.
*/
private static final int INIT = 0;
// 是否是入站节点
private final boolean inbound;
// 是否是出站节点
private final boolean outbound;
// 该节点相关的ChannelPipeline
private final DefaultChannelPipeline pipeline;
// 节点名称
private final String name;
private final boolean ordered;
// Will be set to null if no child executor should be used, otherwise it will be set to the
// child executor.
final EventExecutor executor;
private ChannelFuture succeededFuture;
// Lazily instantiated tasks used to trigger events to a handler with different executor.
// There is no need to make this volatile as at worse it will just create a few more instances then needed.
private Runnable invokeChannelReadCompleteTask;
private Runnable invokeReadTask;
private Runnable invokeChannelWritableStateChangedTask;
private Runnable invokeFlushTask;
private volatile int handlerState = INIT;
ChannelHandler的执行顺序(入站和出站事件)就是由一条双向链表维护的.
总结:
ChannelPipeline保存了与Channel相关联的ChannelHandler.ChannelHandler的执行顺序,入站时正序,出站时倒序.ChannelPipeline是一个双线链表,头尾节点是Netty提供的.ChannelPipeline可以根据需要,通过添加或者删除ChannelHandler来动态地修改执行逻辑.ChannelPipeline有着丰富的API用于被调用,以相应出入站事件.
Reference
转载
本文遵循 CC 4.0 by-sa 版权协议,转载请附上原文出处链接和本声明。