多图详解 Netty

什么是 Netty

简单来说Netty就是JBOSS开源的一个基于NIO的网络编程框架。它可以帮助我们快速开发高性能高可靠性的网络 IO 程序。

Netty在 Java 语言中使用非常广泛,涉及到网络通信的基本上都使用Netty,很少会直接去使用原生的NIO组件或者是其他框架。并且像Dubbo、RocketMQ、Zookeeper、ElasticSearch 这些知名的中间件所使用的网络通讯框架都是基于Netty去实现的。

Netty是在原生NIO的基础上发展起来的框架,其中的许多理念都非常像,所以学习Netty前需要了解一下原生NIO编程。

原生 NIO 编程

在了解原生NIO编程之前需要了解一个基础概念Socket

Socket

Netty是基于TCP协议的,我们知道TCP协议三个重要的特点分别是面向连接、可靠的和字节流。要达成这三点建立连接时需要客户端与服务端达成三个信息的共享,分别是:

  • Socket:包含五个信息:连接使用的协议、本地主机 IP 地址和端口号、远程主机的 IP 地址和端口号
  • 序列号:解决乱序问题
  • 容器大小:用来做流量控制

Socket就是两台主机之间的逻辑连接的端点,TCP所说的面向连接,指的就是面向客户端和服务端两个 Socket之间的连接。

这里要注意的是,服务端会涉及到两种socket,一种叫做监听socket,一种叫做已完成连接socket。当监听Socket发现连接成功了之后会返回一个已完成连接socket文件描述符,用于后续传输数据。

原生 NIO 组件

Netty底层其实用了很多 Java 原生的NIO的组件,Netty自定义的组件中有些理念也来自于原生的NIO组件。因此学习Netty之前需要了解一下原生的NIO组件的一些知识。

这里主要讲三个非常重要的组件:Channel (通道)、Buffer (缓冲区)、Selector (选择器)。

下图展示了这三个组件在NIO模型中发挥的作用:

Buffer (缓冲区)

Buffer本质上就是一块可以读写数据的内存块,我们在使用的时候可以把它理解成一个数组。

下图是Buffer各个类的继承关系:

这里着重讲一下ByteBufferByteBuffer在原生NIO编程时使用频率是最高的。下面主要讲一下它的使用。

注意ByteBuffer初始化时其实是 创建并返回了一个它的子类 HeapByteBuffer 对象,我们操作的也是它的子类。

首先是初始化,初始化主要通过两种方式:

  • **allocate(int capacity)**:创建 byte 类型的指定长度的缓冲区;
  • wrap(byte[] array):创建 byte 类型的有内容的缓冲区。

在学习数据操作之前,有几个ByteBuffer非常重要的参数和方法需要了解一下:

  • position:当前读取或写入的起始坐标;
  • limite:最多可以操作到哪个索引;
  • capacity:缓冲区的总长度;
  • remaining():这个方法返回的是 limit - position 的计算值,代表还有多少空间可以操作。

数据操作主要是两个方法:

  • put():插入字节,它是一个重载方法,可以传入不同形式的字节;
  • get():读取字节,不传参获取 position 位置的字节并让 position + 1,也可以通过参数读取指定位置的字节。

下图是添加字节时各属性值的变化:

ByteBuffer虽然即支持读也支持写,但同一时间只能是其中一种模式,模式切换需要调用相应的方法。

下图是调用flip()方法将写模式切换为读时各属性的变化:

下图调用clear()方法将读切换为写时各属性的变化:

Channel (通道)

通常来说NIO所有的操作都是由通道开始的,它跟我们平常使用的流(InputStreamOutputStream)有点类似。但也有些区别:

  • 通道可以读也可以写,流是单向的,所以需要输入流输出流;
  • 通道可以异步读写
  • 通道总是基于缓冲区来读写(将数据从通道读取到buffer或者将数据以buffer的形式写入到通道)

下图是Channel的继承关系:

