Netty解决 TCP 粘包拆包的方法

什么是粘包/拆包

一般所谓的TCP粘包是在一次接收数据不能完全地体现一个完整的消息数据。TCP通讯为何存在粘包呢?主要原因是TCP是以流的方式来处理数据,再加上网络上MTU的往往小于在应用处理的消息数据,所以就会引发一次接收的数据无法满足消息的需要,导致粘包的存在。处理粘包的唯一方法就是制定应用层的数据通讯协议,通过协议来规范现有接收的数据是否满足消息数据的需要。

我们都知道TCP是基于字节流的传输协议。

那么数据在通信层传播其实就像河水一样并没有明显的分界线,而数据具体表示什么意思什么地方有句号什么地方有分号这个对于TCP底层来说并不清楚。应用层向TCP层发送用于网间传输的、用8位字节表示的数据流,然后TCP把数据流分区成适当长度的报文段,之后TCP把结果包传给IP层,由它来通过网络将包传送给接收端实体的TCP层。

所以对于这个数据拆分成大包小包的问题就是我们今天要讲的粘包和拆包的问题。

1、TCP粘包拆包问题说明

粘包和拆包这两个概念估计大家还不清楚,通过下面这张图我们来分析一下:

假设客户端分别发送两个数据包D1,D2个服务端,但是发送过程中数据是何种形式进行传播这个并不清楚,分别有下列4种情况:

  • 服务端一次接受到了D1和D2两个数据包,两个包粘在一起,称为粘包;
  • 服务端分两次读取到数据包D1和D2,没有发生粘包和拆包;
  • 服务端分两次读到了数据包,第一次读到了D1和D2的部分内容,第二次读到了D2的剩下部分,这个称为拆包;
  • 服务器分三次读到了数据部分,第一次读到了D1包,第二次读到了D2包的部分内容,第三次读到了D2包的剩下内容。

2、TCP粘包产生原因

我们知道在TCP协议中,应用数据分割成TCP认为最适合发送的数据块,这部分是通过“MSS”(最大数据包长度)选项来控制的,通常这种机制也被称为一种协商机制,MSS规定了TCP传往另一端的最大数据块的长度。这个值TCP协议在实现的时候往往用MTU值代替(需要减去IP数据包包头的大小20Bytes和TCP数据段的包头20Bytes)所以往往MSS为1460。通讯双方会根据双方提供的MSS值得最小值确定为这次连接的最大MSS值。

tcp为提高性能,发送端会将需要发送的数据发送到缓冲区,等待缓冲区满了之后,再将缓冲中的数据发送到接收方。同理,接收方也有缓冲区这样的机制,来接收数据。

发生粘包拆包的原因主要有以下这些:

  • 应用程序写入数据的字节大小大于套接字发送缓冲区的大小将发生拆包;
  • 进行MSS大小的TCP分段。MSS是TCP报文段中的数据字段的最大长度,当TCP报文长度-TCP头部长度>mss的时候将发生拆包;
  • 应用程序写入数据小于套接字缓冲区大小,网卡将应用多次写入的数据发送到网络上,将发生粘包;
  • 数据包大于MTU的时候将会进行切片。MTU即(Maxitum Transmission Unit) 最大传输单元,由于以太网传输电气方面的限制,每个以太网帧都有最小的大小64bytes最大不能超过1518bytes,刨去以太网帧的帧头14Bytes和帧尾CRC校验部分4Bytes,那么剩下承载上层协议的地方也就是Data域最大就只能有1500Bytes这个值我们就把它称之为MTU。这个就是网络层协议非常关心的地方,因为网络层协议比如IP协议会根据这个值来决定是否把上层传下来的数据进行分片。

3、如何解决TCP粘包拆包

