Netty的心跳机制-IdleStateHandler

Netty学习指北06

Posted by MistRay on September 24, 2019

Netty学习指北06

心跳机制常见于长连接场景中.例如在TCP长连接中,客户端和服务端之间定期发送的一种特殊的数据包, 通知对方自己还在线,以确保TCP的有效性.

心跳机制的原理

网络是不可靠的,在TCP保持长连接的过程中,由于某些突发状况,例如网线被拔,突然停电,运营商瘫痪等, 会造成服务器和客户端连接中断,在这些突发状况下,如果客户端和服务端恰好没有交互,那么它们是不能在短时间内发现对方已经掉线的. 为了解决这个问题就需要引入心跳机制.

心跳机制的工作原理 :
在服务器和客户端之间一定时间没有数据交互时,即处于idle状态,客户端或服务器会发送一个特殊的数据包给对方,当接收方收到这个数据报文后,也立刻发送一个特殊的数据报文,即一个PING-PONG的交互.当某一端收到心跳消息后,就知道了对方仍然在线,这就确保了TCP连接的有效性.

心跳机制的实现

我们可以通过两种方式实现心跳机制:

  • 使用 TCP 协议层面的 keepalive 机制.
  • 在应用层上实现自定义的心跳机制.

虽然在 TCP 协议层面上, 提供了 keepalive 保活机制, 但是使用它有几个缺点:

  • 它不是 TCP 的标准协议, 并且是默认关闭的.
  • TCP keepalive 机制依赖于操作系统的实现, 默认的 keepalive 心跳时间是 两个小时, 并且对 keepalive 的修改需要系统调用(或者修改系统配置), 灵活性不够.
  • TCP keepaliveTCP 协议绑定, 因此如果需要更换为 UDP 协议时, keepalive 机制就失效了.

虽然使用 TCP 层面的 keepalive 机制比自定义的应用层心跳机制节省流量, 但是基于上面的几点缺点, 一般的实践中, 人们大多数都是选择在应用层上实现自定义的心跳.

Netty与心跳机制

Netty中预设了对于心跳机制的实现->IdleStateHandler.

// IdleStateHandler的构造方法
public IdleStateHandler(int readerIdleTimeSeconds, int writerIdleTimeSeconds, int allIdleTimeSeconds) {
    this((long)readerIdleTimeSeconds, (long)writerIdleTimeSeconds, (long)allIdleTimeSeconds, TimeUnit.SECONDS);
}

实例化一个 IdleStateHandler 需要提供三个参数:

  • readerIdleTimeSeconds, 读超时. 即当在指定的时间间隔内没有从 Channel 读取到数据时, 会触发一个 READER_IDLE 的 IdleStateEvent 事件.
  • writerIdleTimeSeconds, 写超时. 即当在指定的时间间隔内没有数据写入到 Channel 时, 会触发一个 WRITER_IDLE 的 IdleStateEvent 事件.
  • allIdleTimeSeconds, 读/写超时. 即当在指定的时间间隔内没有读或写操作时, 会触发一个 ALL_IDLE 的 IdleStateEvent 事件.
(上面参数设置为0时表示禁用)

当触发了channelidle状态后,会执行我们实现的ChannelInboundHandler中重写的userEventTriggered方法. 可以在该方法中执行释放资源,关闭连接等操作.可以有效清理无效连接,维护服务内连接的有效性.

public class EchoClientHandle extends SimpleChannelInboundHandler<ByteBuf> {

    private final static Logger LOGGER = LoggerFactory.getLogger(EchoClientHandle.class);

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {

        if (evt instanceof IdleStateEvent){
            IdleStateEvent idleStateEvent = (IdleStateEvent) evt ;

            if (idleStateEvent.state() == IdleState.WRITER_IDLE){
                LOGGER.info("已经 10 秒没有发送信息!");
                //向服务端发送消息
                CustomProtocol heartBeat = SpringBeanFactory.getBean("heartBeat", CustomProtocol.class);
                ctx.writeAndFlush(heartBeat).addListener(ChannelFutureListener.CLOSE_ON_FAILURE) ;
            }
        }
        super.userEventTriggered(ctx, evt);
    }


    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf in) throws Exception {
        //从服务端收到消息时被调用
        LOGGER.info("客户端收到消息={}",in.toString(CharsetUtil.UTF_8)) ;
    }
}  

Reference