dubbo服务注册到nacos的过程剖析

目录
  • 前言
  • 简述过程
  • 源码剖析具体实现
  • 服务注册
  • 服务订阅
  • 结语

前言

前面聊到到了我们的dubbo服务从redis迁移到nacos注册中心,迁移后发现,会时不时的抛一个异常 ERROR com.alibaba.nacos.client.naming - [CLIENT-BEAT] failed to send beat:, 所以有了这个剖析过程,当然最后查明异常是我们的SLB网络映射问题,和nacos没有关系。

  • dubbo版本:2.7.4.1
  • nacos client版本:1.0.0
  • nacos server版本:1.1.3

简述过程

  • dubbo侧:dubbo通过nacos注册中心实现,注册服务到nacos,同时添加心跳任务,心跳任务每隔5s发送一次服务健康心跳。同时每隔1s查询nacos服务列表是否有更新,如果有更新触发服务实例更新通知,更新dubbo本地服务列表
  • nacos侧:nacos接收到心跳后,如果此时服务实例不存在,则新建一个服务实例,如果此时服务实例不健康,则设置为健康状态,并主动推送状态到客户端。nacos内部有一个检查服务状态的任务,如果15s没有健康心跳上报,则设置服务实例不健康,如果30s没有健康心跳上报,则下线这个服务实例,并推送状态到客户端。

源码剖析具体实现

在dubbo的registry包下,针对服务注册行为定义了四个接口,所有的服务注册(zookeeper、nacos、redis、etcd等)支持都是这些接口的实现

  • NotifyListener:服务变更通知监听的接口定义,在实现注册中心时不需关心实现,对接具体监听器往下传递这个实例就好
  • RegistryService:服务注册、取消注册、定义、取消订阅、服务查找的接口定义,是最核心的一个接口,包含了注册中心实现的核心功能
  • Registry:对RegistryService、Node的包装,多了检测服务是否可用,服务销毁下线的方法,一般直接实现Registry接口
  • RegistryFactory:通过注册中心URL获取注册中心实现的接口定义,dubbo的spi设计,针对每个具体实现,映射了一个注册中心协议头,如nacos实现对应了nacos:// 新对接一个注册中心,并不需要直接实现Registry接口,可直接继承FailbackRegistry抽象类,实现相关的do方法即可。dubbo针对服务注册的抽象和nacos服务注册的抽象非常契合,大部分接口可以直接对接使用,只有服务订阅监听器的定义不一样,稍微包装转换下即可,所以实现起来就非常简单了。

服务注册

org.apache.dubbo.registry.nacos.NacosRegistry:152

@Override
    public void doRegister(URL url) {
        final String serviceName = getServiceName(url);
        final Instance instance = createInstance(url);
        execute(namingService -> namingService.registerInstance(serviceName, instance));
    }

dubbo中,所以的服务都被封装成了URL,对应nacos中的服务实例Instance,所以服务注册时,只需要简单的将URL转换成Instance就可以注册到nacos中,下面看看namingService中的具体注册行为。

com.alibaba.nacos.client.naming.NacosNamingService:283

@Override
    public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
        if (instance.isEphemeral()) {
            BeatInfo beatInfo = new BeatInfo();
            beatInfo.setServiceName(NamingUtils.getGroupedName(serviceName, groupName));
            beatInfo.setIp(instance.getIp());
            beatInfo.setPort(instance.getPort());
            beatInfo.setCluster(instance.getClusterName());
            beatInfo.setWeight(instance.getWeight());
            beatInfo.setMetadata(instance.getMetadata());
            beatInfo.setScheduled(false);
            beatReactor.addBeatInfo(NamingUtils.getGroupedName(serviceName, groupName), beatInfo);
        }
        serverProxy.registerService(NamingUtils.getGroupedName(serviceName, groupName), groupName, instance);
    }

如上代码,除了注册实例外,还判断了instance实例是否是临时实例,如果是临时实例,则加入了beatReactor的心跳列表。这是因为,nacos将服务分成了两类,一类是临时性的服务, 像dubbo、spring cloud这种,需要通过心跳来保活,如果心跳没有及时发送,服务端会自动下线这个instance。一类是永久性服务,如数据库、缓存服务等, 客户端不会也没法发送心跳,这类服务就由服务端通过TCP端口检测等方式反向探活。下面看看临时实例的心跳是怎么发送的。

com.alibaba.nacos.client.naming.NacosNamingService:104

