TCP的拆包、粘包与Netty的处理

Posted by keys961 on September 21, 2019

1. TCP的拆包与粘包

TCP是一个流协议,在TCP这一层实际上没有什么“包”的概念(实际上没有所谓的拆包粘包,这都是对上层应用而言的)。

而上层应用却有“包”的概念,所以需要在上层处理好“包”的划分。

1.1. 拆包与粘包的说明

TCP协议的两端是配有缓冲区的,缓冲区的大小可配置(Java中可配置SO_SNDBUFSO_RCVBUF),不过根据官方文档,这个配置某些操作系统会动态调整。

假设A要给B发送P1,P2两个数据包:

  • A独立发送P1,P2,且B及时读取了它们——不会发生拆包、粘包
  • A发送P1,P2B没有及时读取,之后读取读到2个数据包的所有数据——发生粘包
  • A发送P1,P2,但某个数据包比较大,导致B一次读取只能读到数据包的一部分,需要多次读取才能读取完全——发送拆包

1.2. 拆包、粘包的原因

拆包的常见原因如下:

  • 发送的数据大于缓冲(两端的)大小
  • 发送的数据大于MSS,TCP会拆成多个IP报文
  • 发送的数据,陷入传输层后,数据报的大小大于MTU,IP层会拆成多个片段

粘包的常见原因如下:

  • 发送的数据小于发送端缓冲大小,客户端多次写入缓冲区后才发送出去
  • 接收端的缓冲区较大,且没有及时读取缓冲区的数据

img1

1.3. 解决方法

常见的解决方法有如下几个,都是比较简单的:

  • 给数据包头部添加一个长度标识
  • 设置数据包的长度必须是定长(不足补0)
  • 数据包之间设置特殊符号作为边界

当然也可以自己实现一个复杂的协议解决拆包粘包的问题

而Netty自身提供了以上3种的解码器,以解决拆包粘包的问题,它们对应的是:

  • LengthFieldBasedFrameDecoder
  • FixedLengthFrameDecoder
  • DelimiterBasedFrameDecoder, LineBasedFrameDecoder

通常第一种方式能解决大部分的需求,下面将会分析LengthFieldBasedFrameDecoder是如何处理拆包粘包的问题。

2. Netty的LengthFieldBasedFrameDecoder

该拆包器是ChannelInboundHandler,所以配置客户端/服务端是需要加到pipeline的头部。

该拆包器是基于长度的策略。

2.1. 构造方法

public LengthFieldBasedFrameDecoder(
        ByteOrder byteOrder, int maxFrameLength, int lengthFieldOffset, int lengthFieldLength,
        int lengthAdjustment, int initialBytesToStrip, boolean failFast) {
    // ...
    this.byteOrder = byteOrder; // 长度域的大小端
    this.maxFrameLength = maxFrameLength; // 数据包最大长度
    this.lengthFieldOffset = lengthFieldOffset; // 长度域在数据包的偏移量
    this.lengthFieldLength = lengthFieldLength; // 长度域的长度
    this.lengthAdjustment = lengthAdjustment; // 长度域的调整值
    lengthFieldEndOffset = lengthFieldOffset + lengthFieldLength;
    this.initialBytesToStrip = initialBytesToStrip; // 构造好数据包后,截掉指定长度的数据
    this.failFast = failFast; // 若为true,只要长度域的值超过最大长度,就报异常;否则读完整个数据包后再校验长度,若超过才抛异常
}

构造方法的参数说明都在上面的注释上。

2.2. 实现逻辑

主要在下面的方法中实现,代码说明如下:

protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
    // 1. 过长数据包的丢弃模式的处理,后面说明
    if (discardingTooLongFrame) {
        discardingTooLongFrame(in);
    }
    // 2. 获取数据包长度域的值
    // 若读不到,返回null,可能发生了拆包
    // **这里即处理了拆包的现象,需要更多的数据进来**
    if (in.readableBytes() < lengthFieldEndOffset) {
        return null;
    }
    int actualLengthFieldOffset = in.readerIndex() + lengthFieldOffset;
    // 获取未调整的长度域的值
    long frameLength = getUnadjustedFrameLength(in, actualLengthFieldOffset, lengthFieldLength, byteOrder);
    // 3. 长度校验
    // 若原始长度小于0,报错
    if (frameLength < 0) {
        failOnNegativeLengthField(in, frameLength, lengthFieldEndOffset);
    }
    // 调整长度值(调整为整个数据包大小的值)
    frameLength += lengthAdjustment + lengthFieldEndOffset;
	// 若小于lengthFieldEndOffset,说明之后的数据长度是负数,报错
    if (frameLength < lengthFieldEndOffset) {
        failOnFrameLengthLessThanLengthFieldEndOffset(in, frameLength, lengthFieldEndOffset);
    }
	// 若长度大于上限,进入丢弃模式,并可能抛出异常
    if (frameLength > maxFrameLength) {
        exceededFrameLength(in, frameLength);
        return null;
    }
	// 若可读的长度小于长度域的值,现不处理,返回null
    // **这里也处理了拆包的现象**
    int frameLengthInt = (int) frameLength;
    if (in.readableBytes() < frameLengthInt) {
        return null;
    }
	// 4. 跳过指定字节长度,若跳过的字节数过大,报错
    if (initialBytesToStrip > frameLengthInt) {
        failOnFrameLengthLessThanInitialBytesToStrip(in, frameLength, initialBytesToStrip);
    }
    in.skipBytes(initialBytesToStrip);
    // 5. 抽取数据包,简单调用retainedSlice无拷贝复制而已
    // **这里可以解决粘包的问题,分割了两个不同的数据包**
    int readerIndex = in.readerIndex();
    int actualFrameLength = frameLengthInt - initialBytesToStrip;
    ByteBuf frame = extractFrame(ctx, in, readerIndex, actualFrameLength);
    in.readerIndex(readerIndex + actualFrameLength);
    return frame;
}

大体的逻辑上面有说明。

不过还是有一点需要补充的——过长数据的丢弃

首先是第3步长度校验的过长数据丢弃:

private void exceededFrameLength(ByteBuf in, long frameLength) {
    // 计算长度域的长度和可读长度的差值
    long discard = frameLength - in.readableBytes();
    // 记录一些这个过长数据的长度
    tooLongFrameLength = frameLength;

    if (discard < 0) {
        // 若可读长度大于长度域的长度,则可跳过长度域的长度,保留后面的
        // 因为可能之后的内容是合法的
        in.skipBytes((int) frameLength);
    } else {
        // 若可读长度小于长度域的长度,则需要全部丢弃
        // 设置discard模式标识为true,进入丢弃模式
        discardingTooLongFrame = true;
        // 这里记录这个差值
        // 下一次decode的时候,先要跳过这部分长度的内容
        bytesToDiscard = discard;
        // 跳过本次的整个数据
        in.skipBytes(in.readableBytes());
    }
    // 报错
    failIfNecessary(true);
}
private void failIfNecessary(boolean firstDetectionOfTooLongFrame) {
    if (bytesToDiscard == 0) {
        // 下一次decode不需要丢弃了
        // 则重置/退出丢弃模式
        long tooLongFrameLength = this.tooLongFrameLength;
        this.tooLongFrameLength = 0;
        discardingTooLongFrame = false;
        // 如果没有设置快速失败
        // 或者设置了快速失败并且是第一次检测到大包错误
        // 抛出异常
        if (!failFast ||
            failFast && firstDetectionOfTooLongFrame) {
            fail(tooLongFrameLength);
        }
    } else {
        // 这里还是在丢弃模式中,即下一次decode需要丢弃数据
        // 如果设置了快速失败,并且是第一次检测到打包错误,抛出异常
        if (failFast && firstDetectionOfTooLongFrame) {
            fail(tooLongFrameLength);
        }
    }
}

然后就是开头第1步的丢弃数据了。当上一次decode检测到过大数据包的时候,下一次decode时,接收的数据可能还是上一次decode所在的数据包中的,所以还需要再丢弃一次。即下面这段代码:

if (discardingTooLongFrame) {
    // 若处于丢弃模式,首先需要丢弃数据
    discardingTooLongFrame(in);
}

跳转进去后:

