深入浅析Netty 在 Dubbo 中是如何应用的

众所周知,国内知名框架 Dubbo 底层使用的是 Netty作为网络通信,那么内部到底是如何使用的呢?今天我们就来一探究竟。

1. dubbo 的 Consumer 消费者如何使用 Netty

注意:此次代码使用了从 github 上 clone 的 dubbo 源码中的 dubbo-demo 例子。

代码如下:

System.setProperty("java.net.preferIPv4Stack", "true");
 ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[]{"META-INF/spring/dubbo-demo-consumer.xml"});
 context.start();
 // @1
 DemoService demoService = (DemoService) context.getBean("demoService"); // get remote service proxy
 int a = 0;
 while (true) {
  try {
   Thread.sleep(1000);
   System.err.println( ++ a + " ");

   String hello = demoService.sayHello("world"); // call remote method
   System.out.println(hello); // get result

  } catch (Throwable throwable) {
   throwable.printStackTrace();
  }
 }

当代码执行到 @1 的时候,会调用 Spring 容器的 getBean 方法,而 dubbo 扩展了 FactoryBean,所以,会调用 getObject 方法,该方法会创建代理对象。

这个过程中会调用 DubboProtocol 实例的 getClients(URL url) 方法,当这个给定的 URL 的 client 没有初始化则创建,然后放入缓存,代码如下:

这个 initClient 方法就是创建 Netty 的 client 的。

最终调用的就是抽象父类 AbstractClient 的构造方法,构造方法中包含了创建 Socket 客户端,连接客户端等行为。

public AbstractClient(URL url, ChannelHandler handler) throws RemotingException {
 doOpen();
 connect();
}

doOpent 方法用来创建 Netty 的 bootstrap :

protected void doOpen() throws Throwable {
 NettyHelper.setNettyLoggerFactory();
 bootstrap = new ClientBootstrap(channelFactory);
 bootstrap.setOption("keepAlive", true);
 bootstrap.setOption("tcpNoDelay", true);
 bootstrap.setOption("connectTimeoutMillis", getTimeout());
 final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
 bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
  public ChannelPipeline getPipeline() {
   NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this);
   ChannelPipeline pipeline = Channels.pipeline();
   pipeline.addLast("decoder", adapter.getDecoder());
   pipeline.addLast("encoder", adapter.getEncoder());
   pipeline.addLast("handler", nettyHandler);
   return pipeline;
  }
 });
}

connect 方法用来连接提供者:

protected void doConnect() throws Throwable {
 long start = System.currentTimeMillis();
 ChannelFuture future = bootstrap.connect(getConnectAddress());
 boolean ret = future.awaitUninterruptibly(getConnectTimeout(), TimeUnit.MILLISECONDS);
 if (ret && future.isSuccess()) {
  Channel newChannel = future.getChannel();
  newChannel.setInterestOps(Channel.OP_READ_WRITE);
 }
}

上面的代码中,调用了 bootstrap 的 connect 方法,熟悉的 Netty 连接操作。当然这里使用的是  jboss 的 netty3,稍微有点区别。点击这篇:教你用 Netty 实现一个简单的 RPC。当连接成功后,注册写事件,准备开始向提供者传递数据。

当 main 方法中调用 demoService.sayHello(“world”) 的时候,最终会调用 HeaderExchangeChannel 的 request 方法,通过 channel 进行请求。

public ResponseFuture request(Object request, int timeout) throws RemotingException {
 Request req = new Request();
 req.setVersion("2.0.0");
 req.setTwoWay(true);
 req.setData(request);
 DefaultFuture future = new DefaultFuture(channel, req, timeout);
 channel.send(req);
 return future;
}

send 方法中最后调用 jboss  Netty 中继承了  NioSocketChannel 的 NioClientSocketChannel 的 write 方法。完成了一次数据的传输。

2. dubbo 的 Provider 提供者如何使用 Netty

Provider demo 代码:

System.setProperty("java.net.preferIPv4Stack", "true");
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[]{"META-INF/spring/dubbo-demo-provider.xml"});
context.start();
System.in.read(); // press any key to exit

Provider 作为被访问方,肯定是一个 Server 模式的 Socket。如何启动的呢?

当 Spring 容器启动的时候,会调用一些扩展类的初始化方法,比如继承了 InitializingBean,ApplicationContextAware,ApplicationListener 。

而 dubbo 创建了 ServiceBean 继承了一个监听器。Spring 会调用他的 onApplicationEvent 方法,该类有一个 export 方法,用于打开 ServerSocket 。

然后执行了 DubboProtocol 的 createServer 方法,然后创建了一个NettyServer 对象。NettyServer 对象的 构造方法同样是  doOpen 方法和。