private int initClientBeatThreadCount(Properties properties) {
        if (properties == null) {
            return UtilAndComs.DEFAULT_CLIENT_BEAT_THREAD_COUNT;
        }
        return NumberUtils.toInt(properties.getProperty(PropertyKeyConst.NAMING_CLIENT_BEAT_THREAD_COUNT),
            UtilAndComs.DEFAULT_CLIENT_BEAT_THREAD_COUNT);
    }
    //可通过配置dubbo.registries.nacos.parameters.namingClientBeatThreadCount = 10设置维护心跳的线程数

先看一段获取心跳beatReactor线程池线程数量的初始化代码,传入的Properties是配置dubbo注册中心时的参数列表,如果配置了namingClientBeatThreadCount,则取配置的值, 默认维护心跳的线程池大小为:如果是单核的,就是一个线程,多核的就CPU核心数一半的线程。继续心跳逻辑

com.alibaba.nacos.client.naming.beat.BeatReactor:78

class BeatProcessor implements Runnable {
        @Override
        public void run() {
            try {
                for (Map.Entry entry : dom2Beat.entrySet()) {
                    BeatInfo beatInfo = entry.getValue();
                    if (beatInfo.isScheduled()) {
                        continue;
                    }
                    beatInfo.setScheduled(true);
                    executorService.schedule(new BeatTask(beatInfo), 0, TimeUnit.MILLISECONDS);
                }
            } catch (Exception e) {
                NAMING_LOGGER.error("[CLIENT-BEAT] Exception while scheduling beat.", e);
            } finally {
                executorService.schedule(this, clientBeatInterval, TimeUnit.MILLISECONDS);
            }
        }
    }
    class BeatTask implements Runnable {
        BeatInfo beatInfo;
        public BeatTask(BeatInfo beatInfo) {
            this.beatInfo = beatInfo;
        }
        @Override
        public void run() {
            long result = serverProxy.sendBeat(beatInfo);
            beatInfo.setScheduled(false);
            if (result > 0) {
                clientBeatInterval = result;
            }
        }
    }

dom2Beat是一个存放需要心跳上报的临时实例的map容器,NacosNamingService.registerInstance中通过判断临时节点添加到心跳列表的逻辑, 最终添加到了这个map里。BeatReactor初始化后会触发BeatProcessor线程的调用,BeatProcessor线程是一个不断自我触发调用的线程,前一次 心跳上报逻辑执行完后,间隔5S触发下一次心跳上报。间隔时间由变量clientBeatInterval控制,受nacos服务端返回的心跳结果值的影响 心跳间隔可能会改变,nacos服务端从instance的元数据中寻找key为preserved.heart.beat.interval的值返回,如果为空则返回5S。 这个功能在dubbo2.7.4.1的版本里还不成熟,只能通过注解元素指定,如@Reference(parameters = "preserved.heart.beat.interval,10000"), 后面如果能够直接在注册中心的url参数配置就算成熟了,所以这个功能暂时不推荐使用,可以作为实验功能试试。

服务订阅

org.apache.dubbo.registry.nacos.NacosRegistry:399

private void subscribeEventListener(String serviceName, final URL url, final NotifyListener listener)
            throws NacosException {
        if (!nacosListeners.containsKey(serviceName)) {
            EventListener eventListener = event -> {
                if (event instanceof NamingEvent) {
                    NamingEvent e = (NamingEvent) event;
                    notifySubscriber(url, listener, e.getInstances());
                }
            };
            namingService.subscribe(serviceName, eventListener);
            nacosListeners.put(serviceName, eventListener);
        }
    }

nacos的服务监听是EventListener,所以dubbo的服务订阅只需要将NotifyListener的处理包装进onEvent中处理即可, 通过namingService.subscribe添加nacos的订阅。最终EventListener对象会被添加到事件调度器的监听器列表中,见如下代码:

com.alibaba.nacos.client.naming.core.EventDispatcher:

