1. TCP的拆包与粘包
TCP是一个流协议,在TCP这一层实际上没有什么“包”的概念(实际上没有所谓的拆包粘包,这都是对上层应用而言的)。
而上层应用却有“包”的概念,所以需要在上层处理好“包”的划分。
1.1. 拆包与粘包的说明
TCP协议的两端是配有缓冲区的,缓冲区的大小可配置(Java中可配置SO_SNDBUF
和SO_RCVBUF
),不过根据官方文档,这个配置某些操作系统会动态调整。
假设A
要给B
发送P1
,P2
两个数据包:
A
独立发送P1
,P2
,且B
及时读取了它们——不会发生拆包、粘包A
发送P1
,P2
,B
没有及时读取,之后读取读到2个数据包的所有数据——发生粘包A
发送P1
,P2
,但某个数据包比较大,导致B
一次读取只能读到数据包的一部分,需要多次读取才能读取完全——发送拆包
1.2. 拆包、粘包的原因
拆包的常见原因如下:
- 发送的数据大于缓冲(两端的)大小
- 发送的数据大于MSS,TCP会拆成多个IP报文
- 发送的数据,陷入传输层后,数据报的大小大于MTU,IP层会拆成多个片段
粘包的常见原因如下:
- 发送的数据小于发送端缓冲大小,客户端多次写入缓冲区后才发送出去
- 接收端的缓冲区较大,且没有及时读取缓冲区的数据
1.3. 解决方法
常见的解决方法有如下几个,都是比较简单的:
- 给数据包头部添加一个长度标识
- 设置数据包的长度必须是定长(不足补0)
- 数据包之间设置特殊符号作为边界
当然也可以自己实现一个复杂的协议解决拆包粘包的问题
而Netty自身提供了以上3种的解码器,以解决拆包粘包的问题,它们对应的是:
LengthFieldBasedFrameDecoder
FixedLengthFrameDecoder
DelimiterBasedFrameDecoder
,LineBasedFrameDecoder
通常第一种方式能解决大部分的需求,下面将会分析LengthFieldBasedFrameDecoder
是如何处理拆包粘包的问题。
2. Netty的LengthFieldBasedFrameDecoder
该拆包器是ChannelInboundHandler
,所以配置客户端/服务端是需要加到pipeline
的头部。
该拆包器是基于长度的策略。
2.1. 构造方法
public LengthFieldBasedFrameDecoder(
ByteOrder byteOrder, int maxFrameLength, int lengthFieldOffset, int lengthFieldLength,
int lengthAdjustment, int initialBytesToStrip, boolean failFast) {
// ...
this.byteOrder = byteOrder; // 长度域的大小端
this.maxFrameLength = maxFrameLength; // 数据包最大长度
this.lengthFieldOffset = lengthFieldOffset; // 长度域在数据包的偏移量
this.lengthFieldLength = lengthFieldLength; // 长度域的长度
this.lengthAdjustment = lengthAdjustment; // 长度域的调整值
lengthFieldEndOffset = lengthFieldOffset + lengthFieldLength;
this.initialBytesToStrip = initialBytesToStrip; // 构造好数据包后,截掉指定长度的数据
this.failFast = failFast; // 若为true,只要长度域的值超过最大长度,就报异常;否则读完整个数据包后再校验长度,若超过才抛异常
}
构造方法的参数说明都在上面的注释上。
2.2. 实现逻辑
主要在下面的方法中实现,代码说明如下:
protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
// 1. 过长数据包的丢弃模式的处理,后面说明
if (discardingTooLongFrame) {
discardingTooLongFrame(in);
}
// 2. 获取数据包长度域的值
// 若读不到,返回null,可能发生了拆包
// **这里即处理了拆包的现象,需要更多的数据进来**
if (in.readableBytes() < lengthFieldEndOffset) {
return null;
}
int actualLengthFieldOffset = in.readerIndex() + lengthFieldOffset;
// 获取未调整的长度域的值
long frameLength = getUnadjustedFrameLength(in, actualLengthFieldOffset, lengthFieldLength, byteOrder);
// 3. 长度校验
// 若原始长度小于0,报错
if (frameLength < 0) {
failOnNegativeLengthField(in, frameLength, lengthFieldEndOffset);
}
// 调整长度值(调整为整个数据包大小的值)
frameLength += lengthAdjustment + lengthFieldEndOffset;
// 若小于lengthFieldEndOffset,说明之后的数据长度是负数,报错
if (frameLength < lengthFieldEndOffset) {
failOnFrameLengthLessThanLengthFieldEndOffset(in, frameLength, lengthFieldEndOffset);
}
// 若长度大于上限,进入丢弃模式,并可能抛出异常
if (frameLength > maxFrameLength) {
exceededFrameLength(in, frameLength);
return null;
}
// 若可读的长度小于长度域的值,现不处理,返回null
// **这里也处理了拆包的现象**
int frameLengthInt = (int) frameLength;
if (in.readableBytes() < frameLengthInt) {
return null;
}
// 4. 跳过指定字节长度,若跳过的字节数过大,报错
if (initialBytesToStrip > frameLengthInt) {
failOnFrameLengthLessThanInitialBytesToStrip(in, frameLength, initialBytesToStrip);
}
in.skipBytes(initialBytesToStrip);
// 5. 抽取数据包,简单调用retainedSlice无拷贝复制而已
// **这里可以解决粘包的问题,分割了两个不同的数据包**
int readerIndex = in.readerIndex();
int actualFrameLength = frameLengthInt - initialBytesToStrip;
ByteBuf frame = extractFrame(ctx, in, readerIndex, actualFrameLength);
in.readerIndex(readerIndex + actualFrameLength);
return frame;
}
大体的逻辑上面有说明。
不过还是有一点需要补充的——过长数据的丢弃。
首先是第3步长度校验的过长数据丢弃:
private void exceededFrameLength(ByteBuf in, long frameLength) {
// 计算长度域的长度和可读长度的差值
long discard = frameLength - in.readableBytes();
// 记录一些这个过长数据的长度
tooLongFrameLength = frameLength;
if (discard < 0) {
// 若可读长度大于长度域的长度,则可跳过长度域的长度,保留后面的
// 因为可能之后的内容是合法的
in.skipBytes((int) frameLength);
} else {
// 若可读长度小于长度域的长度,则需要全部丢弃
// 设置discard模式标识为true,进入丢弃模式
discardingTooLongFrame = true;
// 这里记录这个差值
// 下一次decode的时候,先要跳过这部分长度的内容
bytesToDiscard = discard;
// 跳过本次的整个数据
in.skipBytes(in.readableBytes());
}
// 报错
failIfNecessary(true);
}
private void failIfNecessary(boolean firstDetectionOfTooLongFrame) {
if (bytesToDiscard == 0) {
// 下一次decode不需要丢弃了
// 则重置/退出丢弃模式
long tooLongFrameLength = this.tooLongFrameLength;
this.tooLongFrameLength = 0;
discardingTooLongFrame = false;
// 如果没有设置快速失败
// 或者设置了快速失败并且是第一次检测到大包错误
// 抛出异常
if (!failFast ||
failFast && firstDetectionOfTooLongFrame) {
fail(tooLongFrameLength);
}
} else {
// 这里还是在丢弃模式中,即下一次decode需要丢弃数据
// 如果设置了快速失败,并且是第一次检测到打包错误,抛出异常
if (failFast && firstDetectionOfTooLongFrame) {
fail(tooLongFrameLength);
}
}
}
然后就是开头第1步的丢弃数据了。当上一次decode
检测到过大数据包的时候,下一次decode
时,接收的数据可能还是上一次decode
所在的数据包中的,所以还需要再丢弃一次。即下面这段代码:
if (discardingTooLongFrame) {
// 若处于丢弃模式,首先需要丢弃数据
discardingTooLongFrame(in);
}
跳转进去后:
private void discardingTooLongFrame(ByteBuf in) {
long bytesToDiscard = this.bytesToDiscard;
// 取可读字节数和丢弃字节数的最小值
int localBytesToDiscard = (int) Math.min(bytesToDiscard, in.readableBytes());
// 丢弃这段数据
in.skipBytes(localBytesToDiscard);
// 更新还需要跳过/丢弃的长度
bytesToDiscard -= localBytesToDiscard;
this.bytesToDiscard = bytesToDiscard;
// 报错,这里设为false,因此不会抛出异常(除非failFast为false)
// 这里不是第一次检测到对应的过大数据包
// 对应数据包的检测是在上一次decode中检测到的
failIfNecessary(false);
}
2.3. 上层如何调用
这里见其父类ByteToMessageDecoder
的channelRead
方法实现,下面截取关键的代码,并有解释:
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof ByteBuf) {
CodecOutputList out = CodecOutputList.newInstance();
try {
ByteBuf data = (ByteBuf) msg;
first = cumulation == null;
// 若是接收到的第一个数据,则之间是data
if (first) {
cumulation = data;
} else {
// 否则就将数据添加到cumulation之后
cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
}
// 然后解码整个cumulation的所有数据
callDecode(ctx, cumulation, out);
} catch (DecoderException e) {
throw e;
} catch (Exception e) {
throw new DecoderException(e);
} finally {
// ...Handling discard and release cumulation datas...
// 释放readerIndex之前的数据,保留readerIndex和writerIndex的数据.
int size = out.size();
decodeWasNull = !out.insertSinceRecycled();
fireChannelRead(ctx, out, size);
out.recycle();
}
} else {
ctx.fireChannelRead(msg);
}
}
这里有cumulation
的累积ByteBuf
,作为一个容器,存储收到的数据流。解码(解决拆包和粘包)都会处理这个ByteBuf
。
它有2个实现:
MERGE_CUMULATOR
:内存连续,累积时会拷贝COMPOSITE_CUMULATOR
:内存不连续,使用CompositeByteBuf
实现,累积时不拷贝,但是由于内部索引的复杂性,可能某些场景会比MERGE_CUMULATOR
慢
然后就是调用节码的流程,即代码callDecode(ctx, cumulation, out)
:
protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
try {
while (in.isReadable()) {
int outSize = out.size();
if (outSize > 0) {
// 若有以处理好的数据包,传播给后面的handler
fireChannelRead(ctx, out, outSize);
out.clear();
if (ctx.isRemoved()) {
break;
}
outSize = 0;
}
int oldInputLength = in.readableBytes();
// 这里真正decode待处理的数据
decodeRemovalReentryProtection(ctx, in, out);
// ...
if (outSize == out.size()) {
if (oldInputLength == in.readableBytes()) {
// 数据包不完整,直接返回,等待累计后下一次decode
break;
} else {
continue;
}
}
// ...
}
} catch (DecoderException e) {
throw e;
} catch (Exception cause) {
throw new DecoderException(cause);
}
}
处理数据包关键在于下面的decodeRemovalReentryProtection
方法,它很简单,仅仅调用decode
,然后记录状态而已:
final void decodeRemovalReentryProtection(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
decodeState = STATE_CALLING_CHILD_DECODE;
try {
// 调用LengthFieldBasedFrameDecoder的处理实现
decode(ctx, in, out);
} finally {
boolean removePending = decodeState == STATE_HANDLER_REMOVED_PENDING;
decodeState = STATE_INIT;
if (removePending) {
handlerRemoved(ctx);
}
}
}
2.4. 总结
Netty对于基于长度的拆包粘包处理的实现总结如下:
- 上层需要有一个容器保存接收到的数据流
- 然后读取事件触发后,累积数据到容器中,这可以保证这个容器数据的头就是某个数据包的头
- 对这个容器进行
decode
:- 若长度不足,就不处理,等待下次处理
- 若数据包完整,就截取数据(无拷贝),并传播处理好的数据包到下一个
handler
- 若长度过大/过小,则需要丢弃数据包数据,并抛异常
- 释放已经处理过数据流的内存