我们知道tcp是无界的数据流,且协议本身无法避免粘包,拆包的发生,那我们只能在应用层数据协议上,加以控制。通常在制定传输数据时,可以使用如下方法:

  1. 设置定长消息,服务端每次读取既定长度的内容作为一条完整消息;
  2. 使用带消息头的协议、消息头存储消息开始标识及消息长度信息,服务端获取消息头的时候解析出消息长度,然后向后读取该长度的内容;
  3. 设置消息边界,服务端从网络流中按消息边界分离出消息内容。比如在消息末尾加上换行符用以区分消息结束。

当然应用层还有更多复杂的方式可以解决这个问题,这个就属于网络层的问题了,我们还是用java提供的方式来解决这个问题。我们先看一个例子看看粘包是如何发生的。

服务端:

public class HelloWordServer {
    private int port;

    public HelloWordServer(int port) {
        this.port = port;
    }

    public void start(){
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workGroup = new NioEventLoopGroup();

        ServerBootstrap server = new ServerBootstrap().group(bossGroup,workGroup)
                                    .channel(NioServerSocketChannel.class)
                                    .childHandler(new ServerChannelInitializer());

        try {
            ChannelFuture future = server.bind(port).sync();
            future.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            bossGroup.shutdownGracefully();
            workGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) {
        HelloWordServer server = new HelloWordServer(7788);
        server.start();
    }
}

服务端Initializer:

public class ServerChannelInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        ChannelPipeline pipeline = socketChannel.pipeline();

        // 字符串解码 和 编码
        pipeline.addLast("decoder", new StringDecoder());
        pipeline.addLast("encoder", new StringEncoder());

        // 自己的逻辑Handler
        pipeline.addLast("handler", new HelloWordServerHandler());
    }
}

服务端handler:

public class HelloWordServerHandler extends ChannelInboundHandlerAdapter {
    private int counter;

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        String body = (String)msg;
        System.out.println("server receive order : " + body + ";the counter is: " + ++counter);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        super.exceptionCaught(ctx, cause);
    }
}

客户端:

public class HelloWorldClient {
    private  int port;
    private  String address;

    public HelloWorldClient(int port,String address) {
        this.port = port;
        this.address = address;
    }

    public void start(){
        EventLoopGroup group = new NioEventLoopGroup();

        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(group)
                .channel(NioSocketChannel.class)
                .handler(new ClientChannelInitializer());

        try {
            ChannelFuture future = bootstrap.connect(address,port).sync();
            future.channel().closeFuture().sync();
        } catch (Exception e) {
            e.printStackTrace();
        }finally {
            group.shutdownGracefully();
        }

    }

    public static void main(String[] args) {
        HelloWorldClient client = new HelloWorldClient(7788,"127.0.0.1");
        client.start();
    }
}

客户端Initializer:

public class ClientChannelInitializer extends  ChannelInitializer<SocketChannel> {

    protected void initChannel(SocketChannel socketChannel) throws Exception {
        ChannelPipeline pipeline = socketChannel.pipeline();

        pipeline.addLast("decoder", new StringDecoder());
        pipeline.addLast("encoder", new StringEncoder());

        // 客户端的逻辑
        pipeline.addLast("handler", new HelloWorldClientHandler());
    }
}

客户端handler:

public class HelloWorldClientHandler extends ChannelInboundHandlerAdapter {
    private byte[] req;
    private int counter;

