SpringCloud微服务剔除下线功能实现原理分析

目录
  • 一、前言
  • 二、微服务剔除下线源码解析
    • 1、EurekaBootStrap#contextInitialized()
      • 1.1、初始化注册中心上下文
      • 1.2、openForTraffic()逻辑
      • 1.3、postInit()执行任务
      • 1.4、剔除任务
    • 2、服务剔除下线
      • 2.1、AbstractInstanceRegistry#evict()逻辑
      • 2.1、判断是否过期
      • 2.2、从本地列表异常下线处理

一、前言

上一篇SpringCloud微服务续约源码解析已经分析了心跳机制是什么、底层实现、客户端发送心跳的主要代码、注册中心处理心跳的过程,这节跟它是紧密关联的。联系的枢纽就是lastUpdateTimestamp最后更新时间戳,它是Lease租约类的一个用volatile关键字修饰的对其他线程透明可见的字段。那么Eureka是如何使用该字段判断服务是否过期的?然后进行服务的剔除下线?需要借助什么机制?该机制是什么时候能触发的?带着这些问题,我们下面来探究一番:

二、微服务剔除下线源码解析

EurekaBootStrap是Eureka项目里面的,用于启动Eureka服务器的类:

Eureka 服务器使用类路径中eureka.server.props指定的EurekaServerConfig进行配置。Eureka客户端组件也是通过使用eureka.client.props指定的配置 EurekaInstanceConfig初始化的。如果服务器在AWS云中运行,则eureka服务器将其绑定到指定的弹性ip。

1、EurekaBootStrap#contextInitialized()

    @Override
    public void contextInitialized(ServletContextEvent event) {
        try {
            initEurekaEnvironment();
            // 初始化注册中心上下文
            initEurekaServerContext();
            ServletContext sc = event.getServletContext();
            sc.setAttribute(EurekaServerContext.class.getName(), serverContext);
        } catch (Throwable e) {
            logger.error("Cannot bootstrap eureka server :", e);
            throw new RuntimeException("Cannot bootstrap eureka server :", e);
        }
    }

它这里也使用了事件机制,但是不是基于Spring的,感兴趣的可以去了解下。初始化注册中心上下文,即下面的处理逻辑:

1.1、初始化注册中心上下文

    protected void initEurekaServerContext() throws Exception {
        EurekaServerConfig eurekaServerConfig = new DefaultEurekaServerConfig();
        // For backward compatibility
        JsonXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(), XStream.PRIORITY_VERY_HIGH);
        XmlXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(), XStream.PRIORITY_VERY_HIGH);
        logger.info("Initializing the eureka client...");
        logger.info(eurekaServerConfig.getJsonCodecName());
        ServerCodecs serverCodecs = new DefaultServerCodecs(eurekaServerConfig);
        ApplicationInfoManager applicationInfoManager = null;
        if (eurekaClient == null) {
            EurekaInstanceConfig instanceConfig = isCloud(ConfigurationManager.getDeploymentContext())
                    ? new CloudInstanceConfig()
                    : new MyDataCenterInstanceConfig();
            applicationInfoManager = new ApplicationInfoManager(
                    instanceConfig, new EurekaConfigBasedInstanceInfoProvider(instanceConfig).get());
            EurekaClientConfig eurekaClientConfig = new DefaultEurekaClientConfig();
            eurekaClient = new DiscoveryClient(applicationInfoManager, eurekaClientConfig);
        } else {
            applicationInfoManager = eurekaClient.getApplicationInfoManager();
        }
        PeerAwareInstanceRegistry registry;
        if (isAws(applicationInfoManager.getInfo())) {
            registry = new AwsInstanceRegistry(
                    eurekaServerConfig,
                    eurekaClient.getEurekaClientConfig(),
                    serverCodecs,
                    eurekaClient
            );
            awsBinder = new AwsBinderDelegate(eurekaServerConfig, eurekaClient.getEurekaClientConfig(), registry, applicationInfoManager);
            awsBinder.start();
        } else {
            registry = new PeerAwareInstanceRegistryImpl(
                    eurekaServerConfig,
                    eurekaClient.getEurekaClientConfig(),
                    serverCodecs,
                    eurekaClient
            );
        }
        PeerEurekaNodes peerEurekaNodes = getPeerEurekaNodes(
                registry,
                eurekaServerConfig,
                eurekaClient.getEurekaClientConfig(),
                serverCodecs,
                applicationInfoManager
        );
        serverContext = new DefaultEurekaServerContext(
                eurekaServerConfig,
                serverCodecs,
                registry,
                peerEurekaNodes,
                applicationInfoManager
        );
        EurekaServerContextHolder.initialize(serverContext);
        serverContext.initialize();
        logger.info("Initialized server context");
        // Copy registry from neighboring eureka node
        int registryCount = registry.syncUp();
        registry.openForTraffic(applicationInfoManager, registryCount);
        // Register all monitoring statistics.
        EurekaMonitors.registerAllStats();
    }

