SpringBoot使用Netty实现远程调用的示例

前言

众所周知我们在进行网络连接的时候,建立套接字连接是一个非常消耗性能的事情,特别是在分布式的情况下,用线程池去保持多个客户端连接,是一种非常消耗线程的行为。那么我们该通过什么技术去解决上述的问题呢,那么就不得不提一个网络连接的利器——Netty.

正文 Netty

Netty是一个NIO客户端服务器框架:

  • 它可快速轻松地开发网络应用程序,例如协议服务器和客户端。
  • 它极大地简化和简化了网络编程,例如TCPUDP套接字服务器。

NIO是一种非阻塞IO ,它具有以下的特点

  • 单线程可以连接多个客户端。
  • 选择器可以实现但线程管理多个Channel,新建的通道都要向选择器注册。
  • 一个SelectionKey键表示了一个特定的通道对象和一个特定的选择器对象之间的注册关系。
  • selector进行select()操作可能会产生阻塞,但是可以设置阻塞时间,并且可以用wakeup()唤醒selector,所以NIO是非阻塞IO

Netty模型selector模式

它相对普通NIO的在性能上有了提升,采用了:

  • NIO采用多线程的方式可以同时使用多个selector
  • 通过绑定多个端口的方式,使得一个selector可以同时注册多个ServerSocketServer
  • 单个线程下只能有一个selector,用来实现Channel的匹配及复用

半包问题

TCP/IP在发送消息的时候,可能会拆包,这就导致接收端无法知道什么时候收到的数据是一个完整的数据。在传统的BIO中在读取不到数据时会发生阻塞,但是NIO不会。为了解决NIO的半包问题,NettySelector模型的基础上,提出了reactor模式,从而解决客户端请求在服务端不完整的问题。

netty模型reactor模式

selector的基础上解决了半包问题。

上图,简单地可以描述为"boss接活,让work干":manReactor用来接收请求(会与客户端进行握手验证),而subReactor用来处理请求(不与客户端直接连接)。

SpringBoot使用Netty实现远程调用

maven依赖

<!--lombok-->
<dependency>
 <groupId>org.projectlombok</groupId>
 <artifactId>lombok</artifactId>
 <version>1.18.2</version>
 <optional>true</optional>
</dependency>

<!--netty-->
<dependency>
 <groupId>io.netty</groupId>
 <artifactId>netty-all</artifactId>
 <version>4.1.17.Final</version>
</dependency>

服务端部分

NettyServer.java:服务启动监听器

@Slf4j
public class NettyServer {
  public void start() {
    InetSocketAddress socketAddress = new InetSocketAddress("127.0.0.1", 8082);
    //new 一个主线程组
    EventLoopGroup bossGroup = new NioEventLoopGroup(1);
    //new 一个工作线程组
    EventLoopGroup workGroup = new NioEventLoopGroup(200);
    ServerBootstrap bootstrap = new ServerBootstrap()
        .group(bossGroup, workGroup)
        .channel(NioServerSocketChannel.class)
        .childHandler(new ServerChannelInitializer())
        .localAddress(socketAddress)
        //设置队列大小
        .option(ChannelOption.SO_BACKLOG, 1024)
        // 两小时内没有数据的通信时,TCP会自动发送一个活动探测数据报文
        .childOption(ChannelOption.SO_KEEPALIVE, true);
    //绑定端口,开始接收进来的连接
    try {
      ChannelFuture future = bootstrap.bind(socketAddress).sync();
      log.info("服务器启动开始监听端口: {}", socketAddress.getPort());
      future.channel().closeFuture().sync();
    } catch (InterruptedException e) {
      log.error("服务器开启失败", e);
    } finally {
      //关闭主线程组
      bossGroup.shutdownGracefully();
      //关闭工作线程组
      workGroup.shutdownGracefully();
    }
  }
}

ServerChannelInitializer.java:netty服务初始化器

/**
* netty服务初始化器
**/
public class ServerChannelInitializer extends ChannelInitializer<SocketChannel> {
  @Override
  protected void initChannel(SocketChannel socketChannel) throws Exception {
    //添加编解码
    socketChannel.pipeline().addLast("decoder", new StringDecoder(CharsetUtil.UTF_8));
    socketChannel.pipeline().addLast("encoder", new StringEncoder(CharsetUtil.UTF_8));
    socketChannel.pipeline().addLast(new NettyServerHandler());
  }
}

NettyServerHandler.java:netty服务端处理器

/**
* netty服务端处理器
**/
@Slf4j
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
  /**
   * 客户端连接会触发
   */
  @Override
  public void channelActive(ChannelHandlerContext ctx) throws Exception {
    log.info("Channel active......");
  }

  /**
   * 客户端发消息会触发
   */
  @Override
  public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    log.info("服务器收到消息: {}", msg.toString());
    ctx.write("你也好哦");
    ctx.flush();
  }

  /**
   * 发生异常触发
   */
  @Override
  public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    cause.printStackTrace();
    ctx.close();
  }
}