public class EventDispatcher {
    private ExecutorService executor = null;
    private BlockingQueuechangedServices = new LinkedBlockingQueue();
    private ConcurrentMap observerMap = new ConcurrentHashMap();
    public EventDispatcher() {
        executor = Executors.newSingleThreadExecutor(new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r, "com.alibaba.nacos.naming.client.listener");
                thread.setDaemon(true);
                return thread;
            }
        });
        executor.execute(new Notifier());
    }
    public void addListener(ServiceInfo serviceInfo, String clusters, EventListener listener) {
        NAMING_LOGGER.info("[LISTENER] adding " + serviceInfo.getName() + " with " + clusters + " to listener map");
        Listobservers = Collections.synchronizedList(new ArrayList());
        observers.add(listener);
        observers = observerMap.putIfAbsent(ServiceInfo.getKey(serviceInfo.getName(), clusters), observers);
        if (observers != null) {
            observers.add(listener);
        }
        serviceChanged(serviceInfo);
    }
    public void removeListener(String serviceName, String clusters, EventListener listener) {
        NAMING_LOGGER.info("[LISTENER] removing " + serviceName + " with " + clusters + " from listener map");
        Listobservers = observerMap.get(ServiceInfo.getKey(serviceName, clusters));
        if (observers != null) {
            Iteratoriter = observers.iterator();
            while (iter.hasNext()) {
                EventListener oldListener = iter.next();
                if (oldListener.equals(listener)) {
                    iter.remove();
                }
            }
            if (observers.isEmpty()) {
                observerMap.remove(ServiceInfo.getKey(serviceName, clusters));
            }
        }
    }
    public ListgetSubscribeServices() {
        ListserviceInfos = new ArrayList();
        for (String key : observerMap.keySet()) {
            serviceInfos.add(ServiceInfo.fromKey(key));
        }
        return serviceInfos;
    }
    public void serviceChanged(ServiceInfo serviceInfo) {
        if (serviceInfo == null) {
            return;
        }
        changedServices.add(serviceInfo);
    }
    private class Notifier implements Runnable {
        @Override
        public void run() {
            while (true) {
                ServiceInfo serviceInfo = null;
                try {
                    serviceInfo = changedServices.poll(5, TimeUnit.MINUTES);
                } catch (Exception ignore) {
                }
                if (serviceInfo == null) {
                    continue;
                }
                try {
                    Listlisteners = observerMap.get(serviceInfo.getKey());
                    if (!CollectionUtils.isEmpty(listeners)) {
                        for (EventListener listener : listeners) {
                            Listhosts = Collections.unmodifiableList(serviceInfo.getHosts());
                            listener.onEvent(new NamingEvent(serviceInfo.getName(), hosts));
                        }
                    }
                } catch (Exception e) {
                    NAMING_LOGGER.error("[NA] notify error for service: "
                        + serviceInfo.getName() + ", clusters: " + serviceInfo.getClusters(), e);
                }
            }
        }
    }
    public void setExecutor(ExecutorService executor) {
        ExecutorService oldExecutor = this.executor;
        this.executor = executor;
        oldExecutor.shutdown();
    }
}

EventDispatcher中维护了一个监听器列表observerMap,同时维护了一个事件变更的阻塞队列changedServices,监听调度器初始化后,会触发一个线程消费阻塞队列的 数据,当注册服务发生变化时,将变更数据入队,就能唤醒线程更新dubbo内存中的服务列表了。上面已经聊到,nacos client会以1s的频次拉取注册的实例,当拉取到的实例和本地内存的 有出入时,就会触发入队操作,如:

com.alibaba.nacos.client.naming.core.HostReactor:296

public class UpdateTask implements Runnable {
        long lastRefTime = Long.MAX_VALUE;
        private String clusters;
        private String serviceName;
        public UpdateTask(String serviceName, String clusters) {
            this.serviceName = serviceName;
            this.clusters = clusters;
        }
        @Override
        public void run() {
            try {
                ServiceInfo serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters));
                if (serviceObj == null) {
                    updateServiceNow(serviceName, clusters);
                    executor.schedule(this, DEFAULT_DELAY, TimeUnit.MILLISECONDS);
                    return;
                }
                if (serviceObj.getLastRefTime() <= lastRefTime) {
                    updateServiceNow(serviceName, clusters);
                    serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters));
                } else {
                    // if serviceName already updated by push, we should not override it
                    // since the push data may be different from pull through force push
                    refreshOnly(serviceName, clusters);
                }
                executor.schedule(this, serviceObj.getCacheMillis(), TimeUnit.MILLISECONDS);
                lastRefTime = serviceObj.getLastRefTime();
            } catch (Throwable e) {
                NAMING_LOGGER.warn("[NA] failed to update serviceName: " + serviceName, e);
            }
        }
    }

DEFAULT_DELAY值为1s,同时,nacos也会主动的推送数据变更事件,当遇到nacos主动推送时,serviceInfoMap中的serviceObj会被更新,那么下次 nacos client拉取的时间间隔会被设置成10S之后,具体的和本地列表比对的逻辑都在updateServiceNow方法内,这里就不展开讲述了。

结语

dubbo注册服务到nacos以及订阅服务是一个比较复杂的过程,在剖析的过程中,带着疑问去看源码会有事半功倍的效果,比如博主在看源码前, 首先是为了寻找nacos的心跳异常,然后对nacos如何实现事件监听比较好奇。然后层层剖析渐进明朗恍然大悟。当然在剖析dubbo注册服务到nacos时,也需要了解 nacos服务端的处理逻辑,nacos服务端非常核心的两个类ClientBeatCheckTask、ClientBeatProcessor,包含了心跳处理、健康检测和事件推送的逻辑, 有兴趣可以看看

