Nacos配置中心集群原理及源码分析

目录
  • Nacos集群工作原理
  • 配置变更同步入口
  • AsyncNotifyService
  • AsyncTask
  • 目标节点接收请求
  • NacosDelayTaskExecuteEngine
  • ProcessRunnable
  • processTasks
  • DumpProcessor.process

Nacos作为配置中心,必然需要保证服务节点的高可用性,那么Nacos是如何实现集群的呢?

下面这个图,表示Nacos集群的部署图。

Nacos集群工作原理

Nacos作为配置中心的集群结构中,是一种无中心化节点的设计,由于没有主从节点,也没有选举机制,所以为了能够实现热备,就需要增加虚拟IP(VIP)。

Nacos的数据存储分为两部分

  • Mysql数据库存储,所有Nacos节点共享同一份数据,数据的副本机制由Mysql本身的主从方案来解决,从而保证数据的可靠性。
  • 每个节点的本地磁盘,会保存一份全量数据,具体路径:/data/program/nacos-1/data/config-data/${GROUP}.

在Nacos的设计中,Mysql是一个中心数据仓库,且认为在Mysql中的数据是绝对正确的。 除此之外,Nacos在启动时会把Mysql中的数据写一份到本地磁盘。

这么设计的好处是可以提高性能,当客户端需要请求某个配置项时,服务端会想Ian从磁盘中读取对应文件返回,而磁盘的读取效率要比数据库效率高。

当配置发生变更时:

  • Nacos会把变更的配置保存到数据库,然后再写入本地文件。
  • 接着发送一个HTTP请求,给到集群中的其他节点,其他节点收到事件后,从Mysql中dump刚刚写入的数据到本地文件中。

另外,NacosServer启动后,会同步启动一个定时任务,每隔6小时,会dump一次全量数据到本地文件

配置变更同步入口

当配置发生修改、删除、新增操作时,通过发布一个notifyConfigChange事件。

@PostMapping
@Secured(action = ActionTypes.WRITE, parser = ConfigResourceParser.class)
public Boolean publishConfig(HttpServletRequest request, HttpServletResponse response,
        @RequestParam(value = "dataId") String dataId, @RequestParam(value = "group") String group,
        @RequestParam(value = "tenant", required = false, defaultValue = StringUtils.EMPTY) String tenant,
        @RequestParam(value = "content") String content, @RequestParam(value = "tag", required = false) String tag,
        @RequestParam(value = "appName", required = false) String appName,
        @RequestParam(value = "src_user", required = false) String srcUser,
        @RequestParam(value = "config_tags", required = false) String configTags,
        @RequestParam(value = "desc", required = false) String desc,
        @RequestParam(value = "use", required = false) String use,
        @RequestParam(value = "effect", required = false) String effect,
        @RequestParam(value = "type", required = false) String type,
        @RequestParam(value = "schema", required = false) String schema) throws NacosException {
   //省略..
    if (StringUtils.isBlank(betaIps)) {
        if (StringUtils.isBlank(tag)) {
            persistService.insertOrUpdate(srcIp, srcUser, configInfo, time, configAdvanceInfo, true);
            ConfigChangePublisher
                    .notifyConfigChange(new ConfigDataChangeEvent(false, dataId, group, tenant, time.getTime()));
        } else {
            persistService.insertOrUpdateTag(configInfo, tag, srcIp, srcUser, time, true);
            ConfigChangePublisher.notifyConfigChange(
                    new ConfigDataChangeEvent(false, dataId, group, tenant, tag, time.getTime()));
        }
    }//省略
    return true;
}

AsyncNotifyService

配置数据变更事件,专门有一个监听器AsyncNotifyService,它会处理数据变更后的同步事件。