RpcServerApp.java:SpringBoot启动类

/**
* 启动类
*
*/
@Slf4j
@SpringBootApplication(exclude = {DataSourceAutoConfiguration.class})
public class RpcServerApp extends SpringBootServletInitializer {
  @Override
  protected SpringApplicationBuilder configure(SpringApplicationBuilder application) {
    return application.sources(RpcServerApp.class);
  }

  /**
   * 项目的启动方法
   *
   * @param args
   */
  public static void main(String[] args) {
    SpringApplication.run(RpcServerApp.class, args);
    //开启Netty服务
    NettyServer nettyServer =new NettyServer ();
    nettyServer.start();
    log.info("======服务已经启动========");
  }
}

客户端部分

NettyClientUtil.java:NettyClient工具类

/**
* Netty客户端
**/
@Slf4j
public class NettyClientUtil {

  public static ResponseResult helloNetty(String msg) {
    NettyClientHandler nettyClientHandler = new NettyClientHandler();
    EventLoopGroup group = new NioEventLoopGroup();
    Bootstrap bootstrap = new Bootstrap()
        .group(group)
        //该参数的作用就是禁止使用Nagle算法,使用于小数据即时传输
        .option(ChannelOption.TCP_NODELAY, true)
        .channel(NioSocketChannel.class)
        .handler(new ChannelInitializer<SocketChannel>() {
          @Override
          protected void initChannel(SocketChannel socketChannel) throws Exception {
            socketChannel.pipeline().addLast("decoder", new StringDecoder());
            socketChannel.pipeline().addLast("encoder", new StringEncoder());
            socketChannel.pipeline().addLast(nettyClientHandler);
          }
        });
    try {
      ChannelFuture future = bootstrap.connect("127.0.0.1", 8082).sync();
      log.info("客户端发送成功....");
      //发送消息
      future.channel().writeAndFlush(msg);
      // 等待连接被关闭
      future.channel().closeFuture().sync();
      return nettyClientHandler.getResponseResult();
    } catch (Exception e) {
      log.error("客户端Netty失败", e);
      throw new BusinessException(CouponTypeEnum.OPERATE_ERROR);
    } finally {
      //以一种优雅的方式进行线程退出
      group.shutdownGracefully();
    }
  }
}

NettyClientHandler.java:客户端处理器

/**
* 客户端处理器
**/
@Slf4j
@Setter
@Getter
public class NettyClientHandler extends ChannelInboundHandlerAdapter {

  private ResponseResult responseResult;

  @Override
  public void channelActive(ChannelHandlerContext ctx) throws Exception {
    log.info("客户端Active .....");
  }

  @Override
  public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    log.info("客户端收到消息: {}", msg.toString());
    this.responseResult = ResponseResult.success(msg.toString(), CouponTypeEnum.OPERATE_SUCCESS.getCouponTypeDesc());
    ctx.close();
  }

  @Override
  public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    cause.printStackTrace();
    ctx.close();
  }
}

验证

测试接口

@RestController
@Slf4j
public class UserController {

  @PostMapping("/helloNetty")
  @MethodLogPrint
  public ResponseResult helloNetty(@RequestParam String msg) {
    return NettyClientUtil.helloNetty(msg);
  }
}

访问测试接口

服务端打印信息

客户端打印信息

源码

项目源码可从的我的github中获取:github源码地址