常用的Channel主要有四种:

  • FileChannel:用于文件数据的读写;
  • DatagramChannel:用于UDP数据的读写;
  • ServerSocketChannel 和 SocketChannel:用于TCP数据的读写,前者代表服务端的通道,后者代表客户端。

使用ServerSocketChannelSocketChannel进行NIO编程与直接使用ServerSocketSocket类似,这里就不赘述了。

Selector (选择器)

Selector是多路复用器的一种,虽然它的性能不是最好的,但它几乎在所有平台上都支持,具有良好的跨平台性。

Selector是实现一个线程处理多个客户端请求的核心组件,Channel注册到Selector上之后,如果有就绪事件产生,Selector就会去获取事件然后针对事件进行相应的处理。

Selector常用方法如下:

  • open() :静态方法,获取一个选择器对象;
  • select():调用后阻塞线程,阻塞期间会监控所有注册的通道,当有就绪事件需要操作时,会将 SelectionKey 放入集合并返回事件数量;
  • select(1000):只阻塞 1000 毫秒,阻塞期间与上面的方法相同;
  • selectedKeys():返回集合中保存的全部 SelectionKey 。

这些方法多次提到了SelectionKey,那么SelectionKey是什么呢?

SelectionKey就是用来描述各种就绪事件的类,通过它能获取到当前的就绪事件类型。

SelectionKey通过 4 个常量来定义 4 种不同的就绪事件:

  • OP_READ:值为 1 << 0,读就绪事件,表示通道中有可读数据,可以执行读操作;
  • OP_WRITE:值为 1 << 2,写就绪事件,表示可以向通道写数据了;
  • OP_CONNECT:值为 1 << 3,连接就绪事件,代表客户端与服务器连接已经建立成功了;
  • OP_ACCEPT: = 1 << 4,接收连接就绪事件,表示服务器监听到了客户端连接。

SelectionKey通过以下 4 个静态方法判断当前是否是对应的就绪事件:

  • isReadable():是否是读就绪事件;
  • isWritable():是否是写就绪事件;
  • isConnectable():是否是连接就绪事件;
  • isAcceptable():是否是接收连接就绪事件。

原生 NIO 组件编程示例

下面是使用SelectorChannelByteBuffer进行NIO编程的示例。

服务器端代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
package com.zephyr.selector;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.Set;

/**
* 服务端-选择器
*/
public class NIOSelectorServer {
public static void main(String[] args) throws IOException {
//打开一个服务端通道
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
//绑定对应的端口号
serverSocketChannel.bind(new InetSocketAddress(9999));
//通道默认是阻塞的,需要设置为非阻塞
serverSocketChannel.configureBlocking(false);
//创建选择器
`Selector`selector = Selector.open();
//将服务端通道注册到选择器上,并指定注册监听的事件为OP_ACCEPT
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("服务端启动成功...");
while (true) {
//检查选择器是否有事件
int select = selector.select(2000);
if (select == 0) {
continue;
}
//获取事件集合
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
//判断事件是否是客户端连接事件 SelectionKey.isAcceptable()
SelectionKey key = iterator.next();
//得到客户端通道,并将通道注册到选择器上, 并指定监听事件为OP_READ
if (key.isAcceptable()) {
SocketChannel socketChannel = serverSocketChannel.accept();
System.out.println("客户端已连接......" + socketChannel);
//必须设置通道为非阻塞, 因为selector需要轮询监听每个通道的事件
socketChannel.configureBlocking(false);
//并指定监听事件为OP_READ
socketChannel.register(selector, SelectionKey.OP_READ);
}
//判断是否是客户端读就绪事件SelectionKey.isReadable()
if (key.isReadable()) {
//得到客户端通道,读取数据到缓冲区
SocketChannel socketChannel = (SocketChannel) key.channel();
`ByteBuffer`byteBuffer = ByteBuffer.allocate(1024);
int read = socketChannel.read(byteBuffer);
if (read > 0) {
System.out.println("客户端消息:" +
new String(byteBuffer.array(), 0, read,
StandardCharsets.UTF_8));
//给客户端回写数据
socketChannel.write(ByteBuffer.wrap("yo yo yo, hi man".getBytes(StandardCharsets.UTF_8)));
socketChannel.close();
}
}
//从集合中删除对应的事件, 因为防止二次处理.
iterator.remove();
}
}
}
}

