Nacos客户端配置中心缓存动态更新实现源码

目录
  • 客户端配置缓存更新
  • 长轮训任务启动入口
  • ClientWorker
  • checkConfigInfo
  • LongPollingRunnable.run
    • checkLocalConfig
    • checkListenerMd5
  • 检查服务端配置
    • checkUpdateDataIds
    • checkUpdateConfigStr
  • 客户端缓存配置长轮训机制总结
  • 服务端配置更新的推送
  • doPollingConfig
  • addLongPollingClient
  • ClientLongPolling
    • allSubs
    • LongPollingService
    • DataChangeTask
  • 原理总结

Nacos 作为配置中心,当应用程序去访问Nacos动态获取配置源之后,会缓存到本地内存以及磁盘中。
由于Nacos作为动态配置中心,意味着后续配置变更之后需要让所有相关的客户端感知,并更新本地内存!

那么这个功能是在哪里实现的呢? 以及它是采用什么样的方式来实现配置的更新的呢? 我们一起来探索一下源码的实现!

客户端配置缓存更新

当客户端拿到配置后,需要动态刷新,从而保证数据和服务器端是一致的,这个过程是如何实现的呢?在这一小节中我们来做一个详细分析。

Nacos采用长轮训机制来实现数据变更的同步,原理如下!

整体工作流程如下:

  • 客户端发起长轮训请求
  • 服务端收到请求以后,先比较服务端缓存中的数据是否相同,如果不通,则直接返回
  • 如果相同,则通过schedule延迟29.5s之后再执行比较
  • 为了保证当服务端在29.5s之内发生数据变化能够及时通知给客户端,服务端采用事件订阅的方式来监听服务端本地数据变化的事件,一旦收到事件,则触发DataChangeTask的通知,并且遍历allStubs队列中的ClientLongPolling,把结果写回到客户端,就完成了一次数据的推送
  • 如果 DataChangeTask 任务完成了数据的 “推送” 之后,ClientLongPolling 中的调度任务又开始执行了怎么办呢?
    很简单,只要在进行 “推送” 操作之前,先将原来等待执行的调度任务取消掉就可以了,这样就防止了推送操作写完响应数据之后,调度任务又去写响应数据,这时肯定会报错的。所以,在ClientLongPolling方法中,最开始的一个步骤就是删除订阅事件

长轮训任务启动入口

在NacosConfigService的构造方法中,当这个类被实例化以后,有做一些事情

  • 初始化一个HttpAgent,这里又用到了装饰起模式,实际工作的类是ServerHttpAgent, MetricsHttpAgent内部也是调用了ServerHttpAgent的方法,增加了监控统计的信息
  • ClientWorker, 客户端的一个工作类,agent作为参数传入到clientworker,可以基本猜测到里面会用到agent做一些远程相关的事情
public NacosConfigService(Properties properties) throws NacosException {
    ValidatorUtils.checkInitParam(properties);
    String encodeTmp = properties.getProperty(PropertyKeyConst.ENCODE);
    if (StringUtils.isBlank(encodeTmp)) {
        this.encode = Constants.ENCODE;
    } else {
        this.encode = encodeTmp.trim();
    }
    initNamespace(properties); //
    this.configFilterChainManager = new ConfigFilterChainManager(properties);
    //初始化网络通信组件
    this.agent = new MetricsHttpAgent(new ServerHttpAgent(properties));
    this.agent.start();
    //初始化ClientWorker
    this.worker = new ClientWorker(this.agent, this.configFilterChainManager, properties);
}

ClientWorker

在上述初始化代码中,我们重点需要关注ClientWorker这个类,它的构造方法如下

public ClientWorker(final HttpAgent agent, final ConfigFilterChainManager configFilterChainManager,
        final Properties properties) {
    this.agent = agent;
    this.configFilterChainManager = configFilterChainManager; //初始化配置过滤管理器
    // Initialize the timeout parameter
    init(properties); //初始化配置
    //初始化一个定时调度的线程池,重写了threadfactory方法
    this.executor = Executors.newScheduledThreadPool(1, new ThreadFactory() {
        @Override
        public Thread newThread(Runnable r) {
            Thread t = new Thread(r);
            t.setName("com.alibaba.nacos.client.Worker." + agent.getName());
            t.setDaemon(true);
            return t;
        }
    });
     //初始化一个定时调度的线程池,从里面的name名字来看,似乎和长轮训有关系。而这个长轮训应该是和nacos服务端的长轮训
    this.executorService = Executors
            .newScheduledThreadPool(Runtime.getRuntime().availableProcessors(), new ThreadFactory() {
                @Override
                public Thread newThread(Runnable r) {
                    Thread t = new Thread(r);
                    t.setName("com.alibaba.nacos.client.Worker.longPolling." + agent.getName());
                    t.setDaemon(true);
                    return t;
                }
            });
    //设置定时任务的执行频率,并且调用checkConfigInfo这个方法,猜测是定时去检测配置是否发生了变化
        //首次执行延迟时间为1毫秒、延迟时间为10毫秒
    this.executor.scheduleWithFixedDelay(new Runnable() {
        @Override
        public void run() {
            try {
                checkConfigInfo();
            } catch (Throwable e) {
                LOGGER.error("[" + agent.getName() + "] [sub-check] rotate check error", e);
            }
        }
    }, 1L, 10L, TimeUnit.MILLISECONDS);
}