private void discardingTooLongFrame(ByteBuf in) {
    long bytesToDiscard = this.bytesToDiscard;
    // 取可读字节数和丢弃字节数的最小值
    int localBytesToDiscard = (int) Math.min(bytesToDiscard, in.readableBytes());
    // 丢弃这段数据
    in.skipBytes(localBytesToDiscard);
    // 更新还需要跳过/丢弃的长度
    bytesToDiscard -= localBytesToDiscard;
    this.bytesToDiscard = bytesToDiscard;
    // 报错,这里设为false,因此不会抛出异常(除非failFast为false)
    // 这里不是第一次检测到对应的过大数据包
    // 对应数据包的检测是在上一次decode中检测到的
    failIfNecessary(false);
}

2.3. 上层如何调用

这里见其父类ByteToMessageDecoderchannelRead方法实现,下面截取关键的代码,并有解释:

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    if (msg instanceof ByteBuf) {
        CodecOutputList out = CodecOutputList.newInstance();
        try {
            ByteBuf data = (ByteBuf) msg;
            first = cumulation == null;
            // 若是接收到的第一个数据,则之间是data
            if (first) {
                cumulation = data;
            } else {
                // 否则就将数据添加到cumulation之后
                cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
            }
            // 然后解码整个cumulation的所有数据
            callDecode(ctx, cumulation, out);
        } catch (DecoderException e) {
            throw e;
        } catch (Exception e) {
            throw new DecoderException(e);
        } finally {
            // ...Handling discard and release cumulation datas...
            // 释放readerIndex之前的数据,保留readerIndex和writerIndex的数据.
            int size = out.size();
            decodeWasNull = !out.insertSinceRecycled();
            fireChannelRead(ctx, out, size);
            out.recycle();
        }
    } else {
        ctx.fireChannelRead(msg);
    }
}

这里有cumulation的累积ByteBuf,作为一个容器,存储收到的数据流。解码(解决拆包和粘包)都会处理这个ByteBuf

它有2个实现:

  • MERGE_CUMULATOR:内存连续,累积时会拷贝
  • COMPOSITE_CUMULATOR:内存不连续,使用CompositeByteBuf实现,累积时不拷贝,但是由于内部索引的复杂性,可能某些场景会比MERGE_CUMULATOR

然后就是调用节码的流程,即代码callDecode(ctx, cumulation, out)

protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
        try {
            while (in.isReadable()) {
                int outSize = out.size();

                if (outSize > 0) {
                    // 若有以处理好的数据包,传播给后面的handler
                    fireChannelRead(ctx, out, outSize);
                    out.clear();
                    if (ctx.isRemoved()) {
                        break;
                    }
                    outSize = 0;
                }
                int oldInputLength = in.readableBytes();
                // 这里真正decode待处理的数据
                decodeRemovalReentryProtection(ctx, in, out);                
                // ...
                if (outSize == out.size()) {
                    if (oldInputLength == in.readableBytes()) {
                        // 数据包不完整,直接返回,等待累计后下一次decode
                        break;
                    } else {
                        continue;
                    }
                }
                // ...
            }
        } catch (DecoderException e) {
            throw e;
        } catch (Exception cause) {
            throw new DecoderException(cause);
        }
    }

处理数据包关键在于下面的decodeRemovalReentryProtection方法,它很简单,仅仅调用decode,然后记录状态而已:

final void decodeRemovalReentryProtection(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
    decodeState = STATE_CALLING_CHILD_DECODE;
    try {
        // 调用LengthFieldBasedFrameDecoder的处理实现
        decode(ctx, in, out);
    } finally {
        boolean removePending = decodeState == STATE_HANDLER_REMOVED_PENDING;
        decodeState = STATE_INIT;
        if (removePending) {
            handlerRemoved(ctx);
        }
    }
}

2.4. 总结

Netty对于基于长度的拆包粘包处理的实现总结如下:

  • 上层需要有一个容器保存接收到的数据流
  • 然后读取事件触发后,累积数据到容器中,这可以保证这个容器数据的头就是某个数据包的头
  • 对这个容器进行decode
    • 若长度不足,就不处理,等待下次处理
    • 若数据包完整,就截取数据(无拷贝),并传播处理好的数据包到下一个handler
    • 若长度过大/过小,则需要丢弃数据包数据,并抛异常
  • 释放已经处理过数据流的内存