Netty实现自定义协议编解码器
目录
- 为什么要自定义协议
- 自定义协议设计
- 请求头
- 请求体
- 自定义协议实现
- 1:创建一个Maven项目,引入Netty依赖,完整的依赖如下
- 2:实现我们的协议请求头
- 3:实现我们的协议请求体
- 4:实现我们的协议请求类
- 5:实现自定义编码器
- 6:实现自定义解码器
- 7:Netty Server端
- 8:Netty Server端处理器
- 9:Netty Client端处理器
- 10:Netty Client端
- 11:测试
- 完整代码
为什么要自定义协议
Netty自带了一些编解码器没,比如 StringDecode,StringEncoder,在实际业务中,协议往往需要携带一些我们自定义的属性,比如版本号,imei号,appId等,这时候Netty提供的编解码器就无法满足我们的需求,所以我们需要自定义协议和自定义的编解码器
自定义协议设计
我们可以仿造HTTP协议,比如 请求头 + 请求体 的格式
请求头
HTTP协议的请求头有 请求方法(GET,POST),版本号等,既然是自定义协议,那么肯定是要满足自己实际业务需求的,所以我们的请求头包含以下信息,也可以根据自己的业务去添加一些自定义的属性
commond: 指令,比如说你发送给Netty的消息是【登录】还是【单聊消息】
或者是【群发消息】又或者是【踢人下线】的请求.
version:版本号,在后期如果升级版本的话,要兼容老版本,我们可以做判断,如果是老版本的就走A逻辑分支,新版本就走B逻辑分支
clientType:客户端访问我们的IM系统是通过WEb端,还是IOS,或者是Android端
messageType:将客户端发送的数据解析成哪种格式,比如JSON,Protobuf,还是Xml格式
imeiLen:imei号的长度(imei号在请求体中)
appId:我们的IM是以服务的方式提供出去的,我们需要知道这个请求是从哪个服务进来的,每个服务都有一个自定义唯一的appId
bodyLen:我们的数据长度
请求体
imei号:登录设备的唯一标识,虽然有了clientType来判断是从WEB端还是IOS端访问的,
但是并不知道是从哪台设备登录的,后期我们要做踢人下线,比如一个账号只能一台设备登录,
或者是一个账号能同时登录WEB,或者是IOS端或者是Android端,我们就需要跟clientType一起判断
data:我们要发送的数据
自定义协议实现
1:创建一个Maven项目,引入Netty依赖,完整的依赖如下
<dependencies> <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.69.Final</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.24</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.51</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.32</version> </dependency> <dependency> <groupId>cn.hutool</groupId> <artifactId>hutool-all</artifactId> <version>5.8.0.M2</version> </dependency> </dependencies>
2:实现我们的协议请求头
package com.chat.model; import lombok.Data; import java.io.Serializable; @Data public class MessageHead implements Serializable { /** * 指令 */ private Integer commond; /** * 版本号 */ private Integer version; /** * clientType(WEB,IOS,Android) */ private Integer clientType; /** * 数据解析类型 和具体业务无关,后续根据解析类型解析data数据 0x0:Json,0x1:ProtoBuf,0x2:Xml,默认:0x0 */ private Integer messageType = 0x0; /** * imei号长度 */ private Integer imeiLen; /** * appId */ private Integer appId; /** * bodyLen,数据长度 */ private Integer bodyLen; }
3:实现我们的协议请求体
package com.chat.model; import lombok.Data; import java.io.Serializable; @Data public class MessageBody implements Serializable { /** * imei号 */ private String imei; /** * 数据 */ private Object data; }
4:实现我们的协议请求类
package com.chat.model; import lombok.Data; import java.io.Serializable; // Message就是我们Netty服务接收到的完整的(请求头+请求体)数据包 @Data public class Message implements Serializable { private MessageHead messageHead; private MessageBody messageBody; }
5:实现自定义编码器
package com.chat.codec; import cn.hutool.json.JSONObject; import cn.hutool.json.JSONUtil; import com.chat.model.Message; import com.chat.model.MessageBody; import com.chat.model.MessageHead; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.ByteToMessageDecoder; import java.util.List; /** * 自定义编码器 */ public class MessageDecoder extends ByteToMessageDecoder { /** * 协议格式:请求头 +imei号 + 请求体 * 请求头: 指令(commond) + 版本号 + clientType + 消息解析类型 + imei长度 + appId + bodyLen * 指令:这条消息是做什么的,比如是登录,还是群发消息,还是单聊消息,还是踢人下线.... * 版本号:协议的版本号,对于版本升级有帮助,比如A版本的走A逻辑,B版本的走B逻辑 * clientType:web端,IOS,Android * 消息解析类型:把这条消息解析成什么样的类型,有JSON,还是String等 * imei:虽然有clientType来标识出该用户是从WEB访问的还是IOS或者Android端登录的,但是这时候有二台IOS手机登录你就分辨不了了 * 所以imei号是设备的唯一标识,这样可以在用户多端登录的时候踢人下线,来实现一个账号只能一台设备登录 * appId:如果我们的IM系统是以服务方式提供的,appId表示的是哪个服务来访问的 * bodyLen:数据长度 * 所以请求头的长度是:7 * 4 = 28字节 */ @Override protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf in, List<Object> out) throws Exception { //我们的请求头有7个属性,每个属性都是int型,所以占4个字节,如果小于28个字节说明这个请求数据是有问题的, if(in.readableBytes() < 28) { return; } //拿到指令 int command = in.readInt(); //拿到版本号 int version = in.readInt(); //拿到clientType int clientType = in.readInt(); //拿到消息解析类型 int messageType = in.readInt(); //拿到imei号的长度 int imeiLen = in.readInt(); //拿到appId int appId = in.readInt(); //拿到数据内容长度 int bodyLen = in.readInt(); //我们的数据是以流的形式读取的,当读取到的数据长度小于 imei号长度+data长度,说明还没有获取到完整的请求数据,需要重新再次读取接下来TCP发送过来的数据,直到等于了就代表 //我们已经读取到一条完整的数据了,其实这也是一种解决TCP粘包和拆包的问题 if(in.readableBytes() < (bodyLen + imeiLen)) { //表示读取的数据还不够 in.resetReaderIndex(); return; } //通过imei号长度读取imei号 byte[] imeiData = new byte[imeiLen]; in.readBytes(imeiData); String imei = new String(imeiData); //通过bodyLen读取数据内容 byte[] bodyData = new byte[bodyLen]; in.readBytes(bodyData); /** * 设置请求头 */ MessageHead messageHead = new MessageHead(); messageHead.setCommond(command); messageHead.setAppId(appId); messageHead.setBodyLen(bodyData.length); messageHead.setImeiLen(imeiData.length); messageHead.setVersion(version); messageHead.setClientType(clientType); messageHead.setMessageType(messageType); /** * 设置请求体 */ MessageBody messageBody = new MessageBody(); messageBody.setImei(imei); Message message = new Message(); message.setMessageHead(messageHead); /** * 根据messageType来封装请求数据 */ if(messageType == 0x0) { //解析成JSON格式 String body = new String(bodyData); com.alibaba.fastjson.JSONObject jsonObject = new com.alibaba.fastjson.JSONObject(); jsonObject.put("body",body); messageBody.setData(jsonObject); }else if(messageType == 0x1) { //解析成Protobuf }else if(messageType == 0x2) { //解析成Xml } message.setMessageBody(messageBody); //更新读索引 in.markReaderIndex(); //最后通过管道写出去 out.add(message); } }
6:实现自定义解码器
package com.chat.codec; import com.chat.model.Message; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToByteEncoder; import java.nio.charset.Charset; public class MessageEncoder extends MessageToByteEncoder<Message> { @Override protected void encode(ChannelHandlerContext channelHandlerContext, Message message, ByteBuf out) throws Exception { out.writeInt(message.getMessageHead().getCommond()); out.writeInt(message.getMessageHead().getVersion()); out.writeInt(message.getMessageHead().getClientType()); out.writeInt(message.getMessageHead().getMessageType()); out.writeInt(message.getMessageBody().getImei().getBytes(Charset.forName("utf-8")).length); out.writeInt(message.getMessageHead().getAppId()); out.writeInt(message.getMessageBody().getData().toString().getBytes(Charset.forName("utf-8")).length); out.writeBytes(message.getMessageBody().getImei().getBytes(Charset.forName("utf-8"))); out.writeBytes(message.getMessageBody().getData().toString().getBytes(Charset.forName("utf-8"))); } }
7:Netty Server端
package com.chat.server; import com.chat.codec.MessageDecoder; import com.chat.codec.MessageEncoder; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; public class Server { public static void main(String[] args) throws Exception{ // 创建两个线程组bossGroup和workerGroup, 含有的子线程NioEventLoop的个数默认为cpu核数的两倍 // bossGroup只是处理连接请求 ,真正的和客户端业务处理,会交给workerGroup完成 EventLoopGroup bossGroup = new NioEventLoopGroup(3); EventLoopGroup workerGroup = new NioEventLoopGroup(8); try { // 创建服务器端的启动对象 ServerBootstrap bootstrap = new ServerBootstrap(); // 使用链式编程来配置参数 bootstrap.group(bossGroup, workerGroup) //设置两个线程组 // 使用NioServerSocketChannel作为服务器的通道实现 .channel(NioServerSocketChannel.class) // 初始化服务器连接队列大小,服务端处理客户端连接请求是顺序处理的,所以同一时间只能处理一个客户端连接。 // 多个客户端同时来的时候,服务端将不能处理的客户端连接请求放在队列中等待处理 .option(ChannelOption.SO_BACKLOG, 1024) .childHandler(new ChannelInitializer<SocketChannel>() {//创建通道初始化对象,设置初始化参数,在 SocketChannel 建立起来之前执行 @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new MessageDecoder()); ch.pipeline().addLast(new MessageEncoder()); ch.pipeline().addLast(new MyServerHandler()); //ch.pipeline().addLast(new ServerHandler()); } }); System.out.println("netty server start。。"); // 绑定一个端口并且同步, 生成了一个ChannelFuture异步对象,通过isDone()等方法可以判断异步事件的执行情况 // 启动服务器(并绑定端口),bind是异步操作,sync方法是等待异步操作执行完毕 ChannelFuture cf = bootstrap.bind(9000).sync(); // 给cf注册监听器,监听我们关心的事件 /*cf.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (cf.isSuccess()) { System.out.println("监听端口9000成功"); } else { System.out.println("监听端口9000失败"); } } });*/ // 等待服务端监听端口关闭,closeFuture是异步操作 // 通过sync方法同步等待通道关闭处理完毕,这里会阻塞等待通道关闭完成,内部调用的是Object的wait()方法 cf.channel().closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }
8:Netty Server端处理器
package com.chat.server; import cn.hutool.json.JSONUtil; import com.chat.model.Message; import com.chat.model.MessageBody; import com.chat.model.MessageHead; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class MyServerHandler extends SimpleChannelInboundHandler<Message> { private final static Logger logger = LoggerFactory.getLogger(MyServerHandler.class); @Override protected void channelRead0(ChannelHandlerContext ctx, Message message) throws Exception { System.out.println("这是客户端发送的消息" + JSONUtil.toJsonPrettyStr(message)); Message messageResponse = new Message(); MessageHead messageHead = new MessageHead(); messageHead.setCommond(9988); messageHead.setMessageType(0x0); messageHead.setClientType(1); messageHead.setVersion(2); messageHead.setAppId(3); String msg = "这是服务端发送给你的消息"; messageHead.setBodyLen(msg.getBytes().length); String imei = "12-euri-1234"; messageHead.setImeiLen(imei.getBytes().length); MessageBody messageBody = new MessageBody(); messageBody.setImei(imei); messageBody.setData(msg); messageResponse.setMessageHead(messageHead); messageResponse.setMessageBody(messageBody); ctx.writeAndFlush(messageResponse); } }
9:Netty Client端处理器
package com.chat.client; import cn.hutool.json.JSONUtil; import com.chat.model.Message; import com.chat.model.MessageBody; import com.chat.model.MessageHead; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; public class ClientHandler extends SimpleChannelInboundHandler<Message> { /** * 当客户端连接服务器完成就会触发该方法 * * @param ctx * @throws Exception */ @Override public void channelActive(ChannelHandlerContext ctx) { for(int i = 0; i < 20; i ++) { Message message = new Message(); MessageHead messageHead = new MessageHead(); messageHead.setCommond(9988); messageHead.setMessageType(0x0); messageHead.setClientType(1); messageHead.setVersion(2); messageHead.setAppId(3); String msg = "hello-" + i; messageHead.setBodyLen(msg.getBytes().length); String imei = "12-euri"; messageHead.setImeiLen(imei.getBytes().length); MessageBody messageBody = new MessageBody(); messageBody.setImei(imei); messageBody.setData(msg); message.setMessageHead(messageHead); message.setMessageBody(messageBody); ctx.writeAndFlush(message); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } //当通道有读取事件时会触发,即服务端发送数据给客户端 @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, Message message) throws Exception { System.out.println(JSONUtil.toJsonPrettyStr(message)); } }
10:Netty Client端
package com.chat.client; import com.chat.codec.MessageDecoder; import com.chat.codec.MessageEncoder; import io.netty.bootstrap.Bootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; public class Client { public static void main(String[] args) throws Exception{ //客户端需要一个事件循环组 EventLoopGroup group = new NioEventLoopGroup(); try { //创建客户端启动对象 //注意客户端使用的不是ServerBootstrap而是Bootstrap Bootstrap bootstrap = new Bootstrap(); //设置相关参数 bootstrap.group(group) //设置线程组 .channel(NioSocketChannel.class) // 使用NioSocketChannel作为客户端的通道实现 .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { //加入处理器 ch.pipeline().addLast(new MessageDecoder()); ch.pipeline().addLast(new MessageEncoder()); ch.pipeline().addLast(new ClientHandler()); } }); System.out.println("netty client start。。"); //启动客户端去连接服务器端 ChannelFuture cf = bootstrap.connect("127.0.0.1", 9000).sync(); //对通道关闭进行监听 cf.channel().closeFuture().sync(); } finally { group.shutdownGracefully(); } } }
11:测试
1: 先启动Server端的main方法
2:再启动Client端的main方法
3:查看控制台
服务端控制台:
客户端控制台:
完整代码
全部代码就是下图这几个类,上面已经贴出每个类的全部代码,直接复制就行了
以上就是Netty实现自定义协议编解码器的详细内容,更多关于Netty自定义协议编解码器的资料请关注我们其它相关文章!