利用Java搭建个简单的Netty通信实例教程

前言

看过dubbo源码的同学应该都清楚,使用dubbo协议的底层通信是使用的netty进行交互,而最近看了dubbo的Netty部分后,自己写了个简单的Netty通信例子。

准备

工程截图

模块详解

  • rpc-common

rpc-common作为各个模块都需使用的模块,工程中出现的是一些通信时请求的参数以及返回的参数,还有一些序列化的工具。

  • rpc-client

rpc-client中目前只是单单的一个NettyClient启动类。

  • rpc-server

rpc-client中目前也只是单单的一个NettyServer服务启动类。

需要的依赖

目前所有的依赖项都出现在 rpc-common 下的 pom.xml中。

<dependencies>
  <!-- Netty -->
  <dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.10.Final</version>
  </dependency>

  <dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-log4j12</artifactId>
    <version>1.7.25</version>
  </dependency>

  <!-- Protostuff -->
  <dependency>
    <groupId>com.dyuproject.protostuff</groupId>
    <artifactId>protostuff-core</artifactId>
    <version>1.0.9</version>
  </dependency>

  <dependency>
    <groupId>com.dyuproject.protostuff</groupId>
    <artifactId>protostuff-runtime</artifactId>
    <version>1.0.9</version>
  </dependency>

  <!-- Objenesis -->
  <dependency>
    <groupId>org.objenesis</groupId>
    <artifactId>objenesis</artifactId>
    <version>2.1</version>
  </dependency>

  <!-- fastjson -->
  <dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>fastjson</artifactId>
    <version>1.2.38</version>
  </dependency>
</dependencies>

实现

首先我们在common中先定义本次的Request和Response的基类对象。

public class Request {

  private String requestId;

  private Object parameter;

  public String getRequestId() {
    return requestId;
  }

  public void setRequestId(String requestId) {
    this.requestId = requestId;
  }

  public Object getParameter() {
    return parameter;
  }

  public void setParameter(Object parameter) {
    this.parameter = parameter;
  }
}

public class Response {

  private String requestId;

  private Object result;

  public String getRequestId() {
    return requestId;
  }

  public void setRequestId(String requestId) {
    this.requestId = requestId;
  }

  public Object getResult() {
    return result;
  }

  public void setResult(Object result) {
    this.result = result;
  }
}

使用fastJson进行本次序列化

Netty对象的序列化转换很好懂, ByteToMessageDecoder 和 MessageToByteEncoder 分别只要继承它们,重写方法后,获取到Object和Byte,各自转换就OK。

不过如果是有要用到生产上的同学,建议不要使用 fastJson,因为它的漏洞补丁真的是太多了,可以使用google的 protostuff。

public class RpcDecoder extends ByteToMessageDecoder {

  // 目标对象类型进行解码
  private Class<?> target;

  public RpcDecoder(Class target) {
    this.target = target;
  }

  @Override
  protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
    if (in.readableBytes() < 4) {  // 不够长度丢弃
      return;
    }
    in.markReaderIndex();  // 标记一下当前的readIndex的位置
    int dataLength = in.readInt(); // 读取传送过来的消息的长度。ByteBuf 的readInt()方法会让他的readIndex增加4

    if (in.readableBytes() < dataLength) { // 读到的消息体长度如果小于我们传送过来的消息长度,则resetReaderIndex. 这个配合markReaderIndex使用的。把readIndex重置到mark的地方
      in.resetReaderIndex();
      return;
    }
    byte[] data = new byte[dataLength];
    in.readBytes(data);

    Object obj = JSON.parseObject(data, target);  // 将byte数据转化为我们需要的对象
    out.add(obj);
  }
}

public class RpcEncoder extends MessageToByteEncoder {

  //目标对象类型进行编码
  private Class<?> target;

  public RpcEncoder(Class target) {
    this.target = target;
  }

  @Override
  protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception {
    if (target.isInstance(msg)) {
      byte[] data = JSON.toJSONBytes(msg);  // 使用fastJson将对象转换为byte
      out.writeInt(data.length); // 先将消息长度写入,也就是消息头
      out.writeBytes(data);  // 消息体中包含我们要发送的数据
    }
  }

}

NetyServer

public class NettyServerHandler extends ChannelInboundHandlerAdapter {