做一些初始化工作,重点关注registry.openForTraffic(applicationInfoManager, registryCount);的调用,进入下面处理逻辑:

1.2、openForTraffic()逻辑

    @Override
    public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) {
        // Renewals happen every 30 seconds and for a minute it should be a factor of 2.
        // 更新每30秒发生一次,一分钟应该是2倍。
        this.expectedNumberOfClientsSendingRenews = count;
        updateRenewsPerMinThreshold();
        logger.info("Got {} instances from neighboring DS node", count);
        logger.info("Renew threshold is: {}", numberOfRenewsPerMinThreshold);
        this.startupTime = System.currentTimeMillis();
        if (count > 0) {
            this.peerInstancesTransferEmptyOnStartup = false;
        }
        DataCenterInfo.Name selfName = applicationInfoManager.getInfo().getDataCenterInfo().getName();
        boolean isAws = Name.Amazon == selfName;
        if (isAws && serverConfig.shouldPrimeAwsReplicaConnections()) {
            logger.info("Priming AWS connections for all replicas..");
            primeAwsReplicas(applicationInfoManager);
        }
        // 更改服务实例状态为UP
        logger.info("Changing status to UP");
        applicationInfoManager.setInstanceStatus(InstanceStatus.UP);
        // 调用父类初始化
        super.postInit();
    }

更改服务实例状态为UP,调用父类初始化。

1.3、postInit()执行任务

    protected void postInit() {
        renewsLastMin.start();
        if (evictionTaskRef.get() != null) {
            evictionTaskRef.get().cancel();
        }
        evictionTaskRef.set(new EvictionTask());
        evictionTimer.schedule(evictionTaskRef.get(),
                serverConfig.getEvictionIntervalTimerInMs(),
                serverConfig.getEvictionIntervalTimerInMs());
    }

终于来到剔除任务了,前面说了什么,就是一些初始化的工作。它这里的执行器是Timer,跟Nacos不一样,区别的话感兴趣的就自行去搞个明白。我们进入下面的分析:

1.4、剔除任务

EvictionTask是TimerTask类型任务。

class EvictionTask extends TimerTask {
        private final AtomicLong lastExecutionNanosRef = new AtomicLong(0l);
        @Override
        public void run() {
            try {
                long compensationTimeMs = getCompensationTimeMs();
                logger.info("Running the evict task with compensationTime {}ms", compensationTimeMs);
                evict(compensationTimeMs);
            } catch (Throwable e) {
                logger.error("Could not run the evict task", e);
            }
        }
        /**
         * 计算一个补偿时间,该时间定义为自上一次迭代以来该任务的实际执行时间,与配置的执行时间量相比较。
         * 这对于时间变化(例如由于时钟偏差或 gc)导致实际的驱逐任务根据配置的周期在所需时间之后执行的情况
         * 非常有用。
         */
        long getCompensationTimeMs() {
            long currNanos = getCurrentTimeNano();
            long lastNanos = lastExecutionNanosRef.getAndSet(currNanos);
            if (lastNanos == 0l) {
                return 0l;
            }
            long elapsedMs = TimeUnit.NANOSECONDS.toMillis(currNanos - lastNanos);
            long compensationTime = elapsedMs - serverConfig.getEvictionIntervalTimerInMs();
            return compensationTime <= 0l ? 0l : compensationTime;
        }
        long getCurrentTimeNano() {  // for testing
            return System.nanoTime();
        }
    }