    public BaseClientHandler() {
        req = ("Unless required by applicable law or agreed to in writing, software\n" +
                "  distributed under the License is distributed on an \"AS IS\" BASIS,\n" +
                "  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n" +
                "  See the License for the specific language governing permissions and\n" +
                "  limitations under the License.This connector uses the BIO implementation that requires the JSSE\n" +
                "  style configuration. When using the APR/native implementation, the\n" +
                "  penSSL style configuration is required as described in the APR/native\n" +
                "  documentation.An Engine represents the entry point (within Catalina) that processes\n" +
                "  every request.  The Engine implementation for Tomcat stand alone\n" +
                "  analyzes the HTTP headers included with the request, and passes them\n" +
                "  on to the appropriate Host (virtual host)# Unless required by applicable law or agreed to in writing, software\n" +
                "# distributed under the License is distributed on an \"AS IS\" BASIS,\n" +
                "# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n" +
                "# See the License for the specific language governing permissions and\n" +
                "# limitations under the License.# For example, set the org.apache.catalina.util.LifecycleBase logger to log\n" +
                "# each component that extends LifecycleBase changing state:\n" +
                "#org.apache.catalina.util.LifecycleBase.level = FINE"
                ).getBytes();
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ByteBuf message;

        //将上面的所有字符串作为一个消息体发送出去
        message = Unpooled.buffer(req.length);
        message.writeBytes(req);
        ctx.writeAndFlush(message);
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        String buf = (String)msg;
        System.out.println("Now is : " + buf + " ; the counter is : "+ (++counter));
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}

运行客户端和服务端我们能看到:

我们看到这个长长的字符串被截成了2段发送,这就是发生了拆包的现象。同样粘包我们也很容易去模拟,我们把BaseClientHandler中的channelActive方法里面的:

message = Unpooled.buffer(req.length);
message.writeBytes(req);
ctx.writeAndFlush(message);

这几行代码是把我们上面的一长串字符转成的byte数组写进流里发送出去,那么我们可以在这里把上面发送消息的这几行循环几遍这样发送的内容增多了就有可能在拆包的时候把上一条消息的一部分分配到下一条消息里面了,修改如下:

for (int i = 0; i < 3; i++) {
    message = Unpooled.buffer(req.length);
    message.writeBytes(req);
    ctx.writeAndFlush(message);
}

改完之后我们再运行一下,输出太长不好截图,我们在输出结果中能看到循环3次之后的消息服务端收到的就不是之前的完整的一条了,而是被拆分了4次发送。

对于上面出现的粘包和拆包的问题,Netty已有考虑,并且有实施的方案:LineBasedFrameDecoder。
我们重新改写一下ServerChannelInitializer:

public class ServerChannelInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        ChannelPipeline pipeline = socketChannel.pipeline();

        pipeline.addLast(new LineBasedFrameDecoder(2048));
        // 字符串解码 和 编码
        pipeline.addLast("decoder", new StringDecoder());
        pipeline.addLast("encoder", new StringEncoder());

        // 自己的逻辑Handler
        pipeline.addLast("handler", new BaseServerHandler());
    }
}

新增:pipeline.addLast(new LineBasedFrameDecoder(2048))。同时,我们还得对上面发送的消息进行改造BaseClientHandler:

public class BaseClientHandler extends ChannelInboundHandlerAdapter {
    private byte[] req;
    private int counter;

    req = ("Unless required by applicable dfslaw or agreed to in writing, software" +
                "  distributed under the License is distributed on an \"AS IS\" BASIS," +
                "  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied." +
                "  See the License for the specific language governing permissions and" +
                "  limitations under the License.This connector uses the BIO implementation that requires the JSSE" +
                "  style configuration. When using the APR/native implementation, the" +
                "  penSSL style configuration is required as described in the APR/native" +
                "  documentation.An Engine represents the entry point (within Catalina) that processes" +
                "  every request.  The Engine implementation for Tomcat stand alone" +
                "  analyzes the HTTP headers included with the request, and passes them" +
                "  on to the appropriate Host (virtual host)# Unless required by applicable law or agreed to in writing, software" +
                "# distributed under the License is distributed on an \"AS IS\" BASIS," +
                "# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied." +
                "# See the License for the specific language governing permissions and" +
                "# limitations under the License.# For example, set the org.apache.catalina.util.LifecycleBase logger to log" +
                "# each component that extends LifecycleBase changing state:" +
                "#org.apache.catalina.util.LifecycleBase.level = FINE\n"
                ).getBytes();  

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ByteBuf message;