@Autowired
public AsyncNotifyService(ServerMemberManager memberManager) {
    this.memberManager = memberManager;
    // Register ConfigDataChangeEvent to NotifyCenter.
    NotifyCenter.registerToPublisher(ConfigDataChangeEvent.class, NotifyCenter.ringBufferSize);
    // Register A Subscriber to subscribe ConfigDataChangeEvent.
    NotifyCenter.registerSubscriber(new Subscriber() {
        @Override
        public void onEvent(Event event) {
            // Generate ConfigDataChangeEvent concurrently
            if (event instanceof ConfigDataChangeEvent) {
                ConfigDataChangeEvent evt = (ConfigDataChangeEvent) event;
                long dumpTs = evt.lastModifiedTs;
                String dataId = evt.dataId;
                String group = evt.group;
                String tenant = evt.tenant;
                String tag = evt.tag;
                Collection<Member> ipList = memberManager.allMembers(); //得到集群中的ip列表
                // 构建NotifySingleTask,并添加到队列中。
                Queue<NotifySingleTask> queue = new LinkedList<NotifySingleTask>();
                for (Member member : ipList) { //遍历集群中的每个节点
                    queue.add(new NotifySingleTask(dataId, group, tenant, tag, dumpTs, member.getAddress(),
                            evt.isBeta));
                }
                //异步执行任务 AsyncTask
                ConfigExecutor.executeAsyncNotify(new AsyncTask(nacosAsyncRestTemplate, queue));
            }
        }
        @Override
        public Class<? extends Event> subscribeType() {
            return ConfigDataChangeEvent.class;
        }
    });
}

AsyncTask

@Override
public void run() {
    executeAsyncInvoke();
}
private void executeAsyncInvoke() {
    while (!queue.isEmpty()) {//遍历队列中的数据,直到数据为空
        NotifySingleTask task = queue.poll(); //获取task
        String targetIp = task.getTargetIP(); //获取目标ip

        if (memberManager.hasMember(targetIp)) { //如果集群中的ip列表包含目标ip
            // start the health check and there are ips that are not monitored, put them directly in the notification queue, otherwise notify
            //判断目标ip的健康状态
            boolean unHealthNeedDelay = memberManager.isUnHealth(targetIp); //
            if (unHealthNeedDelay) { //如果目标服务是非健康,则继续添加到队列中,延后再执行。
                // target ip is unhealthy, then put it in the notification list
                ConfigTraceService.logNotifyEvent(task.getDataId(), task.getGroup(), task.getTenant(), null,
                        task.getLastModified(), InetUtils.getSelfIP(), ConfigTraceService.NOTIFY_EVENT_UNHEALTH,
                        0, task.target);
                // get delay time and set fail count to the task
                asyncTaskExecute(task);
            } else {
                //构建header
                Header header = Header.newInstance();
                header.addParam(NotifyService.NOTIFY_HEADER_LAST_MODIFIED, String.valueOf(task.getLastModified()));
                header.addParam(NotifyService.NOTIFY_HEADER_OP_HANDLE_IP, InetUtils.getSelfIP());
                if (task.isBeta) {
                    header.addParam("isBeta", "true");
                }
                AuthHeaderUtil.addIdentityToHeader(header);
                //通过restTemplate发起远程调用,如果调用成功,则执行AsyncNotifyCallBack的回调方法
                restTemplate.get(task.url, header, Query.EMPTY, String.class, new AsyncNotifyCallBack(task));
            }
        }
    }
}

目标节点接收请求

数据同步的请求地址为,task.url=http://192.168.8.16:8848/nacos/v1/cs/communication/dataChange?dataId=log.yaml&group=DEFAULT_GROUP

@GetMapping("/dataChange")
public Boolean notifyConfigInfo(HttpServletRequest request, @RequestParam("dataId") String dataId,
        @RequestParam("group") String group,
        @RequestParam(value = "tenant", required = false, defaultValue = StringUtils.EMPTY) String tenant,
        @RequestParam(value = "tag", required = false) String tag) {
    dataId = dataId.trim();
    group = group.trim();
    String lastModified = request.getHeader(NotifyService.NOTIFY_HEADER_LAST_MODIFIED);
    long lastModifiedTs = StringUtils.isEmpty(lastModified) ? -1 : Long.parseLong(lastModified);
    String handleIp = request.getHeader(NotifyService.NOTIFY_HEADER_OP_HANDLE_IP);
    String isBetaStr = request.getHeader("isBeta");
    if (StringUtils.isNotBlank(isBetaStr) && trueStr.equals(isBetaStr)) {
        dumpService.dump(dataId, group, tenant, lastModifiedTs, handleIp, true);
    } else {
        //
        dumpService.dump(dataId, group, tenant, tag, lastModifiedTs, handleIp);
    }
    return true;
}