客户端代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
package com.zephyr.selector;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;

/**
* 客户端
*/
public class NIOClient {
public static void main(String[] args) throws IOException {
//打开通道
SocketChannel socketChannel = SocketChannel.open();
//设置连接IP和端口号
socketChannel.connect(new InetSocketAddress("127.0.0.1", 9999));
//写出数据
socketChannel.write(ByteBuffer.wrap("What's up.".getBytes(StandardCharsets.UTF_8)));
//读取服务器写回的数据
`ByteBuffer`readBuffer = ByteBuffer.allocate(1024);
int read=socketChannel.read(readBuffer);
System.out.println("服务端消息:" + new String(readBuffer.array(), 0, read, StandardCharsets.UTF_8));
//释放资源
socketChannel.close(); }
}

为什么需要 Netty

上面讲了原生NIO相关的知识,那么问题就来了,既然原生就有完备的NIO编程的各个组件,为什么还需要Netty呢。

主要原因还是因为原生NIO存在一些弊端:

  • NIO 的类库和 API 繁杂:开发者需要熟练掌握SelectorServerSocketChannelSocketChannelByteBuffer等原生组件;
  • 有一定的门槛:必须对多线程和网络编程非常熟悉,才能编写出高质量的NIO程序;
  • 开发工作量和难度都非常大:例如客户端面临断连重连、网络闪断、半包读写、失败缓存、网络拥塞和异常流的处理等等;
  • JDK NIO 的 Bug:臭名昭著的 Epoll Bug,它会导致Selector空轮询,最终导致 CPU 100%。

Netty这个框架就很好地解决了这些问题,前三个比较好理解,简单讲一下第 4 个问题是怎么被解决的。

第 4 个问题讲到了Selector空轮询的 Bug,那么,什么是空轮询呢?

空轮询是指本来Selector调用select()方法如果没有就绪事件在设置的时间到之前是阻塞的,但由于 Linux 底层实现有问题,导致在没有就绪事件时也有概率直接返回,而select()方法一般都是放在while (true)循环里的,这时就会开始不断地空轮询,直到 CPU 使用率飙到 100% 。

Netty解决这个问题主要分别两步:

  • 检测空轮询:判断阻塞时间小于 timeoutMillis (初始化的超时参数),且select()执行次数大于阈值;
  • 重建Selector:新创建一个Selector并把旧Selectorchannel注册到这个Selector上,然后关闭这个Selector

Netty 线程模型

接着我们学习一下Netty的线程模型,了解了Netty的线程模型之后我们对Netty的整体架构也就有了一个大致的了解。

由于Netty的线程模型是基于Reactor模型改进而来的,因此先讲讲Reactor模型,有助于我们对Netty线程模型的理解 。

Reactor 模型

Reactor模型是指当服务器接收到多个请求时,服务器程序会把它们分派到不同的方法或线程去处理。Reactor模式也被称作Dispatcher模式。它的核心是多路复用器,多路复用器收到事件后会进行分发,这点是网络服务器高并发的关键。

Reactor模型分为三种:单Reactor单线程/进程、单Reactor多线程/进程和多Reactor多线程/进程。

这里使用的是进程还是线程在Reactor模型中差别不大,下面主要以线程为主来讲解。

这三种模型按顺序来看理解起来复杂度不断提升,也会更接近Netty的线程模型,下面来分别看看这三种模型。

单 Reactor 单线程

这个最好理解,只有一个线程,只是会把建立连接和处理请求这两种任务分发给不同的类去处理,如下图所示:

整个流程简单来讲就是Reactor通过Selector监听事件,收到事件使用dispatch对事件进行分发,如果是连接事件就由 Acceptor 进行处理,处理完成会创建一个Handler对后续业务进行处理。后面的数据请求都会由Handler进行处理。