主要逻辑:

计算一个补偿时间,该时间定义为自上一次迭代以来该任务的实际执行时间,与配置的执行时间量相比较。这对于时间变化(例如由于时钟偏差或 gc)导致实际的驱逐任务根据配置的周期在所需时间之后执行的情况非常有用。

调用evict(compensationTimeMs)剔除处理,下面分析:

2、服务剔除下线

2.1、AbstractInstanceRegistry#evict()逻辑

public void evict(long additionalLeaseMs) {
        logger.debug("Running the evict task");
        if (!isLeaseExpirationEnabled()) {
            // DS: 租约到期目前已禁用。
            logger.debug("DS: lease expiration is currently disabled.");
            return;
        }
        // 我们首先收集所有过期的物品,以随机的顺序驱逐它们。对于大型驱逐集,如果我们不这样做,
        // 我们可能会在自我保护启动之前删除整个应用程序。通过随机化,影响应该均匀地分布在所有应用程序中。
        List<Lease<InstanceInfo>> expiredLeases = new ArrayList<>();
        for (Entry<String, Map<String, Lease<InstanceInfo>>> groupEntry : registry.entrySet()) {
            Map<String, Lease<InstanceInfo>> leaseMap = groupEntry.getValue();
            if (leaseMap != null) {
                for (Entry<String, Lease<InstanceInfo>> leaseEntry : leaseMap.entrySet()) {
                    Lease<InstanceInfo> lease = leaseEntry.getValue();
                    // 判断租约是否过期
                    if (lease.isExpired(additionalLeaseMs) && lease.getHolder() != null) {
                        // 添加到过期续租集合
                        expiredLeases.add(lease);
                    }
                }
            }
        }
        // To compensate for GC pauses or drifting local time, we need to use current registry size as a base for
        // triggering self-preservation. Without that we would wipe out full registry.
        // 为了补偿 GC 暂停或漂移的本地时间,我们需要使用当前的注册表大小作为触发自我保存的基础。
        // 没有这个,我们就会清除整个注册表。
        // 获取注册表租约总数
        int registrySize = (int) getLocalRegistrySize();
        // 计算注册表租约的阈值 (总数乘以 续租百分比 默认85%),得出要续租的数量
        int registrySizeThreshold = (int) (registrySize * serverConfig.getRenewalPercentThreshold());
        // 理论要剔除的数量 = 总数-要续租的数量
        int evictionLimit = registrySize - registrySizeThreshold;
        // 实际剔除的数量 =  min(实际租期到期服务实例个数,理论剔除数量)
        int toEvict = Math.min(expiredLeases.size(), evictionLimit);
        // 将要剔除数量大于0,把它们下线处理,从本地注册表移除掉以保证高可用
        if (toEvict > 0) {
            logger.info("Evicting {} items (expired={}, evictionLimit={})", toEvict, expiredLeases.size(), evictionLimit);
            Random random = new Random(System.currentTimeMillis());
            for (int i = 0; i < toEvict; i++) {
                // 选择一个随机的项目(Knuth 洗牌算法)
                int next = i + random.nextInt(expiredLeases.size() - i);
                Collections.swap(expiredLeases, i, next);
                Lease<InstanceInfo> lease = expiredLeases.get(i);

                String appName = lease.getHolder().getAppName();
                String id = lease.getHolder().getId();
                EXPIRED.increment();
                // 注册表: {}/{}的租约已过期
                logger.warn("DS: Registry: expired lease for {}/{}", appName, id);
                // 服务下线
                internalCancel(appName, id, false);
            }
        }
    }

