Netty series (2): Solution to Netty unpacking/dipping problem

crocodile 2022-11-24 20:58:14 阅读数:413

nettyseriessolutionnettyunpacking

上一篇说到Netty系列(一):Springboot整合Netty,Custom protocol implementation,This article talks about some unpacking/沾包问题.

拆包/沾包问题

TCP是面向字节流的协议,When several packets of data sent by the sender are received by the receiver,These packets may be glued into one packet,And look from the receive buffer,后一包数据的头紧接着前一包数据的尾,这就形成沾包问题.

But if the amount of data sent in one request is relatively large,超过了缓冲区大小,TCP 就会将其拆分为多次发送,这就是拆包问题,也就是将一个大的包拆分为多个小包进行发送,The receiving end must receive multiple packets to form a complete data.

为什么UDP没有粘包?

粘包/The unpacking problem is at the data link layer、网络层以及传输层都有可能发生.日常的网络应用开发大都在传输层进行,由于UDP有消息保护边界,不会发生粘包/拆包问题.

而TCP是面向字节流,没有边界,操作系统在发送 TCP 数据的时候,底层会有一个缓冲区,Optimize through this buffer,例如缓冲区为1024个字节大小,If the amount of data sent at one time is less than 1024,Multiple data will be combined and sent as one data packet;If the amount of data sent at one time is greater than 1024,This packet will be split into multiple packets for sending.The above two cases are also the problem of sticking and unpacking.

img
The four situations shown above include:

  1. 正常发送,两个包恰好满足TCP缓冲区的大小或达到TCP等待时长,分别发送两个包.
  2. 沾包:D1、D2are too small,Both were dipped.
  3. Unpack and dip the package:D2过大,进行了拆包处理,And part of it was removedD2_1又与D1进行粘包处理.
  4. 沾包拆包:D1过大,进行了拆包处理,And part of it was removedD1_2又与D2进行粘包处理.

解决方案

对于粘包和拆包问题,These four solutions are generally available:

  1. Send with a fixed data length,发送端将每个包都封装成固定的长度,比如100字节大小.如果不足100字节可通过补0Wait until it is filled to the specified length before sending.
  2. 发送端在每个包的末尾使用固定的分隔符,例如##@##.如果发生拆包需等待多个包发送过来之后再找到其中的##@##进行合并.If you send a dipped package, find it##@##进行拆分.
  3. 将消息分为头部和消息体,头部中保存整个消息的长度,In this case, the receiving end only reads a message of sufficient length,It is considered to have received a complete message.
  4. 通过自定义协议进行粘包和拆包的处理.

NettyUnpack and dip the package

Netty对解决粘包和拆包的方案做了抽象,提供了一些解码器(Decoder)来解决粘包和拆包的问题.如:

LineBasedFrameDecoder:以行为单位进行数据包的解码,使用换行符\n或者\r\n作为依据,遇到\n或者\r\n都认为是一条完整的消息.
DelimiterBasedFrameDecoder:以特殊的符号作为分隔来进行数据包的解码.
FixedLengthFrameDecoder:以固定长度进行数据包的解码.
LenghtFieldBasedFrameDecode:适用于消息头包含消息长度的协议(最常用).
基于Netty进行网络读写的程序,可以直接使用这些Decoder来完成数据包的解码.对于高并发、大流量的系统来说,每个数据包都不应该传输多余的数据(所以补齐的方式不可取),LenghtFieldBasedFrameDecode更适合这样的场景.

LineBasedFrameDecoder

使用LineBasedFrameDecoder解决粘包问题,其会根据"\n"或"\r\n"对二进制数据进行拆分,封装到不同的ByteBuf实例中

 /** * 服务启动器 * * @return */