优点:

  • 模型简单,不会有多线程的那些问题

缺点:

  • 性能问题:单线程无法发挥多核 CPU 的性能
  • 可靠性问题:处理业务时往往容易出问题,当Handler出问题了,由于只有一个线程,整个节点也挂了

Reactor多线程

这个线程模型针对前面的问题作出了一定的优化,多出了处理业务的线程池,如下图所示:

前面的流程与单Reactor单线程是一致的,到Handler这一步就不一样了。这个模型Handler只负责读取数据和发送数据部分,业务处理交给了 Worker 线程,而 Worker 线程是由 Worker 线程池统一管理的。

优点:

  • 可以充分利用多核 CPU 的处理能力

缺点:

  • 多线程资源共享和访问处理会比较复杂,在主线程处理所有的连接、监听和响应也会出现性能瓶颈

主从 Reactor 多线程

主从Reactor多线程模型又在前面的模型基础上做了进一步优化,增加了子Reactor,如下图所示:

整个流程大概可以分为以下几步:

  • 主线程的 Main Reactor负责监听连接请求,收到连接请求会由Acceptor进行处理,成功建立连接之后Main Reactor会把连接分派给Sub Reactor,由Sub Reactor监听和处理数据请求;
  • Sub Reactor监听到数据请求,会派发给Handler处理,Handler只会处理读取数据和发送数据部分,中间业务处理部分也是放在线程池中完成。

优点:

  • Main ReactorSub Reactor职责分明,一个处理连接事件,一个处理数据请求;
  • Main ReactorSub Reactor交互逻辑比较简单,Main Reactor单向地将建立好的连接传递出去;
  • Reactor设计能在高并发场景拥有更好的性能。

缺点:

  • 编程复杂度较高

主从Reactor多线程模式是业界非常成熟的服务器程序设计模式,在很多中间件中都使用到了这种模式,像 NginxMemcachedNetty等。这种模式也被称为 1 + M + N 模式,分别代指相对少的连接线程(不一定为 1 ),多个 I/O 线程和多个业务处理线程。

Netty 线程模型

Netty线程模型是基于主从Reactor多线程模型优化而来的,整体架构如下图所示:

Netty的线程模型主要分为两部分,分别是BossGroupWorkerGroup,它们都分别管理一个或多个NioEventLoop。每个NioEventLoop对应着一个线程,一个Selector,一个Executor和一个TaskQueue

NioEventLoop可以理解成一个事件循环,当程序启动后每个NioEventLoop都会通过Executor启动一个线程,开始执行事件循环,在循环中Selector会通过select()方法阻塞并监听就绪事件,当有事件到来时通过processSeelectedKeys方法处理Selector事件,之后再通过runAllTasks方法处理其他的任务。

与前面介绍的主从Reactor多线程模型类似,BossGoup负责连接事件,当建立连接之后会生成一个NioSocketChannel并注册到WorkGroup其中一个NioEventLoopSelector上。WokerGroup中的 NioEventLoop负责处理数据请求,当请求到来时会调用processSelectedKeys方法,其中的业务处理会依次经过Pipeline中的多个Handler

Netty 编程

学习完Netty线程模型,我们来看一下使用Netty写出来的程序大概是什么样的。

服务端代码

Netty服务器:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class NettyServer {

public static void main(String[] args) throws InterruptedException {
// 创建 BossGroup
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
// 创建 WorkerGroup
EventLoopGroup workerGroup = new NioEventLoopGroup();
// 创建服务器启动类
ServerBootstrap bootstrap = new ServerBootstrap();
// 添加配置
bootstrap.group(bossGroup, workerGroup) // 设置 BossGroup 和 ChildGroup
.channel(NioServerSocketChannel.class) // 设置`channel`具体类
.option(ChannelOption.SO_BACKLOG, 128) // 设置连接队列
.childOption(ChannelOption.SO_KEEPALIVE, Boolean.TRUE) // 设置开启保活机制
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
// 把自定义`Handler`添加到 pipeline
socketChannel.pipeline().addLast(new NettyServerHandler());
}
});
// 绑定端口号
`ChannelFuture`channelFuture = bootstrap.bind(new InetSocketAddress(9999)).sync();
System.out.println("服务器启动成功!");
// 阻塞直到通道关闭
channelFuture.channel().closeFuture().sync();
// 优雅地关闭 BossGroup
bossGroup.shutdownGracefully();
// 优雅地关闭 WorkerGroup
workerGroup.shutdownGracefully();
}

}