可以看到 ClientWorker 除了将 HttpAgent 维持在自己内部,还创建了两个线程池:

第一个线程池是只拥有一个线程用来执行定时任务的 executor,executor 每隔 10ms 就会执行一次 checkConfigInfo() 方法,从方法名上可以知道是每 10 ms 检查一次配置信息。

第二个线程池是一个普通的线程池,从 ThreadFactory 的名称可以看到这个线程池是做长轮询的。

checkConfigInfo

ClientWorker构造初始化中,启动了一个定时任务去执行checkConfigInfo()方法,这个方法主要是定时检查本地配置和服务器上的配置的变更情况,这个方法定义如下.

public void checkConfigInfo() {
    // Dispatch tasks.
    int listenerSize = cacheMap.size(); //
    // Round up the longingTaskCount.
     // 向上取整为批数,监听的配置数量除以3000,得到一个整数,代表长轮训任务的数量
    int longingTaskCount = (int) Math.ceil(listenerSize / ParamUtil.getPerTaskConfigSize());
     //currentLongingTaskCount表示当前的长轮训任务数量,如果小于计算的结果,则可以继续创建
    if (longingTaskCount > currentLongingTaskCount) {
        for (int i = (int) currentLongingTaskCount; i < longingTaskCount; i++) {
            // The task list is no order.So it maybe has issues when changing.
            executorService.execute(new LongPollingRunnable(i));
        }
        currentLongingTaskCount = longingTaskCount;
    }
}

这个方法主要的目的是用来检查服务端的配置信息是否发生了变化。如果有变化,则触发listener通知

cacheMap: AtomicReference<Map<String, CacheData>> cacheMap 用来存储监听变更的缓存集合。key是根据dataID/group/tenant(租户) 拼接的值。Value是对应存储在nacos服务器上的配置文件的内容。

默认情况下,每个长轮训LongPullingRunnable任务默认处理3000个监听配置集。如果超过3000, 则需要启动多个LongPollingRunnable去执行。

currentLongingTaskCount保存已启动的LongPullingRunnable任务数

executorService就是在ClientWorker构造方法中初始化的线程池

LongPollingRunnable.run

LongPollingRunnable长轮训任务的实现逻辑,代码比较长,我们分段来分析。

第一部分主要有两个逻辑

  • 对任务按照批次分类
  • 检查当前批次的缓存和本地文件的数据是否一致,如果发生了变化,则触发监听。
class LongPollingRunnable implements Runnable {
    private final int taskId; //表示当前任务批次id
    public LongPollingRunnable(int taskId) {
        this.taskId = taskId;
    }
    @Override
    public void run() {
        List<CacheData> cacheDatas = new ArrayList<CacheData>();
        List<String> inInitializingCacheList = new ArrayList<String>();
        try {
            // 遍历CacheMap,把CacheMap中和当前任务id相同的缓存,保存到cacheDatas
            // 通过checkLocalConfig方法
            for (CacheData cacheData : cacheMap.values()) {
                if (cacheData.getTaskId() == taskId) {
                    cacheDatas.add(cacheData);
                    try {
                        checkLocalConfig(cacheData);
                        if (cacheData.isUseLocalConfigInfo()) { //这里表示数据有变化,需要通知监听器
                            cacheData.checkListenerMd5(); //通知所有针对当前配置设置了监听的监听器
                        }
                    } catch (Exception e) {
                        LOGGER.error("get local config info error", e);
                    }
                }
            }
           //省略部分
        } catch (Throwable e) {
            // If the rotation training task is abnormal, the next execution time of the task will be punished
            LOGGER.error("longPolling error : ", e);
            executorService.schedule(this, taskPenaltyTime, TimeUnit.MILLISECONDS); //出现异常,到下一次taskPenaltyTime后重新执行任务
        }
    }
}

checkLocalConfig

检查本地配置,这里面有三种情况

  • 如果isUseLocalConfigInfo为false,表示不使用本地配置,但是本地缓存路径的文件是存在的,于是把isUseLocalConfigInfo设置为true,并且更新cacheData的内容以及文件的更新时间
  • 如果isUseLocalConfigInfo为true,表示使用本地配置文件,但是本地缓存文件不存在,则设置为false,不通知监听器。
  • 如果isUseLocalConfigInfo为true,并且本地缓存文件也存在,但是缓存的的时间和文件的更新时间不一致,则更新cacheData中的内容,并且isUseLocalConfigInfo设置为true。
