Netty的指挥中心-ChannelPipeline

Netty学习指北08

Posted by MistRay on December 10, 2019

Netty学习指北08

在上一篇中围绕ChannelHandler进行了简要概括. 而这时需要思考一个问题,ChannelHandler到底是由什么控制, 多个ChannelHandler的执行顺序又是怎样的呢?

ChannelPipeline接口

每创建一个Channel都将会被分配一个新的ChannelPipeline.这项关联是永久性的; Channel既不能附加到另一个ChannelPipeline.也不能分离其当前的.在Netty组件的生命周期中, 这是一个固定的操做,不需要开发人员干预.

                                                               I/O Request
                                                  via {@link Channel} or
                                              {@link ChannelHandlerContext}
                                                            |
        +---------------------------------------------------+---------------+
        |                           ChannelPipeline         |               |
        |                                                  \|/              |
        |    +---------------------+            +-----------+----------+    |
        |    | Inbound Handler  N  |            | Outbound Handler  1  |    |
        |    +----------+----------+            +-----------+----------+    |
        |              /|\                                  |               |
        |               |                                  \|/              |
        |    +----------+----------+            +-----------+----------+    |
        |    | Inbound Handler N-1 |            | Outbound Handler  2  |    |
        |    +----------+----------+            +-----------+----------+    |
        |              /|\                                  .               |
        |               .                                   .               |
        | ChannelHandlerContext.fireIN_EVT() ChannelHandlerContext.OUT_EVT()|
        |        [ method call]                       [method call]         |
        |               .                                   .               |
        |               .                                  \|/              |
        |    +----------+----------+            +-----------+----------+    |
        |    | Inbound Handler  2  |            | Outbound Handler M-1 |    |
        |    +----------+----------+            +-----------+----------+    |
        |              /|\                                  |               |
        |               |                                  \|/              |
        |    +----------+----------+            +-----------+----------+    |
        |    | Inbound Handler  1  |            | Outbound Handler  M  |    |
        |    +----------+----------+            +-----------+----------+    |
        |              /|\                                  |               |
        +---------------+-----------------------------------+---------------+
                        |                                  \|/
        +---------------+-----------------------------------+---------------+
        |               |                                   |               |
        |       [ Socket.read() ]                    [ Socket.write() ]     |
        |                                                                   |
        |  Netty Internal I/O Threads (Transport Implementation)            |
        +-------------------------------------------------------------------+

上面是ChannelPipeline源码的注解中的一段注释,形象的描述了ChannelPipeline的职能,与功能范围. 根据事件的起源,事件将会被ChannelInboundHandler或者ChannelOutboundHandler处理. 随后通过调用ChannelHandlerContext实现,它将被转发给同一超类型的下一个ChannelHandler.

修改ChannelPipeline

通过调用ChannelPipeline上的相关方法,ChannelHandler,可以添加,删除或者替换为其他的ChannelHandler, 从而实时的修改ChannelPipeline的布局.(它也可以将它自己从ChannelPipeline上移除) 这是ChannelPipeline最重要的功能之一.

ChannelHandler的执行和阻塞.

通常ChannelPipeline中的每个ChannelHandler都是通过它的EventLoop(I/O线程) 来处理传递给它的事件的.所以至关重要的是不要阻塞这个线程,因为这会对整体的I/O处理产生负面的影响. 但有时可能需要与那些使用阻塞API的遗留代码进行交互.对于这种情况, ChannelPipeline有一些可以可以接受一个EventExecutorGroup的add()方法. 如果一个事件被传递给一个自定义的EventExecutorGroup,它将被包含在这个EventExecutorGroup中的某个EventExecutor所处理, 从而被从该Channel本身的EventLoop中移除.对于这种用例,Netty提供了一个叫DefaultEventExecutorGroup的实现.

ChannelHandler的执行顺序

   ChannelPipeline p = ...;
   p.addLast("1", new InboundHandlerA());
   p.addLast("2", new InboundHandlerB());
   p.addLast("3", new OutboundHandlerA());
   p.addLast("4", new OutboundHandlerB());
   p.addLast("5", new InboundOutboundHandlerX());

上面程序ChannelHandler的执行顺序是怎样的呢?
可以看出1,2是入站事件,3,4是出站事件,5既是入站事件又是出站事件.
所以可以得出执行顺序是1->2->5->5->4->3 (没错,5会执行两次,入站时一次,出站时一次)

ChannelPipeline源码分析

由于ChannelPipeline是一个接口,我们找到它的默认实现DefaultChannelPipeline.


public class DefaultChannelPipeline implements ChannelPipeline {