以上就是dubbo服务注册到nacos的过程剖析的详细内容,更多关于dubbo服务注册到nacos的资料请关注我们其它相关文章!

(0)

相关推荐

  • springboot与dubbo的版本匹配问题

    目录 springboot与dubbo的版本匹配 项目里原始版本 升级到2.6.7后错误日志 springboot+dubbo版本对应关系 背景 对应关系 springboot与dubbo的版本匹配 官方链接 参考: 项目里原始版本 springboot:2.1.6.RELEASE dubbo-spring-boot-starter : 0.2.1.RELEASE dubbo: 2.6.5 上面这么写是可以正常运行的.但是当把dubbo升级到2.6.7时候,却报错了,异常如下: 升级到2.6.7

  • Springcloud-nacos实现配置和注册中心的方法

    最近,阿里开源的nacos比较火,可以和springcloud和dubbo共用,对dubbo升级到springcloud非常的方便.这里学习一下他的配置和注册中心.我主要记录一下它的使用方式和踩得坑. nacos简单介绍 Nacos 致力于帮助您发现.配置和管理微服务.Nacos 提供了一组简单易用的特性集,帮助您快速实现动态服务发现.服务配置.服务元数据及流量管理. Nacos 帮助您更敏捷和容易地构建.交付和管理微服务平台. Nacos 是构建以"服务"为中心的现代应用架构 (例如

  • dubbo的配置文件详解(推荐)

    一.dubbo常用配置 <dubbo:service/> 服务配置,用于暴露一个服务,定义服务的元信息,一个服务可以用多个协议暴露,一个服务也可以注册到多个注册中心. eg.<dubbo:service ref="demoService" interface="com.unj.dubbotest.provider.DemoService" /> <dubbo:reference/> 引用服务配置,用于创建一个远程服务代理,一个引用

  • spring cloud alibaba Nacos 注册中心搭建过程详解

    这篇文章主要介绍了spring cloud alibaba Nacos 注册中心搭建过程详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 nacos下载地址 什么是 Nacos? nacos主要起到俩个作用一个是注册中心,另外一个是配置中心. 下面图 是nacos的功能结构图 运行环境 JDK 1.8+: Maven 3.2.x+: 下载 你可以通过源码和发行包两种方式来获取 Nacos. nacos发行包下载地址 选择版本解压 unzip

  • 升级dubbo2.7.4.1版本平滑迁移到注册中心nacos

    目录 前言 为什么升级到2.7.4.1? 为什么迁移注册中心到nacos? 两种升级方案 方案一:魔改官方的starter组件 注解兼容 配置兼容 方案二:直接使用官方的starter组件-最终采用的方案 第一步:引入maven依赖 第二步:改造相关的注解 第三步:修改dubbo的配置 平滑迁移到nacos注册中心 结语 前言 dubbo是一款非常优秀的服务治理型RPC框架,dubbo的优秀在于,庞大的架构体系.精湛的模块设计.灵活的SPI设计.丰富的组件实现,博主做微服务技术选型考察dubbo

  • dubbo服务注册到nacos的过程剖析

    目录 前言 简述过程 源码剖析具体实现 服务注册 服务订阅 结语 前言 前面聊到到了我们的dubbo服务从redis迁移到nacos注册中心,迁移后发现,会时不时的抛一个异常 ERROR com.alibaba.nacos.client.naming - [CLIENT-BEAT] failed to send beat:, 所以有了这个剖析过程,当然最后查明异常是我们的SLB网络映射问题,和nacos没有关系. dubbo版本:2.7.4.1 nacos client版本:1.0.0 naco

  • SpringCloud 服务注册和消费实现过程

    系统架构 在没有微服务之前有已经有跨服务调用了,比如ServiceB去调用ServiceA中的服务 , 传统模式可以直接在ServiceB中写ServiceA中的服务但是这样是写死了的,不够灵活. 下图就是传统的调用 微服务下的跨系统调用应该是这样的: 此时服务的调用应该是分两个步骤的: ServiceB 去服务中心拿到ServiceA的地址,如果ServiceA是单机部署,那么这个地址就只有一个,如果ServiceA是集群是集群环境部署,那么发现的地址就是多个. 拿到了ServiceA的地址后

  • SpringCloud 服务注册中的nacos实现过程

    如下图,org.springframework.cloud.spring-cloud-commons包下定义了一系列接口,其中就包括serviceregistry的系列规范,并通过SPI机制去调用接口实现. 在该包的META-INF/spring.factories文件中,可以找到EnableAutoConfiguration.class为key的value中有AutoServiceRegistrationAutoConfiguration.class这一项. 来看看这个AutoServiceR

  • 聊聊SpringBoot使用Nacos进行服务注册发现与配置管理问题

    前提 最近由于业务发展,需要调研一套完善和主流的基础架构,进行中台化(微服务)的实施,考虑到技术栈切换到SOFAStack.既然整个体系都切换到蚂蚁金服的技术栈,那么自然考虑一些基础组件如服务注册发现.配置管理等都切换为阿里的技术栈.考虑到目前比较热的服务发现组件是Nacos,需要调研SpringBoot服务接入Nacos的可行性,为以后强制要求新服务使用SOFAStack + Nacos的技术栈进行服务开发打下基础. Nacos简介 Nacos英文全称Dynamic Naming and Co

  • Nacos框架服务注册实现流程

    目录 rpc远程调用 微服务服务治理 分布式注册中心 Nacos 介绍 安装 基本使用 rpc远程调用 可能存在的问题 超时的问题. 安全的问题. 服务与服务之间URL地址管理. 在我们的微服务架构通讯,服务之间依赖关系非常大,如果通过传统的方式管理我们服务的 url 地址的情况下,-一旦地址发生变化的情况下,还需要人工修改rpc远程调用地址. 微服务服务治理 在RPC远程调用过程中,服务与服务之间依赖关系非常大,服务Url地址管理非常复杂,所以这时候需要对我们服务的url实现治理,通过服务治理

  • java开发分布式服务框架Dubbo服务引用过程详解

    目录 大致流程 服务引用策略 服务引用的三种方式 服务引入流程解析 总结 大致流程 Provider将服务暴露出来并且注册到注册中心,而Consumer通过注册中心获取Provider的信息,之后将自己封装成一个调用类去与Provider进行交互. 首先需要将所有调用转化为Dubbo中我们熟悉的Invoker,再通过代理类去远程获取服务. 大致流程如下: 服务引用策略 服务的引用和服务的暴露原理相似,都是Spring自定义标签机制解析生成对应的Bean,在之前服务暴露使用到的Provider S

  • dubbo服务使用redis注册中心的系列异常解决

    目录 前言 1.不支持带密码,设置indexdb的reids 解决方法: 二,集群容错模式异常 三,jedis连接池连接的坑 四,服务超过8个应用启动卡死 文末结语 前言 dubbo支持zookeeper,reids,multicast等注册中心注册服务信息,使用redis作为注册中心时,因为reids作为注册中心使用并不广泛,早期reids由于定位内网访问,使用密码验证也不怎么重视,导致框架本身设计缺陷,会有很多坑,如1.没有考虑到带密码验证的redis,2.集群容错模式判断错误 3.不可以设

  • Spring Cloud Alibaba Nacos服务治理平台,服务注册、RestTemplate实现微服务之间访问负载均衡访问的问题

    目录 Nacos简介 ☘Spring Cloud 组件依赖版本 ☘Nacos部署 ☘访问Nacos平台 Nacos服务注册.微服务访问.负载均衡实现 nacos-consumer微服务创建 ☘nacos-provider微服务创建 测试 Nacos简介 Github:https://github.com/alibaba/nacos官网文档:https://nacos.io/zh-cn/docs/what-is-nacos.htmlNacos 提供了发现.配置和管理微服务能力,能快速实现动态服务发

  • SpringCloud Eureka 服务注册实现过程

    一.将服务注册到Eureka 一个SpringBoot应用如果要注册到Spring Cloud环境(Greenwich.SR3版本),步骤很简单: pom.xml中添加启动器:spring-cloud-starter-netflix-eureka-client: 增加配置:eureka.client.serviceUrl.defaultZone: http://localhost:8100/eureka/: 启动应用: 如果注册中心正常,此时就能在注册中心发现这个应用了,如下图红框所示: 按照s

  • springCloud服务注册Eureka实现过程图解

    介绍 Eureka 是Netfix开发的,一个基于Rest服务的,服务注册与发现的组件. 主要包括两个组件:Eureka Server和Eureka Client Eureka Server:注册中心,提供服务注册与发现 Eureka Client:java客户端(通常就是微服务中的客户端和服务端) 上图简要描述了Eureka的基本架构,由3个角色组成: 1.Eureka Server(注册中心,相当于中介) 2.Service Provider(服务提供方,相当于房东) 3.Service C

随机推荐