自定义服务器端ChannelHandler代码,只列出了主要几个方法的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class NettyServerHandler implements ChannelInboundHandler {

@Override
public void channelRead(ChannelHandlerContext channelHandlerContext, Object o) throws Exception {
`ByteBuf`byteBuf = (ByteBuf) o;
System.out.println(byteBuf.toString(CharsetUtil.UTF_8));
`channel`channel = channelHandlerContext.pipeline().channel();
System.out.println(channel);
}

@Override
public void channelReadComplete(ChannelHandlerContext channelHandlerContext) throws Exception {
channelHandlerContext.writeAndFlush(Unpooled.copiedBuffer("这是服务器的响应信息...".getBytes(CharsetUtil.UTF_8)));
}

@Override
public void channelRegistered(ChannelHandlerContext channelHandlerContext) throws Exception {
System.out.println("通道注册");
}

...

}

客户端代码

Netty客户端:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class NettyClient {

public static void main(String[] args) throws InterruptedException {
// 创建 EventLoopGroup
EventLoopGroup eventLoopGroup = new NioEventLoopGroup(1);
// 创建启动类
Bootstrap bootstrap = new Bootstrap();
// 设置参数
bootstrap.group(eventLoopGroup)
.channel(NioSocketChannel.class) // 设置`channel`的类
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
// 添加自定义 Handler
socketChannel.pipeline().addLast(new NettyClientHandler());
}
});
// 连接服务器
`ChannelFuture`channelFuture = bootstrap.connect(new InetSocketAddress("127.0.0.1", 9999)).sync();
System.out.println("客户端启动成功!");
// 阻塞直到通道判断
channelFuture.channel().closeFuture().sync();
// 优雅地关闭 EventLoopGroup
eventLoopGroup.shutdownGracefully();
}

}

自定义客户端ChannelHandler代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class NettyClientHandler implements ChannelInboundHandler {

@Override
public void channelActive(ChannelHandlerContext channelHandlerContext) throws Exception {
channelHandlerContext.writeAndFlush(Unpooled.copiedBuffer("这是客户端发来的消息", CharsetUtil.UTF_8));
}

@Override
public void channelRead(ChannelHandlerContext channelHandlerContext, Object o) throws Exception {
`ByteBuf`byteBuf = (ByteBuf) o;
System.out.println(byteBuf.toString(CharsetUtil.UTF_8));
}

@Override
public void channelRegistered(ChannelHandlerContext channelHandlerContext) throws Exception {
System.out.println("通道注册");
}

...

}

如果对原生NIO编程比较熟悉理解上面的代码应该比较容易,同时也能看出使用Netty框架编程的难度是远远小于原生NIO的。

下面我们就详细了解一下上面代码涉及的这些Netty组件。

Netty 的核心组件

ChannelHandler

ChannelHandler是一个接口,继承于它的两个接口ChannelInboundHandlerChannelOutboundHandler定义了很多事件处理方法,我们可以通过实现这些方法或者重写子类的方法的来实现相应的业务逻辑。

ChannelHandler的继承关系如图所示:

如果通过实现上述接口来开发,需要实现的方法中常用的有以下几个:

  • public void channelActive(ChannelHandlerContext ctx) 通道就绪事件;
  • public void channelRead(ChannelHandlerContext ctx, Object msg) 通道读取数据事件;
  • public void channelReadComplete(ChannelHandlerContext ctx) 数据读取完毕事件;
  • public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) 通道发生异常事件。

但一般开发中自定义Handler会直接继承 SimpleChannelInboundHandler ,我们自己必须要实现的就只有

protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) 这个方法,这种开发方式在继承的时候传入泛型指定出入站消息类型,配合编解码器使用会非常的方便。代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
public class NettyChatRoomServerHandler extends SimpleChannelInboundHandler<String> {

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("Channel active");
}

@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, String msg) throws Exception {
System.out.println(msg);
}
}

ChannelHandlerContext

ChannelHandlerContextChannelHandler的上下文,它的核心就是ChannelHandler,它同时也保存了 ChannelPipelineExecutor (NioEventLoop) 等信息。

它的继承关系如下图所示:

Netty中的Context分为三种:HeadContextTailContextDefaultChannelHandlerContext

HeadContextTailContext比较特殊,它既是ChannelHandlerContext也是ChannelHandler(实现了Handler的接口)。

我们通过ChannelPipelineaddLast()方法添加的Handler都会封装成DefaultChannelHandlerContext

ChannelPipeline

ChannelPipeline是一个接口,我们平常编程用到的一般是它的实现类DefaultChannelPipeline

Pipeline 队列

DefaultChannelPipline其实就是一个管道,它维护了一个ChannelHandlerContext的双链表队列。

Pipeline初始化时会创建头节点和尾节点,它们的类型分别是HeadContextTailContext,所以整个链表至少有两个节点。

中间的节点类型都是DefaultChannelHandlerContext

链表如图所示:

ChannelHandler 的传递性

前面说过Handler 分为InboundHanderOutboundHandler,消息入站时只会访问InboundHander,消息出站时只会访问OutboundHander。如果既是InboundHandler又是OutboundHandler出站入站都会访问。

InboundHandlerOutboundHandler都具有传递性,不过传递方法有些区别:

  • InboundHander是向后传递,需要调用ChannelHandlerContextfireChannel...(),比如如果是传递ChannelRead()方法就要调用fireChannelRead(),那么下一个节点的ChannelRead()方法就会被调用;
  • OutboundHandler是向前传递,需要调用ChannelHanderContext的同名方法,比如如果是传递write()方法调用的也是write(),这里下一个节点的write()方法就会被调用。

正常我们在开发中对数据的读写使用一个节点就够了,不需要使用这种传递性,这种传递性一般用在编解码器上。

无论是我们写子类自定义的编解码器还是使用Netty提供的编解码器,它们内部都会自动调用这些传递方法,开发者对这些是无感知的。

我们了解这些传递性的最大意义在于确定在添加Handlerpipeline中时(Handle会被封装成DefaultChannelHandlerContext然后添加到队列中去)的顺序:

  • 先添加编解码器,并且解码器在前,编码器在后;
  • 先添加OutboundHandler,后添加InboundHandler

Pipeline 消息入站

消息入站首先是Selector监听到读就绪事件,接着判断就绪事件如果是读事件就调用通道的read()方法,通道会把消息读到ByteBuf里,然后把ByteBuf传递给Pipeline自已去处理。

Pipeline会直接把ByteBuf交给HeadContext去处理,而HeadContext没有具体的处理逻辑,会直接传递给下一个节点去处理。

下图就是Pipeline节点的处理顺序:

Pipeline 消息出站

消息出站与入站最大的不同是发起方。入站的消息是通过Selector监听到的。而出站是程序主动发起的。

对外写消息有三种方式:

  • 调用channelwriteAndFlush(),它内部会直接调用pipeline.writeAndFlush(msg),最终会从队列尾部开始调用;
  • 调用pipelinewriteAndFlush(),它内部会直接调用tail.writeAndFlush(msg),最终也是从队列尾部开始调用;
  • 调用channelHandlerContextwriteAndFlush(),它内部会以当前节点为起点找到下一个OutboundHandler让它去处理,最终就是从这个节点的下一个OutboundHander开始处理。

下图展示了各个节点处理顺序:

NioEventLoop

NioEventLoop就是一个事件循环类,几乎所有事件处理都会经过这个类,它的继承关系如下:

NioEventLoopGroup