dumpService.dump用来实现配置的更新,代码如下

当前任务会被添加到DumpTaskMgr中管理。

public void dump(String dataId, String group, String tenant, String tag, long lastModified, String handleIp,
        boolean isBeta) {
    String groupKey = GroupKey2.getKey(dataId, group, tenant);
    String taskKey = String.join("+", dataId, group, tenant, String.valueOf(isBeta), tag);
    dumpTaskMgr.addTask(taskKey, new DumpTask(groupKey, tag, lastModified, handleIp, isBeta));
    DUMP_LOG.info("[dump-task] add task. groupKey={}, taskKey={}", groupKey, taskKey);
}

TaskManager.addTask, 先调用父类去完成任务添加。

@Override
public void addTask(Object key, AbstractDelayTask newTask) {
    super.addTask(key, newTask);
    MetricsMonitor.getDumpTaskMonitor().set(tasks.size());
}

在这种场景设计中,一般都会采用生产者消费者模式来完成,因此这里不难猜测到,任务会被保存到一个队列中,然后有另外一个线程来执行。

NacosDelayTaskExecuteEngine

TaskManager的父类是NacosDelayTaskExecuteEngine,

这个类中有一个成员属性protected final ConcurrentHashMap<Object, AbstractDelayTask> tasks;,专门来保存延期执行的任务类型AbstractDelayTask.

在这个类的构造方法中,初始化了一个延期执行的任务,其中具体的任务是ProcessRunnable.

public NacosDelayTaskExecuteEngine(String name, int initCapacity, Logger logger, long processInterval) {
    super(logger);
    tasks = new ConcurrentHashMap<Object, AbstractDelayTask>(initCapacity);
    processingExecutor = ExecutorFactory.newSingleScheduledExecutorService(new NameThreadFactory(name));
    processingExecutor
            .scheduleWithFixedDelay(new ProcessRunnable(), processInterval, processInterval, TimeUnit.MILLISECONDS);
}

ProcessRunnable

private class ProcessRunnable implements Runnable {
    @Override
    public void run() {
        try {
            processTasks();
        } catch (Throwable e) {
            getEngineLog().error(e.toString(), e);
        }
    }
}

processTasks

protected void processTasks() {
    //获取所有的任务
    Collection<Object> keys = getAllTaskKeys();
    for (Object taskKey : keys) {
        AbstractDelayTask task = removeTask(taskKey);
        if (null == task) {
            continue;
        }
        //获取任务处理器,这里返回的是DumpProcessor
        NacosTaskProcessor processor = getProcessor(taskKey);
        if (null == processor) {
            getEngineLog().error("processor not found for task, so discarded. " + task);
            continue;
        }
        try {
            // ReAdd task if process failed
            //执行具体任务
            if (!processor.process(task)) {
                retryFailedTask(taskKey, task);
            }
        } catch (Throwable e) {
            getEngineLog().error("Nacos task execute error : " + e.toString(), e);
            retryFailedTask(taskKey, task);
        }
    }
}

DumpProcessor.process

读取数据库的最新数据,然后更新本地缓存和磁盘

以上就是Nacos配置中心集群原理及源码分析的详细内容,更多关于Nacos配置中心集群原理的资料请关注我们其它相关文章!

(0)