        message = Unpooled.buffer(req.length);
        message.writeBytes(req);
        ctx.writeAndFlush(message);
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        String buf = (String)msg;
        System.out.println("Now is : " + buf + " ; the counter is : "+ (++counter));
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}

去掉所有的”\n”,只保留字符串末尾的这一个。原因稍后再说。channelActive方法中我们不必再用循环多次发送消息了,只发送一次就好(第一个例子中发送一次的时候是发生了拆包的),然后我们再次运行,大家会看到这么长一串字符只发送了一串就发送完毕。程序输出我就不截图了。下面来解释一下LineBasedFrameDecoder。

LineBasedFrameDecoder的工作原理是它依次遍历ByteBuf 中的可读字节,判断看是否有”\n” 或者” \r\n”,如果有,就以此位置为结束位置,从可读索引到结束位置区间的字节就组成了一行。它是以换行符为结束标志的解码器。支持携带结束符或者不携带结束符两种解码方式,同时支持配置单行的最大长度。如果连续读取到最大长度后仍然没有发现换行符,就会抛出异常,同时忽略掉之前读到的异常码流。这个对于我们确定消息最大长度的应用场景还是很有帮助。

对于上面的判断看是否有”\n” 或者” \r\n”以此作为结束的标志我们可能回想,要是没有”\n” 或者” \r\n”那还有什么别的方式可以判断消息是否结束呢。别担心,Netty对于此已经有考虑,还有别的解码器可以帮助我们解决问题,