NioEventLoopGroup就是NioEventLoop组,负责管理NioEventLoop,当有channel需要注册的时候,NioEventLoopGroup会轮询找到下一个NioEventLoop注册上去。在NioEventLoopGroup上作出的配置最终都会作用到NioEventLoop上。

ChannelOption

在程序初始化的时候我们可以通过ChannelOptionchannel设置一些参数,常用的参数有两个:SO_BACKLOGSO_KEEPALIVE

下面分别讲讲这两个参数 :

SO_BACKLOG

这个参数主要是用来控制 Accept 队列的大小的 (早期的 Linux 内核是控制的 SYN 队列的大小)。

这里展开说一下这两个队列,它们都是由 Linux 内核维护的。一个是保存第一次握手的 SYN 的队列,系统会依次从这个队列取出 SYN 并进行响应,一个是保存三次握手完成后的 Accept 队列,调用accept()方法就能拿到已完成连接的socket,反应在Netty里面就是返回一个新的Channel

SO_KEEPALIVE

这个参数对应的是连接的保活机制 ,如果不设置这个参数,请求完成连接就会被关闭。设置了这个参数之后,连接关闭的条件变成了如果客户端与服务器 2 个小时没有数据交互,那么客户端就会开始发探活数据报文,如果多次发送都没有响应,就断开连接。

ServerBootstrap 和 Bootstrap

服务端和客户端的启动类,负责对Netty的各个组件进行配置。

服务器端配置代码如下:

1
2
3
4
5
6
7
8
9
10
11
bootstrap.group(bossGroup, workerGroup) // 设置 BossGroup 和 ChildGroup
.channel(NioServerSocketChannel.class) // 设置`channel`具体类
.option(ChannelOption.SO_BACKLOG, 128) // 设置连接队列
.childOption(ChannelOption.SO_KEEPALIVE, Boolean.TRUE) // 设置开启保活机制
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
// 把自定义`Handler`添加到 pipeline
socketChannel.pipeline().addLast(new NettyServerHandler());
}
});

ChannelFuture

下图是ChannelFuture的继承关系

从图中可以看出,它继承的Future接口是Netty自定义的接口,这个接口同时也继承自 Java 原生的Future接口。

Netty中最常用的是ChannelFuture的子类DefaultChannelPromise,而这个类大部分功能都是由 DefaultPromise 实现的。

DefaultPromise阻塞线程使用的是Objectwait()方法,而原生Future的子类 FutureTask 阻塞线程使用的是LockSupportpark()方法。

ChannelFuture支持添加ChannelFutureListener,监听各种事件。

Unpooled

这个类如果我们在使用Netty编程时不使用编解码器就会经常用到,它可以通过传入的字符串快速生成一个 ByteBuf(Netty独有的类,类似于原生的ByteBuffer,只是它在ByteBuffer的基础上做了封装) 对象。常用的方法如下:

1
public static`ByteBuf`copiedBuffer(CharSequence string, Charset charset)

StringDecoder 和 StringEncoder

这两个类分别是Netty提供的解码器和编码器,它们同时也是ChannelHandler的子类。有了这两个编解码器,就不再需要与ByteBuf打交道,代码写起来也更简洁方便。

StringDecoder

下图是解码器类StringDecoder的继承关系,注意它的父类是实现了ChannelInboundHandler接口的,作用在消息入站的时候:

如果有特殊需求需要自定义解码器也是可以的,只要实现MessageToMessageDecoder接口就可以了。

写法如下:

1
2
3
4
5
6
public class MessageDecoder extends MessageToMessageDecoder<ByteBuf> {
@Override
protected void decode(ChannelHandlerContext channelHandlerContext,`ByteBuf`byteBuf, List<Object> list) throws Exception {
list.add(byteBuf.toString(CharsetUtil.UTF_8));
}
}

StringEncoder

下图是编码器类StringEncoder的继承关系,注意它的父类是实现了ChannelOutboundHandler接口的,作用在消息出站:

如果要自定义编码器,实现MessageToMessageEncoder接口就行了。

写法如下:

