你好,我是yes。
今天我们来盘一盘 Netty 中的 Pipeline 实现(没看过我 Netty 前面几篇的,建议先看看,文末有合集链接)。
再学一手有点骚的掩码操作,深入学习下 Netty 是如何处理各种事件的,看看事件是如何在 ChannelPipeline 中传播处理的。
话不多说,发车!
我们都知道,在 Netty 中业务处理逻辑都被拆分成一个个 ChannelHandler 实现,我们选装的 ChannelHandler 们又组成了一条链,叫 ChannelPipeline。
实际上 ChannelPipeline 就是一条双向链表,它串联起了所有 ChannelHandler ,根据处理逻辑,使用者可以编排 ChannelHandler 在 ChannelPipeline 中的顺序,灵活变化处理步骤。
示例代码:
ChannelPipeline p = ch.pipeline();


// 事件先经过 InHandler1 处理,再传播至 InHandler2 处理
p.addLast(
new
 InHandler1());

p.addLast(
new
 InHandler2());

---------

// 事件先经过 InHandler2 处理,再传播至 InHandler1 处理
p.addLast(
new
 InHandler2());

p.addLast(
new
 InHandler1());


也可以灵活地增加(删减)逻辑,比如需求变更了,需要多处理一步,那就新建个 InHandler3 加入,无需修改之前的代码逻辑,符合开闭原则。
p.addLast(
new
 InHandler1());

p.addLast(
new
 InHandler2());

p.addLast(
new
 InHandler3());

这种实现有个名称,叫责任链模式
当然,我们前面文章提到过,pipeline 实际串联的是 ChannelHandlerContext 实例,当一个 ChannelHandler 被添加到 pipeline 的时候,就会被封装成 ChannelHandlerContext 对象。
之所以这样做是把一些通用重复的逻辑提取到 ChannelHandlerContext 中,避免在每个 ChannelHandler 实现一样的逻辑。
所以 pipeline 大概长得如图所示:
然后 pipeline 中的 ChannelHandler 可以分为两大类,分别是 ChannelInboundHandler 和 ChannelOutboundHandler
ChannelInboundHandler,用来处理入站的事件,何为入站?接收别人的消息就是入站,比如服务端接收客户端的请求,这就是入站。
ChannelOutboundHandler,用来处理出站的事件,何为出站?发送消息给别人就是出站,比如服务端返回响应给客户端,这就是出站。
A、B两个Netty服务远程调用的数据处理流向,如下图所示:
还有两个隐藏的 handler 没有提到,那就是 ChannelPipeline 里 head 和 tail 两个节点,可以看到构造函数里面默认创建了这两个节点。
要注意一下,这里的 head 节点。
可以看到, head 节点实现了 ChannelOutboundHandler 和 ChannelInboundHandler 这两个接口,并且内置了一个  unsafe 对象,说明它可以进行底层的 I/O 操作。
然后再来看一下 tail 节点。
可以看到,它与 head 节点不同,只实现了 ChannelInboundHandler 接口,且没有内置 unsafe 对象,从这里也可以看出它的作用没 head 大。
然后我们调用 pipeline.addLast 添加的 handler 都是添加在 head 和 tail 中间。
关于事件在 pipeline 上的传播顺序,我借用 ChannelPipeline 接口上的示例来解释下。
假设现在配置了以下几个 handler
  p.addLast(
"1"
new
 InboundHandlerA());

  p.addLast(
"2"
new
 InboundHandlerB());

  p.addLast(
"3"
new
 OutboundHandlerA());

  p.addLast(
"4"
new
 OutboundHandlerB());

  p.addLast(
"5"
new
 InboundOutboundHandlerX());

此时的 pipeline 长这样(为了简化,没画 ChannelHandlerContext ),上面第五个 handler 即实现了入站接口,也实现了出站接口。
  • 当有入站事件产生,事件会随着 head 往 tail 传递,不过由于是入站事件,所以在 pipeline 遍历的时候会越过 outboundHandler,因此事件会经过 head->1->2->5->tail
  • 当有出站事件产生,事件会随着 tail 往 head 传递,不过由于是出站事件,所以在 pipeline 遍历的时候会越过 inboundHandler,因此事件会经过 tail->5->4->3->head。
看到这可能有人奇怪,tail 不是 inboundHandler 吗?为什么会调用到?emmm..因为 pipeline 的写操作调用的都是 tail 的方法。
可以认为 tail 为出站事件的始发站!不过要注意上面所述的传播顺序是基于调用 pipeline 上的方法进行传播的。
比如当事件经过 2 的时候,在 2 内部调用了 ctx.pipeline().write或者ctx.channel().write,这样写事件才会从 tail 开始流向 head。
如果 2 内部直接调用的是 ctx.write,这样就不会从 tail 开始,而是直接从当前的 handler 开始流向 head。

那具体 pipeline 是如何实现事件在 handler 上传播,且过滤不对的 handler 类型呢?