private void checkLocalConfig(CacheData cacheData) {
    final String dataId = cacheData.dataId;
    final String group = cacheData.group;
    final String tenant = cacheData.tenant;
    File path = LocalConfigInfoProcessor.getFailoverFile(agent.getName(), dataId, group, tenant);
    // 没有 -> 有
    if (!cacheData.isUseLocalConfigInfo() && path.exists()) {
        String content = LocalConfigInfoProcessor.getFailover(agent.getName(), dataId, group, tenant);
        final String md5 = MD5Utils.md5Hex(content, Constants.ENCODE);
        cacheData.setUseLocalConfigInfo(true);
        cacheData.setLocalConfigInfoVersion(path.lastModified());
        cacheData.setContent(content);
        String encryptedDataKey = LocalEncryptedDataKeyProcessor
                .getEncryptDataKeyFailover(agent.getName(), dataId, group, tenant);
        cacheData.setEncryptedDataKey(encryptedDataKey);

        LOGGER.warn(
                "[{}] [failover-change] failover file created. dataId={}, group={}, tenant={}, md5={}, content={}",
                agent.getName(), dataId, group, tenant, md5, ContentUtils.truncateContent(content));
        return;
    }
     // 有 -> 没有。不通知业务监听器,从server拿到配置后通知。
    // If use local config info, then it doesn't notify business listener and notify after getting from server.
    if (cacheData.isUseLocalConfigInfo() && !path.exists()) {
        cacheData.setUseLocalConfigInfo(false);
        LOGGER.warn("[{}] [failover-change] failover file deleted. dataId={}, group={}, tenant={}", agent.getName(),
                dataId, group, tenant);
        return;
    }
     // 有变更
    if (cacheData.isUseLocalConfigInfo() && path.exists() && cacheData.getLocalConfigInfoVersion() != path
            .lastModified()) {
        String content = LocalConfigInfoProcessor.getFailover(agent.getName(), dataId, group, tenant);
        final String md5 = MD5Utils.md5Hex(content, Constants.ENCODE);
        cacheData.setUseLocalConfigInfo(true);
        cacheData.setLocalConfigInfoVersion(path.lastModified());
        cacheData.setContent(content);
        String encryptedDataKey = LocalEncryptedDataKeyProcessor
                .getEncryptDataKeyFailover(agent.getName(), dataId, group, tenant);
        cacheData.setEncryptedDataKey(encryptedDataKey);
        LOGGER.warn(
                "[{}] [failover-change] failover file changed. dataId={}, group={}, tenant={}, md5={}, content={}",
                agent.getName(), dataId, group, tenant, md5, ContentUtils.truncateContent(content));
    }
}

checkListenerMd5

遍历用户自己添加的监听器,如果发现数据的md5值不同,则发送通知

void checkListenerMd5() {
    for (ManagerListenerWrap wrap : listeners) {
        if (!md5.equals(wrap.lastCallMd5)) {
            safeNotifyListener(dataId, group, content, type, md5, wrap);
        }
    }
}

检查服务端配置

在LongPollingRunnable.run中,先通过本地配置的读取和检查来判断数据是否发生变化从而实现变化的通知

接着,当前的线程还需要去远程服务器上获得最新的数据,检查哪些数据发生了变化

  • 通过checkUpdateDataIds获取远程服务器上数据变更的dataid
  • 遍历这些变化的集合,然后调用getServerConfig从远程服务器获得对应的内容
  • 更新本地的cache,设置为服务器端返回的内容
  • 最后遍历cacheDatas,找到变化的数据进行通知
// check server config
//从服务端获取发生变化的数据的DataID列表,保存在List<String>集合中
List<String> changedGroupKeys = checkUpdateDataIds(cacheDatas, inInitializingCacheList);
if (!CollectionUtils.isEmpty(changedGroupKeys)) {
    LOGGER.info("get changedGroupKeys:" + changedGroupKeys);
}
//遍历发生了变更的配置项
for (String groupKey : changedGroupKeys) {
    String[] key = GroupKey.parseKey(groupKey);
    String dataId = key[0];
    String group = key[1];
    String tenant = null;
    if (key.length == 3) {
        tenant = key[2];
    }
    try {
        //逐项根据这些配置项获取配置信息
        ConfigResponse response = getServerConfig(dataId, group, tenant, 3000L);
        //把配置信息保存到CacheData中
        CacheData cache = cacheMap.get(GroupKey.getKeyTenant(dataId, group, tenant));
        cache.setContent(response.getContent());
        cache.setEncryptedDataKey(response.getEncryptedDataKey());
        if (null != response.getConfigType()) {
            cache.setType(response.getConfigType());
        }
        LOGGER.info("[{}] [data-received] dataId={}, group={}, tenant={}, md5={}, content={}, type={}",
                    agent.getName(), dataId, group, tenant, cache.getMd5(),
                    ContentUtils.truncateContent(response.getContent()), response.getConfigType());
    } catch (NacosException ioe) {
        String message = String
            .format("[%s] [get-update] get changed config exception. dataId=%s, group=%s, tenant=%s",
                    agent.getName(), dataId, group, tenant);
        LOGGER.error(message, ioe);
    }
}
//再遍历CacheData这个集合,找到发生变化的数据进行通知
for (CacheData cacheData : cacheDatas) {
    if (!cacheData.isInitializing() || inInitializingCacheList
        .contains(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant))) {
        cacheData.checkListenerMd5();
        cacheData.setInitializing(false);
    }
}
inInitializingCacheList.clear();
 //继续传递当前线程进行轮询
executorService.execute(this);

checkUpdateDataIds

这个方法主要是向服务器端发起检查请求,判断自己本地的配置和服务端的配置是否一致。

  • 首先从cacheDatas集合中找到isUseLocalConfigInfo为false的缓存
  • 把需要检查的配置项,拼接成一个字符串,调用checkUpdateConfigStr进行验证