代码如下:

protected void doOpen() throws Throwable {
 NettyHelper.setNettyLoggerFactory();
 ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true));
 ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true));
 ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS));
 bootstrap = new ServerBootstrap(channelFactory);

 final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
 channels = nettyHandler.getChannels();
 bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
  public ChannelPipeline getPipeline() {
   NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
   ChannelPipeline pipeline = Channels.pipeline();
   pipeline.addLast("decoder", adapter.getDecoder());
   pipeline.addLast("encoder", adapter.getEncoder());
   pipeline.addLast("handler", nettyHandler);
   return pipeline;
  }
 });
 channel = bootstrap.bind(getBindAddress());
}

该方法中,看到了熟悉的 boss 线程,worker 线程,和 ServerBootstrap,在添加了编解码 handler  之后,添加一个 NettyHandler,最后调用 bind 方法,完成绑定端口的工作。和我们使用 Netty 是一摸一样。

3. 总结

可以看到,dubbo 使用 Netty 还是挺简单的,消费者使用 NettyClient,提供者使用 NettyServer,Provider  启动的时候,会开启端口监听,使用我们平时启动 Netty 一样的方式。

而 Client 在 Spring getBean 的时候,会创建 Client,当调用远程方法的时候,将数据通过 dubbo 协议编码发送到 NettyServer,然后 NettServer 收到数据后解码,并调用本地方法,并返回数据,完成一次完美的 RPC 调用。