相关推荐

  • Nacos注册中心的部署与用法示例详解

    目录 一.什么是注册中心: 1.什么是注册中心: 2.注册中心的核心功能: 3.注册中心解决的问题: 4.服务的发现与注册的实现模式: 5.服务注册表: 二.主流服务注册中心的对比: 三.Nacos 注册中心的部署与使用: 1.Nacos 注册中心的搭建: 1.1.Windows 环境: 1.2.Linux 环境: 2.SpringBoot 整合 Nacos 进行服务注册发现: 2.1.创建服务提供者 cloud-producer-server: 2.2.创建服务消费者 cloud-consum

  • Nacos源码阅读方法

    为什么我会经常阅读源码呢,因为阅读源码能让你更加接近大佬,哈哈,这是我瞎扯的. 这篇文章将会带大家阅读Nacos源码 以及 教大家阅读源码的技巧,我们正式开始吧! 先给大家献上一张我梳理的高清源码图,方便大家对nacos的源码有一个整体上的认识. 有了这张图,我们就很容易去看nacos源码了. 如何找切入点 首先我们得要找一个切入点进入到nacos源码中,那么就从nacos依赖入手 <dependency> <groupId>com.alibaba.cloud</groupI

  • nacos配置注册中心时指定命名空间不起作用的问题

    目录 nacos配置注册中心时指定命名空间不起作用 因为命名空间不适用于nacos 0.8 nacos命名空间的使用 我们为什么要用命名空间呢 创建命名空间 最终读取 nacos配置注册中心时指定命名空间不起作用 因为命名空间不适用于nacos 0.8 Nacos客户端服务注册不划分命名空间,默认注册位于公共命名空间 下. 他被阿里设计的起初原因是:namespace主要是进行业务隔离的,多个业务或者部门可以用一个Nacos,但是数据都相互屏蔽. 因此并不是做服务和开发环境隔离的,目前没看到官方

  • nacos配置中心远程调用读取不到配置文件的解决

    目录 nacos配置中心远程调用读取不到配置文件 下面引用官方文档的一段内容 读取不到nacos配置中心内容的坑 这是出错的配置 这是正确的配置 nacos配置中心远程调用读取不到配置文件 1.由于公司要求,对nacos做了命名空间的配置.(这个开始我步入了迷途....) 为了更好的区分配置文件,我们创建了两个命名空间,dev和test,在学习的时候,我们并未创建这些命名空间,所以一直正常使用,创建命名空间后我们无法读取到配置文件. 那么添加命名空间后需要在配置文件中支持命名空间,类似你创建了g

  • Nginx实现Nacos反向代理的项目实践

    目录 1.win10安装Nginx 1.1 windows系统启动和停止的命令 2.win10安装nacos 2.1 搭建三台nacos步骤 1.win10安装Nginx nginx下载地址 nginx: download 下载后解压,进入bin目录,根据你的系统执行相应的命令 1.1 windows系统启动和停止的命令 启动 start nginx.exe 终止 nginx.exe -s stop //停止nginx nginx.exe -s reload //重新加载nginx nginx.

  • Nacos配置中心集群原理及源码分析

    目录 Nacos集群工作原理 配置变更同步入口 AsyncNotifyService AsyncTask 目标节点接收请求 NacosDelayTaskExecuteEngine ProcessRunnable processTasks DumpProcessor.process Nacos作为配置中心,必然需要保证服务节点的高可用性,那么Nacos是如何实现集群的呢? 下面这个图,表示Nacos集群的部署图. Nacos集群工作原理 Nacos作为配置中心的集群结构中,是一种无中心化节点的设计

  • Flink部署集群整体架构源码分析

    目录 概览 部署模式 Application mode 客户端提交请求 服务端启动&提交Application session mode Cluster架构 Cluster的启动流程 DispatcherResourceManagerComponent Runner代码 HA代码框架 总结 概览 本篇我们来了解Flink的部署模式和Flink集群的整体架构 部署模式 Flink支持如下三种运行模式 运行模式 描述 Application Mode Flink Cluster只执行提交的整个job

  • Nacos客户端配置中心缓存动态更新实现源码

    目录 客户端配置缓存更新 长轮训任务启动入口 ClientWorker checkConfigInfo LongPollingRunnable.run checkLocalConfig checkListenerMd5 检查服务端配置 checkUpdateDataIds checkUpdateConfigStr 客户端缓存配置长轮训机制总结 服务端配置更新的推送 doPollingConfig addLongPollingClient ClientLongPolling allSubs Lon

  • 深入理解框架背后的原理及源码分析

    目录 问题1 问题2 总结 近期团队中同学遇到几个问题,想在这儿跟大家分享一波,虽说不是很有难度,但是背后也折射出一些问题,值得思考. 开始之前先简单介绍一下我所在团队的技术栈,基于这个背景再展开后面将提到的几个问题,将会有更深刻的体会. 控制层基于SpringMvc,数据持久层基于JdbcTemplate自己封装了一套类MyBatis的Dao框架,视图层基于Velocity模板技术,其余组件基于SpringCloud全家桶. 问题1 某应用发布以后开始报数据库连接池不够用异常,日志如下: co

  • java编程Reference核心原理示例源码分析

    带着问题,看源码针对性会更强一点.印象会更深刻.并且效果也会更好.所以我先卖个关子,提两个问题(没准下次跳槽时就被问到). 我们可以用ByteBuffer的allocateDirect方法,申请一块堆外内存创建一个DirectByteBuffer对象,然后利用它去操作堆外内存.这些申请完的堆外内存,我们可以回收吗?可以的话是通过什么样的机制回收的? 大家应该都知道WeakHashMap可以用来实现内存相对敏感的本地缓存,为什么WeakHashMap合适这种业务场景,其内部实现会做什么特殊处理呢?

  • java并发容器CopyOnWriteArrayList实现原理及源码分析

    CopyOnWriteArrayList是Java并发包中提供的一个并发容器,它是个线程安全且读操作无锁的ArrayList,写操作则通过创建底层数组的新副本来实现,是一种读写分离的并发策略,我们也可以称这种容器为"写时复制器",Java并发包中类似的容器还有CopyOnWriteSet.本文会对CopyOnWriteArrayList的实现原理及源码进行分析. 实现原理 我们都知道,集合框架中的ArrayList是非线程安全的,Vector虽是线程安全的,但由于简单粗暴的锁同步机制,

  • SpringBoot2入门自动配置原理及源码分析

    目录 SpringBoot自动配置 一.@SpringBootApplication 1. @SpringBootConfiguration 2. @ComponentScan 3. @EnableAutoConfiguration 二.自动配置示例 1. 未生效的自动配置 2. 生效的自动配置 三.小结 SpringBoot自动配置 之前为什么会去了解一些底层注解,其实就是为了后续更好的了解 springboot 底层的一些原理,比如自动配置原理. 一.@SpringBootApplicati

  • 阿里面试Nacos配置中心交互模型是push还是pull原理解析

    目录 引言 配置中心 长轮询 Nacos初识 几个概念 客户端源码分析 结束语 引言 对于Nacos大家应该都不太陌生,出身阿里名声在外,能做动态服务发现.配置管理,非常好用的一个工具.然而这样的技术用的人越多面试被问的概率也就越大,如果只停留在使用层面,那面试可能要吃大亏. 比如我们今天要讨论的话题,Nacos在做配置中心的时候,配置数据的交互模式是服务端推过来还是客户端主动拉的? 这里我先抛出答案:客户端主动拉的! 接下来咱们扒一扒Nacos的源码,来看看它具体是如何实现的? 配置中心 聊N

  • Java并发编程之ReentrantLock实现原理及源码剖析

    目录 一.ReentrantLock简介 二.ReentrantLock使用 三.ReentrantLock源码分析 1.非公平锁源码分析 2.公平锁源码分析 前面<Java并发编程之JUC并发核心AQS同步队列原理剖析>介绍了AQS的同步等待队列的实现原理及源码分析,这节我们将介绍一下基于AQS实现的ReentranLock的应用.特性.实现原理及源码分析. 一.ReentrantLock简介 ReentrantLock位于Java的juc包里面,从JDK1.5开始出现,是基于AQS同步队列

  • 深入解析spring AOP原理及源码

    目录 @EnableAspectJAutoProxy 找切面 代理对象的创建 代理方法的执行 ExposeInvocationInterceptor#invoke 环绕通知的执行 前置通知的执行 后置通知的执行 返回后通知的执行 异常通知的执行 @EnableAspectJAutoProxy @EnableAspectJAutoProxy注解用于开启AOP功能,那么这个注解底层到底做了什么呢? 查看@EnableAspectJAutoProxy的源码,发现它使用@Import注解向Spring容

随机推荐