/**
 * 从Server获取值变化了的DataID列表。返回的对象里只有dataId和group是有效的。 保证不返回NULL。
 */
List<String> checkUpdateDataIds(List<CacheData> cacheDatas, List<String> inInitializingCacheList) throws IOException {
    StringBuilder sb = new StringBuilder();
    for (CacheData cacheData : cacheDatas) { //把需要检查的配置项,拼接成一个字符串
        if (!cacheData.isUseLocalConfigInfo()) { //找到isUseLocalConfigInfo=false的缓存
            sb.append(cacheData.dataId).append(WORD_SEPARATOR);
            sb.append(cacheData.group).append(WORD_SEPARATOR);
            if (StringUtils.isBlank(cacheData.tenant)) {
                sb.append(cacheData.getMd5()).append(LINE_SEPARATOR);
            } else {
                sb.append(cacheData.getMd5()).append(WORD_SEPARATOR);
                sb.append(cacheData.getTenant()).append(LINE_SEPARATOR);
            }
            if (cacheData.isInitializing()) {//
                // cacheData 首次出现在cacheMap中&首次check更新
                inInitializingCacheList
                    .add(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant));
            }
        }
    }
    boolean isInitializingCacheList = !inInitializingCacheList.isEmpty();
    return checkUpdateConfigStr(sb.toString(), isInitializingCacheList);
}

checkUpdateConfigStr

从Server获取值变化了的DataID列表。返回的对象里只有dataId和group是有效的。 保证不返回NULL。

List<String> checkUpdateConfigStr(String probeUpdateString, boolean isInitializingCacheList) throws Exception {
    //拼接参数和header
    Map<String, String> params = new HashMap<String, String>(2);
    params.put(Constants.PROBE_MODIFY_REQUEST, probeUpdateString);
    Map<String, String> headers = new HashMap<String, String>(2);
    headers.put("Long-Pulling-Timeout", "" + timeout);
    // told server do not hang me up if new initializing cacheData added in
    if (isInitializingCacheList) {
        headers.put("Long-Pulling-Timeout-No-Hangup", "true");
    }
    if (StringUtils.isBlank(probeUpdateString)) {//判断可能发生变更的字符串是否为空,如果是,则直接返回。
        return Collections.emptyList();
    }
    try {
        // In order to prevent the server from handling the delay of the client's long task,
        // increase the client's read timeout to avoid this problem.
        // 设置readTimeoutMs,也就是本次请求等待响应的超时时间,默认是30s
        long readTimeoutMs = timeout + (long) Math.round(timeout >> 1);
        //发起远程调用
        HttpRestResult<String> result = agent
                .httpPost(Constants.CONFIG_CONTROLLER_PATH + "/listener", headers, params, agent.getEncode(),
                        readTimeoutMs);
        if (result.ok()) { //如果响应成功
            setHealthServer(true);
            return parseUpdateDataIdResponse(result.getData()); //解析并更新数据,返回的是确实发生了数据变更的字符串:tenant/group/dataid。
        } else {//如果响应失败
            setHealthServer(false);
            LOGGER.error("[{}] [check-update] get changed dataId error, code: {}", agent.getName(),
                    result.getCode());
        }
    } catch (Exception e) {
        setHealthServer(false);
        LOGGER.error("[" + agent.getName() + "] [check-update] get changed dataId exception", e);
        throw e;
    }
    return Collections.emptyList();
}

客户端缓存配置长轮训机制总结

整体实现的核心点就一下几个部分

对本地缓存的配置做任务拆分,每一个批次是3000条

针对每3000条创建一个线程去执行

先把每一个批次的缓存和本地磁盘文件中的数据进行比较,

  • 如果和本地配置不一致,则表示该缓存发生了更新,直接通知客户端监听
  • 如果本地缓存和磁盘数据一致,则需要发起远程请求检查配置变化

先以tenent/groupId/dataId拼接成字符串,发送到服务端进行检查,返回发生了变更的配置

客户端收到变更配置列表,再逐项遍历发送到服务端获取配置内容。

服务端配置更新的推送

分析完客户端之后,随着好奇心的驱使,服务端是如何处理客户端的请求的?那么同样,我们需要思考几个问题

  • 服务端是如何实现长轮训机制的
  • 客户端的超时时间为什么要设置30s

客户端发起的请求地址是:/v1/cs/configs/listener,于是找到这个接口进行查看,代码如下。

//# ConfigController.java
@PostMapping("/listener")
@Secured(action = ActionTypes.READ, parser = ConfigResourceParser.class)
public void listener(HttpServletRequest request, HttpServletResponse response)
        throws ServletException, IOException {
    request.setAttribute("org.apache.catalina.ASYNC_SUPPORTED", true);
    String probeModify = request.getParameter("Listening-Configs");
    if (StringUtils.isBlank(probeModify)) {
        throw new IllegalArgumentException("invalid probeModify");
    }
    probeModify = URLDecoder.decode(probeModify, Constants.ENCODE);
    Map<String, String> clientMd5Map;
    try {
        //解析客户端传递过来的可能发生变化的配置项目,转化为Map集合(key=dataId,value=md5)
        clientMd5Map = MD5Util.getClientMd5Map(probeModify);
    } catch (Throwable e) {
        throw new IllegalArgumentException("invalid probeModify");
    }
    // 开始执行长轮训。
    inner.doPollingConfig(request, response, clientMd5Map, probeModify.length());
}