到此这篇关于深入浅析Netty 在 Dubbo 中是如何应用的的文章就介绍到这了,更多相关Netty 在 Dubbo中应用内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • SpringBoot2.0 整合 Dubbo框架实现RPC服务远程调用方法

    一.Dubbo框架简介 1.框架依赖 图例说明: 1)图中小方块 Protocol, Cluster, Proxy, Service, Container, Registry, Monitor 代表层或模块,蓝色的表示与业务有交互,绿色的表示只对 Dubbo 内部交互. 2)图中背景方块 Consumer, Provider, Registry, Monitor 代表部署逻辑拓扑节点. 3)图中蓝色虚线为初始化时调用,红色虚线为运行时异步调用,红色实线为运行时同步调用. 4)图中只包含 RPC

  • Spring Boot集成netty实现客户端服务端交互示例详解

    前言 Netty 是一个高性能的 NIO 网络框架,本文主要给大家介绍了关于SpringBoot集成netty实现客户端服务端交互的相关内容,下面来一起看看详细的介绍吧 看了好几天的netty实战,慢慢摸索,虽然还没有摸着很多门道,但今天还是把之前想加入到项目里的 一些想法实现了,算是有点信心了吧(讲真netty对初学者还真的不是很友好......) 首先,当然是在SpringBoot项目里添加netty的依赖了,注意不要用netty5的依赖,因为已经废弃了 <!--netty--> <

  • Java NIO框架Netty简单使用的示例

    之前写了一篇文章:Java 网络IO编程总结(BIO.NIO.AIO均含完整实例代码),介绍了如何使用Java原生IO支持进行网络编程,本文介绍一种更为简单的方式,即Java NIO框架. Netty是业界最流行的NIO框架之一,具有良好的健壮性.功能.性能.可定制性和可扩展性.同时,它提供的十分简单的API,大大简化了我们的网络编程. 同Java IO介绍的文章一样,本文所展示的例子,实现了一个相同的功能. 1.服务端 Server: package com.anxpp.io.calculat

  • SpringBoot+Dubbo+Seata分布式事务实战详解

    前言 Seata 是 阿里巴巴开源的分布式事务中间件,以高效并且对业务0侵入的方式,解决微服务场景下面临的分布式事务问题. 事实上,官方在GitHub已经给出了多种环境下的Seata应用示例项目,地址:https://github.com/seata/seata-samples. 为什么笔者要重新写一遍呢,主要原因有两点: 官网代码示例中,依赖太多,分不清哪些有什么作用 Seata相关资料较少,笔者在搭建的过程中,遇到了一些坑,记录一下 一.环境准备 本文涉及软件环境如下: SpringBoot

  • 深入浅析Netty 在 Dubbo 中是如何应用的

    众所周知,国内知名框架 Dubbo 底层使用的是 Netty作为网络通信,那么内部到底是如何使用的呢?今天我们就来一探究竟. 1. dubbo 的 Consumer 消费者如何使用 Netty 注意:此次代码使用了从 github 上 clone 的 dubbo 源码中的 dubbo-demo 例子. 代码如下: System.setProperty("java.net.preferIPv4Stack", "true"); ClassPathXmlApplicati

  • 基于dubbo中Listener的实现方法

    这里继续dubbo的源码旅程,在过程中学习它的设计和技巧,看优秀的代码,我想对我们日程编码必然有帮助的.而那些开源的代码正是千锤百炼的东西,希望和各位共勉. 拿ProtocolListenerWrapper为例子,看源码的时候发现它是一个装饰类的标准实现有一个自身的复制构造函数,把被包装者复制进来,然后结合装饰部分的操作.看下ProtocolListenerWrapper类有这样的代码: public class ProtocolListenerWrapper implements Protoc

  • 浅析jQuery 3.0中的Data

    jQuery 3.0 在6月9日正式发布了,3.0 也被称为下一代的 jQuery.这个版本从14年10月开始,其中发布过一次beta 版(2016/1/14,)和候选版(2016/05/20).一路走来,颇为不易. 一.Data浅析 jQuery 3.0 中的 Data 是内部使用的,定义为一个"类".一共用它创建了两个对象,dataPriv 和 dataUser.Data 有 1 个对象属性(expando)和类属性(uid),有 6 个方法,如下 下面分别解读 1.Data.ui

  • dubbo中zookeeper请求超时问题:mybatis+spring连接mysql8.0.15的配置

    这两天准备复习一下java,所以写一个采用dubbo的商场项目练练手,却卡第一个测试上,启动provider服务和Consumer服务,请求接口却始终报zookeeper请求超时错误(dubbo+zookeeper服务端重复调用三次),经过排查,我的问题是出在dao层与数据库连接的问题上(而且provider方还不报错,我也是R-此处省略一万字,其他都是正常的,如果你不是出在dao层连接数据库的问题,此文可能对你帮助不大).dao层采用mybatis+spring连接mysql数据库版本8.0.

  • 浅析python标准库中的glob

    glob 文件名模式匹配,不用遍历整个目录判断每个文件是不是符合. 1.通配符 星号(*)匹配零个或多个字符 import glob for name in glob.glob('dir/*'): print (name) dir/file.txt dir/file1.txt dir/file2.txt dir/filea.txt dir/fileb.txt dir/subdir 列出子目录中的文件,必须在模式中包括子目录名: import glob #用子目录查询文件 print ('Name

  • 浅析VSCode tasks.json中的各种替换变量的意思 ${workspaceFolder} ${file} ${fileBasename} ${fileDirname}等

    When authoring tasks configurations, it is often useful to have a set of predefined common variables. VS Code supports variable substitution inside strings in the tasks.json file and has the following predefined variables: ${workspaceFolder} the path

  • 浅析Java 并发编程中的synchronized

    synchronized关键字,我们一般称之为"同步锁",用它来修饰需要同步的方法和需要同步代码块,默认是当前对象作为锁的对象.在用synchronized修饰类时(或者修饰静态方法),默认是当前类的Class对象作为锁的对象,故存在着方法锁.对象锁.类锁这样的概念. 一.没有设置线程同步的情况 先给出以下代码感受下代码执行的时候为什么需要同步?代码可能比较枯燥,配上业务理解起来就会舒服很多,学生军训,有三列,每列5人,需要报数,每个线程负责每一列报数. class Synchroni

  • 浅析Alibaba Nacos注册中心源码剖析

    Nacos&Ribbon&Feign核心微服务架构图 架构原理 微服务系统在启动时将自己注册到服务注册中心,同时外发布 Http 接口供其它系统调用(一般都是基于Spring MVC) 服务消费者基于 Feign 调用服务提供者对外发布的接口,先对调用的本地接口加上注解@FeignClient,Feign会针对加了该注解的接口生成动态代理,服务消费者针对 Feign 生成的动态代理去调用方法时,会在底层生成Http协议格式的请求,类似 /stock/deduct?productId=100

  • 深入浅析JSON在java中的使用

    一.javaBean和json的互转 JavaBean类 public class Person { private Integer id; private String name; public Person() { } public Person(Integer id, String name) { this.id = id; this.name = name; } public Integer getId() { return id; } public void setId(Integer

  • 详解SPI在Dubbo中的应用

    目录 一.概述 二.JDK自带SPI 2.1.代码示例 2.2.简单分析 三.SPI与双亲委派 3.1.SPI加载到何处 3.2.SPI是否破坏了双亲委派 四.Dubbo SPI 4.1.基本概念 4.2.代码示例 4.3.源码分析 五.总结 一.概述 SPI 全称为 Service Provider Interface,是一种模块间组件相互引用的机制.其方案通常是提供方将接口实现类的全名配置在classPath下的指定文件中,由调用方读取并加载.这样需要替换某个组件时,只需要引入新的JAR包并

随机推荐