@Bean
public ServerBootstrap serverBootstrap() {

ServerBootstrap serverBootstrap = new ServerBootstrap()
// 指定使用的线程组
.group(boosGroup(), workerGroup())
// 指定使用的通道
.channel(NioServerSocketChannel.class)
// 指定连接超时时间
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyProperties.getTimeout())
// Dip packages are handled by line breaks/拆包
.childHandler(new NettyServerLineBasedHandler());
return serverBootstrap;
}
public class NettyServerLineBasedHandler extends ChannelInitializer<SocketChannel> {

@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {

ChannelPipeline pipeline = socketChannel.pipeline();
// 使用LineBasedFrameDecoder解决粘包问题,其会根据"\n"或"\r\n"对二进制数据进行拆分,封装到不同的ByteBuf实例中,并且每次查找的最大长度为1024字节
pipeline.addLast(new LineBasedFrameDecoder(1024, true, true));
// 将上一步解码后的数据转码为Message实例
pipeline.addLast(new MessageDecodeHandler());
// 对发送客户端的数据进行编码
pipeline.addLast(new MessageEncodeHandler());
// 对数据进行最终处理
pipeline.addLast(new ServerListenerHandler());
}
}

DelimiterBasedFrameDecoder

以特殊的符号作为分隔来进行数据包的解码,The above is to##@##It is explained as a separator as an example.Paste the key code here again:
使用DelimiterBasedFrameDecoder处理拆包/沾包,并且每次查找的最大长度为1024字节.

 @Override
protected void initChannel(SocketChannel socketChannel) throws Exception {

// 数据分割符
String delimiterStr = "##@##";
ByteBuf delimiter = Unpooled.copiedBuffer(delimiterStr.getBytes());
ChannelPipeline pipeline = socketChannel.pipeline();
// Handle unpacking with custom delimiters/沾包,并且每次查找的最大长度为1024字节
pipeline.addLast(new DelimiterBasedFrameDecoder(1024, delimiter));
// 将上一步解码后的数据转码为Message实例
pipeline.addLast(new MessageDecodeHandler());
// 对发送客户端的数据进行编码,并添加数据分隔符
pipeline.addLast(new MessageEncodeHandler(delimiterStr));
// 对数据进行最终处理
pipeline.addLast(new ServerListenerHandler());
}

MessageEncodeHandlerAdd a separator and encode the sent data

public class MessageEncodeHandler extends MessageToByteEncoder<Message> {

// 数据分割符
String delimiter;
public MessageEncodeHandler(String delimiter) {

this.delimiter = delimiter;
}
@Override
protected void encode(ChannelHandlerContext channelHandlerContext, Message message, ByteBuf out) throws Exception {

out.writeBytes((message.toJsonString() + delimiter).getBytes(CharsetUtil.UTF_8));
}
}

FixedLengthFrameDecoder

Server code settings,在NettyConfig配置中将workerprocessor insteadNettyServerFixedLengthHandler,使用固定100byte length processing message.

 /** * 服务启动器 * * @return */
@Bean
public ServerBootstrap serverBootstrap() {

ServerBootstrap serverBootstrap = new ServerBootstrap()
// 指定使用的线程组
.group(boosGroup(), workerGroup())
// 指定使用的通道
.channel(NioServerSocketChannel.class)
// 指定连接超时时间
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyProperties.getTimeout())
// Specifies the processor as fixed-length bytes
.childHandler(new NettyServerFixedLengthHandler());
return serverBootstrap;
}

NettyServerFixedLengthHandler类代码,使用FixedLengthFrameDecoderSet by pressing Fixed100Number of bytes to split receivedByteBuf.And customize a message encoder,Insufficient byte length100Byte message is complemented0操作.

public class NettyServerFixedLengthHandler extends ChannelInitializer<SocketChannel> {

@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {

// Fixed byte length
Integer length = 100;
ChannelPipeline pipeline = socketChannel.pipeline();
// 按固定100Number of bytes to split receivedByteBuf的解码器
pipeline.addLast(new FixedLengthFrameDecoder(length));
// 将上一步解码后的数据转码为Message实例
pipeline.addLast(new MessageDecodeHandler());
// Custom encoding of data sent to the client,And set the byte length to make up for insufficient0
pipeline.addLast(new MessageEncodeFixedLengthHandler(length));
// 对数据进行最终处理
pipeline.addLast(new ServerListenerHandler());
}
}