doPollingConfig

这个方法主要是用来做长轮训和短轮询的判断

  • 如果是长轮训,直接走addLongPollingClient方法
  • 如果是短轮询,直接比较服务端的数据,如果存在md5不一致,直接把数据返回。
public String doPollingConfig(HttpServletRequest request, HttpServletResponse response,
        Map<String, String> clientMd5Map, int probeRequestSize) throws IOException {
    // 判断当前请求是否支持长轮训。()
    if (LongPollingService.isSupportLongPolling(request)) {
        longPollingService.addLongPollingClient(request, response, clientMd5Map, probeRequestSize);
        return HttpServletResponse.SC_OK + "";
    }
    //如果是短轮询,走下面的请求,下面的请求就是把客户端传过来的数据和服务端的数据逐项进行比较,保存到changeGroups中。
    // Compatible with short polling logic.
    List<String> changedGroups = MD5Util.compareMd5(request, response, clientMd5Map);
    // Compatible with short polling result.
    String oldResult = MD5Util.compareMd5OldResult(changedGroups);
    String newResult = MD5Util.compareMd5ResultString(changedGroups);

    String version = request.getHeader(Constants.CLIENT_VERSION_HEADER);
    if (version == null) {
        version = "2.0.0";
    }
    int versionNum = Protocol.getVersionNumber(version);
    // Before 2.0.4 version, return value is put into header.
    if (versionNum < START_LONG_POLLING_VERSION_NUM) {
        response.addHeader(Constants.PROBE_MODIFY_RESPONSE, oldResult);
        response.addHeader(Constants.PROBE_MODIFY_RESPONSE_NEW, newResult);
    } else {
        request.setAttribute("content", newResult);
    }
    Loggers.AUTH.info("new content:" + newResult);
    // Disable cache.
    response.setHeader("Pragma", "no-cache");
    response.setDateHeader("Expires", 0);
    response.setHeader("Cache-Control", "no-cache,no-store");
    response.setStatus(HttpServletResponse.SC_OK);
    return HttpServletResponse.SC_OK + "";
}

addLongPollingClient

把客户端的请求,保存到长轮训的执行引擎中。

public void addLongPollingClient(HttpServletRequest req, HttpServletResponse rsp, Map<String, String> clientMd5Map,
        int probeRequestSize) {
    //获取客户端长轮训的超时时间
    String str = req.getHeader(LongPollingService.LONG_POLLING_HEADER);
    //不允许断开的标记
    String noHangUpFlag = req.getHeader(LongPollingService.LONG_POLLING_NO_HANG_UP_HEADER);
    //应用名称
    String appName = req.getHeader(RequestUtil.CLIENT_APPNAME_HEADER);
    //
    String tag = req.getHeader("Vipserver-Tag");
    //延期时间,默认为500ms
    int delayTime = SwitchService.getSwitchInteger(SwitchService.FIXED_DELAY_TIME, 500);

    // Add delay time for LoadBalance, and one response is returned 500 ms in advance to avoid client timeout.
    // 提前500ms返回一个响应,避免客户端出现超时
    long timeout = Math.max(10000, Long.parseLong(str) - delayTime);
    if (isFixedPolling()) {
        timeout = Math.max(10000, getFixedPollingInterval());
        // Do nothing but set fix polling timeout.
    } else {
        long start = System.currentTimeMillis();
        //通过md5判断客户端请求过来的key是否有和服务器端有不一致的,如果有,则保存到changedGroups中。
        List<String> changedGroups = MD5Util.compareMd5(req, rsp, clientMd5Map);
        if (changedGroups.size() > 0) { //如果发现有变更,则直接把请求返回给客户端
            generateResponse(req, rsp, changedGroups);
            LogUtil.CLIENT_LOG.info("{}|{}|{}|{}|{}|{}|{}", System.currentTimeMillis() - start, "instant",
                    RequestUtil.getRemoteIp(req), "polling", clientMd5Map.size(), probeRequestSize,
                    changedGroups.size());
            return;
        } else if (noHangUpFlag != null && noHangUpFlag.equalsIgnoreCase(TRUE_STR)) { //如果noHangUpFlag为true,说明不需要挂起客户端,所以直接返回。
            LogUtil.CLIENT_LOG.info("{}|{}|{}|{}|{}|{}|{}", System.currentTimeMillis() - start, "nohangup",
                    RequestUtil.getRemoteIp(req), "polling", clientMd5Map.size(), probeRequestSize,
                    changedGroups.size());
            return;
        }
    }
    //获取请求端的ip
    String ip = RequestUtil.getRemoteIp(req);
    // Must be called by http thread, or send response.
    //把当前请求转化为一个异步请求(意味着此时tomcat线程被释放,也就是客户端的请求,需要通过asyncContext来手动触发返回,否则一直挂起)
    final AsyncContext asyncContext = req.startAsync();
    // AsyncContext.setTimeout() is incorrect, Control by oneself
    asyncContext.setTimeout(0L); //设置异步请求超时时间,
    //执行长轮训请求
    ConfigExecutor.executeLongPolling(
            new ClientLongPolling(asyncContext, clientMd5Map, ip, probeRequestSize, timeout, appName, tag));
}