1
2
3
4
5
6
public class MessageEncoder extends MessageToMessageEncoder<String> {
@Override
protected void encode(ChannelHandlerContext channelHandlerContext, String s, List<Object> list) throws Exception {
list.add(Unpooled.copiedBuffer(s, CharsetUtil.UTF_8));
}
}

如果嫌为自定义编码器和自定义解码器分别创建一个类太麻烦,还可以直接继承MessageToMessageCodec接口。

这个接口继承关系如下,注意它的父类同时实现了ChannelInboundHandlerChannelOutboundHandler,作用在消息入站和出站:

写法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
public class MessageCodec extends MessageToMessageCodec<ByteBuf, String> {

@Override
protected void encode(ChannelHandlerContext channelHandlerContext, String s, List<Object> list) throws Exception {
list.add(Unpooled.copiedBuffer(s, CharsetUtil.UTF_8));
}

@Override
protected void decode(ChannelHandlerContext channelHandlerContext,`ByteBuf`byteBuf, List<Object> list) throws Exception {
list.add(byteBuf.toString(CharsetUtil.UTF_8));
}

}

LineBasedFrameDecoder 与 DelimiterBasedFrameDecoder

这两个类也都是解码器,但它们解决的问题与上面所讲的编解码器不同,这两个类主要是解决粘包拆包的问题。

那么问题来了,什么是粘包和拆包?为什么会出现粘包和拆包呢?

首先来说说什么是粘包和拆包:

在文章开始讲了TCP的三个重要的特点:面向连接、可靠的和字节流。而Netty底层是基于TCP的,它的客户端与服务端交互时发送的数据在传输层都是通过字节流传输的,字节流是没有界线的概念的,这时服务器在读取数据时就可能在一次读取中读取到到客户端分几次发的数据,这就叫粘包。如果客户端发送一次数据,服务器分几次才能完整读到,这就是拆包。

粘包拆包大致如下图所示:

粘包拆包大致有以下几个原因:

  • socket缓冲区与滑动窗口: 在发送数据的时,发送方必须要先确认接收方的窗口没有被填充满,如果没有填满,则可以发送
  • MSS/MTU限制
  • Nagle算法:Nagle算法是为了尽可能发送大块数据,避免网络中充斥着许多小数据块。

除了上面几个原因以外,我们Netty程序中看到的粘包拆包现象很大程度上也是由于Netty读取数据的机制。

Netty为了每次轮询能够负载均衡,会限制读取数据前生成的ByteBufcapacity大小,也就是限制了每次读取数据量的大小。通常第一次读取这个初始大小都 2048,后续的限制有两种情况:

  • 连续的数据量都比较大:2048 -> 32768 -> 65536 -> 65536 …. (Netty限制的最大值就是 65536)
  • 连续数据量都比较小:2048 -> 1024 -> 512 -> 496 … -> 64 (Netty限制的最小值是 64)

注意:Netty能对每次读取数据进行限制而不怕数据丢失主要是因为NioServerSocketChannel底层是水平触发的,即使这次没读完,下次Selector也能自动获取到剩下数据的读就绪事件。

由于ByteBuf的大小是Netty设定的,即使TCP层传的没有问题,小的数据也会”粘”在一起,大的数据也会”拆”开,看起来出现了粘包拆包。

Netty中解决粘包拆包的方法:

  • FixedLengthFrameDecoder:固定长度拆包器,使用固定长度进行拆分;
  • LineBasedFrameDecoder:行拆包器,使用换行符进行拆分;
  • DelimiterBasedFrameDecoder:分隔符拆包器,使用自定义的分隔符进行拆分;
  • LengthFieldBasedFrameDecoder:基于数据包长度的拆包器,基于应用层协议中传过来的长度进行拆分。

最常用的就是中间两个LineBasedFrameDecoderDelimiterBasedFrameDecoder

总结

以上就是Netty编程相关的知识点。Netty的组件非常多,可以自定义的地方也非常多,如果对这些组件不熟悉会踩很多坑,但熟悉这些组件之后使用它们编程会非常方便快捷。