Netty
现在的互联网环境下,分布式系统大行其道,而分布式系统的根基在于网络编程,而 Netty 恰恰是 Java 领域网络
编程的王者。如果要致力于开发高性能的服务器程序、高性能的客户端程序,必须掌握 Netty。
网络编程回顾
在 Java 基础中,我们讲解了网络编程,使用 Socket 来进行简单通信,我们简要回顾一下代码。
服务端:
@Slf4j
public class SocketServer {
public static void main(String[] args) {
try {
ServerSocket server = new ServerSocket(10086);
log.info("服务启动...等待客户端连接");
Socket socket = server.accept();
String ip = socket.getInetAddress().getHostAddress();
int port = socket.getPort();
log.info("有客户端连接: {}, 端口: {}", ip, port);
// 读客户端输入进来的数据
InputStream inputStream = socket.getInputStream();
InputStreamReader inputStreamReader = new InputStreamReader(inputStream);
BufferedReader bufferedReader = new BufferedReader(inputStreamReader);
String readData = bufferedReader.readLine();
log.info("收到客户端消息: {}", readData);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
客户端:
@Slf4j
public class SocketClient {
public static void main(String[] args) {
try (Socket socket = new Socket("localhost", 10086)) {
// 往服务端发送消息
OutputStream outputStream = socket.getOutputStream();
PrintWriter printWriter = new PrintWriter(outputStream);
System.out.println("请输入内容: ");
Scanner scanner = new Scanner(System.in);
String input = scanner.nextLine();
printWriter.println(input);
printWriter.flush();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
这里的服务端和客户端都是什么意思呢?我们不妨来设想一个场景:A 要发邮件给 B,但是事实上,A 没办法直接发邮件给 B,而是需要一个中转——邮局。A 把邮件投递到邮局,邮局再把邮件发送给 B。那么,这里的服务端就起到类似于邮局的作用,客户端就是 A(上述代码没有写服务端到另一个客户端的发送)。
NIO 基础
NIO(Non-Blocking IO,非阻塞 IO)。NIO 有三大组件:Channel、Buffer、Selector。我们接下来先看一下这三大组件。
Channel & Buffer
Channel 具有通道的意思,在这里指的是数据的传输通道,是一个双向通道。而 Buffer 是缓存区,如果要往应用程序通道中写入、读出数据时,数据应优先存入 Buffer 中。
Channel 有一点类似于 stream,它就是读写数据的双向通道,可以从 Channel 将数据读入 Buffer,也可以将 Buffer的数据写入 Channel,而之前的 stream 要么是输入,要么是输出,Channel 比 Stream 更为底层。
flowchart LR Channel --> Buffer Buffer --> Channel
常见的 Channel 有:
- FileChannel。
- DatagramChannel。
- SocketChannel。
- ServerSocketChannel。
常见的 Buffer 有:
- ByteBuffer:
- MappedByteBuffer。
- DirectByteBuffer。
- HeapByteBuffer。
- ShortBuffer。
- IntBuffer。
- LongBuffer。
- FloatBuffer。
- DoubleBuffer。
- CharBuffer。
Selector
Selector 但从字面意思不好理解,需要结合服务器的设计演化来理解它的用途:
多线程版的服务器设计
在一开始,我们最原始的服务端开发是多线程版的思想。服务器需要处理请求,展示出来的就是需要用套接字来进行数据的通信,多个请求就需要多个套接字,自然需要多个线程来维护:
flowchart TD Thread1 --> Socket1 Thread2 --> Socket2 Thread3 --> Socket3
但是这样一来,会有很大的缺陷:
- 内存占用高。(多个线程占用大量内存)
- 线程上下文切换成本高。
- 只适合连接数少的场景。
线程池版的服务器设计
再后来,我们推出了线程池版的服务器设计:
flowchart TD Thread1 --> Socket1 Thread1 --> Socket2 Thread2 --> Socket3 Thread2 --> Socket4
不过,线程池版的服务器设计也有缺陷:
- 阻塞模式(线程会一直等待这个 Socket 的服务请求,如果 Socket 没服务需要,则一致等待)下,线程仅能处理一个 Socket 连接。
- 仅适合短连接的场景。
Selector 版的服务器设计
Selector 的作用就是配合一个线程来管理多个 Channel,获取这些 Channel 上发生的事件,这些 Channel 工作在
非阻塞模式下,不会让线程吊死在一个 Channel 上。适合连接数特别多,但流量低的场景(lowtraffic,指 Socket 一次请求中交互的数据较少)。
flowchart TD Thread --> Selector Selector --> Socket1 Selector --> Socket2 Selector --> Socket3
调用 Selector 的 select()
方法会阻塞知道 Channel 发生了读写就绪事件,这些事件发生后,select()
方法就会返回这些事件交给 Thread 来处理。
ByteBuffer
基础使用
开始正式介绍 ByteBuffer 前,我们使用一个示例程序来演示 Channel 和 Buffer 的用法。首先,需要引入相关依赖:
<dependencies>
<!-- Netty依赖 -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.101.Final</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.34</version>
</dependency>
<!-- json转换 -->
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.10.1</version>
</dependency>
<!-- google工具类合集 -->
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>33.3.1-jre</version>
</dependency>
<!-- 日志打印 -->
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.5.6</version>
</dependency>
</dependencies>
接下来就可以使用 Channel 和 Buffer 来读取数据字节:
@Slf4j
public class TestByteBuffer {
public static void main(String[] args) {
// 获得FileChannel,这是一个文件数据读取通道
// 通过输入输出流可以间接获得Channel
try (FileChannel channel = new FileInputStream("data.txt").getChannel()) {
// 准备缓冲区暂存数据,缓冲区不能一次性开太大,不然很占用内存,我们使用多次读取的方式来解决这个问题
ByteBuffer buffer = ByteBuffer.allocate(10);
while (true) {
// 从channel读取数据,把数据写入到buffer中
int len = channel.read(buffer);
log.debug("读取到的字节数: {}", len);
if (len == -1) {
break;
}
// 从buffer中读取内容
buffer.flip(); // 切换到buffer的读模式
while (buffer.hasRemaining()) { // 检查是否有剩余数据
byte b = buffer.get();
log.debug("读取到的字节:{}", (char) b);
}
// 读完后,buffer切换为写模式
buffer.clear();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
内部结构
从上述的 “基础使用” 中,我们了解到了 ByteBuffer 的正确使用方法:
- 向 buffer 中写入数据,例如调用
channel.read(buffer)
。 - 调用
flip()
切换至读模式。 - 从 buffer 读取数据,例如调用
buffer.get()
。 - 调用
clear()
或compact()
切换至写模式。 - 重复 1 ~ 4 步骤。
ByteBuffer 有以下重要属性:
- capacity:表示了 buffer 的容量。
- position:读写位置。
- limit:读写限制。
Buffer 可以看作是一个维护有可以移动的指针的数组。
Buffer 一开始为写模式,position 在最左侧,limit 在最右侧。当我们写入时,position 往右移动,当我们写入完毕后,此时调用 filp 方法切换为读模式时,limit 移动到 position 位置,position 移动到最左侧,限制了读模式下的最大读入位置。当读取完毕后,position 再次回到最左侧,limit 再次回到最右侧……如此反复。
上述是调用 flip 方法的效果,而当我们调用 compact 时,会把未读完的部分向前压缩,然后切换至写模式。
常用方法
Buffer 的两种分配方式区别:
public class TestByteBufferAllocate {
public static void main(String[] args) {
// HeapByteBuffer,使用Java堆内存,读写效率较低,并且会受到垃圾回收的影响
System.out.println(ByteBuffer.allocate(16).getClass());
// DirectByteBuffer,使用的是直接内存(系统内存)
// 读写效率较高,不会收到gc影响,但是分配的速度较慢(需要调用系统函数),并且可能造成内存泄漏
System.out.println(ByteBuffer.allocateDirect(16).getClass());
}
}
写入数据:
- 调用 Channel 的 read 方法:
channel.read(buf)
。 - 调用 Buffer 自己的 put 方法:
buf.put((byte)172)
。
读取数据:
- 调用 Channel 的 write 方法:
channel.write(buf)
。 - 调用 Buffer 自己的 get 方法:
buf.get()
。(get 方法会让 position 读指针向后走,但是如果我们要重复读取数据的话,可以调用rewind
方法将 position 重置为 0;或者调用get(int i)
方法获取索引i
的内容,但是不移动读指针)
标记位置:
- 如果需要记录某个重要位置,可以调用
mark()
方法。 - 调用了 mark 后,可以调用
reset()
方法,把读指针移动会刚刚做了标记的位置。
字符串与 ByteBuffer 互转:
public class TestByteBufferString {
public static void main(String[] args) {
// 1.字符串直接转为ByteBuffer
ByteBuffer buffer = ByteBuffer.allocate(16);
buffer.put("hello".getBytes());
// 2.借助Charset指定字符集,此时会自动切换到读模式
ByteBuffer helloBuffer = StandardCharsets.UTF_8.encode("hello");
// 3.使用工具类wrap方法进行转换
ByteBuffer wrapBuffer = ByteBuffer.wrap("hello".getBytes());
// 4.把Buffer转换成字符串,注意需要在读模式下才能进行转换
String hello = StandardCharsets.UTF_8.decode(helloBuffer).toString();
System.out.println(hello);
}
}
分散读集中写
如果要读入的文本长度已知,可以分散地把这段文本读入到多个 Buffer 当中,可以避免数据的重新分割和复制:
public class TestScatteringReads {
public static void main(String[] args) {
try (FileChannel channel = new RandomAccessFile("data.txt", "r").getChannel()) {
// 准备三个ByteBuffer
ByteBuffer b1 = ByteBuffer.allocate(3);
ByteBuffer b2 = ByteBuffer.allocate(3);
ByteBuffer b3 = ByteBuffer.allocate(5);
// 直接读入
channel.read(new ByteBuffer[]{b1, b2, b3});
b1.flip();
b2.flip();
b3.flip();
} catch (IOException e) {
}
}
}
反之,如果有很多段 ByteBuffer,想要把数据拼接后写入到文件中,可以直接把多个 ByteBuffer 组合到一起写入,避免多次拷贝:
public class TestGatheringWrites {
public static void main(String[] args) {
// 准备三个ByteBuffer
ByteBuffer b1 = StandardCharsets.UTF_8.encode("hello");
ByteBuffer b2 = StandardCharsets.UTF_8.encode("world");
ByteBuffer b3 = StandardCharsets.UTF_8.encode("你好");
try (FileChannel channel = new RandomAccessFile("data2.txt", "rw").getChannel()) {
// 集中写入
channel.write(new ByteBuffer[]{b1, b2, b3});
} catch (IOException e) {
}
}
}
黏包半包处理
我们来看一个场景:网络上有多条数据发送给服务器,数据之间使用 \n
进行分隔。但由于某种原因这些数据在接收时,被进行了重新组合。例如原始数据有三条为:
Hello,world\n
I'm zhangsan\n
How are you?\n
这些数据在传输过程中变成了如下形式:
Hello,world\nI'm zhangsan\nHo
w are you?\n
这实际上是很常见的网络黏包半包现象,“黏包” 指的就是数据被黏合到了一起(网络考虑效率问题,把数据合起来再发出去,减少发送次数),“半包” 指的就是数据被切割开了(服务器缓冲区相对较小,无法一次性读入,只能分多次读,导致数据被切割)。
现在要求我们编写程序,来对数据进行复原。我们根据上述题意,便可以写出如下程序:
public class TestByteBufferExam {
public static void main(String[] args) {
ByteBuffer source = ByteBuffer.allocate(32);
source.put("Hello,world\nI'm zhangsan\nHo".getBytes());
split(source);
source.put("w are you?\n".getBytes());
split(source);
}
// 拆分数据
private static void split(ByteBuffer source) {
// 切换成读模式
source.flip();
for (int i = 0; i < source.limit(); ++i) {
if (source.get(i) == '\n') {
int len = i + 1 - source.position();
// 存入到新的ByteBuffer
ByteBuffer target = ByteBuffer.allocate(len);
// 往target中写
for (int j = 0; j < len; ++j) {
target.put(source.get());
}
}
}
// 切换成写模式,并保留第一次未读的数据,方便可以和下一个数据包拼接到一起
source.compact();
}
}
文件编程
FileChannel
FileChannel 只能工作在阻塞模式下,所以没办法配合 Selector 一起使用。一般我们不能直接打开 FileChannel,必须通过 FileInputStream、FileOutputStream 或者 RandomAccessFile 来获取 FileChannel,它们都有 getChannel
方法:
- 通过 FileInputStream 获取的 Channel 只能读。
- 通过 FileOutputStream 获取的 Channel 只能写。
- 通过 RandomAccessFile 获取的 Channel 是否能读写根据构造 RandomAccessFile 的读写模式决定。
Channel 的写能力有上限,所以在写入的时候需要分多次写入:
ByteBuffer buffer = ...;
buffer.put(...); // 存入数据
buffer.flip(); // 切换为读模式
while(buffer.hasRemaining()) {
channel.write(buffer);
}
同时,Channel 必须关闭,不过调用了 FileInputStream、FileOutputStream 或者 RandomAccessFile 的 close 方法会间接关闭 Channel。并且,操作系统出于性能的考虑,会将数据缓存,不是立即写入磁盘。可以调用 force(true)
方法将文件内容和元数据(文件的权限等信息)立刻写入磁盘。
两个 Channel 传输数据
调用 transferTo
可以让数据在两个 Channel 中间传输:
public class TestFileChannelTransferTo {
public static void main(String[] args) {
try (
FileChannel from = new FileInputStream("data.txt").getChannel();
FileChannel to = new FileOutputStream("data-output.txt").getChannel();
) {
// 底层利用操作系统的零拷贝进行优化,但是最多只能传2G的数据
from.transferTo(0, from.size(), to);
} catch (IOException e) {
e.printStackTrace();
}
}
}
我们优化上述代码,使得其可以传输大数据:
public class TestFileChannelTransferTo {
public static void main(String[] args) {
try (
FileChannel from = new FileInputStream("data.txt").getChannel();
FileChannel to = new FileOutputStream("data-output.txt").getChannel();
) {
long size = from.size();
for(long left = size; left > 0;) {
// 返回实际传输的字节数
long transfer = from.transferTo(size - left, left, to);
left -= transfer;
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
Path 和 Files
jdk7 引入了 Path 和 Paths 类:
- Path 用来表示文件路径。
- Paths 是工具类,用来获取 Path 实例。
Path source = Paths.get("1.txt"); // 相对路径,使用user.dir来定位
Path path = Paths.get("d:\\data\\projects\\a\\..\\b");
System.out.println(path.normalize()); // 正常化路径
同样,jdk7 也引入了 Files:
// 检查文件是否存在
Path path = Paths.get("helloworld/data.txt");
System.out.println(Files.exists(path));
其他 API 见官方文档,这里不再赘述。我们接下俩利用几段代码来展示其功能。
遍历目录(使用的是访问者模式):
public class TestFiles { public static void main(String[] args) throws IOException { AtomicInteger dirCount = new AtomicInteger(); AtomicInteger fileCount = new AtomicInteger(); // 使用walkFileTree遍历目录 Files.walkFileTree(Paths.get("D:\\Java"), new SimpleFileVisitor<>() { // 在进入文件夹前执行的操作 @Override public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) throws IOException { System.out.println("===> " + dir); // 匿名内部类无法引用外部局部变量(此时相当于变量被final修饰),所以我们使用累加器代替 dirCount.incrementAndGet(); return super.preVisitDirectory(dir, attrs); } // 访问文件时执行的操作 @Override public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException { System.out.println(file); fileCount.incrementAndGet(); return super.visitFile(file, attrs); } }); System.out.println(dirCount + " " + fileCount); } }
删除多级目录:
public class TestFiles { public static void main(String[] args) throws IOException { Files.walkFileTree(Paths.get("D:\\tmp"), new SimpleFileVisitor<>() { // 访问文件时执行的操作 @Override public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException { Files.delete(file); return super.visitFile(file, attrs); } // 退出目录执行的操作 @Override public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException { Files.delete(dir); return super.postVisitDirectory(dir, exc); } }); } }
拷贝多级目录:
public class TestFiles { public static void main(String[] args) throws IOException { String source = "D:\\source"; String target = "D:\\target"; Files.walk(Paths.get(source)).forEach(path -> { try { // 替换文件名 String targetName = path.toString().replace(source, target); // 是目录 if (Files.isDirectory(path)) { Files.createDirectories(Paths.get(targetName)); } // 是普通文件,执行拷贝 else if (Files.isRegularFile(path)) { Files.copy(path, Paths.get(targetName)); } } catch (IOException e) { throw new RuntimeException(e); } }); } }
网络编程
非阻塞 VS 阻塞
我们来看一下阻塞和非阻塞的区别:
阻塞:
- 在没有数据可读时,包括数据复制过程中,线程必须阻塞等待,不会占用 cpu,但线程相当于闲置。
- 32 位 jvm 一个线程 320k,64 位 jvm 一个线程 1024k,为了减少线程数,需要采用线程池技术.
- 即便用了线程池,如果有很多连接建立,但长时间 inactive,会阻塞线程池中所有线程。
简单来说,阻塞就是:没有连接时,线程停止运行。
非阻塞:
- 某个 Channel 没有可读事件时,线程不必阻塞,它可以去处理其它有可读事件的 Channel。
- 数据复制过程中,线程实际还是阻塞的(AIO 改进的地方)。
- 写数据时,线程只是等待数据写入 Channel 即可,无需等 Channel 通过网络把数据发送出去。
相对于阻塞,线程非阻塞的状态下,即使没有连接,它也会一直轮询,一直在工作,消耗 CPU。
多路复用
上述对于阻塞和非阻塞我们已经有了一个简单了解,我们之前说的 Selector 就是来解决非阻塞下线程一直轮询的缺陷。单线程可以配合 Selector 完成对多个 Channel 可读写事件的监控,这就叫做多路复用。
对于 Channel,我们需要将其和 Selector 进行绑定,以便 Selector 对 Channel 进行处理。当 Channel 绑定到 Selector 后,会生成一个 key,我们使用这个 key 来处理相对应的事件。其中事件又分以下几种:
- accept:会在有连接请求时触发。
- connect:客户端发起连接请求时,客户端与服务器建立连接后触发。
- read:当数据可读时触发。
- write:当数据可写时触发。
以下是用 Selector 优化后的服务端代码:
@Slf4j
public class TestServer {
public static void main(String[] args) throws IOException {
// 准备Selector
Selector selector = Selector.open(); // 可以管理多个Channel
ServerSocketChannel ssc = ServerSocketChannel.open(); // 创建服务器
ssc.configureBlocking(false); // 开启非阻塞状态
// 建立Selector和Channel之间的联系
// SelectionKey是事件发生后通过它来知晓事件和哪个Channel发生的事件
SelectionKey sscKey = ssc.register(selector, 0, null);
sscKey.interestOps(SelectionKey.OP_ACCEPT); // sscKey只关注accept事件,即是否有请求到达
ssc.bind(new InetSocketAddress(8080)); // 绑定监听端口
// 建立与客户端的连接
while (true) {
// 调用Selector查询是否发生指定的事件
// 没有事件发生,就会让线程阻塞,如果有事件发生,则会让线程非阻塞
selector.select();
// 处理事件,selector在事件发生后,只会加入key,但是不会删除,所以需要自己删除,不然事件处理后key没有删除,会产生空指针
Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
iter.remove(); // 手动删除key
// 区分事件类型
if (key.isAcceptable()) {
ServerSocketChannel channel = (ServerSocketChannel) key.channel();
SocketChannel sc = channel.accept(); // 建立连接
sc.configureBlocking(false);
SelectionKey scKey = sc.register(selector, 0, null);
scKey.interestOps(SelectionKey.OP_READ);
}
else if (key.isReadable()) {
try {
SocketChannel channel = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(16);
int read = channel.read(buffer);
// 如果客户端正常断开,会产生read事件,但是此时read方法返回-1
if (read == -1) {
key.cancel(); // 取消注册
} else {
buffer.flip();
System.out.println(StandardCharsets.UTF_8.decode(buffer));
}
} catch (IOException e) {
// 客户端强制断开连接时也会产生一个read事件,但这个时候会导致上述代码出现异常
// 我们抓住异常后需要把这个key取消注册,从selectedKey集合中删除
e.printStackTrace();
key.cancel();
}
}
}
}
}
}
客户端代码如下:
public class TestClient {
public static void main(String[] args) throws IOException {
SocketChannel socketChannel = SocketChannel.open();
socketChannel.connect(new InetSocketAddress("127.0.0.1", 8080));
SocketAddress address = socketChannel.getLocalAddress();
socketChannel.write(Charset.defaultCharset().encode("0123456789abcdefghijkl\n"));
System.out.println("waiting...");
}
}
消息边界问题
在网络编程中,对于一个 Buffer,可能出现 Buffer 的长度小于消息的长度,这就会导致半包。如果消息的长度小于 Buffer 的长度但是消息的个数过多,就会导致黏包。
一种解决方法就是,对于一次数据的传输,先传递一个数,表示接下来的数据长度,然后再发送实际的数据长度。这样服务端就可以根据这个数据的长度来开辟对应大小的 Buffer 用来接收数据。
这种方法的应用很广泛,例如 HTTP 协议使用到就是 TLV 格式(Type、Length、Value)。需要知晓文件类型(Content-Type)、长度(Content-Length),最后再去读取实际的报文体。
对于这种方法具体细节我们放到后面的 Netty 来讲,我们先用前面小节中使用的处理半包黏包的方法来解决消息边界问题:
@Slf4j
public class TestServer {
// 拆分数据
private static void split(ByteBuffer source) {
// 切换成读模式
source.flip();
for (int i = 0; i < source.limit(); ++i) {
if (source.get(i) == '\n') {
int len = i + 1 - source.position();
// 存入到新的ByteBuffer
ByteBuffer target = ByteBuffer.allocate(len);
// 往target中写
for (int j = 0; j < len; ++j) {
target.put(source.get());
}
target.flip();
System.out.println(StandardCharsets.UTF_8.decode(target));
}
}
// 切换成写模式,并保留第一次未读的数据,方便可以和下一个数据包拼接到一起
source.compact();
}
public static void main(String[] args) throws IOException {
// 准备Selector
Selector selector = Selector.open(); // 可以管理多个Channel
ServerSocketChannel ssc = ServerSocketChannel.open(); // 创建服务器
ssc.configureBlocking(false); // 开启非阻塞状态
// 建立Selector和Channel之间的联系
// SelectionKey是事件发生后通过它来知晓事件和哪个Channel发生的事件
SelectionKey sscKey = ssc.register(selector, 0, null);
sscKey.interestOps(SelectionKey.OP_ACCEPT); // sscKey只关注accept事件,即是否有请求到达
log.debug("sscKey: {}", sscKey);
ssc.bind(new InetSocketAddress(8080)); // 绑定监听端口
// 建立与客户端的连接
while (true) {
// 调用Selector查询是否发生指定的事件
// 没有事件发生,就会让线程阻塞,如果有事件发生,则会让线程非阻塞
selector.select();
// 处理事件,selector在事件发生后,只会加入key,但是不会删除,所以需要自己删除,不然事件处理后key没有删除,会产生空指针
Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
iter.remove(); // 手动删除key
// 区分事件类型
if (key.isAcceptable()) {
ServerSocketChannel channel = (ServerSocketChannel) key.channel();
SocketChannel sc = channel.accept(); // 建立连接
sc.configureBlocking(false);
// Buffer作为附件关联到SelectionKey上
ByteBuffer buffer = ByteBuffer.allocate(16);
SelectionKey scKey = sc.register(selector, 0, buffer);
scKey.interestOps(SelectionKey.OP_READ);
log.debug("scKey: {}", scKey);
}
else if (key.isReadable()) {
try {
SocketChannel channel = (SocketChannel) key.channel();
// 拿取附件
ByteBuffer buffer = (ByteBuffer) key.attachment();
int read = channel.read(buffer);
// 如果客户端正常断开,会产生read事件,但是此时read方法返回-1
if (read == -1) {
key.cancel(); // 取消注册
} else {
split(buffer);
// 第一次没读完,扩容附件
if (buffer.position() == buffer.limit()) {
ByteBuffer newBuffer = ByteBuffer.allocate(buffer.capacity() * 2);
buffer.flip();
newBuffer.put(buffer);
key.attach(newBuffer);
}
}
} catch (IOException e) {
// 客户端强制断开连接时也会产生一个read事件,但这个时候会导致上述代码出现异常
// 我们抓住异常后需要把这个key取消注册,从selectedKey集合中删除
e.printStackTrace();
key.cancel();
}
}
}
}
}
}
ByteBuffer 大小分配
每个 Channel 都需要记录可能被拆分的消息,因为 ByteBuffer 不能被多个 Channel 共同使用。所以需要以附件的形式把 ByteBuffer 挂载到 Channel 上。
ByteBuffer 不能太大,比如一个 ByteBuffer 1Mb 的话,要支持百万连接就需要 1Tb 的内存,所以需要设计大小可变的 ByteBuffer:
- 一种思路就是准备一个较小的 Buffer,然后根据数据量来扩容。优点是消息连续容易处理,缺点是扩容时需要数据拷贝,消耗性能。
- 另一种思路就是用多个数组组成 Buffer,一个数组不够,把多出来的内容写入新的数组,与前面的区别是消息存储不连续解析复杂,优点是避免了拷贝引起的性能消耗。
写入内容过多的问题
现在有一个客户端负责接收服务端的数据:
public class TestWriteClient {
public static void main(String[] args) throws IOException {
SocketChannel sc = SocketChannel.open();
sc.connect(new InetSocketAddress("127.0.0.1", 8080));
// 接收数据
int cnt = 0;
while(true) {
ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024);
cnt += sc.read(buffer);
System.out.println(cnt);
buffer.clear();
}
}
}
服务端的发送数据代码如下:
public class TestWriteServer {
public static void main(String[] args) throws IOException {
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
Selector selector = Selector.open();
ssc.register(selector, SelectionKey.OP_ACCEPT);
ssc.bind(new InetSocketAddress(8080));
while (true) {
selector.select();
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
if (key.isAcceptable()) {
SocketChannel sc = ssc.accept();
sc.configureBlocking(false);
// 向客户端发送大量数据
StringBuilder sb = new StringBuilder();
for (int i = 0; i < 3000000; ++i) {
sb.append("a");
}
ByteBuffer buffer = Charset.defaultCharset().encode(sb.toString());
// 返回值代表实际写入的字节数
while (buffer.hasRemaining()) {
int write = sc.write(buffer);
System.out.println(write);
}
}
}
}
}
}
看似很完美,但是实际上,接收端可能没办法及时接收到服务端发送的数据。这就会导致服务端有时候可能会无法 write,此时服务端负责发送数据的 SocketChannel 就会轮询发送,消耗资源。
所以我们就需要正确处理 write 事件,当发送缓冲区满了的时候,让 Channel 处理别的操作。当写缓冲区空了的时候,触发 write 事件,再次让 Channel 来发送数据,这样才能最大化利用资源。
优化后代码如下:
public class TestWriteServer {
public static void main(String[] args) throws IOException {
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
Selector selector = Selector.open();
ssc.register(selector, SelectionKey.OP_ACCEPT);
ssc.bind(new InetSocketAddress(8080));
while (true) {
selector.select();
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
if (key.isAcceptable()) {
SocketChannel sc = ssc.accept();
sc.configureBlocking(false);
SelectionKey scKey = sc.register(selector, 0, null);
// 向客户端发送大量数据
StringBuilder sb = new StringBuilder();
for (int i = 0; i < 3000000; ++i) {
sb.append("a");
}
ByteBuffer buffer = Charset.defaultCharset().encode(sb.toString());
// 先尝试写满一次
int write = sc.write(buffer);
System.out.println(write);
// 判断是否有剩余内容
if (buffer.hasRemaining()) {
// 关注可写事件,在原来关注的基础上加上新事件,防止事件覆盖
scKey.interestOps(scKey.interestOps() + SelectionKey.OP_WRITE);
// 把未写完的数据挂载到scKey上
scKey.attach(buffer);
}
}
// 当缓冲区再次可写时触发
else if (key.isWritable()) {
ByteBuffer buffer = (ByteBuffer) key.attachment();
SocketChannel channel = (SocketChannel) key.channel();
// 再写一次,就算写不完,我们已经关注可写事件了,所以当可写发生时会再次走这里的代码
int write = channel.write(buffer);
System.out.println(write);
if (!buffer.hasRemaining()) {
// buffer读完了,把附件buffer释放掉,并且释放可写事件的绑定
key.attach(null);
key.interestOps(key.interestOps() - SelectionKey.OP_WRITE);
}
}
}
}
}
}
多线程优化
前面的代码只有一个 Seletor,没有充分利用多核 CPU,我们现在需要来用多线程优化我们之前的代码。
我们提供一个场景进行多线程演示:Boss 在服务端中有自己的多线程,每个线程有自己的 Selector,但是 Boss 只负责与客户端建立连接;Worker 在服务端种也有自己的多线程,每个线程也有自己的 Selector,但是 Worker 只负责数据的读写处理:
@Slf4j
public class MultiThreadServer {
public static void main(String[] args) throws IOException {
Thread.currentThread().setName("boss");
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
Selector boss = Selector.open();
SelectionKey bossKey = ssc.register(boss, 0, null);
bossKey.interestOps(SelectionKey.OP_ACCEPT);
ssc.bind(new InetSocketAddress(8080));
// 创建固定数量的worker
Worker worker = new Worker("worker-0");
worker.register();
while (true) {
boss.select();
Iterator<SelectionKey> iterator = boss.selectedKeys().iterator();
while(iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
if (key.isAcceptable()) {
SocketChannel sc = ssc.accept(); // 建立新的SocketChannel
sc.configureBlocking(false);
log.debug("connected...{}", sc.getRemoteAddress());
// 使用Worker来管理SocketChannel的数据处理
log.debug("before register...{}", sc.getRemoteAddress());
sc.register(worker.getSelector(), SelectionKey.OP_READ, null);
log.debug("after register...{}", sc.getRemoteAddress());
}
}
}
}
}
@Data
@Slf4j
public class Worker implements Runnable {
private Thread thread;
private Selector selector;
private String name;
private volatile boolean start = false; // 线程还未初始化
public Worker(String name) {
this.name = name;
}
// 初始化线程和selector
public void register() throws IOException {
if (!start) { // 保证本段代码只执行一遍
// Worker实际上就是任务对象,实现了线程任务接口
thread = new Thread(this, name);
selector = Selector.open();
thread.start();
start = true;
}
}
@Override
public void run() {
// 检测读写事件
while (true) {
try {
selector.select();
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
if (key.isReadable()) {
ByteBuffer buffer = ByteBuffer.allocate(16);
SocketChannel channel = (SocketChannel) key.channel();
log.debug("read...{}", channel.getRemoteAddress());
channel.read(buffer);
buffer.flip();
System.out.println(Charset.defaultCharset().decode(buffer));
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
阻塞问题
现在有一个问题,就是服务端实际上没办法接收并读取客户端发送过来的数据,原因在于 register 方法中,会启动 Worker 的线程去执行 run 方法,run 方法中 selector.select()
在没有事件时会阻塞,导致 sc.register(worker.getSelector(), SelectionKey.OP_READ, null);
这行代码也被阻塞。一种简单的解决方法是把 worker.register()
放在 sc.register(...)
前:
worker.register(); // 启用worker-0线程,run方法在worker线程中执行
sc.register(worker.getSelector(), SelectionKey.OP_READ, null); // boss线程执行
此时操作系统在针对两个线程的启动时,顺序是随机的,大概率会先让 boss 线程负责的 sc.register(...)
先执行,此时不会导致阻塞。
但是,当有多个客户端的时候,当系统处理完第一个客户端请求时,worker 线程的 selector 会因为没有事件发生而再次阻塞住,此时如果又有客户端请求时便会再一次被阻塞住。
一种解决思路就是让这两段代码运行在同一个线程中,这样就容易控制其执行顺序了。服务器代码更改:
// ...
while (true) {
boss.select();
Iterator<SelectionKey> iterator = boss.selectedKeys().iterator();
while(iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
if (key.isAcceptable()) {
SocketChannel sc = ssc.accept();
sc.configureBlocking(false);
log.debug("connected...{}", sc.getRemoteAddress());
log.debug("before register...{}", sc.getRemoteAddress());
// 1.调用worker执行初始化
worker.register(sc);
log.debug("after register...{}", sc.getRemoteAddress());
}
}
}
Worker 代码更改:
@Data
@Slf4j
public class Worker implements Runnable {
private Thread thread;
private Selector selector;
private String name;
private volatile boolean start = false;
// 使用线程安全的队列来存储执行任务,进行解耦
private ConcurrentLinkedQueue<Runnable> queue = new ConcurrentLinkedQueue<>();
public Worker(String name) {
this.name = name;
}
public void register(SocketChannel sc) throws IOException {
if (!start) {
thread = new Thread(this, name);
selector = Selector.open();
thread.start();
start = true;
}
// 3.boss线程向队列添加任务,但任务并没有立刻执行
queue.add(() -> {
try {
// 6.当前sc关注read事件
sc.register(selector, SelectionKey.OP_READ, null);
} catch (ClosedChannelException e) {
throw new RuntimeException(e);
}
});
selector.wakeup(); // 4.唤醒selector
}
@Override
public void run() {
// 检测读写事件
while (true) {
try {
// 2.worker-0在调用register后会启动,没事件发生,在这里阻塞
selector.select();
// 5.上述代码被唤醒,不阻塞,把绑定代码放在worker线程里面执行
Runnable task = queue.poll();
if (task != null) {
task.run();
}
// ...
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
多 Worker
之前的代码只有一个 Worker,我们为了充分利用 CPU,接下来将其优化为多个 Worker:
@Slf4j
public class MultiThreadServer {
public static void main(String[] args) throws IOException {
Thread.currentThread().setName("boss");
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
Selector boss = Selector.open();
SelectionKey bossKey = ssc.register(boss, 0, null);
bossKey.interestOps(SelectionKey.OP_ACCEPT);
ssc.bind(new InetSocketAddress(8080));
// 创建多个worker,根据CPU核心数来创建
Worker[] workers = new Worker[Runtime.getRuntime().availableProcessors()];
for (int i = 0; i < workers.length; ++i) {
workers[i] = new Worker("worker-" + i);
}
AtomicInteger index = new AtomicInteger();
while (true) {
boss.select();
Iterator<SelectionKey> iterator = boss.selectedKeys().iterator();
while(iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
if (key.isAcceptable()) {
SocketChannel sc = ssc.accept(); // 建立新的SocketChannel
sc.configureBlocking(false);
log.debug("connected...{}", sc.getRemoteAddress());
// 使用Worker来管理SocketChannel的数据处理
log.debug("before register...{}", sc.getRemoteAddress());
// 轮询worker
workers[index.getAndIncrement() % workers.length].register(sc);
log.debug("after register...{}", sc.getRemoteAddress());
}
}
}
}
}
概念剖析
Stream VS Channel
- Stream 不会自动缓冲数据,Channel 会利用系统提供的发送缓冲区、接收缓冲区。(Channel 更为底层)
- Stream 仅支持阻塞 API,Channel 同时支持阻塞、非阻塞 API,网络 Channel 可配合 Selector 实现多路复用。
- 二者均为全双工,即读写可以同时进行。
IO 模型
- 同步:线程自己去获取结果(一个线程)。
- 异步:线程自己不去获取结果,而是由其他线程送结果(至少两个线程)。
其他知识点详情见 《Redis》一章。
零拷贝
我们先来讲一个传统 IO 的问题,传统 IO 下,我们将一个文件通过 socket 写出,有如下代码:
// 文件读入
File f = new File("helloworld/data.txt");
RandomAccessFile file = new RandomAccessFile(file, "r");
byte[] buf = new byte[(int)f.length()];
file.read(buf);
// 文件写出
Socket socket = ...;
socket.getOutputStream().write(buf);
有于 Java 是没有读写能力的,Java 是通过调用操作系统的方法实现读写功能。其内部的工作流程是这样的:数据先从磁盘读入到内核缓冲区,再读入到用户缓冲区(上述 byte 数组);接着数据从用户缓冲区写入到 socket 缓冲区,最后写入网卡。
可以看到,最终用户态到内核态的切换发生了三次,数据一共拷贝了四次,效率低下。
NIO 下,可以通过 DirectByteBuffer 来优化:ByteBuffer.allocateDirect(10)
。通过直接在操作系统的内存空间中创建 Buffer,直接把内核缓冲区和用户缓冲区合并共用了。减少了一次数据拷贝,但是用户态和内核态的切换次数并没有减少。
进一步优化,我们调用 Channel 的 transferTo/transferFrom
来进行数据拷贝时,Linux 底层调用的函数把数据直接从内核缓冲区复制到 socket 缓冲区。此时只发生一次用户态和内核态的切换,数据拷贝了 3 次。
最后,在 Linux 2.4 中,数据是直接从内核缓冲区复制到网卡中,实现了数据零拷贝(指不会拷贝重复数据到 jvm 内存中,实际上数据的拷贝会发生两次,一次是从磁盘到内核缓冲区,另一次是内核缓冲区拷贝到网卡)。
零拷贝的优点有:
- 更少的用户态和内核态的切换。
- 不利用 CPU 计算(直接利用 EMA 硬件拷贝),减少 CPU 缓存伪共享。
- 零拷贝适合小文件的频繁传输。
Netty 入门
Netty 是一个异步的、基于事件驱动的网络应用框架,用于快速开发可维护、高性能的网络服务器和客户端。Netty 在 Java 网络应用框架中的地位就好比 Spring 框架在 JavaEE 开发中的地位。只要是涉及网络通信需求的项目,基本上都需要使用 Netty。
Hello World
我们先尝试使用 Netty 开发简单的服务器端和客户端:
- 客户端向服务器端发送
hello, world
。 - 服务器端仅接收,不返回。
需要依赖:
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.111.Final</version>
</dependency>
服务器代码:
public class HelloServer {
public static void main(String[] args) {
// 1.创建服务器端启动器,负责把下方这些Netty的组件组合到一起,提供服务
new ServerBootstrap()
// 2.添加EventLoop,由thread和selector组合而成
.group(new NioEventLoopGroup())
// 3.选择ServerChannel的实现,负责处理客户端的连接
.channel(NioServerSocketChannel.class)
// 4.boss负责处理连接,child负责处理读写,下述代码告诉child到时候数据到了需要做哪些事
.childHandler(new ChannelInitializer<NioSocketChannel>() { // 5.ChannelInitializer负责添加handler
@Override // 在连接建立后才会调用这个init方法
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
// 6.添加具体handler
nioSocketChannel.pipeline().addLast(new StringDecoder()); // 将ByteBuf转换为字符串
nioSocketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter() { // 自定义handler
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 打印接收到的数据
System.out.println(msg);
}
});
}
})
// 7.指定绑定监听端口
.bind(8080);
}
}
客户端代码:
public class HelloClient {
public static void main(String[] args) throws InterruptedException {
// 1.创建启动器类
new Bootstrap()
// 2.添加EventLoop,可以负责服务端到客户端的数据接收
.group(new NioEventLoopGroup())
// 3.选择Channel实现
.channel(NioSocketChannel.class)
// 4.添加处理器
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override // 在连接建立后才触发init方法
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
nioSocketChannel.pipeline().addLast(new StringEncoder()); // 把字符串转换为ByteBuf
}
})
// 5.连接到服务器
.connect(new InetSocketAddress("127.0.0.1", 8080))
.sync() // 阻塞方法,直到连接建立后才调用,拿到channel对象发送数据
.channel()
// 6.向服务器发送数据
.writeAndFlush("hello world");
}
}
概念理解
针对于上述代码,我们可以做如下比喻,以方便我们更好地理解 Netty 的工作流程:
- channel 可以理解为数据的传输通道。
- msg 可以理解为流动的数据,最开始输入是 ByteBuf,但经过 pipeline 的加工,会变成其他类型对象,最后输出又变成 ByteBuf。
- handler 可以理解为数据的处理工序:
- 工序由多道,合在一起就是 pipeline,pipeline 负责发布事件(读、读取完成……)传播给每个 handler,handler 对自己感兴趣的事件进行处理(重写了相应事件处理方法)
- handler 分 Inbound 和 Outbound 两类。
- eventLoop 可以理解为处理数据的工人:
- 工人可以管理多个 Channel 的 IO 操作(Selector 的多路复用),并且一旦工人负责了某个 channel,就要负责到底(绑定)。
- 工人既可以执行 IO 操作,也可以进行任务处理,每位工人有任务队列,队列里可以堆方多个 Channel 的代处理任务,任务分为普通任务、定时任务。
- 工人按照 pipline 顺序,依次按照 handler 的规划(代码)处理数据,可以为每道工序指定不同的工人。
EventLoop
EventLoop 本质是一个单线程执行器(同时维护了一个 Selector),里面有 run 方法处理 Channel 上源源不断的
IO 事件。
它的继承关系比较复杂:
- 一条线是继承自
j.u.c.ScheduledExecutorService
因此包含了线程池中所有的方法。 - 另一条线是继承自 netty 自己的 OrderedEventExecutor:
- 提供了
boolean inEventLoop(Thread thread)
方法判断一个线程是否属于此 EventLoop。 - 提供了 parent 方法来看看自己属于哪个 EventLoopGroup。
- 提供了
EventLoopGroup 是一组 EventLoop,Channel 一般会调用 EventLoopGroup 的 register 方法来绑定其中一个
EventLoop,后续这个 Channel 上的 IO 事件都由此 EventLoop 来处理(保证了 IO 事件处理时的线程安全)。
也就是说,可以把 EventLoopGroup 看作是 Netty 版的线程池,EventLoopGroup 中维护多个 EventLoop,其中每一个 EventLoop 中只维护了一个线程。
普通-定时任务
使用 EventLoopGroup 执行普通、定时任务代码如下:
@Slf4j
public class TestEventLoop {
public static void main(String[] args) {
// 1.创建事件循环组
EventLoopGroup group = new NioEventLoopGroup(2); // 可以处理IO事件,普通任务,定时任务
// DefaultEventLoop group = new DefaultEventLoop(); 可以处理普通任务,定时任务
// 2.获取下一个事件循环对象,轮询获取
System.out.println(group.next());
System.out.println(group.next());
System.out.println(group.next());
// 3.执行普通任务,因为其继承了线程池,所以有线程池的一系列方法
group.next().submit(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
log.debug("ok1");
});
// 4.执行定时任务,从第0秒开始,每隔1秒执行一次
group.next().scheduleAtFixedRate(() -> {
log.debug("ok2");
}, 0, 1, TimeUnit.SECONDS);
log.debug("main");
}
}
分工细化
上述代码是 EventLoop 的简单使用,而它的核心使用——处理 IO 事件,在之前的 hello world 示例中已经看到过了。
之前在 “NIO 基础” 一节中,我们提到过 Boss 和 Worker 进程。实际上,EventLoop 的分工细化也可以划分为 Boss 和 Worker,有负责专门处理连接的,有负责专门处理数据的:
@Slf4j
public class TestEventLoopServer {
public static void main(String[] args) {
new ServerBootstrap()
// 第一个参数:Boss,负责处理ServerSocketChannel上的accept事件
// 第二个参数:Worker,负责处理SocketChannel上的读写事件
.group(new NioEventLoopGroup(), new NioEventLoopGroup(2))
// ...省略冗余代码
.bind(8080);
}
}
上述的代码只是对连接和处理数据进行了划分。我们知道,对于一个 Worker,可能需要管理多个 Channel,如果有些 Channel 需要执行的操作总时间花费过长,会导致其他的 Channel 阻塞,无法执行。所以,我们可以专门划分出一些 EventLoop 来执行耗时较长的操作:
@Slf4j
public class TestEventLoopServer {
public static void main(String[] args) {
// 专门处理长耗时任务(无需IO)
EventLoopGroup group = new DefaultEventLoopGroup();
new ServerBootstrap()
.group(new NioEventLoopGroup(), new NioEventLoopGroup(2))
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
// 不指定EventLoopGroup,默认为上述NioEventLoopGroup(2)
nioSocketChannel.pipeline().addLast("handler1", new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.debug("执行耗时较短的操作");
ctx.fireChannelRead(msg); // 把msg传递给下一个handler
}
})
// 在添加Handler时指定EventLoopGroup来处理操作
.addLast(group, "handler2", new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.debug("执行耗时较长的操作");
}
});
}
})
.bind(8080);
}
}
切换线程
我们之前说过,EventLoop 是和一个 Channel 绑定的,像上述那种需要利用不同 EventLoop 执行 Handler 方法的,必定会涉及到线程的切换,关键源码如下:
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
final Object m = next.pipline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
// 下一个Handler的事件循环是否与当前事件循环是同一个线程
EventExecutor executor = next.executor(); // 返回下一个handler的eventLoop
// 是,直接调用
if (executor.inEventLoop()) {
next.invokeChannelRead(m);
}
// 不是,将要执行的代码作为任务提交给下一个事件循环
else {
// executor是下一个handler线程,往下一个线程里面添加执行任务
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelRead(m);
}
})
}
}
Channel
Channel 的主要作用:
close()
:可以用来关闭 Channel。closeFuture()
:用来处理 Channel 的关闭:- sync 方法作用是同步等待 Channel 关闭。
- addListener 方法是异步等待 Channel 关闭。
pipline()
:添加处理器。write()
:将数据写入 Channel 缓冲区,但是不会立刻写。writeAndFlush()
:将数据写入 Channel 并立刻刷出。
接下来,我们来介绍一个和 Channel 密切相关的类:ChannelFuture。
连接问题
我们把之前客户端代码分成多段,而不是链式调用:
public class EventLoopClient {
public static void main(String[] args) throws InterruptedException {
ChannelFuture channelFuture = new Bootstrap()
.group(new NioEventLoopGroup())
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
nioSocketChannel.pipeline().addLast(new StringEncoder());
}
})
// 异步方法
.connect(new InetSocketAddress("127.0.0.1", 8080));
channelFuture.sync();
Channel channel = channelFuture.channel();
channel.writeAndFlush("hello");
}
}
我们如果把 channelFuture.sync()
去掉,会发现客户端的数据并没有顺利传到服务端。上述代码还有很多值得细讲的地方:
- 首先,connect 方法实际上是一个异步的方法,即调用 connect 线程不关注结果,结果是由另一个线程返回的。上述代码中,是 main 主线程调用 connect 方法,而真正执行连接的是 NioEventLoopGroup 中的线程。
- 如果不执行
sync()
方法,那么 main 主线程会非阻塞地往下执行去创建一个 Channel,但是这个 Channel 并没有完全建立好连接,此时尝试发送数据便会发送失败。
ChannelFuture 实际上就是为了与上述这种异步方法使用的,用于正确处理结果:
使用
sync()
方法同步处理结果:使用sync()
后,main 线程会在这里阻塞,等到结果返回时才会继续往下执行。使用
addListener()
方法异步处理结果:mian 线程把等待结果的任务直接交给别的线程做,并且事先给这个线程一个回调函数,告诉这个线程当结果接收到后应该做什么事情:public class EventLoopClient { public static void main(String[] args) throws InterruptedException { ChannelFuture channelFuture = new Bootstrap() .group(new NioEventLoopGroup()) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<NioSocketChannel>() { @Override protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception { nioSocketChannel.pipeline().addLast(new StringEncoder()); } }) .connect(new InetSocketAddress("127.0.0.1", 8080)); // 异步处理结果 channelFuture.addListener(new ChannelFutureListener() { @Override // Nio线程建立完连接后执行的操作 public void operationComplete(ChannelFuture channelFuture) throws Exception { Channel channel = channelFuture.channel(); channel.writeAndFlush("hello"); } }); } }
关闭问题
有如下场景,现在我们需要编写代码使得 Channel 的关闭变得更加优雅:
public class EventLoopClient {
public static void main(String[] args) throws InterruptedException {
ChannelFuture channelFuture = new Bootstrap()
.group(new NioEventLoopGroup())
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
nioSocketChannel.pipeline().addLast(new StringEncoder());
}
})
.connect(new InetSocketAddress("127.0.0.1", 8080));
// 从控制台输入数据,把数据传递给服务端,直到输入Q
Channel channel = channelFuture.sync().channel();
new Thread(() -> {
Scanner scanner = new Scanner(System.in);
while (true) {
String line = scanner.nextLine();
if ("q".equals(line)) {
channel.close();
// close的善后代码不能写在这里,因为close是一个异步方法,如果其他线程在1s后才关闭这个channel,这个时候马上执行到这里,也是不对的
break;
}
channel.writeAndFlush(line);
}
}, "input").start();
// close的善后代码不能写在这里,因为Tread.start是非阻塞的,调用完start后会马上执行到这个位置
}
}
可以使用 CloseFuture,同步或者异步地处理善后操作:
@Slf4j
public class EventLoopClient {
public static void main(String[] args) throws InterruptedException {
ChannelFuture channelFuture = new Bootstrap()
.group(new NioEventLoopGroup())
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
// LoggingHandler可以打印更多日志
nioSocketChannel.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
nioSocketChannel.pipeline().addLast(new StringEncoder());
}
})
.connect(new InetSocketAddress("127.0.0.1", 8080));
// 从控制台输入数据,把数据传递给服务端,直到输入Q
Channel channel = channelFuture.sync().channel();
log.debug("{}", channel);
new Thread(() -> {
Scanner scanner = new Scanner(System.in);
while (true) {
String line = scanner.nextLine();
if ("q".equals(line)) {
channel.close();
break;
}
channel.writeAndFlush(line);
}
}, "input").start();
// 获取CloseFuture对象,可以同步处理关闭,也可以异步处理关闭
ChannelFuture closeFuture = channel.closeFuture();
System.out.println("waiting close...");
closeFuture.sync(); // 同步等待
System.out.println("善后操作...");
}
}
@Slf4j
public class EventLoopClient {
public static void main(String[] args) throws InterruptedException {
// ...省略冗余代码
// 获取CloseFuture对象,可以同步处理关闭,也可以异步处理关闭
ChannelFuture closeFuture = channel.closeFuture();
// 异步关闭
closeFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
System.out.println("善后操作...");
}
});
}
}
虽然上述代码正确地处理了 Channel 关闭后的善后操作,但是我们会发现实际上当 Channel 关闭后,客户端的线程并没有直接终止。这是因为 NioEventLoopGroup 中还有线程没有终止,我们需要在接下来的代码中处理这个问题:
@Slf4j
public class EventLoopClient {
public static void main(String[] args) throws InterruptedException {
// 把NioEventLoopGroup提取出来
NioEventLoopGroup group = new NioEventLoopGroup();
ChannelFuture channelFuture = new Bootstrap()
.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
// LoggingHandler可以打印更多日志
nioSocketChannel.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
nioSocketChannel.pipeline().addLast(new StringEncoder());
}
})
.connect(new InetSocketAddress("127.0.0.1", 8080));
Channel channel = channelFuture.sync().channel();
log.debug("{}", channel);
new Thread(() -> {
Scanner scanner = new Scanner(System.in);
while (true) {
String line = scanner.nextLine();
if ("q".equals(line)) {
channel.close();
break;
}
channel.writeAndFlush(line);
}
}, "input").start();
ChannelFuture closeFuture = channel.closeFuture();
// 异步关闭
closeFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
System.out.println("关闭后的善后操作...");
// 先拒绝接收任务,然后等到现在的任务执行后再把EventLoop线程关闭
group.shutdownGracefully();
}
});
}
}
Future & Promise
为什么需要异步
考虑如下一个场景:一个医院有 4 个医生,每个医生处理一个病人需要四个步骤(挂号、看病、缴费、取药),如果此时采用最直接的方法,便是一个医生负责病人看病所需要的四个步骤,并且四个医生都同时给病人看病。
我们可以看到,上述场景下,实际上并没有用到异步(一个医生负责一个病人的所有看病流程),只是用到了多线程。然而,假设我们考虑下述场景:第一个医生只负责给病人挂号,挂完号后把病人交给第二个医生;第二个医生只负责给看病,看完病后把病人交给第三个医生……。在这种情况下,才是异步的思想。
但实际上,假设一个医生一天工作 8 小时,病人看病的每一个步骤都需要 5 分钟。按照同步和异步的场景下,计算下来,会发现异步场景下一天能看的病人总数反而要比同步场景下的少。
要点:
- 单线程没法异步提高效率,必须配合多线程、多核 cpu 才能发挥异步的优势。
- 异步并没有缩短响应时间,反而有所增加。但是提高了单位时间内看病人的个数(即提高了吞吐量)。
- 合理进行任务拆分,也是利用异步的关键。
Future 和 Promise 概述
Netty 在异步处理时,经常用到 Future 和 Promise 这两个接口。
首先要说明的是 Netty 中的 Future 与 jdk 中的 Future 同名,但是是两个接口,Netty 的 Future 继承自 jdk 的
Future,而 Promise 又对 NettyFuture 进行了扩展:
- jdk Future 只能同步等待任务结束(或成功、或失败)才能得到结果。
- NettyFuture 可以同步等待任务结束得到结果,也可以异步方式得到结果,但都是要等任务结束(比较被动)。
- NettyPromise 仅有 NettyFuture 的功能,而且脱离了任务独立存在,只作为两个线程间传递结果的容器。
功能/名称 | jdk Future | netty Future | Promise |
---|---|---|---|
cancel |
取消任务 | - | - |
isCanceled |
任务是否取消 | - | - |
isDone |
任务是否完成,不能区分成功失败 | - | - |
get |
获取任务结果,阻塞等待 | - | - |
getNow |
- | 获取任务结果,非阻塞,还未产生结果时返回 null | - |
await |
- | 等待任务结束,如果任务失败,不会抛异常,而是通过 isSuccess 判断 |
- |
sync |
- | 等待任务结束,如果任务失败,抛出异常 | - |
isSuccess |
- | 判断任务是否成功 | - |
cause |
- | 获取失败信息,非阻塞,如果没有失败,返回null | - |
addLinstener |
- | 添加回调,异步接收结果 | - |
setSuccess |
- | - | 设置成功结果 |
setFailure |
- | - | 设置失败结果 |
JdkFuture
JdkFuture 中是由执行任务的线程把结果填入,作为主线程的我们是没办法主动地去填入这个结果的,所以 JdkFuture 是被动填入:
@Slf4j
public class TestJdkFuture {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 创建线程池,主线程异步调用线程池中的线程处理结果并接收返回值
ExecutorService service = Executors.newFixedThreadPool(2);
// 提交任务,需要接收结果
Future<Integer> future = service.submit(new Callable<Integer>() {
// 执行任务是线程池中的线程
@Override
public Integer call() throws Exception {
log.debug("执行计算");
Thread.sleep(1000); // 这段是线程池中线程处理的,主线程不会走
return 50;
}
});
// 主线程通过Future接收返回值
log.debug("等待结果");
Integer result = future.get();// get是同步等待,主线程直接走到这里阻塞住,等待结果
System.out.println(result);
}
}
NettyFuture
相对于 JdkFutrue 只能同步等待结果,NettyFuture 不仅可以同步等待,还可以异步等待结果。
同步接收结果:
@Slf4j
public class TestNettyFuture {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 创建Netty中的线程池
NioEventLoopGroup group = new NioEventLoopGroup();
// 拿到一个线程
EventLoop eventLoop = group.next();
// 这里返回的是NettyFuture
Future<Integer> future = eventLoop.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
log.debug("执行计算");
Thread.sleep(1000);
return 50;
}
});
log.debug("等待结果");
log.debug("结果是: {}", future.get());
group.shutdownGracefully();
}
}
异步接收结果:
@Slf4j
public class TestNettyFuture {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 创建Netty中的线程池
NioEventLoopGroup group = new NioEventLoopGroup();
// 拿到一个线程
EventLoop eventLoop = group.next();
// 这里返回的是NettyFuture
Future<Integer> future = eventLoop.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
log.debug("执行计算");
Thread.sleep(1000);
return 50;
}
});
future.addListener(new GenericFutureListener<Future<? super Integer>>() {
@Override
public void operationComplete(Future<? super Integer> future) throws Exception {
log.debug("接收结果: {}", future.getNow());
}
});
group.shutdownGracefully();
}
}
NettyPromise
在 JdkFuture 和 NettyFuture 中,对于 Future 的创建和使用权都不是我们主线程可以随便控制的。而对于 NettyPromise,其把主动权交给了主线程:
@Slf4j
public class TestNettyPromise {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 创建EventLoop
EventLoopGroup group = new NioEventLoopGroup();
EventLoop eventLoop = group.next();
// 可以主动创建Promise对象,Promise是一个结果容器
DefaultPromise<Integer> promise = new DefaultPromise<>(eventLoop);
new Thread(() -> {
log.debug("开始计算...");
try {
Thread.sleep(1000);
promise.setSuccess(50); // 调用promise方法填装结果
} catch (InterruptedException e) {
e.printStackTrace();
promise.setFailure(e); // 告知主线程程序异常
}
}).start();
// 主线程拿取结果
log.debug("等待结果...");
log.debug("结果是: {}", promise.get());
group.shutdownGracefully();
}
}
Handler & Pipeline
ChannelHandler 用来处理 Channel 上的各种事件,分为入站、出站两种。所有 ChannelHandler 被连成一串,就是 Pipeline:
- 入站处理器通常是 ChannelInboundHandlerAdapter 的子类,主要用来读取客户端数据,写回结果。
- 出站处理器通常是 ChannelOutboundHandlerAdapter 的子类,主要对写回结果进行加工。
打个比喻,每个 Channel 是一个产品的加工车间,Pipeline 是车间中的流水线,ChannelHandler 就是流水线上的各道工序,而后面要讲的 ByteBuf 是原材料,经过很多工序的加工:先经过一道道入站工序,再经过一道道出站工序最终变成产品。
@Slf4j
public class TestPipelineServer {
public static void main(String[] args) {
new ServerBootstrap()
.group(new NioEventLoopGroup())
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
// 通过channel拿到pipeline
ChannelPipeline pipeline = nioSocketChannel.pipeline();
// 添加处理器,Netty默认会给我们两个处理器 head -> tail
// 所谓的addLast,实际上是把处理器加在tail前 head -> h1 -> h2 -> tail
// 实际上Netty中handler是一个双向链表
pipeline.addLast("h1", new ChannelInboundHandlerAdapter() {
@Override // 入站处理器,从Channel读数据才会触发
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.debug("1");
super.channelRead(ctx, msg);
}
}).addLast("h2", new ChannelOutboundHandlerAdapter() {
@Override // 出站处理器,往Channel写数据才会触发
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
log.debug("2");
super.write(ctx, msg, promise);
}
});
}
})
.bind(8080);
}
}
Pipeline 中,入站处理器只有当我们从 Channel 中读数据才会生效,此时 Handler 链路是从 Head 往后走;出站处理器只有当我们往 Channel 中写数据才会生效,此时 Handler 链路是从 Tail 往前走。
InboundHandler
入站处理器的基本使用如下:
@Slf4j
public class TestPipelineServer {
public static void main(String[] args) {
new ServerBootstrap()
.group(new NioEventLoopGroup())
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
ChannelPipeline pipeline = nioSocketChannel.pipeline();
// 入站处理器,从Channel读数据才会触发
pipeline.addLast("h1", new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 将ByteBuf转成字符串
ByteBuf buf = (ByteBuf) msg;
String name = buf.toString(StandardCharsets.UTF_8);
// 把字符串交给下一个入站处理器
super.channelRead(ctx, name);
}
}).addLast("h2", new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 字符串封装为学生对象
Student student = new Student((String) msg);
super.channelRead(ctx, student);
}
}).addLast("h3", new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.debug("最终结果: {},class: {}", msg, msg.getClass());
}
});
}
})
.bind(8080);
}
@Data
@NoArgsConstructor
@AllArgsConstructor
static class Student {
private String name;
}
}
OutboundHandler
出站处理器的基本使用如下:
@Slf4j
public class TestPipelineServer {
public static void main(String[] args) {
new ServerBootstrap()
.group(new NioEventLoopGroup())
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
ChannelPipeline pipeline = nioSocketChannel.pipeline();
// 出站处理器,写数据时才会触发
// head -> h0 -> h1 -> h2 -> h3 -> tail
pipeline.addLast("h0", new ChannelInboundHandlerAdapter() {
@Override // 由入站处理器往客户端写数据
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.debug("h0");
// ctx.writeAndFlush(ctx.alloc().buffer().writeBytes("server...".getBytes()));
// ctx写出时是从当前处理器往前找,如果用ctx写,在本代码中是没办法执行下方的出站处理器
// channel的writeAndFlush方法是从tail往前找出站处理器
nioSocketChannel.writeAndFlush(ctx.alloc().buffer().writeBytes("server...".getBytes()));
super.channelRead(ctx, msg);
}
}).addLast("h1", new ChannelOutboundHandlerAdapter() {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
log.debug("h1");
super.write(ctx, msg, promise);
}
}).addLast("h2", new ChannelOutboundHandlerAdapter() {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
log.debug("h2");
super.write(ctx, msg, promise);
}
}).addLast("h3", new ChannelOutboundHandlerAdapter() {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
log.debug("h3");
super.write(ctx, msg, promise);
}
});
}
})
.bind(8080);
}
}
EmbeddedChannel
EmbeddedChannel 是为了方便我们进行调试而产生的:
@Slf4j
public class TestEmbeddedChannel {
public static void main(String[] args) {
ChannelInboundHandlerAdapter h1 = new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.debug("1");
super.channelRead(ctx, msg);
}
};
ChannelOutboundHandlerAdapter h2 = new ChannelOutboundHandlerAdapter() {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
log.debug("2");
super.write(ctx, msg, promise);
}
};
// 传递入站出站处理器
EmbeddedChannel embeddedChannel = new EmbeddedChannel(h1, h2);
// 模拟输入数据和写出数据
embeddedChannel.writeInbound(ByteBufAllocator.DEFAULT.buffer().writeBytes("hello".getBytes()));
embeddedChannel.writeOutbound(ByteBufAllocator.DEFAULT.buffer().writeBytes("hello".getBytes()));
}
}
ByteBuf
创建
ByteBuf 是对 NIO 中的 ByteBuffer 的增强。我们先来看其是怎么创建的:
public class TestByteBuf {
public static void main(String[] args) {
// ByteBuf的容量是可以动态扩容的
ByteBuf buf = ByteBufAllocator.DEFAULT.buffer();
System.out.println(buf);
StringBuilder sb = new StringBuilder();
for (int i = 0; i < 300; ++i) {
sb.append("a");
}
// 往buf里面写入内容
buf.writeBytes(sb.toString().getBytes());
System.out.println(buf); // 可以看到这里buf的容量扩容了
}
}
直接内存 VS 堆内存
可以使用下面的代码来创建池化基于堆的 ByteBuf:
ByteBuf buffer = ByteBufAllocator.DEFAULT.heapBuffer(10);
也可以使用下面的代码来创建池化基于直接内存的 ByteBuf:
ByteBuf buffer = ByteBufAllocator.DEFAULT.directBuffer(10);
- 直接内存创建和销毁的代价昂贵,但读写性能高(少一次内存复制),适合配合池化功能一起用。
- 直接内存对 GC 压力小,因为这部分内存不受 JVM 垃圾回收的管理,但也要注意及时主动释放。
池化 VS 非池化
池化的最大意义在于可以重用 ByteBuf,优点有:
- 没有池化,则每次都得创建新的 ByteBuf 实例,这个操作对直接内存代价昂贵,就算是堆内存,也会增加 GC 压力。
- 有了池化,则可以重用池中 ByteBuf 实例,并且采用了与 jemalloc 类似的内存分配算法提升分配效率。
- 高并发时,池化功能更节约内存,减少内存溢出的可能。
池化功能是否开启,可以通过下面的系统环境变量来设置:
-Dio.netty.allocator.type={unpooled|pooled}
- 4.1 以后,非 Android 平台默认启用池化实现,Android 平台启用非池化实现。
- 4.1 之前,池化功能还不成熟,默认是非池化实现。
ByteBuf 组成
ByteBuf 由四部分组成:容量、最大容量、读指针、写指针。最开始读写指针都在 0 位置。
0 read write
+---------+---------+----------+-----------+
| 废弃字节 | 可读字节 | 可写字节 | 可扩容字节 |
+---------+---------+----------+-----------+
|<----------capacity---------->|
|<---------------max capacity------------->|
相对于 ByteBuffer,ByteBuf 把读写指针分离开来,读取和写入不再需要转换读写模式。并且,ByteBuf 还可以动态扩容。
写入
方法列表,省略一些不重要的方法(这些方法的未指明返回值的,其返回值都是 ByteBuf,意味着可以链式调用):
方法签名 | 含义 | 备注 |
---|---|---|
writeBoolean(boolean value) |
写入 boolean 值 | 用一字节 `01 |
writeByte(int value) |
写入 byte 值 | |
writeShort(int value) |
写入 short 值 | |
writeInt(int value) |
写入 int 值 | Big Endian,即 0x250,写入后 00 00 02 50,网络编程主要用大端写入 |
writeIntLE(int value) |
写入 int 值 | Little Endian,即 0x250,写入后 50 02 00 00 |
writeLong(long value) |
写入 long 值 | |
writeChar(int value) |
写入 char 值 | |
writeFloat(float value) |
写入 float 值 | |
writeDouble(double value) |
写入 double 值 | |
writeBytes(ByteBuf src) |
写入 netty 的 ByteBuf | |
writeBytes(byte[] src) |
写入 byte[] |
|
writeBytes(ByteBuffer src) |
写入 nio 的 ByteBuffer | |
int writeCharSequence(CharSequence sequence, Charset charset) |
写入字符串 |
示例如下:
先写入 4 个字节:
buffer.writeBytes(new byte[]{1, 2, 3, 4});
log(buffer);
结果是:
read index:0 write index:4 capacity:10
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04 |.... |
+--------+-------------------------------------------------+----------------+
再写入一个 int 整数,也是 4 个字节:
buffer.writeInt(5);
log(buffer);
结果是:
read index:0 write index:8 capacity:10
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04 00 00 00 05 |........ |
+--------+-------------------------------------------------+----------------+
还有一类方法是 set 开头的一系列方法,也可以写入数据,但不会改变写指针位置。
扩容
再写入一个 int 整数时,容量不够了(初始容量是 10),这时会引发扩容:
buffer.writeInt(6);
log(buffer);
扩容规则是:
- 如何写入后数据大小未超过 512,则选择下一个 16 的整数倍,例如写入后大小为 12 ,则扩容后 capacity 是 16。
- 如果写入后数据大小超过 512,则选择下一个 $2^n$,例如写入后大小为 513,则扩容后 capacity 是 $2^10=1024$($2^9=512$ 已经不够了)。
- 扩容不能超过 max capacity,不然会报错。
结果是
read index:0 write index:12 capacity:16
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04 00 00 00 05 00 00 00 06 |............ |
+--------+-------------------------------------------------+----------------+
读取
例如读了 4 次,每次一个字节:
System.out.println(buffer.readByte());
System.out.println(buffer.readByte());
System.out.println(buffer.readByte());
System.out.println(buffer.readByte());
log(buffer);
读过的内容,就属于废弃部分了,再读只能读那些尚未读取的部分:
1
2
3
4
read index:4 write index:12 capacity:16
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 00 00 00 05 00 00 00 06 |........ |
+--------+-------------------------------------------------+----------------+
如果需要重复读取 int 整数 5,怎么办?
可以在 read 前先做个标记 mark:
buffer.markReaderIndex();
System.out.println(buffer.readInt());
log(buffer);
结果:
5
read index:8 write index:12 capacity:16
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 00 00 00 06 |.... |
+--------+-------------------------------------------------+----------------+
这时要重复读取的话,重置到标记位置 reset:
buffer.resetReaderIndex();
log(buffer);
这时:
read index:4 write index:12 capacity:16
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 00 00 00 05 00 00 00 06 |........ |
+--------+-------------------------------------------------+----------------+
还有种办法是采用 get 开头的一系列方法,这些方法不会改变 read index。
内存释放
由于 Netty 中有堆外内存的 ByteBuf 实现,堆外内存最好是手动来释放,而不是等 GC 垃圾回收。
- UnpooledHeapByteBuf 使用的是 JVM 内存,只需等 GC 回收内存即可。
- UnpooledDirectByteBuf 使用的就是直接内存了,需要特殊的方法来回收内存。
- PooledByteBuf 和它的子类使用了池化机制,需要更复杂的规则来回收内存。
Netty 采用了引用计数法来控制回收内存,每个 ByteBuf 都实现了 ReferenceCounted 接口:
- 每个 ByteBuf 对象的初始计数为 1。
- 调用 release 方法计数减 1,如果计数为 0,ByteBuf 内存被回收。
- 调用 retain 方法计数加 1,表示调用者没用完之前,其它 handler 即使调用了 release 也不会造成回收。
- 当计数为 0 时,底层内存会被回收,这时即使 ByteBuf 对象还在,其各个方法均无法正常使用。
ByteBuf 的使用是需要注意进行内存释放的,一般来说,谁是最后使用者,谁负责 release:大部分情况下,Netty 会使用引用计数的机制来管理 ByteBuf 的内存,如果引用计数变为 0,则会自动释放内存。如果主动调用 retain()
来增加引用计数,或者在处理完数据后不再传递给下一个 ChannelHandler,则需要在合适的时机调用 release()
手动释放。
具体分析如下:
- 起点,对于 NIO 实现来讲,在
io.netty.channel.nio.AbstractNioByteChannel.NioByteUnsafe
read 方法中首次创建 ByteBuf 放入 pipeline。 - 入站 ByteBuf 处理原则:
- 如果 ByteBuf 是由
channelRead
方法传递的消息对象 (msg),Netty 会自动管理它的生命周期,这时无须 release。 ctx.writeAndFlush()
会将response
交给 Netty 的处理链,由 Netty 自动管理其内存释放,这时也无需 release。- 对原始 ByteBuf 不做处理,调用
ctx.fireChannelRead(msg)
向后传递,这时无须 release。 - 将原始 ByteBuf 转换为其它类型的 Java 对象,这时 ByteBuf 就没用了,必须 release。
- 如果不调用
ctx.fireChannelRead(msg)
向后传递,那么也必须 release。 - 注意各种异常,如果 ByteBuf 没有成功传递到下一个 ChannelHandler,必须 release。
- 假设消息一直向后传,那么 TailContext 会负责释放未处理消息(原始的 ByteBuf)。
- 如果 ByteBuf 是由
- 出站 ByteBuf 处理原则:
- 出站消息最终都会转为 ByteBuf 输出,一直向前传,由 HeadContext flush 后 release。
- 异常处理原则:
- 有时候不清楚 ByteBuf 被引用了多少次,但又必须彻底释放,可以循环调用 release 直到返回 true。
零拷贝
slice
:调用该方法后可以复制 ByteBuf 的内容。对原始 ByteBuf 进行切片成多个 ByteBuf,切片后的 ByteBuf 并没有发生内存复制,还是使用原始 ByteBuf 的内存,切片后的 ByteBuf 维护独立的 read,write 指针。因为拷贝的时候并没有开辟新的内存空间,所以进行切片后也不允许对 ByteBuf 插入元素了。duplicate
:与slice
一样,没有新开辟内存空间。对比来看:slice
是零拷贝原来 ByteBuf 中的一小段;而duplicate
是零拷贝原来 ByteBuf 的所有内容。compositeBuffer
:合并多个 ByteBuf,不进行真正的数据拷贝而对多段 ByteBuf 进行逻辑合并。CompositeByteBuf 是一个组合的 ByteBuf,它内部维护了一个 Component 数组,每个 Component 管理一个 ByteBuf,记录了这个 ByteBuf 相对于整体偏移量等信息,代表着整体中某一段的数据。copy
:该方法会对底层内存数据进行深拷贝,因此无论读写,与原始 ByteBuf 无关。- Unpooled:工具类,类如其名,提供了非池化的 ByteBuf 创建、组合、复制等操作。
优势总结
ByteBuf 的优势如下:
- 池化 - 可以重用池中 ByteBuf 实例,更节约内存,减少内存溢出的可能。
- 读写指针分离,不需要像 ByteBuffer 一样切换读写模式。
- 可以自动扩容。
- 支持链式调用,使用更流畅。
- 很多地方体现零拷贝,例如 slice、duplicate、CompositeByteBuf。
双向通信
客户端向服务端发送数据,服务端接收后发回给客户端。
服务端代码:
public class HelloServer {
public static void main(String[] args) {
new ServerBootstrap()
.group(new NioEventLoopGroup())
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
// nioSocketChannel.pipeline().addLast(new StringDecoder());
// 读取到数据后写出
nioSocketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf message = (ByteBuf) msg;
System.out.println(message.toString(Charset.defaultCharset()));
ByteBuf buffer = ctx.alloc().buffer().writeBytes(message);
ctx.writeAndFlush(buffer);
}
});
}
})
.bind(8080);
}
}
客户端代码:
public class HelloClient {
public static void main(String[] args) throws InterruptedException {
NioEventLoopGroup group = new NioEventLoopGroup();
Channel channel = new Bootstrap()
.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override // 在连接建立后触发
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
nioSocketChannel.pipeline().addLast(new StringEncoder()); // 把字符串转换为ByteBuf
nioSocketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buffer = (ByteBuf) msg;
System.out.println(buffer.toString(Charset.defaultCharset()));
super.channelRead(ctx, msg);
}
});
}
})
.connect(new InetSocketAddress("127.0.0.1", 8080))
.sync()
.channel();
new Thread(() -> {
Scanner scanner = new Scanner(System.in);
while (true) {
String line = scanner.nextLine();
if ("q".equals(line)) {
channel.close();
break;
}
channel.writeAndFlush(line);
}
}).start();
// 优雅关闭
channel.closeFuture().sync();
group.shutdownGracefully();
}
}
Netty 进阶
黏包与半包
现象演示
黏包现象
服务端代码:
public class HelloWorldServer {
static final Logger log = LoggerFactory.getLogger(HelloWorldServer.class);
void start() {
NioEventLoopGroup boss = new NioEventLoopGroup(1);
NioEventLoopGroup worker = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.channel(NioServerSocketChannel.class);
serverBootstrap.group(boss, worker);
serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
log.debug("connected {}", ctx.channel());
super.channelActive(ctx);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
log.debug("disconnect {}", ctx.channel());
super.channelInactive(ctx);
}
});
}
});
ChannelFuture channelFuture = serverBootstrap.bind(8080);
log.debug("{} binding...", channelFuture.channel());
channelFuture.sync();
log.debug("{} bound...", channelFuture.channel());
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
log.error("server error", e);
} finally {
boss.shutdownGracefully();
worker.shutdownGracefully();
log.debug("stoped");
}
}
public static void main(String[] args) {
new HelloWorldServer().start();
}
}
客户端代码希望发送 10 个消息,每个消息是 16 字节:
public class HelloWorldClient {
static final Logger log = LoggerFactory.getLogger(HelloWorldClient.class);
public static void main(String[] args) {
NioEventLoopGroup worker = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.channel(NioSocketChannel.class);
bootstrap.group(worker);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
log.debug("connetted...");
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
log.debug("sending...");
Random r = new Random();
char c = 'a';
for (int i = 0; i < 10; i++) {
ByteBuf buffer = ctx.alloc().buffer();
buffer.writeBytes(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15});
ctx.writeAndFlush(buffer);
}
}
});
}
});
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8080).sync();
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
log.error("client error", e);
} finally {
worker.shutdownGracefully();
}
}
}
服务器端的某次输出,可以看到一次就接收了 160 个字节,而非分 10 次接收:
08:24:46 [DEBUG] [main] c.i.n.HelloWorldServer - [id: 0x81e0fda5] binding...
08:24:46 [DEBUG] [main] c.i.n.HelloWorldServer - [id: 0x81e0fda5, L:/0:0:0:0:0:0:0:0:8080] bound...
08:24:55 [DEBUG] [nioEventLoopGroup-3-1] i.n.h.l.LoggingHandler - [id: 0x94132411, L:/127.0.0.1:8080 - R:/127.0.0.1:58177] REGISTERED
08:24:55 [DEBUG] [nioEventLoopGroup-3-1] i.n.h.l.LoggingHandler - [id: 0x94132411, L:/127.0.0.1:8080 - R:/127.0.0.1:58177] ACTIVE
08:24:55 [DEBUG] [nioEventLoopGroup-3-1] c.i.n.HelloWorldServer - connected [id: 0x94132411, L:/127.0.0.1:8080 - R:/127.0.0.1:58177]
08:24:55 [DEBUG] [nioEventLoopGroup-3-1] i.n.h.l.LoggingHandler - [id: 0x94132411, L:/127.0.0.1:8080 - R:/127.0.0.1:58177] READ: 160B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f |................|
|00000010| 00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f |................|
|00000020| 00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f |................|
|00000030| 00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f |................|
|00000040| 00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f |................|
|00000050| 00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f |................|
|00000060| 00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f |................|
|00000070| 00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f |................|
|00000080| 00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f |................|
|00000090| 00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f |................|
+--------+-------------------------------------------------+----------------+
08:24:55 [DEBUG] [nioEventLoopGroup-3-1] i.n.h.l.LoggingHandler - [id: 0x94132411, L:/127.0.0.1:8080 - R:/127.0.0.1:58177] READ COMPLETE
半包现象
客户端代码希望发送 1 个消息,这个消息是 160 字节,代码改为:
ByteBuf buffer = ctx.alloc().buffer();
for (int i = 0; i < 10; i++) {
buffer.writeBytes(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15});
}
ctx.writeAndFlush(buffer);
为现象明显,服务端修改一下接收缓冲区,其它代码不变:
serverBootstrap.option(ChannelOption.SO_RCVBUF, 10);
服务器端的某次输出,可以看到接收的消息被分为两节,第一次 20 字节,第二次 140 字节:
08:43:49 [DEBUG] [main] c.i.n.HelloWorldServer - [id: 0x4d6c6a84] binding...
08:43:49 [DEBUG] [main] c.i.n.HelloWorldServer - [id: 0x4d6c6a84, L:/0:0:0:0:0:0:0:0:8080] bound...
08:44:23 [DEBUG] [nioEventLoopGroup-3-1] i.n.h.l.LoggingHandler - [id: 0x1719abf7, L:/127.0.0.1:8080 - R:/127.0.0.1:59221] REGISTERED
08:44:23 [DEBUG] [nioEventLoopGroup-3-1] i.n.h.l.LoggingHandler - [id: 0x1719abf7, L:/127.0.0.1:8080 - R:/127.0.0.1:59221] ACTIVE
08:44:23 [DEBUG] [nioEventLoopGroup-3-1] c.i.n.HelloWorldServer - connected [id: 0x1719abf7, L:/127.0.0.1:8080 - R:/127.0.0.1:59221]
08:44:24 [DEBUG] [nioEventLoopGroup-3-1] i.n.h.l.LoggingHandler - [id: 0x1719abf7, L:/127.0.0.1:8080 - R:/127.0.0.1:59221] READ: 20B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f |................|
|00000010| 00 01 02 03 |.... |
+--------+-------------------------------------------------+----------------+
08:44:24 [DEBUG] [nioEventLoopGroup-3-1] i.n.h.l.LoggingHandler - [id: 0x1719abf7, L:/127.0.0.1:8080 - R:/127.0.0.1:59221] READ COMPLETE
08:44:24 [DEBUG] [nioEventLoopGroup-3-1] i.n.h.l.LoggingHandler - [id: 0x1719abf7, L:/127.0.0.1:8080 - R:/127.0.0.1:59221] READ: 140B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f 00 01 02 03 |................|
|00000010| 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f 00 01 02 03 |................|
|00000020| 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f 00 01 02 03 |................|
|00000030| 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f 00 01 02 03 |................|
|00000040| 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f 00 01 02 03 |................|
|00000050| 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f 00 01 02 03 |................|
|00000060| 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f 00 01 02 03 |................|
|00000070| 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f 00 01 02 03 |................|
|00000080| 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f |............ |
+--------+-------------------------------------------------+----------------+
08:44:24 [DEBUG] [nioEventLoopGroup-3-1] i.n.h.l.LoggingHandler - [id: 0x1719abf7, L:/127.0.0.1:8080 - R:/127.0.0.1:59221] READ COMPLETE
实际上,不仅仅是 Netty,主要是用 TCP / IP 编程,都会有黏包半包问题(UDP 没有该问题)。
现象分析
粘包
- 现象:发送
abc def
,接收abcdef
。 - 原因:
- 应用层:接收方 ByteBuf 设置太大(Netty 默认 1024)。
- 滑动窗口:假设发送方 256 bytes 表示一个完整报文,但由于接收方处理不及时且窗口大小足够大,这 256 bytes 字节就会缓冲在接收方的滑动窗口中,当滑动窗口中缓冲了多个报文就会粘包。
- Nagle 算法:会造成粘包。
半包
- 现象,发送
abcdef
,接收abc def
。 - 原因:
- 应用层:接收方 ByteBuf 小于实际发送数据量。
- 滑动窗口:假设接收方的窗口只剩了 128 bytes,发送方的报文大小是 256 bytes,这时放不下了,只能先发送前 128 bytes,等待 ack 后才能发送剩余部分,这就造成了半包。
- MSS 限制:当发送的数据超过 MSS 限制后,会将数据切分发送,就会造成半包。
本质是因为 TCP 是流式协议,消息无边界。
现象解决
对于黏包和半包,有多种解决方案,我们先来看第一种——短连接。
短连接
我们之前说过,服务端如果在客户端断开连接后读取消息,那么会读取到 -1。如果我们发一个包建立一次连接,发完再断开连接,这样保证服务端在每一次都可以收到完整信息:
public class HelloWorldClient {
static final Logger log = LoggerFactory.getLogger(HelloWorldClient.class);
public static void main(String[] args) {
// 发10次短连接
for (int i = 0; i < 10; i++) {
sendMessage();
}
System.out.println("finish");
}
private static void sendMessage() {
NioEventLoopGroup worker = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.channel(NioSocketChannel.class);
bootstrap.group(worker);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
log.debug("connetted...");
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
// 连接建立好后会触发
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// 连接建立后只发一次
ByteBuf buf = ctx.alloc().buffer(16);
buf.writeBytes(new byte[] {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15});
ctx.writeAndFlush(buf);
// 发完立刻断开连接
ctx.channel().close();
}
});
}
});
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8080).sync();
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
log.error("client error", e);
} finally {
worker.shutdownGracefully();
}
}
}
这种方案可以解决黏包问题,但是无法解决半包问题,并且还很消耗资源,需要很多次断开和建立连接。我们接下看第二种解决方案——定长解码器。
定长解码器
服务端和客户端约定好消息发送的长度,服务端按照这个固定的长度来接收消息和解析消息。
让所有数据包长度固定(假设长度为 8 字节),服务器端加入:
// 放在LoggingHandler之前
ch.pipeline().addLast(new FixedLengthFrameDecoder(8));
客户端测试代码,注意, 采用这种方法后,客户端什么时候 flush 都可以
public class HelloWorldClient {
static final Logger log = LoggerFactory.getLogger(HelloWorldClient.class);
public static void main(String[] args) {
NioEventLoopGroup worker = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.channel(NioSocketChannel.class);
bootstrap.group(worker);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
log.debug("connetted...");
ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
log.debug("sending...");
// 发送内容随机的数据包
Random r = new Random();
char c = 'a';
ByteBuf buffer = ctx.alloc().buffer();
for (int i = 0; i < 10; i++) {
byte[] bytes = new byte[8];
for (int j = 0; j < r.nextInt(8); j++) {
bytes[j] = (byte) c;
}
c++;
buffer.writeBytes(bytes);
}
ctx.writeAndFlush(buffer);
}
});
}
});
ChannelFuture channelFuture = bootstrap.connect("localhost", 9090).sync();
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
log.error("client error", e);
} finally {
worker.shutdownGracefully();
}
}
}
对于本方法,缺点也很明显,那就是我们很难预估数据的长度是多少。预估的数据太长了,消耗资源;太短了,数据包有时无法发送完整。
接下来我们来看第三种解决方案——固定分隔符。
固定分隔符
服务端加入,默认以 \n
或 \r\n
作为分隔符,如果超出指定长度仍未出现分隔符,则抛出异常:
ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
客户端在每条消息之后,加入 \n
分隔符即可。
这种方法也能正确解析数据,但是缺点就是如果发送的数据包内容中也包含 \n
,就会被识别为分隔符,导致数据出错。
我们来看最后一种解决方案——LTC 解码器。
LTC 解码器
主要原理就是发送消息的时候,在消息的头部加上消息的长度,读取的时候先读取头部,就能知道消息一共有多长了。
在发送消息前,先约定用定长字节表示接下来数据的长度:
// 参数解析如下
// 1.最大长度
// 2.长度偏移(长度占用字节中可能有一部分要装载header,长度偏移用来跳过长度占用字节中的header,读取实际的长度字节)
// 3.长度占用字节
// 4.长度调整(内容里面有些时候也要装载header,长度调整表示以长度字段为基准,还有几个字节是内容,用于跳过内容中的header)
// 5.剥离字节数(解析的时候抛弃多少个字节,一般抛弃前面的长度占用字节)
ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024, 0, 1, 0, 1));
示例代码,使用 EmbeddedChannel 进行测试:
public class TestLengthFieldDecoder {
public static void main(String[] args) {
EmbeddedChannel channel = new EmbeddedChannel(
new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4),
new LoggingHandler(LogLevel.DEBUG)
);
// 4个字节的内容长度+实际内容
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
send(buffer, "Hello, world");
send(buffer, "Hi!");
// 写入
channel.writeInbound(buffer);
}
private static void send(ByteBuf buffer, String str) {
byte[] bytes = str.getBytes(); // 实际内容
int length = bytes.length; // 实际内容长度,int类型4个字节
// 先写入长度,再写入内容
buffer.writeInt(length);
buffer.writeBytes(bytes);
}
}
协议设计与解析
协议能够规定发送方和接收方发送和接收的数据格式,保证数据正确传输。接下来我们以几个示例来讲解在网络通信中的协议使用。
协议示例
Redis 协议示例
在 Redis 中,我们经常会发送如下指令:set key value
。
Redis 的协议是这么处理的:
- Redis 将整个命令看成一个数组,先要发送
*数组元素个数
,在这里我们的指令是*3
。 - 接下来发送每一个命令以及每个键值的长度,例如:先发送
$3
,再发送set
。
示例代码如下:
NioEventLoopGroup worker = new NioEventLoopGroup();
byte[] LINE = {13, 10}; // 换行
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.channel(NioSocketChannel.class);
bootstrap.group(worker);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ch.pipeline().addLast(new LoggingHandler());
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
// 会在连接 channel 建立成功后,会触发 active 事件
@Override
public void channelActive(ChannelHandlerContext ctx) {
set(ctx);
get(ctx);
}
private void get(ChannelHandlerContext ctx) {
ByteBuf buf = ctx.alloc().buffer();
buf.writeBytes("*2".getBytes());
buf.writeBytes(LINE);
buf.writeBytes("$3".getBytes());
buf.writeBytes(LINE);
buf.writeBytes("get".getBytes());
buf.writeBytes(LINE);
buf.writeBytes("$3".getBytes());
buf.writeBytes(LINE);
buf.writeBytes("aaa".getBytes());
buf.writeBytes(LINE);
ctx.writeAndFlush(buf);
}
private void set(ChannelHandlerContext ctx) {
ByteBuf buf = ctx.alloc().buffer();
buf.writeBytes("*3".getBytes());
buf.writeBytes(LINE);
buf.writeBytes("$3".getBytes());
buf.writeBytes(LINE);
buf.writeBytes("set".getBytes());
buf.writeBytes(LINE);
buf.writeBytes("$3".getBytes());
buf.writeBytes(LINE);
buf.writeBytes("aaa".getBytes());
buf.writeBytes(LINE);
buf.writeBytes("$3".getBytes());
buf.writeBytes(LINE);
buf.writeBytes("bbb".getBytes());
buf.writeBytes(LINE);
ctx.writeAndFlush(buf);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
System.out.println(buf.toString(Charset.defaultCharset()));
}
});
}
});
// 连接本机Redis,往Redis的网络服务中发送该数据
ChannelFuture channelFuture = bootstrap.connect("localhost", 6379).sync();
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
log.error("client error", e);
} finally {
worker.shutdownGracefully();
}
Http 协议
Netty 当中已经帮我们编写好对 Http 协议的编解码器了,我们只需要调取并使用即可:
public class HelloWorldServer {
static final Logger log = LoggerFactory.getLogger(HelloWorldServer.class);
void start() {
NioEventLoopGroup boss = new NioEventLoopGroup(1);
NioEventLoopGroup worker = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.channel(NioServerSocketChannel.class);
serverBootstrap.group(boss, worker);
serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
// 配置Http协议编解码器
ch.pipeline().addLast(new HttpServerCodec());
// 配置自定义handler对解码后的数据进行处理
/*ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.debug("{}", msg.getClass());
if (msg instanceof HttpRequest) { // 请求头
}
else if (msg instanceof HttpContent) { // 请求体
}
}
});*/
// 也可以直接通过指定感兴趣的消息类型来进行消息区分
ch.pipeline().addLast(new SimpleChannelInboundHandler<HttpRequest>() {
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext,
HttpRequest httpRequest) throws Exception {
// 获取请求信息
log.debug(httpRequest.uri());
// 向浏览器返回响应
DefaultFullHttpResponse response = new DefaultFullHttpResponse(
httpRequest.protocolVersion(), HttpResponseStatus.OK
);
// 响应内容
byte[] bytes = "<h1>Hello World!</h1>".getBytes();
response.headers().setInt(HttpHeaderNames.CONTENT_LENGTH, bytes.length);
response.content().writeBytes(bytes);
// 写回响应
channelHandlerContext.writeAndFlush(response);
}
});
}
});
ChannelFuture channelFuture = serverBootstrap.bind(8080);
log.debug("{} binding...", channelFuture.channel());
channelFuture.sync();
log.debug("{} bound...", channelFuture.channel());
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
log.error("server error", e);
} finally {
boss.shutdownGracefully();
worker.shutdownGracefully();
log.debug("stoped");
}
}
public static void main(String[] args) {
new HelloWorldServer().start();
}
}
自定义协议
自定义协议需要有:
- 魔数,用来在第一时间判定是否是无效数据包。
- 版本号,可以支持协议的升级。
- 序列化算法,消息正文到底采用哪种序列化反序列化方式,可以由此扩展,例如:json、protobuf、hessian、jdk。
- 指令类型,是登录、注册、单聊、群聊… 跟业务相关。
- 请求序号,为了双工通信,提供异步能力。
- 正文长度。
- 消息正文。
对于通信时的消息,我们需要一个父类来进行封装:
@Data
public abstract class Message implements Serializable {
public static Class<?> getMessageClass(int messageType) {
return messageClasses.get(messageType);
}
// 请求序号
private int sequenceId;
private int messageType;
// 抽象方法,子类实现,用于获取消息类型
public abstract int getMessageType();
// 消息指令类型
public static final int LoginRequestMessage = 0;
public static final int LoginResponseMessage = 1;
public static final int ChatRequestMessage = 2;
public static final int ChatResponseMessage = 3;
public static final int GroupCreateRequestMessage = 4;
public static final int GroupCreateResponseMessage = 5;
public static final int GroupJoinRequestMessage = 6;
public static final int GroupJoinResponseMessage = 7;
public static final int GroupQuitRequestMessage = 8;
public static final int GroupQuitResponseMessage = 9;
public static final int GroupChatRequestMessage = 10;
public static final int GroupChatResponseMessage = 11;
public static final int GroupMembersRequestMessage = 12;
public static final int GroupMembersResponseMessage = 13;
private static final Map<Integer, Class<?>> messageClasses = new HashMap<>();
// 指令类型封装
static {
messageClasses.put(LoginRequestMessage, LoginRequestMessage.class);
messageClasses.put(LoginResponseMessage, LoginResponseMessage.class);
messageClasses.put(ChatRequestMessage, ChatRequestMessage.class);
messageClasses.put(ChatResponseMessage, ChatResponseMessage.class);
messageClasses.put(GroupCreateRequestMessage, GroupCreateRequestMessage.class);
messageClasses.put(GroupCreateResponseMessage, GroupCreateResponseMessage.class);
messageClasses.put(GroupJoinRequestMessage, GroupJoinRequestMessage.class);
messageClasses.put(GroupJoinResponseMessage, GroupJoinResponseMessage.class);
messageClasses.put(GroupQuitRequestMessage, GroupQuitRequestMessage.class);
messageClasses.put(GroupQuitResponseMessage, GroupQuitResponseMessage.class);
messageClasses.put(GroupChatRequestMessage, GroupChatRequestMessage.class);
messageClasses.put(GroupChatResponseMessage, GroupChatResponseMessage.class);
messageClasses.put(GroupMembersRequestMessage, GroupMembersRequestMessage.class);
messageClasses.put(GroupMembersResponseMessage, GroupMembersResponseMessage.class);
}
}
子类来实现父类的 Message:
// 登录请求消息封装
@Data
@ToString(callSuper = true)
public class LoginRequestMessage extends Message {
private String username;
private String password;
private String nickname;
public LoginRequestMessage() {
}
public LoginRequestMessage(String username, String password, String nickname) {
this.username = username;
this.password = password;
this.nickname = nickname;
}
@Override
public int getMessageType() {
return LoginRequestMessage;
}
}
编解码器
通过继承 ByteToMessageCodec 并提供消息泛型(上述的 Message 父类)来编写我们自己的协议编解码器:
/**
* 编解码器,ByteToMessageCodec,泛型指的是我们自定义的消息类型
*/
@Slf4j
public class MessageCodec extends ByteToMessageCodec<Message> {
// 出站前进行编码
@Override
protected void encode(ChannelHandlerContext channelHandlerContext,
Message message, ByteBuf byteBuf) throws Exception {
// 魔数,四个字节的魔数
byteBuf.writeBytes(new byte[] {0, 4, 2, 1});
// 协议版本
byteBuf.writeByte(1);
// 序列化算法 0-jdk序列化,1-json序列化
byteBuf.writeByte(0);
// 指令类型
byteBuf.writeByte(message.getMessageType());
// 请求序号,四个字节
byteBuf.writeInt(message.getSequenceId());
byteBuf.writeByte(0xff); // 无意义,为了对齐,使得正文之外的字节数为2的整数倍
// 将正文序列化
ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(bos);
oos.writeObject(message);
byte[] bytes = bos.toByteArray();
// 正文长度
byteBuf.writeInt(bytes.length);
// 写入内容
byteBuf.writeBytes(bytes);
}
// 入站时用来解码
@Override
protected void decode(ChannelHandlerContext channelHandlerContext,
ByteBuf byteBuf, List<Object> list) throws Exception {
// 读四个字节的魔数
int magicNumber = byteBuf.readInt();
// 读一个字节的版本
byte version = byteBuf.readByte();
// 读一个字节的序列化方式
byte serializerType = byteBuf.readByte();
// 读一个字节的指令类型
byte messageType = byteBuf.readByte();
// 读四个字节的序列号
int sequenceId = byteBuf.readInt();
// 跳过无意义的对齐字符
byteBuf.readByte();
// 读取四个字节的内容长度
int length = byteBuf.readInt();
// 读正文
byte[] bytes = new byte[length];
byteBuf.readBytes(bytes, 0, length);
// 序列化正文,jdk序列化
ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
ObjectInputStream ois = new ObjectInputStream(bis);
Message message = (Message) ois.readObject();
// 加入消息列表中,为了给下一个Handler使用
list.add(message);
log.debug("{}, {}, {}, {}, {}, {}", magicNumber, version, serializerType, messageType, sequenceId, length);
log.debug("{}", message);
}
}
最后,使用 EmbeddedChannel 来进行测试:
public class TestMessageCodec {
public static void main(String[] args) throws Exception {
EmbeddedChannel channel = new EmbeddedChannel(
// 避免出现黏包半包
new LengthFieldBasedFrameDecoder(1024, 12, 4, 0, 0),
new LoggingHandler(),
new MessageCodec()
);
// 出站编码
LoginRequestMessage message = new LoginRequestMessage("zhangsan", "123", "张三");
channel.writeOutbound(message);
// 入站解码
ByteBuf buf = ByteBufAllocator.DEFAULT.buffer();
new MessageCodec().encode(null, message, buf);
channel.writeInbound(buf);
}
}
@Sharable
在我们编码过程中,有时一个 Handler 需要被多个 Channel 使用,但是如果我们只是单单是把 Handler 提取出来后添加到多个 Channel 中,这实际上是线程不安全的。比如 LengthFieldBasedFrameDecoder,这类 Handler 会记录读入数据的状态(记录上一次没有处理完的数据),导致多个 EventLoop 在使用 Handler 的时候可能造成一个数据多个线程处理,线程不安全。
当然,并不是每一个 Handler 都是线程不安全的,例如 LoggingHandler,它只是用来记录日志的,而不是对数据状态进行记录,自然可以被多个线程使用。
在 Netty 中,如果被 @Sharable 注解标记的 Handler,意味着可以被多个线程使用,是线程安全的。
上述的编解码器继承自 ByteToMessageCodec,其设计认为子类可能是线程不安全的,所以不允许我们自己加 @Sharable,如果我们自己的编解码器已经明确不会造成线程不安全,那么可以继承 MessageToMessageCodec,从而可以添加 @Sharable:
@ChannelHandler.Sharable
@Slf4j
public class MessageCodecSharable extends MessageToMessageCodec<ByteBuf, Message> {
@Override
protected void encode(ChannelHandlerContext channelHandlerContext,
Message message, List<Object> list) throws Exception {
// 手动创建ByteBuf
ByteBuf byteBuf = channelHandlerContext.alloc().buffer();
// ...代码与之前一致
// 写出byteBuf,传递给下一个Handler
list.add(byteBuf);
}
@Override
protected void decode(ChannelHandlerContext channelHandlerContext,
ByteBuf byteBuf, List<Object> list) throws Exception {
// ...代码内容与之前完全一致
}
}
聊天业务
登录
在客户端启动后,马上发送一段登录消息给服务端,表示客户端要尝试登录,服务端校验客户端的输入之后,向客户端返回是否登录成功的提示。
客户端 Handler:
@Slf4j
public class ClientLoginHandler extends ChannelInboundHandlerAdapter {
// 触发连接事件
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// 创建一个独立的线程独立接收用户输入,防止占用NioEventLoop的资源
new Thread(() -> {
Scanner scanner = new Scanner(System.in);
System.out.println("请输入用户名: ");
String username = scanner.nextLine();
System.out.println("请输入密码: ");
String password = scanner.nextLine();
// 构造消息对象
LoginRequestMessage message = new LoginRequestMessage(username, password);
// 发送消息
ctx.writeAndFlush(message);
System.out.println("等待后续操作...");
try {
System.in.read();
} catch (IOException e) {
throw new RuntimeException(e);
}
}, "system in").start();
}
// 接收客户端消息
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.debug("msg: {}", msg);
super.channelRead(ctx, msg);
}
}
服务端 Handler:
@ChannelHandler.Sharable
public class ServerLoginHandler extends SimpleChannelInboundHandler<LoginRequestMessage> {
@Override
protected void channelRead0(ChannelHandlerContext ctx,
LoginRequestMessage msg) throws Exception {
String username = msg.getUsername();
String password = msg.getPassword();
UserService userService = UserServiceFactory.getUserService();
boolean login = userService.login(username, password);
// 根据登录是否成功,向客户端返回消息
LoginResponseMessage responseMessage = new LoginResponseMessage(login, login ? "登录成功" : "用户名或密码错误");
ctx.writeAndFlush(responseMessage);
// 绑定会话和Channel
Session session = SessionFactory.getSession();
session.bind(ctx.channel(), username);
}
}
线程通信
当我们完成登录后,应该让用户可以接下去继续输入,我们利用 CountDownLatch 倒计时锁来进行两个线程之间的通信,判断用户是否登录成功。客户端 Handler 更改:
@Slf4j
public class ClientLoginHandler extends ChannelInboundHandlerAdapter {
// 倒计时锁,不为0时等待
private CountDownLatch WAIT_FOR_LOGIN = new CountDownLatch(1);
// 表示初始暂未登录
private AtomicBoolean LOGIN = new AtomicBoolean(false);
// 触发连接事件
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// 创建一个线程独立接收用户输入
new Thread(() -> {
Scanner scanner = new Scanner(System.in);
System.out.println("请输入用户名: ");
String username = scanner.nextLine();
System.out.println("请输入密码: ");
String password = scanner.nextLine();
// 构造消息对象
LoginRequestMessage message = new LoginRequestMessage(username, password);
// 发送消息
ctx.writeAndFlush(message);
try {
WAIT_FOR_LOGIN.await(); // 等待计数减为0
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
// 如果登录失败,关闭Channel
if (!LOGIN.get()) {
ctx.channel().close();
// 注意这里要加return!!!不然只关闭Channel,本线程并没有关闭
return;
}
while (true) { // 打印功能菜单
// ...
}
}, "system in").start();
}
// 接收客户端消息
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.debug("msg: {}", msg);
if (msg instanceof LoginResponseMessage response) {
if (response.isSuccess()) {
LOGIN.set(true);
}
WAIT_FOR_LOGIN.countDown(); // 计数减1,唤醒用户输入线程
}
}
}
业务消息发送
当用户登录完成后,根据功能菜单的显示来发送不同的命令需求:
while (true) {
// 打印功能菜单
System.out.println("==================================");
System.out.println("send [username] [content]");
System.out.println("gsend [group name] [content]");
System.out.println("gcreate [group name] [m1,m2,m3...]");
System.out.println("gmembers [group name]");
System.out.println("gjoin [group name]");
System.out.println("gquit [group name]");
System.out.println("quit");
System.out.println("==================================");
// 读取用户输入命令,解析命令
String command = scanner.nextLine();
if (command.equals("quit")) {
ctx.channel().close(); // 关闭连接
return; // 退出当前线程
}
handleCommand(command, username, ctx);
}
private void handleCommand(String command, String username, ChannelHandlerContext ctx) {
String[] split = command.split(" ");
switch (split[0]) {
case "send": // 发送聊天消息 from to content
ctx.writeAndFlush(new ChatRequestMessage(username, split[1], split[2]));
break;
case "gsend": // 往聊天组发送消息
ctx.writeAndFlush(new GroupChatRequestMessage(username, split[1], split[2]));
break;
case "gcreate": // 创建聊天组
Set<String> set = new HashSet<>(Arrays.asList(split[2].split(",")));
set.add(username); // 加入自己
ctx.writeAndFlush(new GroupCreateRequestMessage(split[1], set));
break;
case "gmembers": // 获取聊天组成员
ctx.writeAndFlush(new GroupMembersRequestMessage(split[1]));
break;
case "gjoin": // 加入聊天组
ctx.writeAndFlush(new GroupJoinRequestMessage(username, split[1]));
break;
case "gquit": // 退出聊天组
ctx.writeAndFlush(new GroupQuitRequestMessage(username, split[1]));
break;
}
}
单聊消息处理
服务端新建一个处理器处理单聊消息:
@ChannelHandler.Sharable
public class ServerChatRequestMessageHandler extends SimpleChannelInboundHandler<ChatRequestMessage> {
@Override
protected void channelRead0(ChannelHandlerContext ctx,
ChatRequestMessage msg) throws Exception {
String to = msg.getTo();
Session session = SessionFactory.getSession();
// 根据用户名获取对方的Channel
Channel channel = session.getChannel(to);
// 对方在线
if (channel != null) {
channel.writeAndFlush(new ChatResponseMessage(msg.getFrom(), msg.getContent()));
}
// 对方不在线
else {
ctx.writeAndFlush(new ChatResponseMessage(false, "对方不存在或不在线"));
}
}
}
群聊建群处理
服务端处理群聊创建:
@ChannelHandler.Sharable
public class ServerChatCreateRequestHandler extends SimpleChannelInboundHandler<GroupCreateRequestMessage> {
@Override
protected void channelRead0(ChannelHandlerContext ctx,
GroupCreateRequestMessage msg) throws Exception {
String groupName = msg.getGroupName();
Set<String> members = msg.getMembers();
GroupSession groupSession = GroupSessionFactory.getGroupSession();
Group group = groupSession.createGroup(groupName, members);
// 发送成功/失败消息
ctx.writeAndFlush(new GroupCreateResponseMessage(group == null, groupName + (group == null ? "创建成功" : "创建失败")));
if (group == null) {
// 通知所有人
List<Channel> channels = groupSession.getMembersChannel(groupName);
for (Channel channel : channels) {
channel.writeAndFlush(new GroupCreateResponseMessage(true, "您已被拉入" + groupName));
}
}
}
}
群聊消息处理
服务端处理群聊消息的接收:
@ChannelHandler.Sharable
public class ServerGroupChatRequestHandler extends SimpleChannelInboundHandler<GroupChatRequestMessage> {
@Override
protected void channelRead0(ChannelHandlerContext ctx,
GroupChatRequestMessage msg) throws Exception {
// 找到所有成员的Channel
List<Channel> channels = GroupSessionFactory.getGroupSession()
.getMembersChannel(msg.getGroupName());
for (Channel channel : channels) {
channel.writeAndFlush(new GroupChatResponseMessage(msg.getFrom(), msg.getContent()));
}
}
}
退出
服务端需要处理客户端的正常和异常退出:
@Slf4j
@ChannelHandler.Sharable
public class ServerQuitHandler extends ChannelInboundHandlerAdapter {
// 连接正常断开时会触发
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
// 在会话管理器中移除会话
SessionFactory.getSession().unbind(ctx.channel());
log.debug("{}已经断开", ctx.channel());
}
// 连接异常断开时会触发
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
SessionFactory.getSession().unbind(ctx.channel());
log.debug("{} 已经断开,异常 {}", ctx.channel(), cause.getMessage());
}
}
空闲检测
在网络应用中,有时候会出现连接假死的情况:
- 网络设备出现故障,例如网卡,机房等,底层的 TCP 连接已经断开了,但应用程序没有感知到,仍然占用着资源。
- 公网网络不稳定,出现丢包。如果连续出现丢包,这时现象就是客户端数据发不出去,服务端也一直收不到数据,就这么一直耗着。
- 应用程序线程阻塞,无法进行数据读写。
产生的问题有:
- 假死的连接占用的资源不能自动释放。
- 向假死的连接发送数据,得到的反馈是发送超时。
Netty 提供了空闲状态检测器用来判断是否空闲时间过长:
serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new ProtocolFrameDecoder());
ch.pipeline().addLast(LOGGING_HANDLER);
ch.pipeline().addLast(MESSAGE_CODEC);
// 参数解析
// 1.读空闲时间,表示客户端有多长时间没有发数据给服务端
// 2.写空闲时间,表示服务端有多长时间没有向客户端发送时间
// 3.读写空闲时间,上述二者结合
// 读超时触发READER_IDLE事件,写超时触发WRITER_IDLE事件
ch.pipeline().addLast(new IdleStateHandler(5, 0, 0));
ch.pipeline().addLast(SERVER_IDLE_HANDLER);
}
});
// 处理读写空闲事件,是双向Handler,继承ChannelDuplexHandler
@Slf4j
@ChannelHandler.Sharable
public class ServerIdleHandler extends ChannelDuplexHandler {
// 用来触发特殊事件
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
// 拿到事件消息类型
IdleStateEvent event = (IdleStateEvent) evt;
// 触发读空闲事件
if (event.state() == IdleState.READER_IDLE) {
log.debug("已经5秒没有读到数据了");
// 释放服务器资源
ctx.channel().close();
}
}
}
当然,为了避免误杀,客户端需要进行心跳检测:
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new ProtocolFrameDecoder());
ch.pipeline().addLast(LOGGING_HANDLER);
ch.pipeline().addLast(MESSAGE_CODEC);
ch.pipeline().addLast(new IdleStateHandler(0, 3, 0));
ch.pipeline().addLast(CLIENT_IDLE_HANDLER);
ch.pipeline().addLast("client handler", new ClientLoginHandler());
}
});
@Slf4j
@ChannelHandler.Sharable
public class ClientIdleHandler extends ChannelDuplexHandler {
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
// 3秒服务器没有返回数据时,要向服务器证明连接正常,避免断开连接
IdleStateEvent event = (IdleStateEvent) evt;
if (event.state() == IdleState.WRITER_IDLE) {
// 发送心跳消息
ctx.writeAndFlush(new PingMessage());
}
}
}
Netty 优化
扩展序列化算法
序列化,反序列化主要用在消息正文的转换上:
- 序列化时,需要将 Java 对象变为要传输的数据(可以是
byte[]
,或json
等,最终都需要变成byte[]
)。 - 反序列化时,需要将传入的正文数据还原成 Java 对象,便于处理。
为了支持更多序列化算法,抽象一个 Serializer 接口:
public interface Serializer {
// 反序列化方法
<T> T deserialize(Class<T> clazz, byte[] bytes);
// 序列化方法
<T> byte[] serialize(T object);
}
补充 jdk 序列化方法:
public interface Serializer {
// 反序列化方法
<T> T deserialize(Class<T> clazz, byte[] bytes);
// 序列化方法
<T> byte[] serialize(T object);
// 枚举
enum Algorithm implements Serializer {
// Java 序列化
Java {
@Override
public <T> T deserialize(Class<T> clazz, byte[] bytes) {
try {
ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes));
return (T) ois.readObject();
} catch (IOException | ClassNotFoundException e) {
throw new RuntimeException("反序列化失败", e);
}
}
@Override
public <T> byte[] serialize(T object) {
try {
ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(bos);
oos.writeObject(object);
return bos.toByteArray();
} catch (IOException e) {
throw new RuntimeException("序列化失败", e);
}
}
}
}
}
序列化和反序列化代码改进:
// 将正文序列化
byte[] bytes = Serializer.Algorithm.Java.serialize(message);
// 反序列化正文,jdk序列化
Message message = Serializer.Algorithm.Java.deserialize(Message.class, bytes);
当然也可以使用 gson 进行 json 序列化:
enum Algorithm implements Serializer {
Json {
@Override
public <T> T deserialize(Class<T> clazz, byte[] bytes) {
String json = new String(bytes, StandardCharsets.UTF_8);
return new Gson().fromJson(json, clazz);
}
@Override
public <T> byte[] serialize(T object) {
String json = new Gson().toJson(object);
return json.getBytes(StandardCharsets.UTF_8);
}
}
}
参数调优
对于客户端而言,使用的是 Bootstrap 类,直接调用 option
方法配置参数即可。
而对于服务端,使用的是 ServerBootstrap 类,其中有两个可以配置参数的方法,调用 option
方法是给 ServerSocketChannel 配置参数,调用 childOption
方法是给 SocketChannel 配置参数。
CONNECT_TIMEOUT_MILLIS
属于 SocketChannal 参数。
用在客户端建立连接时,如果在指定毫秒内无法连接,会抛出 timeout 异常。
有一个比较类似的参数——SO_TIMEOUT,它主要用在阻塞 IO,阻塞 IO 中 accept,read 等都是无限等待的,如果不希望永远阻塞,使用它调整超时时间(这个参数在 NIO 和 Netty 中没什么用)。
客户端代码如下:
@Slf4j
public class TestConnectionTimeout {
public static void main(String[] args) {
// 客户端通过.option方法配置参数
NioEventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap()
.group(group)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 300) // 300ms超时
.channel(NioSocketChannel.class)
.handler(new LoggingHandler());
ChannelFuture future = bootstrap.connect("127.0.0.1", 8080);
future.sync().channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
log.debug("timeout");
} finally {
group.shutdownGracefully();
}
}
}
CONNECT_TIMEOUT_MILLIS 的实现实际上是一个定时任务,根据配置的时间,如果超时了,就抛出异常断开连接。
SO_BACKLOG
属于 ServerSocketChannel 参数,是在服务器端调用 option
方法来配置的。
先来回顾一下 TCP 三次握手的流程(sync queue 为半连接队列,accept queue 为全连接队列):
- 第一次握手,client 发送 SYN 到 server,状态修改为 SYN_SEND,server 收到,状态改变为 SYN_REVD,并将该请求放入 sync queue 队列。
- 第二次握手,server 回复 SYN + ACK 给 client,client 收到,状态改变为 ESTABLISHED,并发送 ACK 给 server。
- 第三次握手,server 收到 ACK,状态改变为 ESTABLISHED,将该请求从 sync queue 放入 accept queue。
sequenceDiagram participant c as client participant s as server participant sq as syns queue participant aq as accept queue s ->> s : bind() s ->> s : listen() c ->> c : connect() c ->> s : 1. SYN Note left of c : SYN_SEND s ->> sq : put Note right of s : SYN_RCVD s ->> c : 2. SYN + ACK Note left of c : ESTABLISHED c ->> s : 3. ACK sq ->> aq : put Note right of s : ESTABLISHED aq -->> s : s ->> s : accept()
其中
在 linux 2.2 之前,backlog 大小包括了两个队列的大小,在 2.2 之后,分别用下面两个参数来控制:
sync queue - 半连接队列:
- 大小通过
/proc/sys/net/ipv4/tcp_max_syn_backlog
指定,在syncookies
启用的情况下,逻辑上没有最大值限制,这个设置便被忽略。
- 大小通过
accept queue - 全连接队列:
- 其大小通过
/proc/sys/net/core/somaxconn
指定,在使用 listen 函数时,内核会根据传入的 backlog 参数与系统参数,取二者的较小值。 - 如果 accpet queue 队列满了,server 将发送一个拒绝连接的错误信息到 client。
- 其大小通过
Netty 中对 backlog 的设置如下:
public class TestBacklogServer {
public static void main(String[] args) {
new ServerBootstrap()
.group(new NioEventLoopGroup())
.option(ChannelOption.SO_BACKLOG, 2) // 全连接队列配置
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel sc) throws Exception {
sc.pipeline().addLast(new LoggingHandler());
}
})
.bind(8080);
}
}
其他参数
ulimit-n
:属于操作系统参数,用于限制一个进程能够打开的最大文件描述符的数量。(临时参数,建议将其的配置放在脚本里)TCP_NODELAY
:属于 SocketChannal 参数,默认为 false,表示开启 nagle 算法。这个算法可能会导致消息延迟发送(为了一次性发送较大的数据包,减少发送次数),可以设置为 true,关闭 nagle 算法。SO_SNDBUF & SO_RCVBUF
:用于调整发送和接收缓冲区的大小。SO_SNDBUF 属于 SocketChannal 参数;SO_RCVBUF 既可用于 SocketChannal 参数,也可以用于 ServerSocketChannal 参数(建议设置到 ServerSocketChannal 上)。早期需要调整,但对于现在的操作系统而言,可以不用调整。ALLOCATOR
:属于 SocketChannal 参数,用来分配 ByteBuf,ctx.alloc()
。用来控制分配的 ByteBuf 是否池化以及是否分配到直接内存中。RCVBUF_ALLOCATOR
:属于 SocketChannal 参数,用于控制 netty 接收缓冲区大小,负责入站数据的分配,决定入站缓冲区的大小(并可动态调整),统一采用 direct 直接内存,具体池化还是非池化由 allocator 决定。
RPC 框架
RPC(Remote Procedure Call,远程过程调用)框架 是一种用于实现分布式系统中不同服务之间通信的工具(例如微服务当中的 OpenFeign)。它允许开发者像调用本地方法一样,调用远程服务上的方法,而无需关心底层的网络通信细节。RPC 框架屏蔽了序列化、反序列化、网络传输、协议解析等复杂性,为开发者提供了简单、透明的接口。
准备工作
为了简化起见,在原来聊天项目的基础上新增 Rpc 请求和响应消息:
@Data
public abstract class Message implements Serializable {
// 省略旧的代码
public static final int RPC_MESSAGE_TYPE_REQUEST = 101;
public static final int RPC_MESSAGE_TYPE_RESPONSE = 102;
static {
// ...
messageClasses.put(RPC_MESSAGE_TYPE_REQUEST, RpcRequestMessage.class);
messageClasses.put(RPC_MESSAGE_TYPE_RESPONSE, RpcResponseMessage.class);
}
}
请求消息:
@Getter
@ToString(callSuper = true)
public class RpcRequestMessage extends Message {
/**
* 调用的接口全限定名,服务端根据它找到实现
*/
private String interfaceName;
/**
* 调用接口中的方法名
*/
private String methodName;
/**
* 方法返回类型
*/
private Class<?> returnType;
/**
* 方法参数类型数组
*/
private Class[] parameterTypes;
/**
* 方法参数值数组
*/
private Object[] parameterValue;
public RpcRequestMessage(int sequenceId, String interfaceName, String methodName, Class<?> returnType, Class[] parameterTypes, Object[] parameterValue) {
super.setSequenceId(sequenceId);
this.interfaceName = interfaceName;
this.methodName = methodName;
this.returnType = returnType;
this.parameterTypes = parameterTypes;
this.parameterValue = parameterValue;
}
@Override
public int getMessageType() {
return RPC_MESSAGE_TYPE_REQUEST;
}
}
响应消息:
@Data
@ToString(callSuper = true)
public class RpcResponseMessage extends Message {
/**
* 返回值
*/
private Object returnValue;
/**
* 异常值
*/
private Exception exceptionValue;
@Override
public int getMessageType() {
return RPC_MESSAGE_TYPE_RESPONSE;
}
}
远程调用需要调用的接口:
public interface HelloService {
String sayHello(String name);
}
Handler 编写
请求 Handler 编写:
@Slf4j
@ChannelHandler.Sharable
public class RpcRequestMessageHandler extends SimpleChannelInboundHandler<RpcRequestMessage> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, RpcRequestMessage msg) throws Exception {
RpcResponseMessage response = new RpcResponseMessage();
response.setSequenceId(msg.getSequenceId());
try {
// 获取接口的实现类
HelloService service = (HelloService) ServicesFactory
.getService(Class.forName(msg.getInterfaceName()));
// 反射调用对应方法
Method method = service.getClass().getMethod(msg.getMethodName(), msg.getParameterTypes());
Object invoke = method.invoke(service, msg.getParameterValue());
// 构造响应消息返回
response.setReturnValue(invoke);
} catch (Exception e) {
e.printStackTrace();
String message = e.getCause().getMessage();
// 返回精简的异常信息,避免异常信息太多占用资源
response.setExceptionValue(new Exception("远程调用出错: " + message));
}
// 返回响应消息
ctx.writeAndFlush(response);
}
}
响应 Handler 编写(暂时):
@Slf4j
@ChannelHandler.Sharable
public class RpcResponseMessageHandler extends SimpleChannelInboundHandler<RpcResponseMessage> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, RpcResponseMessage msg) throws Exception {
log.debug("{}", msg);
}
}
客户端直接发送封装好的请求消息:
@Slf4j
public class RpcClient {
public static void main(String[] args) {
NioEventLoopGroup group = new NioEventLoopGroup();
LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG);
MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable();
RpcResponseMessageHandler RPC_HANDLER = new RpcResponseMessageHandler();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.channel(NioSocketChannel.class);
bootstrap.group(group);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new ProtocolFrameDecoder());
ch.pipeline().addLast(LOGGING_HANDLER);
ch.pipeline().addLast(MESSAGE_CODEC);
ch.pipeline().addLast(RPC_HANDLER);
}
});
Channel channel = bootstrap.connect("localhost", 8080).sync().channel();
// 发送消息
channel.writeAndFlush(new RpcRequestMessage(
1,
"cn.itcast.server.service.HelloService",
"sayHello",
String.class,
new Class[] {String.class},
new Object[] {"张三"}
));
channel.closeFuture().sync();
} catch (Exception e) {
log.error("client error", e);
} finally {
group.shutdownGracefully();
}
}
}
服务端接收消息并处理:
@Slf4j
public class RpcServer {
public static void main(String[] args) {
NioEventLoopGroup boss = new NioEventLoopGroup();
NioEventLoopGroup worker = new NioEventLoopGroup();
LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG);
MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable();
// rpc 请求消息处理器,待实现
RpcRequestMessageHandler RPC_HANDLER = new RpcRequestMessageHandler();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.channel(NioServerSocketChannel.class);
serverBootstrap.group(boss, worker);
serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new ProtocolFrameDecoder());
ch.pipeline().addLast(LOGGING_HANDLER);
ch.pipeline().addLast(MESSAGE_CODEC);
ch.pipeline().addLast(RPC_HANDLER);
}
});
Channel channel = serverBootstrap.bind(8080).sync().channel();
channel.closeFuture().sync();
} catch (InterruptedException e) {
log.error("server error", e);
} finally {
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
}
代码改进
对于客户端,我们通过上述的代码可以看出,其最重要的实际上就是获取一个 Channel,好让客户端可以向服务端发送消息,我们根据客户端的代码来封装一个 Manager 类,用于对外暴露 Channel:
public class RpcClientManager {
private static Channel channel = null;
private static final Object LOCK = new Object();
// 获取唯一的Channel对象
public static Channel getChannel() {
// 双重检查锁,防止线程不安全
if (channel != null) { // 所有线程都会访问,直接进入下方加锁的判断逻辑
return channel;
}
synchronized (LOCK) { // 互斥锁,第一个线程来了的时候初始化Channel,后续线程就可以直接获取Channel
if (channel != null) {
return channel;
}
initChannel();
return channel;
}
}
// 初始化Channel
private static void initChannel() {
NioEventLoopGroup group = new NioEventLoopGroup();
LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG);
MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable();
// rpc 响应消息处理器,待实现
RpcResponseMessageHandler RPC_HANDLER = new RpcResponseMessageHandler();
Bootstrap bootstrap = new Bootstrap();
bootstrap.channel(NioSocketChannel.class);
bootstrap.group(group);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new ProtocolFrameDecoder());
ch.pipeline().addLast(LOGGING_HANDLER);
ch.pipeline().addLast(MESSAGE_CODEC);
ch.pipeline().addLast(RPC_HANDLER);
}
});
try {
// 创建出来一个Channel对象
channel = bootstrap.connect("localhost", 8080).sync().channel();
// 使用异步的方式,防止阻塞而导致Channel无法使用
channel.closeFuture().addListener(future -> {
group.shutdownGracefully();
});
} catch (Exception e) {
log.error("client error", e);
}
}
}
对于用户而言,在客户端,我们希望简单调用接口的方法,就可以实现远程调用的效果,所以,我们封装一个动态代理类的获取方法,用于帮用户创建 Request 消息并发出。
服务端接收到 Request 消息并调用对应的方法后获取结果,封装成 Response 对象返回给客户端。此时,对于客户端来讲,发起请求等待结果的是主线程,而获取结果的是 NIO 线程,我们还需要进行线程之间的结果传递,这一步可以使用 Netty 为我们提供的 Promise 对象来做:
@Slf4j
@ChannelHandler.Sharable
public class RpcResponseMessageHandler extends SimpleChannelInboundHandler<RpcResponseMessage> {
// key-消息的id,value-Promise对象,用来接收结果
public static final Map<Integer, Promise<Object>> PROMISES = new ConcurrentHashMap<>(); // 可能多个线程访问,保证线程安全,让Handler可以被Sharable修饰
@Override
protected void channelRead0(ChannelHandlerContext ctx, RpcResponseMessage msg) throws Exception {
// 拿到空的Promise,调用remove方法,拿取value的时候顺便去除
Promise<Object> promise = PROMISES.remove(msg.getSequenceId());
if (promise != null) {
Object returnValue = msg.getReturnValue();
Exception exceptionValue = msg.getExceptionValue();
if (exceptionValue != null) {
promise.setFailure(exceptionValue);
} else {
promise.setSuccess(returnValue);
}
}
log.debug("{}", msg);
}
}
public class RpcClientManager {
public static void main(String[] args) {
// 用户只需要直接调用方法即可
HelloService service = getProxyService(HelloService.class);
service.sayHello("张三");
}
// 创建代理类
public static <T> T getProxyService(Class<T> serviceClass) {
ClassLoader loader = serviceClass.getClassLoader();
Class<?>[] interfaces = new Class[] { serviceClass };
// 使用JDK动态代理
Object object = Proxy.newProxyInstance(loader, interfaces, (proxy, method, args) -> {
// 1.将方法的调用转换为消息对象
int sequenceId = SequenceIdGenerator.nextId();
RpcRequestMessage msg = new RpcRequestMessage(
sequenceId,
serviceClass.getName(),
method.getName(),
method.getReturnType(),
method.getParameterTypes(),
args
);
// 2.发送消息对象
getChannel().writeAndFlush(msg);
// 3.准备Promise对象接收结果,并指定Promise对象异步接收结果的线程(eventLoop)
DefaultPromise<Object> promise = new DefaultPromise<>(getChannel().eventLoop());
// 4.将Promise存入Handler中
RpcResponseMessageHandler.PROMISES.put(sequenceId, promise);
// 5.同步等待结果出现
promise.await();
if (promise.isSuccess()) {
// 最终要返回的结果,从getNow中获取
return promise.getNow();
} else {
// 抛出异常
throw new RuntimeException(promise.cause());
}
});
return (T) object;
}
private static Channel channel = null;
private static final Object LOCK = new Object();
// 获取唯一的Channel对象
public static Channel getChannel() {
// ...
}
// 初始化Channel
private static void initChannel() {
// ...
}
}