到此这篇关于SpringBoot使用Netty实现远程调用的示例的文章就介绍到这了,更多相关SpringBoot Netty远程调用内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • SpringBoot2.0 整合 Dubbo框架实现RPC服务远程调用方法

    一.Dubbo框架简介 1.框架依赖 图例说明: 1)图中小方块 Protocol, Cluster, Proxy, Service, Container, Registry, Monitor 代表层或模块,蓝色的表示与业务有交互,绿色的表示只对 Dubbo 内部交互. 2)图中背景方块 Consumer, Provider, Registry, Monitor 代表部署逻辑拓扑节点. 3)图中蓝色虚线为初始化时调用,红色虚线为运行时异步调用,红色实线为运行时同步调用. 4)图中只包含 RPC

  • SpringBoot使用Netty实现远程调用的示例

    前言 众所周知我们在进行网络连接的时候,建立套接字连接是一个非常消耗性能的事情,特别是在分布式的情况下,用线程池去保持多个客户端连接,是一种非常消耗线程的行为.那么我们该通过什么技术去解决上述的问题呢,那么就不得不提一个网络连接的利器--Netty. 正文 Netty Netty是一个NIO客户端服务器框架: 它可快速轻松地开发网络应用程序,例如协议服务器和客户端. 它极大地简化和简化了网络编程,例如TCP和UDP套接字服务器. NIO是一种非阻塞IO ,它具有以下的特点 单线程可以连接多个客户

  • 在SpringBoot中,如何使用Netty实现远程调用方法总结

    Netty Netty是一个NIO客户端服务器框架: 它可快速轻松地开发网络应用程序,例如协议服务器和客户端. 它极大地简化和简化了网络编程,例如TCP和UDP套接字服务器. NIO是一种非阻塞IO ,它具有以下的特点 单线程可以连接多个客户端. 选择器可以实现单线程管理多个Channel,新建的通道都要向选择器注册. 一个SelectionKey键表示了一个特定的通道对象和一个特定的选择器对象之间的注册关系. selector进行select()操作可能会产生阻塞,但是可以设置阻塞时间,并且可

  • Springboot整合Netty实现RPC服务器的示例代码

    一.什么是RPC? RPC(Remote Procedure Call)远程过程调用,是一种进程间的通信方式,其可以做到像调用本地方法那样调用位于远程的计算机的服务.其实现的原理过程如下: 本地的进程通过接口进行本地方法调用. RPC客户端将调用的接口名.接口方法.方法参数等信息利用网络通信发送给RPC服务器. RPC服务器对请求进行解析,根据接口名.接口方法.方法参数等信息找到对应的方法实现,并进行本地方法调用,然后将方法调用结果响应给RPC客户端. 二.实现RPC需要解决那些问题? 1. 约

  • springBoot使用openfeign来远程调用的实现

    目录 使用openfeign来远程调用 springBoot使用openfeign 使用openfeign来远程调用 1.客户端调用方 导入依赖 org.springframework.cloud spring-cloud-starter-openfeign 2.启动类 @EnableFeignClients 3. 4. springBoot使用openfeign 1.首先需要把两个不同模块都加入到nacos注册中心中 2.引入openfeign 依赖 <dependency>     <

  • SpringBoot+WebSocket+Netty实现消息推送的示例代码

    上一篇文章讲了Netty的理论基础,这一篇讲一下Netty在项目中的应用场景之一:消息推送功能,可以满足给所有用户推送,也可以满足给指定某一个用户推送消息,创建的是SpringBoot项目,后台服务端使用Netty技术,前端页面使用WebSocket技术. 大概实现思路: 前端使用webSocket与服务端创建连接的时候,将用户ID传给服务端 服务端将用户ID与channel关联起来存储,同时将channel放入到channel组中 如果需要给所有用户发送消息,直接执行channel组的writ

  • 使用Spring Cloud Feign远程调用的方法示例

    在Spring Cloud Netflix栈中,各个微服务都是以HTTP接口的形式暴露自身服务的,因此在调用远程服务时就必须使用HTTP客户端.我们可以使用JDK原生的URLConnection.Apache的Http Client.Netty的异步HTTP Client, Spring的RestTemplate.但是,用起来最方便.最优雅的还是要属Feign了. Feign简介 Feign是一个声明式的Web服务客户端,使用Feign可使得Web服务客户端的写入更加方便. 它具有可插拔注释支持

  • SpringBoot整合Dubbo框架,实现RPC服务远程调用

    一.Dubbo框架简介 1.框架依赖 图例说明: 1)图中小方块 Protocol, Cluster, Proxy, Service, Container, Registry, Monitor 代表层或模块,蓝色的表示与业务有交互,绿色的表示只对 Dubbo 内部交互. 2)图中背景方块 Consumer, Provider, Registry, Monitor 代表部署逻辑拓扑节点. 3)图中蓝色虚线为初始化时调用,红色虚线为运行时异步调用,红色实线为运行时同步调用. 4)图中只包含 RPC

  • SpringBoot整合Netty实现WebSocket的示例代码

    目录 一.pom.xml依赖配置 二.代码 2.1.NettyServer 类 2.2.SocketHandler 类 2.3.ChannelHandlerPool 类 2.4.Application启动类 三.测试 一.pom.xml依赖配置 <!-- netty --> <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <v

  • SpringBoot Http远程调用的方法

    本文实例为大家分享了SpringBoot Http远程调用的具体代码,供大家参考,具体内容如下 一.在实现远程调用时可以使用feign与http远程调用,两者的关系有一下几点: feign.http,有时候在调用第三方api的时候.使用httpclient,别人的接口不可能提供它的配置,自己项目框架是spring的,使用feign相互配置,都是okhttpclient的方式.Feign是一个接口声明式调用框架,实现了一个抽象层的逻辑,没有真正实现底层http请求,提供了一个client接口用于实

  • go语言net包rpc远程调用的使用示例

    rpc 包提供了一个方法来通过网络或者其他的I/O连接进入对象的外部方法. 一个server注册一个对象, 标记它成为可见对象类型名字的服务.注册后,对象的外部方法就可以远程调用了.一个server可以注册多个 不同类型的对象,但是却不可以注册多个相同类型的对象. 只有满足这些标准的方法才会被远程调用视为可见:其他的方法都会被忽略: - 方法是外部可见的. - 方法有两个参数,参数的类型都是外部可见的. - 方法的第二个参数是一个指针. - 方法有返回类型错误 一.基于http的RPC 服务端:

随机推荐