主要逻辑:

  • 判断租约到期是否禁用,如果禁用return。默认启用
  • 首先收集所有过期的租约,以随机的顺序剔除它们。对于大型剔除集,如果不这样做,可能会在自我保护启动之前删除整个应用程序。通过随机化,影响应该均匀地分布在所有应用程序中。判断租约是否过期,如果过期添加到过期租约集合,继续遍历到。
  • 为了补偿 GC 暂停或漂移的本地时间,需要使用当前的注册表大小作为触发自我保存的基础。没有这个,就会清除整个注册表。1)获取注册表租约总数;2)计算注册表租约的阈值 (总数乘以 续租百分比 默认85%),得出要续租的数量;3)理论要剔除的数量 = 总数-要续租的数量;4)实际剔除的数量 = min(实际租期到期服务实例个数,理论剔除数量);
  • 将要剔除数量大于0,把它们下线处理,从本地注册表移除掉以保证高可用:选择一个随机的项目(Knuth 洗牌算法),调用internalCancel(appName, id, false)下线处理。

2.1、判断是否过期

    public boolean isExpired(long additionalLeaseMs) {
        return (evictionTimestamp > 0 || System.currentTimeMillis() > (lastUpdateTimestamp + duration + additionalLeaseMs));
    }

如果是cancel()处理前面的值就大于0,一般是判断后面部分逻辑:如果当前系统时间戳小于后面的时间戳之和,则没有过期;否则大于就是过期了。

duration的值也可以通过配置文件更改,通过yml配置文件中eureka:instance:lease-expiration-duration-in-seconds:指定,不过必须大于eureka:instance:lease-renewal-interval-in-seconds默认值或指定值。设置duration太长可能意味着即使实例不存在,流量也可能被路由到该实例。将此值设置得太小可能意味着,由于临时网络故障,该实例可能会从流量中删除。因此duration的值要设置为至少高于eureka:instance:lease-renewal-interval-in-seconds中默认的或指定的值。

2.2、从本地列表异常下线处理

cancel(String,String,boolean)方法被PeerAwareInstanceRegistry重写了,因此每个取消请求都被复制到对等点。然而,对于在远程对等点中被视为有效取消的过期,这是不需要的,因此自我保存模式不会启用。

    protected boolean internalCancel(String appName, String id, boolean isReplication) {
        // 加锁
        read.lock();
        try {
            CANCEL.increment(isReplication);
            // 根据appName从本地注册表获取租约服务实例
            Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
            Lease<InstanceInfo> leaseToCancel = null;
            if (gMap != null) {
                // 根据唯一ID从本地移除服务实例,下线
                leaseToCancel = gMap.remove(id);
            }
            recentCanceledQueue.add(new Pair<Long, String>(System.currentTimeMillis(), appName + "(" + id + ")"));
            InstanceStatus instanceStatus = overriddenInstanceStatusMap.remove(id);
            if (instanceStatus != null) {
                logger.debug("Removed instance id {} from the overridden map which has value {}", id, instanceStatus.name());
            }
            if (leaseToCancel == null) {
                // 下线失败,因为租约信息中不存在该服务实例
                CANCEL_NOT_FOUND.increment(isReplication);
                logger.warn("DS: Registry: cancel failed because Lease is not registered for: {}/{}", appName, id);
                return false;
            } else {
                // 通过更新剔除时间取消租约。
                leaseToCancel.cancel();
                // 从租约获取服务实例
                InstanceInfo instanceInfo = leaseToCancel.getHolder();
                String vip = null;
                String svip = null;
                if (instanceInfo != null) {
                    instanceInfo.setActionType(ActionType.DELETED);
                    recentlyChangedQueue.add(new RecentlyChangedItem(leaseToCancel));
                    instanceInfo.setLastUpdatedTimestamp();
                    vip = instanceInfo.getVIPAddress();
                    svip = instanceInfo.getSecureVipAddress();
                }
                // 使特定应用程序的缓存失效
                invalidateCache(appName, vip, svip);
                logger.info("Cancelled instance {}/{} (replication={})", appName, id, isReplication);
            }
        } finally {
            // 释放锁
            read.unlock();
        }
        synchronized (lock) {
            if (this.expectedNumberOfClientsSendingRenews > 0) {
                // Since the client wants to cancel it, reduce the number of clients to send renews.
                this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews - 1;
                updateRenewsPerMinThreshold();
            }
        }
        return true;
    }

主要逻辑:

  • 获取锁后,根据appName从本地注册表获取租约服务实例
  • 根据唯一ID从本地移除服务实例,下线
  • 如果需下线租约信息为空,则下线失败,因为租约信息中不存在该服务实例,return假;否则可能通过更新剔除时间取消租约,从租约获取服务实例以便使特定应用程序的缓存失效
  • 释放锁