ClientLongPolling

接下来我们来分析一下,clientLongPolling到底做了什么操作。或者说我们可以先猜测一下应该会做什么事情

  • 这个任务要阻塞29.5s才能执行,因为立马执行没有任何意义,毕竟前面已经执行过一次了
  • 如果在29.5s+之内,数据发生变化,需要提前通知。需要有一种监控机制

基于这些猜想,我们可以看看它的实现过程

从代码粗粒度来看,它的实现似乎和我们的猜想一致,在run方法中,通过scheduler.schedule实现了一个定时任务,它的delay时间正好是前面计算的29.5s。在这个任务中,会通过MD5Util.compareMd5来进行计算

那另外一个,当数据发生变化以后,肯定不能等到29.5s之后才通知呀,那怎么办呢?我们发现有一个allSubs的东西,它似乎和发布订阅有关系。那是不是有可能当前的clientLongPolling订阅了数据变化的事件呢?

class ClientLongPolling implements Runnable {
    @Override
    public void run() {
        //构建一个异步任务,延后29.5s执行
        asyncTimeoutFuture = ConfigExecutor.scheduleLongPolling(new Runnable() {
            @Override
            public void run() { //如果达到29.5s,说明这个期间没有做任何配置修改,则自动触发执行
                try {
                    getRetainIps().put(ClientLongPolling.this.ip, System.currentTimeMillis());
                    // Delete subsciber's relations.
                    allSubs.remove(ClientLongPolling.this); //移除订阅关系
                    if (isFixedPolling()) { //如果是固定间隔的长轮训
                        LogUtil.CLIENT_LOG
                                .info("{}|{}|{}|{}|{}|{}", (System.currentTimeMillis() - createTime), "fix",
                                        RequestUtil.getRemoteIp((HttpServletRequest) asyncContext.getRequest()),
                                        "polling", clientMd5Map.size(), probeRequestSize);
                        //比较变更的key
                        List<String> changedGroups = MD5Util
                                .compareMd5((HttpServletRequest) asyncContext.getRequest(),
                                        (HttpServletResponse) asyncContext.getResponse(), clientMd5Map);
                        if (changedGroups.size() > 0) {//如果大于0,表示有变更,直接响应
                            sendResponse(changedGroups);
                        } else {
                            sendResponse(null); //否则返回null
                        }
                    } else {
                        LogUtil.CLIENT_LOG
                                .info("{}|{}|{}|{}|{}|{}", (System.currentTimeMillis() - createTime), "timeout",
                                        RequestUtil.getRemoteIp((HttpServletRequest) asyncContext.getRequest()),
                                        "polling", clientMd5Map.size(), probeRequestSize);
                        sendResponse(null);
                    }
                } catch (Throwable t) {
                    LogUtil.DEFAULT_LOG.error("long polling error:" + t.getMessage(), t.getCause());
                }
            }
        }, timeoutTime, TimeUnit.MILLISECONDS);
        allSubs.add(this);  //把当前线程添加到订阅事件队列中
    }
}

allSubs

allSubs是一个队列,队列里面放了ClientLongPolling这个对象。这个队列似乎和配置变更有某种关联关系。

那么这里必须要实现的是,当用户在nacos 控制台修改了配置之后,必须要从这个订阅关系中取出关注的客户端长连接,然后把变更的结果返回。于是我们去看LongPollingService的构造方法查找订阅关系

/**
 * 长轮询订阅关系
 */
final Queue<ClientLongPolling> allSubs;
allSubs.add(this);

LongPollingService

在LongPollingService的构造方法中,使用了一个NotifyCenter订阅了一个事件,其中不难发现,如果这个事件的实例是LocalDataChangeEvent,也就是服务端数据发生变更的时间,就会执行一个DataChangeTask的线程。

public LongPollingService() {
    allSubs = new ConcurrentLinkedQueue<ClientLongPolling>();
    ConfigExecutor.scheduleLongPolling(new StatTask(), 0L, 10L, TimeUnit.SECONDS);
    // Register LocalDataChangeEvent to NotifyCenter.
    NotifyCenter.registerToPublisher(LocalDataChangeEvent.class, NotifyCenter.ringBufferSize);
    //注册LocalDataChangeEvent订阅事件
    NotifyCenter.registerSubscriber(new Subscriber() {
        @Override
        public void onEvent(Event event) {
            if (isFixedPolling()) {
                // Ignore.
            } else {
                if (event instanceof LocalDataChangeEvent) { //如果触发了LocalDataChangeEvent,则执行下面的代码
                    LocalDataChangeEvent evt = (LocalDataChangeEvent) event;
                    ConfigExecutor.executeLongPolling(new DataChangeTask(evt.groupKey, evt.isBeta, evt.betaIps));
                }
            }
        }
        @Override
        public Class<? extends Event> subscribeType() {
            return LocalDataChangeEvent.class;
        }
    });
}

DataChangeTask

数据变更事件线程,代码如下

