跳到主要内容

Netty如何处理粘包、半包

为什么会出现粘包、拆包

TCP是一种面向流的协议,它不保证应用程序发送的数据包在接收端以同样的数据包边界进行接收。操作系统在发送TCP数据时,会根据TCP缓冲区的大小、网络MTU(最大传输单元)等因素对数据进行分段或合并,这就导致了粘包和拆包问题的产生。

粘包(Packet Threading): 发送方连续发送的多个小数据包,在接收方的TCP缓冲区中可能被合并成一个大的数据包进行接收。应用程序一次读取到的数据可能包含多个逻辑上的数据包。

拆包(Packet Splitting): 发送方发送的一个大数据包,可能被拆分成多个小的数据段进行传输,接收方需要多次接收才能获得一个完整的逻辑数据包。

如果应用程序不处理这些问题,

固定长度解码器 FixedLengthFrameDecoder

FixedLengthFrameDecoder是最简单的解决方案。它假设每个消息的长度都是固定的。如果不足指定长度,会自动补齐

  • 使用场景:适用于每个消息都具有相同长度的协议,例如,某些金融或工业控制协议。

分隔符解码器 DelimiterBasedFrameDecoder

DelimiterBasedFrameDecoder通过在消息的末尾添加特殊的分隔符来界定消息的边界。

  • 使用场景:适用于文本类协议,例如,每条消息以回车换行符(\r\n)结尾的场景。

LineBasedFrameDecoder 行解码器

LineBasedFrameDecoder是DelimiterBasedFrameDecoder的一个特例,它以换行符(\n或\r\n)作为消息的分隔符。

  • 使用场景:非常适合处理基于行的文本协议,如日志、简单的命令行协议等。

LengthFieldBasedFrameDecoder 长度域解码器

LengthFieldBasedFrameDecoder是一种非常通用和强大的解码器,它通过读取消息头中的长度字段来确定一个完整消息的长度。这是业界解决粘包、拆包问题最主流的方式。

构造方法

    public LengthFieldBasedFrameDecoder(
int maxFrameLength,
int lengthFieldOffset, int lengthFieldLength,
int lengthAdjustment, int initialBytesToStrip) {
this(
maxFrameLength,
lengthFieldOffset, lengthFieldLength, lengthAdjustment,
initialBytesToStrip, true);
}
  • maxFrameLength:单个数据包的最大长度。

  • lengthFieldOffset:长度字段的偏移量。

  • lengthFieldLength:长度字段本身的长度。

  • lengthAdjustment:长度字段调整值。该值加上长度字段的值,等于消息体的长度。

  • initialBytesToStrip:解码后需要跳过的初始字节数,通常是长度字段的长度。

RocketMQ中的的实现: 基于LengthFieldBasedFrameDecoder的自定义协议

RocketMQ的远程通信协议(RemotingCommand)格式如下:

总长度(4字节) + Header长度(4字节) + Header数据(JSON序列化) + Body数据

总长度:整个消息的总长度。

Header长度:Header数据的长度。

Header数据:使用JSON序列化的RemotingCommand的头部信息,包含了请求/响应类型、请求ID等元数据。

Body数据:实际的业务数据,例如消息本身。

在org.apache.rocketmq.remoting.netty.NettyDecoder类中,其decode方法的核心逻辑如下:

@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
if (in.readableBytes() >= 4) {
// 标记当前的读索引
in.markReaderIndex();
// 读取总长度
int length = in.readInt();

// 如果可读字节数小于消息长度,说明是半包,重置读索引并等待更多数据
if (in.readableBytes() < length) {
in.resetReaderIndex();
return;
}

// 读取完整的消息并进行解码
ByteBuf frame = in.readBytes(length);
Object decoded = decode(ctx, frame);
out.add(decoded);
}
}

private Object decode(ChannelHandlerContext ctx, ByteBuf in) {
// ... 省略了从ByteBuf中解析Header和Body的详细过程
// 这里的实现与RemotingCommand的协议格式相对应
return RemotingCommand.decode(in.nioBuffer());
}

kafka:二进制的长度前缀协议

消息大小(4字节) + 消息内容

  • 实现类: org.apache.kafka.common.network.NetworkReceive

总结

无论是RocketMQ还是Kafka,它们在处理粘包和拆包问题上都选择了业界最成熟、最高效的**“长度字段”**方案。这充分证明了LengthFieldBasedFrameDecoder所代表的设计思想在构建高性能、高可靠性网络应用中的重要地位。对于广大开发者而言,在进行网络应用开发时,优先考虑使用基于长度的协议设计,并善用Netty提供的解码器,将能极大地简化开发工作,并提升系统的稳定性和性能。