到此这篇关于SpringCloud微服务剔除下线功能实现原理分析的文章就介绍到这了,更多相关SpringCloud微服务剔除下线内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • SpringCloud微服务续约实现源码分析详解

    目录 一.前言 二.客户端续约 1.入口 构造初始化 initScheduledTasks()调度执行心跳任务 2.TimedSupervisorTask组件 构造初始化 TimedSupervisorTask#run()任务逻辑 3.心跳任务 HeartbeatThread私有内部类 发送心跳 4.发送心跳到注册中心 构建请求数据发送心跳 三.服务端处理客户端续约 1.InstanceRegistry#renew()逻辑 2.PeerAwareInstanceRegistryImpl#rene

  • SpringCloud微服务中跨域配置的方法详解

    跨域,指的是浏览器不能执行其他网站的脚本.它是由浏览器的同源策略造成的,是浏览器对 javascript 施加的安全限制. 同源策略,指的是协议,域名,端口都要相同,其中有一个不同都会产生跨域. 跨域相关含义: Access-Control-Allow-Origin:服务器允许请求的源: Access-Control-Allow-Headers: 服务器允许使用的头: Access-Control-Allow-Methods: 真实请求允许的方法: Access-Control-Allow-Cr

  • SpringCloud超详细讲解微服务网关Zuul基础

    目录 一.Zuul的简介 1.Zuul是怎么工作的 2.Zuul能干嘛 二.Zuul的使用 1.配置Pom.xml 2.配置Application.yml 3.撰写启动类 4.效果图 三.学会SpringCloud的感触 一.Zuul的简介 1.Zuul是怎么工作的 Zull包含了对请求的路由(用来跳转的)和过滤两个最主要功能: 其中路由功能负责将外部请求转发到具体的微服务实例上,是实现外部访问统一入口的基础,而过滤器功能则负责对请求的处理过程进行干预,是实现请求校验,服务聚合等功能的基础.Zu

  • SpringCloud微服务的调用与远程调用测试示例

    目录 微服务相关简述 微服务架构中的重要角色 微服务架构 分布式远程调用 RESTFUL接口 资源 表现层 状态转化 RPC CAP原理 springCloud简述 服务注册与发现 负载均衡 熔断 链路追踪 API网关 测试模拟远程调用 创建父工程remote_call模块 创建子模块server_a 创建子模块server_b 微服务相关简述 微服务架构中的重要角色 服务调用者:可以暂时认为是与用户交互的角色(因为存在微服务之间的调用),可以根据该用户的类型将其赋予不同的服务调用权限,通过一次

  • SpringCloud微服务架构实战之微服务治理功能的实现

    微服务治理 Spring Cloud 工具套件为微服务治理提供了全面的技术支持.这些治理工具主要包括服务的注册与发现.负载均衡管理.动态路由.服务降级和故障转移.链路跟踪.服务监控等.微服务治理的主要功能组件如下: 注册管理服务组件Eureka,提供服务注册和发现的功能. 负载均衡服务组件Ribbon,提供负载均衡调度管理的功能. 边缘代理服务组件Zuul,提供网关服务和动态路由的功能. 断路器组件Hystrix,提供容错机制.服务降级.故障转移等功能. 聚合服务事件流组件Turbine,可用来

  • SpringCloud微服务基础简介

    一.什么是Spring Cloud? SpringCloud 对常见的分布式系统模式提供了简单易用的编程模型,帮助开发者构建弹性.可靠.协调的应用程序. SpringCloud 是在SpringBoot的基础上构建的,使开发者可以轻松入门并快速提高工作效率. SpringCloud 提供了一套微服务解决方案,包括服务注册与发现,配置中心,全链路监控,服务网关,负载均衡,熔断器等组件,除了基于NetFlix的开源组件做高度抽象封装之外,还有一些选型中立的开源组件. SpringCloud 为开发人

  • SpringCloud微服务之Hystrix组件实现服务熔断的方法

    一.熔断器简介 微服务架构特点就是多服务,多数据源,支撑系统应用.这样导致微服务之间存在依赖关系.如果其中一个服务故障,可能导致系统宕机,这就是所谓的雪崩效应. 1.服务熔断 微服务架构中某个微服务发生故障时,要快速切断服务,提示用户,后续请求,不调用该服务,直接返回,释放资源,这就是服务熔断. 熔断生效后,会在指定的时间后调用请求来测试依赖是否恢复,依赖的应用恢复后关闭熔断. 2.服务降级 服务器高并发下,压力剧增的时候,根据当业务情况以及流量,对一些服务和页面有策略的降级(可以理解为关闭不必

  • springcloud微服务之Eureka配置详解

    Eureka注册中心/服务发现框架 Eureka是Netflix开发的服务发现框架,本身是一个基于REST的服务,主要用于定位运行在AWS域中的中间层服务,以达到负载均衡和中间层服务故障转移的目的.SpringCloud将它集成在其子项目spring-cloud-netflix中,以实现SpringCloud的服务发现功能. Eureka包含两个组件:Eureka Server和Eureka Client. Eureka Server提供服务注册服务,各个节点启动后,会在Eureka Serve

  • SpringCloud 微服务最佳开发实践

    现在基于SpringCloud的微服务开发日益流行,网上各种开源项目层出不穷.我们在实际工作中可以参考开源项目实现很多开箱即用的功能,但是必须要遵守一定的约定和规范. 本文结合我们实际的开发中遇到的一些问题整理出了一份微服务开发的实践规范,欢迎各位大佬拍砖指点. Maven规范 1.所有项目必须要有一个统一的parent模块 所有微服务工程都依赖这个parent,parent用于管理依赖版本,maven仓库,jar版本的统一升级维护 在parent下层可以有 core,starter,rate-

  • SpringCloud 微服务数据权限控制的实现

    目录 一. 整体架构 二. 实现流程 三. 实现步骤 1. 注解实现 2. 注解使用 3. 实现AuthStoreSupplier 4. 实现AuthQuerySupplier 5. 开启数据权限 四. 综述 五.源代码 举个例子: 有一批业务员跟进全国的销售订单.他们被按城市进行划分,一个业务员跟进3个城市的订单,为了保护公司的业务数据不能被所有人都掌握,故每个业务员只能看到自己负责城市的订单数据.所以从系统来讲每个业务员都有访问销售订单的功能,然后再需要配置每个业务员负责的城市,以此对订单数

  • 教你创建springcloud微服务的基础子服务的超详细过程

    目录 一.创建父项目 1. 选择Spring Initializr 2. Type选择为Maven POM,Java Version选择为8 3. 勾选一些基本的依赖,如lombok和spring-boot-starter-web 4. 创建好了的父项目结构如下: 二.创建二级项目 1. 选择新建Module 2. 选择Maven 3. 填写Name 4. 修改openfeign pom.xml文件,并删除掉该项目下的src文件 5. 创建完成的项目结构如下 三.创建子项目feign-provi

  • SpringCloud微服务应用config配置中心详解

    目录 前言 一.传统应用配置痛点 二.Config 配置中心介绍 三.服务端Config Server搭建 1.pom依赖 2.application启动类配置 3.application.yml配置 4.test-dev.xml(客户端应读取的配置) 5.项目结构 四.客户端Config Client搭建 1.pom依赖 2.application启动类配置 3.bootstrap.yml配置 4.application.yml配置 5.测试controller 6.项目结构 五.动态刷新 六

  • SpringCloud微服务熔断器Hystrix使用详解

    目录 什么是Hystrix Hystrix实战 总结 什么是Hystrix 在日常生活用电中,如果我们的电路中正确地安置了保险丝,那么在电压异常升高时,保险丝就会熔断以便切断电流,从而起到保护电路安全运行的作用. 在货船中,为了防止漏水和火灾的扩散,一般会将货仓进行分割,避免了一个货仓出事导致整艘船沉没的悲剧,这就是舱壁保护机制. Hystrix提供的熔断器也类似,在调用某个服务提供者时,当一定时间内请求总数超过配置的阈值,且窗口期内错误率过高,那Hystrix就会对调用请求熔断,后续的请求直接

随机推荐