class DataChangeTask implements Runnable {
    @Override
    public void run() {
        try {
            ConfigCacheService.getContentBetaMd5(groupKey); //
            //遍历所有订阅事件表
            for (Iterator<ClientLongPolling> iter = allSubs.iterator(); iter.hasNext(); ) {
                ClientLongPolling clientSub = iter.next(); //得到ClientLongPolling
                //判断当前的ClientLongPolling中,请求的key是否包含当前修改的groupKey
                if (clientSub.clientMd5Map.containsKey(groupKey)) {
                    // If published tag is not in the beta list, then it skipped.
                    if (isBeta && !CollectionUtils.contains(betaIps, clientSub.ip)) { //如果是beta方式且betaIps不包含当前客户端ip,直接返回
                        continue;
                    }
                    // If published tag is not in the tag list, then it skipped.
                    if (StringUtils.isNotBlank(tag) && !tag.equals(clientSub.tag)) {//如果配置了tag标签且不包含当前客户端的tag,直接返回
                        continue;
                    }
					//
                    getRetainIps().put(clientSub.ip, System.currentTimeMillis());
                    iter.remove(); // Delete subscribers' relationships. 移除当前客户端的订阅关系
                    LogUtil.CLIENT_LOG
                            .info("{}|{}|{}|{}|{}|{}|{}", (System.currentTimeMillis() - changeTime), "in-advance",
                                    RequestUtil
                                            .getRemoteIp((HttpServletRequest) clientSub.asyncContext.getRequest()),
                                    "polling", clientSub.clientMd5Map.size(), clientSub.probeRequestSize, groupKey);
                    clientSub.sendResponse(Arrays.asList(groupKey)); //响应客户端请求。
                }
            }
        } catch (Throwable t) {
            LogUtil.DEFAULT_LOG.error("data change error: {}", ExceptionUtil.getStackTrace(t));
        }
    }
}

原理总结

以上就是Nacos客户端配置中心缓存动态更新实现源码的详细内容,更多关于Nacos客户端配置中心缓存动态更新的资料请关注我们其它相关文章!

(0)