    static final InternalLogger logger = InternalLoggerFactory.getInstance(DefaultChannelPipeline.class);
    // 头节点名称
    private static final String HEAD_NAME = generateName0(HeadContext.class);
    // 尾节点名称
    private static final String TAIL_NAME = generateName0(TailContext.class);

    private static final FastThreadLocal<Map<Class<?>, String>> nameCaches =
            new FastThreadLocal<Map<Class<?>, String>>() {
        @Override
        protected Map<Class<?>, String> initialValue() throws Exception {
            return new WeakHashMap<Class<?>, String>();
        }
    };

    private static final AtomicReferenceFieldUpdater<DefaultChannelPipeline, MessageSizeEstimator.Handle> ESTIMATOR =
            AtomicReferenceFieldUpdater.newUpdater(
                    DefaultChannelPipeline.class, MessageSizeEstimator.Handle.class, "estimatorHandle");
    // 链表的头节点
    final AbstractChannelHandlerContext head;
    // 链表的尾节点
    final AbstractChannelHandlerContext tail;
    private final Channel channel;
    private final ChannelFuture succeededFuture;
    private final VoidChannelPromise voidPromise;
    private final boolean touch = ResourceLeakDetector.isEnabled();

    private Map<EventExecutorGroup, EventExecutor> childExecutors;
    private volatile MessageSizeEstimator.Handle estimatorHandle;
    private boolean firstRegistration = true;

由成员变量即可看出,其内部维护了一个双向链表.

直接看添加ChannelHandler到链表尾部的函数(省略外层逻辑)
可以看出双向链表的尾节点并不是我们创建的节点,是Netty提供的.(头节点同理)

    private void addLast0(AbstractChannelHandlerContext newCtx) {
        AbstractChannelHandlerContext prev = tail.prev;
        newCtx.prev = prev;
        newCtx.next = tail;
        prev.next = newCtx;
        tail.prev = newCtx;
    }

可以看出节点类型为AbstractChannelHandlerContext,向内跟进

abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
        implements ChannelHandlerContext, ResourceLeakHint {

    private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractChannelHandlerContext.class);
    // 下一个节点
    volatile AbstractChannelHandlerContext next;
    // 上一个节点
    volatile AbstractChannelHandlerContext prev;

    private static final AtomicIntegerFieldUpdater<AbstractChannelHandlerContext> HANDLER_STATE_UPDATER =
            AtomicIntegerFieldUpdater.newUpdater(AbstractChannelHandlerContext.class, "handlerState");

    /**
     * {@link ChannelHandler#handlerAdded(ChannelHandlerContext)} is about to be called.
     */
    private static final int ADD_PENDING = 1;
    /**
     * {@link ChannelHandler#handlerAdded(ChannelHandlerContext)} was called.
     */
    private static final int ADD_COMPLETE = 2;
    /**
     * {@link ChannelHandler#handlerRemoved(ChannelHandlerContext)} was called.
     */
    private static final int REMOVE_COMPLETE = 3;
    /**
     * Neither {@link ChannelHandler#handlerAdded(ChannelHandlerContext)}
     * nor {@link ChannelHandler#handlerRemoved(ChannelHandlerContext)} was called.
     */
    private static final int INIT = 0;
    // 是否是入站节点
    private final boolean inbound;
    // 是否是出站节点
    private final boolean outbound;
    // 该节点相关的ChannelPipeline
    private final DefaultChannelPipeline pipeline;
    // 节点名称
    private final String name;
    private final boolean ordered;

    // Will be set to null if no child executor should be used, otherwise it will be set to the
    // child executor.
    final EventExecutor executor;
    private ChannelFuture succeededFuture;

    // Lazily instantiated tasks used to trigger events to a handler with different executor.
    // There is no need to make this volatile as at worse it will just create a few more instances then needed.
    private Runnable invokeChannelReadCompleteTask;
    private Runnable invokeReadTask;
    private Runnable invokeChannelWritableStateChangedTask;
    private Runnable invokeFlushTask;

    private volatile int handlerState = INIT;

ChannelHandler的执行顺序(入站和出站事件)就是由一条双向链表维护的.

总结:

  1. ChannelPipeline保存了与Channel相关联的ChannelHandler.
  2. ChannelHandler的执行顺序,入站时正序,出站时倒序.
  3. ChannelPipeline是一个双线链表,头尾节点是Netty提供的.
  4. ChannelPipeline可以根据需要,通过添加或者删除ChannelHandler来动态地修改执行逻辑.
  5. ChannelPipeline有着丰富的API用于被调用,以相应出入站事件.

Reference

转载

本文遵循 CC 4.0 by-sa 版权协议,转载请附上原文出处链接和本声明。