我们从 ServerChannel accept 连接,生成子 channel 后,调用的 pipeline.fireChannelRead 说起,这也是每个 channel 接收客户端消息都需要经历的调用。
pipeline 会调用一个静态方法,往里面传入 head 节点,实际就是调用 head 节点实现的 channelRead 方法
而 head#channelRead 的实现异常简单,用一句话概括就是找到它的下一个 inboudHandler 节点,然后调用该节点的 channelRead 方法。
关键我已经标出来了,就是 findContextInbound(MASK_CHANNEL_READ),这里就是涉及到掩码操作,其实就是位操作,这种操作在开源软件上很常见,毕竟位操作效率高,且十分简洁,我们来细看下具体是如何实现的。
首先感兴趣的是MASK_CHANNEL_READ事件,Netty 将所有事件都用数字表示,这个数字拿二进制来看待比较容易理解。
比如MASK_CHANNEL_INACTIVE 是 ..00010000。
MASK_CHANNEL_READ 是 ..00100000
通过二进制固定位上是否为 1 来判断事件的类型,这个应该很好理解。
所以查找 handler 的逻辑就是遍历下一个节点,查看节点的掩码,是否匹配传入的事件类型,即通过 & 运算,只要结果等于 0 说明不匹配,然后继续遍历下一个节点,只要找到符合条件的节点就返回调用。
那 handler 上的掩码是如何来的呢?
前面我们提到 handler 会被  ChannelHandlerContext 封装,在其构造函数内就会调用 mask 计算出掩码。
具体 mask 方法会调用到 ChannelHandlerMask#mask0 方法,这个方法就是实现的逻辑所在。
道理很简单,判断 handler 的类型,看其是属于 inboudhandler 还是 outboudhandler,然后将 mask 设为能处理对应 handler 类型的所有事件的值。
然后再遍历 handler 是否实现对应事件的处理方法,如没有,则将 mask 对应的位,置为 0。
这样最终只有实现了相应事件处理方法的位数才为 1,其余位置都为 0,这样通过与运算就能判断 handler 是否能够处理当前事件。
这里还需要牵扯到一个注解@Skip,我们来看下isSkippable的处理逻辑,其实很简单,通过事件处理的方法名和参数,反射得到方法,然后看其上是否有标注@Skip注解,如果标注了说明没有实现该方法,没标注说明实现了该方法。
看到这可能又会有同学有疑问,他会说,我看那些 channelHandler 从来没管过什么@Skip注解,你说的没错。
因为一般的 channelHandler 都会继承自 ChannelInboundHandlerAdapter 或者 ChannelOutboundHandlerAdapter ,这都是 Netty 帮我们封装好的类,也推荐我们继承它来扩展我们自己的 handler。
而这两个类会把所有的方法都标注上 @Skip 注解。
但如果你继承了这两个类,覆盖了相应事件的处理方法,你肯定不会往方法上标注 @Skip 注解,方法上没@Skip 注解就表明了你实现了这个方法。
这样就能正确地生成 handler 的掩码。
所以这招学会了没?以后业务上碰到需要过滤、归类或者判断的一大坨类,都可以通过掩码来实现,然后学着这样用注解+反射的方式,非常优雅~

总结一下

  1. ChannelPipeline 就是一条双向链表,它串联起了所有 ChannelHandler
  2. ChannelHandler 分为两种类型,一种是 ChannelInboundHandler,另一种是 ChannelOutboundHandler,也可以两者兼之,Netty 现成提供了一个类,叫 ChannelDuplexHandler
  3. ChannelInboundHandler,用来处理入站的事件,何为入站?接收别人的消息就是入站,比如服务端接收客户端的请求,这就是入站。
  4. ChannelOutboundHandler,用来处理出站的事件,何为出站?发送消息给别人就是出站,比如服务端返回响应给客户端,这就是出站。
  5. ChannelPipeline 的传播顺序根据事件的类型不同而不同,入站事件是从 head 往 tail 方向传递,出站事件是从 tail 往 head 方向传递。所以要注意 handler 在 pipeline 中的位置,防止处理顺序出错。
  6. 主要是通过掩码来判断 ChannelHandler 是否实现对应事件的处理方法,如果未实现则会跳过当前handler。
  7. ChannelHandler 在被封装成 ChannelHandlerContext 的时候,会根据方法上是否标注 @skip 注解来判断当前 handler 是否支持处理事件,根据对应的方法实现,组成 handler 的掩码。

最后

好了,这篇的重点其实就是需要理解 ChannelPipeline 内部不同 handler 类型的传播方向,不理解这个可能添加的 handler 逻辑不会被执行,然后一脸懵逼。
还有,通过掩码来判断 handler 是否实现某类事件的处理逻辑,可以供我们学习,之后遇到这样的场景,我们可以通过位运算+注解来实现这个功能。
好了,今天这篇就到这,下篇我们来盘 Netty 的粘包和拆包相关的内容~
我是 yes,从一点点到亿点点,我们下篇见~
继续阅读
阅读原文