相关推荐

  • Nacos源码阅读方法

    为什么我会经常阅读源码呢,因为阅读源码能让你更加接近大佬,哈哈,这是我瞎扯的. 这篇文章将会带大家阅读Nacos源码 以及 教大家阅读源码的技巧,我们正式开始吧! 先给大家献上一张我梳理的高清源码图,方便大家对nacos的源码有一个整体上的认识. 有了这张图,我们就很容易去看nacos源码了. 如何找切入点 首先我们得要找一个切入点进入到nacos源码中,那么就从nacos依赖入手 <dependency> <groupId>com.alibaba.cloud</groupI

  • Nacos配置中心集群原理及源码分析

    目录 Nacos集群工作原理 配置变更同步入口 AsyncNotifyService AsyncTask 目标节点接收请求 NacosDelayTaskExecuteEngine ProcessRunnable processTasks DumpProcessor.process Nacos作为配置中心,必然需要保证服务节点的高可用性,那么Nacos是如何实现集群的呢? 下面这个图,表示Nacos集群的部署图. Nacos集群工作原理 Nacos作为配置中心的集群结构中,是一种无中心化节点的设计

  • Docker安装Nacos服务的实现

    目录 拉取镜像 查看镜像 启动容器中的nacos 查看启动日志 修改Nacos的配置 拉取镜像 docker pull nacos/nacos-server 查看镜像 docker images 启动容器中的nacos docker run --env MODE=standalone --name mynacos -d -p 8848:8848 docker.io/nacos/nacos-server –name:设置容器的名称. -d:表示在后台运行容器. -p:指定端口映射.左边的8848是

  • nacos客户端一致性hash负载需求实现

    目录 1.首先创建NacosClient,监听对应的服务: 2.创建Nodelistener,主要处理构建hash环等逻辑: 3.负载均衡器: 思考: 最近接到一个需求,由于文件服务器上传文件后,不同节点之间共享文件需要延迟,上游上传文件后立刻去下载,如果负载到其他节点上可能会找不到文件,所以使用文件服务器接入nacos根据相同的trace_id路由到一个节点上,这样保证上传后立刻下载的请求都能路由到同一个节点上,研究了两天nacos原生的client发现并没有提供相关功能,于是便产生了一个想法

  • nacos配置注册中心时指定命名空间不起作用的问题

    目录 nacos配置注册中心时指定命名空间不起作用 因为命名空间不适用于nacos 0.8 nacos命名空间的使用 我们为什么要用命名空间呢 创建命名空间 最终读取 nacos配置注册中心时指定命名空间不起作用 因为命名空间不适用于nacos 0.8 Nacos客户端服务注册不划分命名空间,默认注册位于公共命名空间 下. 他被阿里设计的起初原因是:namespace主要是进行业务隔离的,多个业务或者部门可以用一个Nacos,但是数据都相互屏蔽. 因此并不是做服务和开发环境隔离的,目前没看到官方

  • Nginx实现Nacos反向代理的项目实践

    目录 1.win10安装Nginx 1.1 windows系统启动和停止的命令 2.win10安装nacos 2.1 搭建三台nacos步骤 1.win10安装Nginx nginx下载地址 nginx: download 下载后解压,进入bin目录,根据你的系统执行相应的命令 1.1 windows系统启动和停止的命令 启动 start nginx.exe 终止 nginx.exe -s stop //停止nginx nginx.exe -s reload //重新加载nginx nginx.

  • Nacos客户端配置中心缓存动态更新实现源码

    目录 客户端配置缓存更新 长轮训任务启动入口 ClientWorker checkConfigInfo LongPollingRunnable.run checkLocalConfig checkListenerMd5 检查服务端配置 checkUpdateDataIds checkUpdateConfigStr 客户端缓存配置长轮训机制总结 服务端配置更新的推送 doPollingConfig addLongPollingClient ClientLongPolling allSubs Lon

  • SpringCloud Nacos作为配置中心超详细讲解

    目录 前言 Nacos配置模型 配置介绍 实际演练 前言 在单体架构的时候我们可以将配置写在配置文件中,但有⼀个缺点就是每次修改配置都需要重启服务才能生效. 当应用程序实例比较少的时候还可以维护.如果转向微服务架构有成百上千个实例,每修改⼀次配置要将全部实例重启,不仅增加了系统的不稳定性,也提高了维护的成本. 那么如何能够做到服务不重启就可以修改配置?所有就产生了四个基础诉求: 需要支持动态修改配置 需要动态变更有多实时 变更快了之后如何管控控制变更风险,如灰度.回滚等 敏感配置如何做安全配置

  • SpringCloud安装Nacos完成配置中心

    目录 1. Nacos介绍 2. docker安装Nacos 2.1 docker-compose.yaml 2.2 启动后访问控制台 3.Springboot集成Nacos 3.1 pom依赖 3.2 yaml配置 3.3 测试配置动态化 3.4 测试日志 eureka注册中心 1. Nacos介绍 官网说明:Nacos 致力于帮助您发现.配置和管理微服务.Nacos 提供了一组简单易用的特性集,帮助您快速实现动态服务发现.服务配置.服务元数据及流量管理. 2. docker安装Nacos 基

  • linux下SVN配置实现项目目录自动更新以及源码安装的操作方法

    配置钩子文件自动更新 开发环境提交更新至服务器时会出现每次在服务器端项目目录下必须手动更新SVN up才可以访问最新更新,通过钩子文件配置 则可以实现自动更新 新建文件: vim /usr/local/svn/demo/hooks/post-commit 添加如下文字: #!/bin/sh export LANG=en_US.UTF-8 /usr/bin/svn update /var/www/html/demo --username dev1 --password 123456 再添加post

  • Mango Cache缓存管理库TinyLFU源码解析

    目录 介绍 整体架构 初始化流程 读流程 写流程 事件处理机制 主流程 write 清理工作 缓存管理 什么是LRU? 什么是SLRU? 什么是TinyLFU? mango Cache中的TinyLFU counter counter的初始化 counter的使用 lruCache slruCache filter TinyLFU的初始化 TinyLFU写入 TinyLFU访问 增加entry的访问次数 估计entry访问次数 总结 介绍 据官方所述,mango Cache是对Guava Cac

  • Postgres中UPDATE更新语句源码分析

    目录 PG中UPDATE源码分析 整体流程分析 解析部分——生成语法解析树UpdateStmt 解析部分——生成查询树Query 优化器——生成执行计划 执行器 事务 总结 PG中UPDATE源码分析 本文主要描述SQL中UPDATE语句的源码分析,代码为PG13.3版本. 整体流程分析 以update dtea set id = 1;这条最简单的Update语句进行源码分析(dtea不是分区表,不考虑并行等,没有建立任何索引),帮助我们理解update的大致流程. SQL流程如下: parse

  • Spring Cloud Alibaba Nacos Config配置中心实现

    什么是 Nacos Config 在分布式系统中,由于服务数量巨多,为了方便服务 配置文件统一管理,实时更新,所以需要分布式配置中心组件. Spring Cloud Alibaba Nacos Config 是 Spring Cloud Config 的替代方案. Nacos Config 的存储配置功能为分布式系统中的外部化配置提供服务器端和客户端支持,可以在 Nacos 中集中管理 Spring Cloud 应用的外部属性配置. 引入依赖 在 pom.xml 中添加 spring-cloud

  • 浅谈Vuejs中nextTick()异步更新队列源码解析

    vue官网关于此解释说明如下: vue2.0里面的深入响应式原理的异步更新队列 官网说明如下: 只要观察到数据变化,Vue 将开启一个队列,并缓冲在同一事件循环中发生的所有数据改变.如果同一个 watcher 被多次触发,只会一次推入到队列中.这种在缓冲时去除重复数据对于避免不必要的计算和 DOM 操作上非常重要.然后,在下一个的事件循环"tick"中,Vue 刷新队列并执行实际(已去重的)工作.Vue 在内部尝试对异步队列使用原生的 Promise.then 和 MutationOb

  • CentOS 6.3 安装配置Apache2.2.6的方法(源码编译安装)

    安装说明 安装环境:CentOS-6.3 安装方式:源码编译安装 软件:httpd-2.2.6.tar.gz | pcre-8.32.tar.gz | apr-1.4.6.tar.gz | apr-util-1.5.1.tar.gz 下载地址:http://mirror.bjtu.edu.cn/apache/httpd/ http://apr.apache.org/download.cgi http://jaist.dl.sourceforge.net/project/pcre/pcre 安装位

随机推荐