自定义MessageEncodeFixedLengthHandler编码类,Encode the message using a fixed byte length,Complement when the byte length is insufficient0.


public class MessageEncodeFixedLengthHandler extends MessageToByteEncoder<Message> {

private int length;
public MessageEncodeFixedLengthHandler(int length) {

this.length = length;
}
/** * Encode the message using a fixed byte length,Complement when the byte length is insufficient0 * * @param ctx the {@link ChannelHandlerContext} which this {@link MessageToByteEncoder} belongs to * @param msg the message to encode * @param out the {@link ByteBuf} into which the encoded message will be written * @throws Exception */
@Override
protected void encode(ChannelHandlerContext ctx, Message msg, ByteBuf out) throws Exception {

String jsonStr = msg.toJsonString();
// 如果长度不足,则进行补0
if (jsonStr.length() < length) {

jsonStr = addSpace(jsonStr);
}
// 使用Unpooled.wrappedBuffer实现零拷贝,将字符串转为ByteBuf
ctx.writeAndFlush(Unpooled.wrappedBuffer(jsonStr.getBytes()));
}
/** * If the specified length is not reached, make up0 * * @param msg * @return */
private String addSpace(String msg) {

StringBuilder builder = new StringBuilder(msg);
for (int i = 0; i < length - msg.length(); i++) {

builder.append(0);
}
return builder.toString();
}
}

LenghtFieldBasedFrameDecode

LenghtFieldBasedFrameDecode适用于消息头包含消息长度的协议,According to the length of the message, it is judged whether a data packet has been read.

 /** * 服务启动器 * * @return */
@Bean
public ServerBootstrap serverBootstrap() {

ServerBootstrap serverBootstrap = new ServerBootstrap()
// 指定使用的线程组
.group(boosGroup(), workerGroup())
// 指定使用的通道
.channel(NioServerSocketChannel.class)
// 指定连接超时时间
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyProperties.getTimeout())
// The request header contains the data length
.childHandler(new NettyServerLenghtFieldBasedHandler());
return serverBootstrap;
}
public class NettyServerLenghtFieldBasedHandler extends ChannelInitializer<SocketChannel> {

@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {

ChannelPipeline pipeline = socketChannel.pipeline();
// The request header contains the data length,Dip and unpack according to the length
/** * maxFrameLength:指定了每个包所能传递的最大数据包大小; * lengthFieldOffset:指定了长度字段在字节码中的偏移量; * lengthFieldLength:指定了长度字段所占用的字节长度; * lengthAdjustment:对一些不仅包含有消息头和消息体的数据进行消息头的长度的调整,这样就可以只得到消息体的数据,这里的lengthAdjustment指定的就是消息头的长度; * initialBytesToStrip:对于长度字段在消息头中间的情况,可以通过initialBytesToStrip忽略掉消息头以及长度字段占用的字节. */
pipeline.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 2, 0, 2));
// Add a byte length field to the request header
pipeline.addLast(new LengthFieldPrepender(2));
// 将上一步解码后的数据转码为Message实例
pipeline.addLast(new MessageDecodeHandler());
// 对发送客户端的数据进行编码,Insufficient length in bytes0
pipeline.addLast(new MessageEncodeHandler());
// 对数据进行最终处理
pipeline.addLast(new ServerListenerHandler());
}
}

总结

造成TCPProtocol sticky package/The reason for the unpacking problem isTCPProtocol data transfer is基于字节流的,它No message is included、数据包等概念,是无界的,需要应用层协议自己设计消息的边界,即消息帧(Message Framing).If the application layer protocol does not use length-based or delimiter-based(终结符)processing by demarcating borders, etc,则会导致多个消息的粘包和拆包.

copyright:author[crocodile],Please bring the original link to reprint, thank you. https://en.javamana.com/2022/328/202211242053414209.html