  @Override
  public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    Request request = (Request) msg;

    System.out.println("Client Data:" + JSON.toJSONString(request));

    Response response = new Response();
    response.setRequestId(request.getRequestId());
    response.setResult("Hello Client !");

    // client接收到信息后主动关闭掉连接
    ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
  }

  @Override
  public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
    ctx.flush();
  }

  @Override
  public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    ctx.close();
  }
}

public class NettyServer {

  private static final Logger logger = LoggerFactory.getLogger(NettyServer.class);

  private String ip;
  private int port;

  public NettyServer(String ip, int port) {
    this.ip = ip;
    this.port = port;
  }

  public void server() throws Exception {

    EventLoopGroup bossGroup = new NioEventLoopGroup();
    EventLoopGroup workerGroup = new NioEventLoopGroup();

    try {

      final ServerBootstrap serverBootstrap = new ServerBootstrap();

      serverBootstrap.group(bossGroup, workerGroup)
          .channel(NioServerSocketChannel.class)
          .option(ChannelOption.SO_BACKLOG, 1024)
          .option(ChannelOption.SO_SNDBUF, 32 * 1024)
          .option(ChannelOption.SO_RCVBUF, 32 * 1024)
          .option(ChannelOption.SO_KEEPALIVE, true)
          .childHandler(new ChannelInitializer<SocketChannel>() {
            protected void initChannel(SocketChannel socketChannel) throws Exception {
              socketChannel.pipeline().addLast(new RpcDecoder(Request.class))
                  .addLast(new RpcEncoder(Response.class))
                  .addLast(new NettyServerHandler());
            }
          });

      serverBootstrap.childOption(ChannelOption.SO_KEEPALIVE, true); // 开启长连接

      ChannelFuture future = serverBootstrap.bind(ip, port).sync();

//      if (future.isSuccess()) {
//
//        new Register().register("/yanzhenyidai/com.yanzhenyidai.server", ip + ":" + port);
//      }

      future.channel().closeFuture().sync();
    } finally {
      bossGroup.shutdownGracefully();
      workerGroup.shutdownGracefully();
    }
  }

  public static void main(String[] args) throws Exception {
    new NettyServer("127.0.0.1", 20000).server();
  }
}

关键名词:

  • EventLoopGroup

    • workerGroup
    • bossGroup
      Server端的EventLoopGroup分为两个,一般workerGroup作为处理请求,bossGroup作为接收请求。
  • ChannelOption
    • SO_BACKLOG
    • SO_SNDBUF
    • SO_RCVBUF
    • SO_KEEPALIVE
      以上四个常量作为TCP连接中的属性。
  • ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);

NettyServerHandler中出现的 ChannelFutureListener.CLOSE ,作为Server端主动关闭与Client端的通信,如果没有主动Close,那么NettyClient将会一直处于阻塞状态,得不到NettyServer的返回信息。

NettyClient

public class NettyClient extends SimpleChannelInboundHandler<Response> {

  private final String ip;
  private final int port;
  private Response response;

  public NettyClient(String ip, int port) {
    this.ip = ip;
    this.port = port;
  }

  @Override
  public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    ctx.close();
  }

  @Override
  protected void channelRead0(ChannelHandlerContext channelHandlerContext, Response response) throws Exception {
    this.response = response;
  }

  public Response client(Request request) throws Exception {
    EventLoopGroup group = new NioEventLoopGroup();

    try {

      // 创建并初始化 Netty 客户端 Bootstrap 对象
      Bootstrap bootstrap = new Bootstrap();
      bootstrap.group(group);
      bootstrap.channel(NioSocketChannel.class);
      bootstrap.handler(new ChannelInitializer<SocketChannel>() {
        @Override
        public void initChannel(SocketChannel channel) throws Exception {
          ChannelPipeline pipeline = channel.pipeline();

          pipeline.addLast(new RpcDecoder(Response.class));
          pipeline.addLast(new RpcEncoder(Request.class));
          pipeline.addLast(NettyClient.this);
        }
      });
      bootstrap.option(ChannelOption.TCP_NODELAY, true);

//      String[] discover = new Discover().discover("/yanzhenyidai/com.yanzhenyidai.server").split(":");

      // 连接 RPC 服务器
      ChannelFuture future = bootstrap.connect(ip, port).sync();

      // 写入 RPC 请求数据并关闭连接
      Channel channel = future.channel();

      channel.writeAndFlush(request).sync();
      channel.closeFuture().sync();

      return response;
    } finally {
      group.shutdownGracefully();
    }
  }

  public static void main(String[] args) throws Exception {
    Request request = new Request();
    request.setRequestId(UUID.randomUUID().toString());
    request.setParameter("Hello Server !");
    System.out.println(JSON.toJSONString(new NettyClient("127.0.0.1", 30000).client(request)));
  }
}