到此这篇关于Netty解决 TCP 粘包拆包的方法的文章就介绍到这了,更多相关Netty解决 TCP 粘包拆包内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • 使用Netty解决TCP粘包和拆包问题过程详解

    前言 上一篇我们介绍了如果使用Netty来开发一个简单的服务端和客户端,接下来我们来讨论如何使用解码器来解决TCP的粘包和拆包问题 TCP为什么会粘包/拆包 我们知道,TCP是以一种流的方式来进行网络转播的,当tcp三次握手简历通信后,客户端服务端之间就建立了一种通讯管道,我们可以想象成自来水管道,流出来的水是连城一片的,是没有分界线的. TCP底层并不了解上层的业务数据的具体含义,它会根据TCP缓冲区的实际情况进行包的划分. 所以对于我们应用层而言.我们直观是发送一个个连续完整TCP数据包的,

  • Netty解决 TCP 粘包拆包的方法

    什么是粘包/拆包 一般所谓的TCP粘包是在一次接收数据不能完全地体现一个完整的消息数据.TCP通讯为何存在粘包呢?主要原因是TCP是以流的方式来处理数据,再加上网络上MTU的往往小于在应用处理的消息数据,所以就会引发一次接收的数据无法满足消息的需要,导致粘包的存在.处理粘包的唯一方法就是制定应用层的数据通讯协议,通过协议来规范现有接收的数据是否满足消息数据的需要. 我们都知道TCP是基于字节流的传输协议. 那么数据在通信层传播其实就像河水一样并没有明显的分界线,而数据具体表示什么意思什么地方有句

  • Golang TCP粘包拆包问题的解决方法

    什么是粘包问题 最近在使用Golang编写Socket层,发现有时候接收端会一次读到多个数据包的问题.于是通过查阅资料,发现这个就是传说中的TCP粘包问题.下面通过编写代码来重现这个问题: 服务端代码 server/main.go func main() { l, err := net.Listen("tcp", ":4044") if err != nil { panic(err) } fmt.Println("listen to 4044")

  • Netty粘包拆包及使用原理详解

    目录 为什么使用Netty框架 Netty框架介绍 Netty实战 Netty编写服务器端 Netty客户端 粘包与拆包 为什么使用Netty框架 NIO的类库和API繁杂,使用麻烦,你需要熟练掌握Selector.ServerSocketChannel.SocketChannel.ByteBuffer等. 需要具备其他的额外技能做铺垫,例如熟悉Java多线程编程.这是因为NIO编程涉及到 Reactor 模式,你必须对多线程和网路编程非常熟悉,才能编写出高质量的NIO程序. 可靠性能力补齐,工

  • Golang通过包长协议处理TCP粘包的问题解决

    tcp粘包产生的原因这里就不说了,因为大家能搜索TCP粘包的处理方法,想必大概对TCP粘包有了一定了解,所以我们直接从处理思路开始讲起 tcp粘包现象代码重现 首先,我们来重现一下TCP粘包,然后再此基础之上解决粘包的问题,这里给出了client和server的示例代码如下 /* 文件名:client.go client客户端的示例代码(未处理粘包问题) 通过无限循环无时间间隔发送数据给server服务器 server将会不间断的出现TCP粘包问题 */ package main import

  • C#中TCP粘包问题的解决方法

    一.TCP粘包产生的原理 1.TCP粘包是指发送方发送的若干包数据到接收方接收时粘成一包,从接收缓冲区看,后一包数据的头紧接着前一包数据的尾.出现粘包现象的原因是多方面的,它既可能由发送方造成,也可能由接收方造成. 2.发送方引起的粘包是由TCP协议本身造成的,TCP为提高传输效率,发送方往往要收集到足够多的数据后才发送一包数据.若连续几次发送的数据都很少,通常TCP会根据优化算法把这些数据合成一包后一次发送出去,这样接收方就收到了粘包数据.接收方引起的粘包是由于接收方用户进程不及时接收数据,从

  • Netty粘包拆包问题解决方案

    TCP黏包拆包 TCP是一个流协议,就是没有界限的一长串二进制数据.TCP作为传输层协议并不不了解上层业务数据的具体含义,它会根据TCP缓冲区的实际情况进行数据包的划分,所以在业务上认为是一个完整的包,可能会被TCP拆分成多个包进行发送,也有可能把多个小的包封装成一个大的数据包发送,这就是所谓的TCP粘包和拆包问题. 怎么解决? • 消息定长度,传输的数据大小固定长度,例如每段的长度固定为100字节,如果不够空位补空格 • 在数据包尾部添加特殊分隔符,比如下划线,中划线等 • 将消息分为消息头和

  • 6行代码快速解决golang TCP粘包问题

    前言 什么是TCP粘包问题以及为什么会产生TCP粘包,本文不加讨论.本文使用golang的bufio.Scanner来实现自定义协议解包. 下面话不多说了,来一起看看详细的介绍吧. 协议数据包定义 本文模拟一个日志服务器,该服务器接收客户端传到的数据包并显示出来 type Package struct { Version [2]byte // 协议版本,暂定V1 Length int16 // 数据部分长度 Timestamp int64 // 时间戳 HostnameLength int16

  • GO语言如何手动处理TCP粘包详解

    前言 一般所谓的TCP粘包是在一次接收数据不能完全地体现一个完整的消息数据.TCP通讯为何存在粘包呢?主要原因是TCP是以流的方式来处理数据,再加上网络上MTU的往往小于在应用处理的消息数据,所以就会引发一次接收的数据无法满足消息的需要,导致粘包的存在.处理粘包的唯一方法就是制定应用层的数据通讯协议,通过协议来规范现有接收的数据是否满足消息数据的需要.在应用中处理粘包的基础方法主要有两种分别是以4节字描述消息大小或以结束符,实际上也有两者相结合的如HTTP,redis的通讯协议等. 应用场景 大

  • golang网络socket粘包问题的解决方法

    本文实例讲述了golang网络socket粘包问题的解决方法.分享给大家供大家参考,具体如下: 看到很多人问这个问题, 今天就写了个例子, 希望能帮助大家 首先说一下什么是粘包:百度上比较通俗的说法是指TCP协议中,发送方发送的若干包数据到接收方接收时粘成一包,从接收缓冲区看,后一包数据的头紧接着前一包数据的尾. 解决方案如下: 服务端: 复制代码 代码如下: package main import (     "bytes"     "encoding/binary&quo

随机推荐