测试

如果以上所有内容都准备就绪,那么就可以进行调试了。

启动顺序,先启动NettyServer,再启动NettyClient。

总结

记得刚出来工作时,有工作很多年的同事问我了不了解Netty,当时工作太短,直说听过Putty,现在回想起来真的挺丢人的,哈哈。😋

Netty作为通信框架,如果你了解TCP,而且项目中有类似传输信息的需求,又不想集成HTTP或者Socket,那么Netty真的挺实用的。

参考资料:

本项目Github地址:Netty-RPC

到此这篇关于利用Java搭建个简单的Netty通信的文章就介绍到这了,更多相关Java搭建Netty通信内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • Java NIO框架Netty简单使用的示例

    之前写了一篇文章:Java 网络IO编程总结(BIO.NIO.AIO均含完整实例代码),介绍了如何使用Java原生IO支持进行网络编程,本文介绍一种更为简单的方式,即Java NIO框架. Netty是业界最流行的NIO框架之一,具有良好的健壮性.功能.性能.可定制性和可扩展性.同时,它提供的十分简单的API,大大简化了我们的网络编程. 同Java IO介绍的文章一样,本文所展示的例子,实现了一个相同的功能. 1.服务端 Server: package com.anxpp.io.calculat

  • 简单了解Java Netty Reactor三种线程模型

    1. Reactor三种线程模型 1.1. 单线程模型 Reactor单线程模型,指的是所有的IO操作都在同一个NIO线程上面完成,NIO线程的职责如下: 1)作为NIO服务端,接收客户端的TCP连接: 2)作为NIO客户端,向服务端发起TCP连接: 3)读取通信对端的请求或者应答消息: 4)向通信对端发送消息请求或者应答消息. Reactor单线程模型示意图如下所示: Reactor单线程模型 由于Reactor模式使用的是异步非阻塞IO,所有的IO操作都不会导致阻塞,理论上一个线程可以独立处

  • Java Netty实现心跳机制过程解析

    netty心跳机制示例,使用Netty实现心跳机制,使用netty4,IdleStateHandler 实现.Netty心跳机制,netty心跳检测,netty,心跳 本文假设你已经了解了Netty的使用,或者至少写过netty的helloworld,知道了netty的基本使用.我们知道使用netty的时候,大多数的东西都与Handler有关,我们的业务逻辑基本都是在Handler中实现的.Netty中自带了一个IdleStateHandler 可以用来实现心跳检测. 心跳检测的逻辑 本文中我们

  • 利用Java搭建个简单的Netty通信实例教程

    前言 看过dubbo源码的同学应该都清楚,使用dubbo协议的底层通信是使用的netty进行交互,而最近看了dubbo的Netty部分后,自己写了个简单的Netty通信例子. 准备 工程截图 模块详解 rpc-common rpc-common作为各个模块都需使用的模块,工程中出现的是一些通信时请求的参数以及返回的参数,还有一些序列化的工具. rpc-client rpc-client中目前只是单单的一个NettyClient启动类. rpc-server rpc-client中目前也只是单单的

  • Kafka利用Java实现数据的生产和消费实例教程

    前言 在上一篇中讲述如何搭建kafka集群,本篇则讲述如何简单的使用 kafka .不过在使用kafka的时候,还是应该简单的了解下kafka. Kafka的介绍 Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据. Kafka 有如下特性: 以时间复杂度为O(1)的方式提供消息持久化能力,即使对TB级以上数据也能保证常数时间复杂度的访问性能. 高吞吐率.即使在非常廉价的商用机器上也能做到单机支持每秒100K条以上消息的传输. 支持Kafka Serv

  • java实现一个简单的Web服务器实例解析

    Web服务器也称为超文本传输协议服务器,使用http与其客户端进行通信,基于java的web服务器会使用两个重要的类, java.net.Socket类和java.net.ServerSocket类,并基于发送http消息进行通信. 这个简单的Web服务器会有以下三个类: *HttpServer *Request *Response 应用程序的入口在HttpServer类中,main()方法创建一个HttpServer实例,然后调用其await()方法,顾名思义,await()方法会在指定端口上

  • Java多线程中线程间的通信实例详解

    Java多线程中线程间的通信 一.使用while方式来实现线程之间的通信 package com.ietree.multithread.sync; import java.util.ArrayList; import java.util.List; public class MyList { private volatile static List list = new ArrayList(); public void add() { list.add("apple"); } publ

  • 利用Java编写24点小游戏的实例代码

    话不多说直接给大家上代码 package com.company; import java.util.*; /** * 24点小游戏 * 游戏规则:系统自动生成4个1-10的随机整数,玩家通过加减乘除操作,得到结果为24,每个数字只能使用一次 */ public class Game24Player { final String[] patterns = {"nnonnoo", "nnonono", "nnnoono", "nnnono

  • 利用python实现简单的情感分析实例教程

    目录 1 数据导入及预处理 1.1 数据导入 1.2 数据描述 1.3 数据预处理 2 情感分析 2.1 情感分 2.2 情感分直方图 2.3 词云图 2.4 关键词提取 3 积极评论与消极评论 3.1 积极评论与消极评论占比 3.2 消极评论分析 总结 python实现简单的情感分析 1 数据导入及预处理 1.1 数据导入 # 数据导入 import pandas as pd data = pd.read_csv('../data/京东评论数据.csv') data.head() 1.2 数据

  • Java实现AOP面向切面编程的实例教程

    介绍 众所周知,AOP(面向切面编程)是Spring框架的特色功能之一.通过设置横切关注点(cross cutting concerns),AOP提供了极高的扩展性.那AOP在Spring中是怎样运作的呢?当你只能使用core java,却需要AOP技术时,这个问题的解答变得极为关键.不仅如此,在高级技术岗位的面试中,此类问题也常作为考题出现.这不,我的朋友最近参加了一个面试,就被问到了这样一个棘手的问题--如何在不使用Spring及相关库,只用core Java的条件下实现AOP.因此,我将在

  • Java设计模式开发中使用观察者模式的实例教程

    观察者模式是软件设计模式中的一种,使用也比较普遍,尤其是在GUI编程中.关于设计模式的文章,网络上写的都比较多,而且很多文章写的也不错,虽然说有一种重复早轮子的嫌疑,但此轮子非彼轮子,侧重点不同,思路也不同,讲述方式也不近相同. 关键要素 主题: 主题是观察者观察的对象,一个主题必须具备下面三个特征. 持有监听的观察者的引用 支持增加和删除观察者 主题状态改变,通知观察者 观察者: 当主题发生变化,收到通知进行具体的处理是观察者必须具备的特征. 为什么要用这种模式 这里举一个例子来说明,牛奶送奶

  • php实现最简单的MVC框架实例教程

    本文以一个实例的形式讲述了PHP实现MVC框架的过程,比较浅显易懂.现分享给大家供大家参考之用.具体分析如下: 首先,在学习一个框架之前,基本上我们都需要知道什么是mvc,即model-view-control,说白了就是数据控制以及页面的分离实现,mvc就是这样应运而生的,mvc分为了三个层次,而且三个层次各司其职,互不干扰,首先简单介绍下,各个层次:view即是视图,也就是web页面,control即是控制器 向系统发出指令的工具,model 简单说是从数据库中取出数据进行处理. MVC的工

  • 利用C语言的Cairo图形库绘制太极图实例教程

    前言 可能许多人对直接用C语言绘图仍然停留在Turbo C的graphics.h年代,或许也有教育老化的原因,毕竟曾经的经典早已成往事,与其想尽各种办法寻找与其兼容的图形库,不如顺势拥抱灿烂的明天.Cario(http://cairographics.org/)是一个非常出色的2D图形库,著名的GTK+3.0完全采用Cario作为绘图引擎,由此可见它的强大和吸引力. Cario支持X Window,Quartz,Win32,image.buffers,PostScript,PDF和SVG文件等多

随机推荐