SpringCloud微服务续约实现源码分析详解
目录
- 一、前言
- 二、客户端续约
- 1、入口
- 构造初始化
- initScheduledTasks()调度执行心跳任务
- 2、TimedSupervisorTask组件
- 构造初始化
- TimedSupervisorTask#run()任务逻辑
- 3、心跳任务
- HeartbeatThread私有内部类
- 发送心跳
- 4、发送心跳到注册中心
- 构建请求数据发送心跳
- 三、服务端处理客户端续约
- 1、InstanceRegistry#renew()逻辑
- 2、PeerAwareInstanceRegistryImpl#renew()逻辑
- 3、AbstractInstanceRegistry#renew()逻辑
一、前言
微服务续约都有通用的设计,就是(微服务)客户端使用心跳机制向注册中心报告自己还活着(可以提供服务),它们的心跳机制略有不同。而Eureka Client客户端会每隔 30 秒发送一次心跳来续约,通过续约来告知 Eureka Server注册中心该 Eureka Client客户端正常运行,没有出现问题。那么心跳机制是什么呢、底层基于什么的?客户端发送心跳的代码在哪里?注册中心怎么处理的?
二、客户端续约
真正触发的还是SpringBoot的自动装配,这里不会过多赘述,下面直奔主题:
1、入口
构造初始化
private final ScheduledExecutorService scheduler; private final ThreadPoolExecutor heartbeatExecutor; @Inject DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args, Provider<BackupRegistry> backupRegistryProvider, EndpointRandomizer endpointRandomizer) { ......此处省略n行代码..... try { // default size of 2 - 1 each for heartbeat and cacheRefresh心跳和缓存刷新的默认大小分别为2-1 scheduler = Executors.newScheduledThreadPool(2, new ThreadFactoryBuilder() .setNameFormat("DiscoveryClient-%d") .setDaemon(true) .build()); // 心跳执行者 heartbeatExecutor = new ThreadPoolExecutor( 1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactoryBuilder() .setNameFormat("DiscoveryClient-HeartbeatExecutor-%d") .setDaemon(true) .build() ); // use direct handoff ......此处省略n行代码..... // 最后,初始化调度任务(例如,集群解析器、 heartbeat、 instanceInfo replicator、 fetch initScheduledTasks(); ......此处省略n行代码..... }
主要逻辑:
scheduler和heartbeatExecutor都是DiscoveryClient的私有成员变量,并且是final的,故在构造方法中必须初始化。而DiscoveryClient的构造初始化前面也讲了,是在SpringBoot的自动装配过程调用的。构造方法中:
1)scheduler是交给jdk的Executors工具类创建的,核心线程数为2(心跳和缓存刷新需用到)。
2)直接调用ThreadPoolExecutor原生构造方法初始化,核心线程数为1,使用SynchronousQueue队列。
3)最后,初始化调度任务(例如,集群解析器、 心跳、 服务实例复制、 刷新),进入下面逻辑分析
initScheduledTasks()调度执行心跳任务
private void initScheduledTasks() { if (clientConfig.shouldRegisterWithEureka()) { /* LeaseInfo: public static final int DEFAULT_LEASE_RENEWAL_INTERVAL = 30; // Client settings private int renewalIntervalInSecs = DEFAULT_LEASE_RENEWAL_INTERVAL; */ // 默认30 int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs(); int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound(); logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs); // Heartbeat timer心跳任务 heartbeatTask = new TimedSupervisorTask( "heartbeat", scheduler, heartbeatExecutor, renewalIntervalInSecs, TimeUnit.SECONDS, expBackOffBound, new HeartbeatThread() ); // 默认的情况下会每隔30秒向注册中心 (eureka.instance.lease-renewal-interval-in-seconds)发送一次心跳来进行服务续约 scheduler.schedule( heartbeatTask, renewalIntervalInSecs, TimeUnit.SECONDS); // InstanceInfo replicator实例信息复制任务 instanceInfoReplicator = new InstanceInfoReplicator( this, instanceInfo, clientConfig.getInstanceInfoReplicationIntervalSeconds(), 2); // burstSize // 状态变更监听者 statusChangeListener = new ApplicationInfoManager.StatusChangeListener() { @Override public String getId() { return "statusChangeListener"; } @Override public void notify(StatusChangeEvent statusChangeEvent) { // Saw local status change event StatusChangeEvent [timestamp=1668595102513, current=UP, previous=STARTING] logger.info("Saw local status change event {}", statusChangeEvent); instanceInfoReplicator.onDemandUpdate(); } }; // 初始化状态变更监听者 if (clientConfig.shouldOnDemandUpdateStatusChange()) { applicationInfoManager.registerStatusChangeListener(statusChangeListener); } // 定时刷新服务实例信息和检查应用状态的变化,在服务实例信息发生改变的情况下向server重新发起注册 instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds()); } else { logger.info("Not registering with Eureka server per configuration"); } }
主要逻辑:
- 根据配置决定是否发送心跳,默认会发送。在LeaseInfo租约信息中维护发送心跳时间,默认间隔为30秒,可以在yml配置文件(eureka.instance.lease-renewal-interval-in-seconds)中更改默认值。
- 初始化heartbeatTask,真正的心跳任务为HeartbeatThread类型(下面3分析)。
- 调度执行心跳任务,默认的情况下会每隔30秒向注册中心 (eureka.instance.lease-renewal-interval-in-seconds)发送一次心跳来进行服务续约
- 本方法下面的逻辑上一节已经分析
2、TimedSupervisorTask组件
可见TimedSupervisorTask是Runnable类型的任务,那么它的任务逻辑在run()方法。
构造初始化
public TimedSupervisorTask(String name, ScheduledExecutorService scheduler, ThreadPoolExecutor executor, int timeout, TimeUnit timeUnit, int expBackOffBound, Runnable task) { this.name = name; this.scheduler = scheduler; // heartbeatExecutor或cacheRefreshExecutor this.executor = executor; this.timeoutMillis = timeUnit.toMillis(timeout); // HeartbeatThread或CacheRefreshThread等类型任务 this.task = task; this.delay = new AtomicLong(timeoutMillis); this.maxDelay = timeoutMillis * expBackOffBound; // Initialize the counters and register. successCounter = Monitors.newCounter("success"); timeoutCounter = Monitors.newCounter("timeouts"); rejectedCounter = Monitors.newCounter("rejectedExecutions"); throwableCounter = Monitors.newCounter("throwables"); threadPoolLevelGauge = new LongGauge(MonitorConfig.builder("threadPoolUsed").build()); Monitors.registerObject(name, this); }
初始化自己的一些字段,在阿里Java编程规范中是强烈建议给线程起别名的,这样便于监控排查问题等。name线程名字,在心跳任务中为heartbeat;scheduler字段传递进来是为了周期性执行任务;executor用于提交任务,下面分析;task任务,为HeartbeatThread或CacheRefreshThread类型任务;delay,用于存储以及计算延迟时间;最大延迟时间不能超过maxDelay。
TimedSupervisorTask#run()任务逻辑
@Override public void run() { // Future模式 Future<?> future = null; try { // 提交任务待执行 future = executor.submit(task); threadPoolLevelGauge.set((long) executor.getActiveCount()); // 阻塞直到完成或超时 future.get(timeoutMillis, TimeUnit.MILLISECONDS); // 更新以便计算延迟时间 delay.set(timeoutMillis); // 更新 threadPoolLevelGauge.set((long) executor.getActiveCount()); successCounter.increment(); } catch (TimeoutException e) { // 任务主管超时了 logger.warn("task supervisor timed out", e); timeoutCounter.increment(); long currentDelay = delay.get(); // 任务线程超时的时候,就把delay变量翻倍,但不会超过外部调用时设定的最大延时时间 long newDelay = Math.min(maxDelay, currentDelay * 2); // CAS更新延迟时间,考虑到多线程,所以用了CAS delay.compareAndSet(currentDelay, newDelay); } catch (RejectedExecutionException e) { // 一旦线程池的阻塞队列中放满了待处理任务,触发了拒绝策略 if (executor.isShutdown() || scheduler.isShutdown()) { // 线程池关闭,拒绝任务 logger.warn("task supervisor shutting down, reject the task", e); } else { // 线程池拒绝任务 logger.warn("task supervisor rejected the task", e); } rejectedCounter.increment(); } catch (Throwable e) { if (executor.isShutdown() || scheduler.isShutdown()) { // 任务主管关闭,不能接受任务 logger.warn("task supervisor shutting down, can't accept the task"); } else { // 任务主管抛出了一个异常 logger.warn("task supervisor threw an exception", e); } throwableCounter.increment(); } finally { // 上面的异常catch了没有外跑,下面继续运行 if (future != null) { // 中断 future.cancel(true); } // 调度器没有关闭,延迟继续周期性执行任务。这样的周期性任务时间设置灵活 if (!scheduler.isShutdown()) { scheduler.schedule(this, delay.get(), TimeUnit.MILLISECONDS); } } }
主要逻辑:
- 提交任务待执行
- 阻塞直到完成或超时
- 更新以便计算延迟时间
异常处理:
- 超时异常,任务线程超时的时候,就把delay变量翻倍,但不会超过外部调用时设定的最大延时时间。CAS更新延迟时间,考虑到多线程,所以用了CAS。
- 拒绝执行异常,一旦线程池的阻塞队列中放满了待处理任务,触发了拒绝策略。
- 其他异常
上面的异常catch了没有外跑,下面继续运行:
1)丢弃当前任务;
2)调度器没有关闭,延迟继续周期性执行任务。这样的周期性任务时间设置灵活
3、心跳任务
HeartbeatThread私有内部类
private class HeartbeatThread implements Runnable { public void run() { if (renew()) { lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis(); } } }
HeartbeatThread实现了Runnable接口,也就是上面说的task,它的逻辑封装了出去:
发送心跳
boolean renew() { EurekaHttpResponse<InstanceInfo> httpResponse; try { // 发送心跳 httpResponse = eurekaTransport.registrationClient .sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null); // 打印日志,如:心跳状态 logger.debug(PREFIX + "{} - Heartbeat status: {}", appPathIdentifier, httpResponse.getStatusCode()); // 响应状态没有找到重新注册 if (httpResponse.getStatusCode() == Status.NOT_FOUND.getStatusCode()) { REREGISTER_COUNTER.increment(); logger.info(PREFIX + "{} - Re-registering apps/{}", appPathIdentifier, instanceInfo.getAppName()); long timestamp = instanceInfo.setIsDirtyWithTime(); boolean success = register(); if (success) { instanceInfo.unsetIsDirty(timestamp); } return success; } return httpResponse.getStatusCode() == Status.OK.getStatusCode(); } catch (Throwable e) { // 无法发送心跳! logger.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e); return false; } }
主要逻辑:
- EurekaTransport是DiscoevryClient的内部类,封装了几个与注册中心通信的XXXclient。获取通信类发送心跳请求,传入appName、唯一ID以及instanceInfo。
- 响应状态没有找到重新注册,毕竟当前客户端是运行中正常状态
- 返回是否发送心跳成功
4、发送心跳到注册中心
构建请求数据发送心跳
@Override public EurekaHttpResponse<InstanceInfo> sendHeartBeat(String appName, String id, InstanceInfo info, InstanceStatus overriddenStatus) { // 拼接请求URL String urlPath = "apps/" + appName + '/' + id; ClientResponse response = null; try { // 构建请求资源 WebResource webResource = jerseyClient.resource(serviceUrl) .path(urlPath) .queryParam("status", info.getStatus().toString()) .queryParam("lastDirtyTimestamp", info.getLastDirtyTimestamp().toString()); if (overriddenStatus != null) { webResource = webResource.queryParam("overriddenstatus", overriddenStatus.name()); } Builder requestBuilder = webResource.getRequestBuilder(); addExtraHeaders(requestBuilder); // 请求注册中心 response = requestBuilder.put(ClientResponse.class); EurekaHttpResponseBuilder<InstanceInfo> eurekaResponseBuilder = anEurekaHttpResponse(response.getStatus(), InstanceInfo.class).headers(headersOf(response)); if (response.hasEntity() && !HTML.equals(response.getType().getSubtype())) { //don't try and deserialize random html errors from the server eurekaResponseBuilder.entity(response.getEntity(InstanceInfo.class)); } // 返回构建结果 return eurekaResponseBuilder.build(); } finally { if (logger.isDebugEnabled()) { logger.debug("Jersey HTTP PUT {}{}; statusCode={}", serviceUrl, urlPath, response == null ? "N/A" : response.getStatus()); } if (response != null) { // 关闭资源 response.close(); } } }
主要逻辑:
- 拼接请求URL
- 构建请求资源数据
- 请求注册中心
- 返回构建结果
- 关闭资源
三、服务端处理客户端续约
InstanceRegistry父子关系图上一节也分析了,在使用super关键字时注意一下。
1、InstanceRegistry#renew()逻辑
@Override public boolean renew(final String appName, final String serverId, boolean isReplication) { log("renew " + appName + " serverId " + serverId + ", isReplication {}" + isReplication); List<Application> applications = getSortedApplications(); for (Application input : applications) { if (input.getName().equals(appName)) { InstanceInfo instance = null; for (InstanceInfo info : input.getInstances()) { if (info.getId().equals(serverId)) { instance = info; break; } } // 发布服务实例处理心跳事件 publishEvent(new EurekaInstanceRenewedEvent(this, appName, serverId, instance, isReplication)); break; } } // 调用父类的处理心跳方法 return super.renew(appName, serverId, isReplication); }
这里逻辑主要是委托父类处理心跳,具体逻辑见下面分析:
2、PeerAwareInstanceRegistryImpl#renew()逻辑
public boolean renew(final String appName, final String id, final boolean isReplication) { if (super.renew(appName, id, isReplication)) { // 服务续约成功,将所有的Eureka操作复制到对等的Eureka节点,除了复制到此节点的流量。 replicateToPeers(Action.Heartbeat, appName, id, null, null, isReplication); return true; } return false; }
PeerAwareInstanceRegistryImpl职责:处理将所有操作复制到 AbstractInstanceRegistry的peer Eureka节点,以保持所有操作同步。这里如果服务续约成功,将所有的Eureka操作复制到对等的Eureka节点,除了复制到此节点的流量。
3、AbstractInstanceRegistry#renew()逻辑
public boolean renew(String appName, String id, boolean isReplication) { RENEW.increment(isReplication); // 根据appName从本地注册表服务实例信息 Map<String, Lease<InstanceInfo>> gMap = registry.get(appName); Lease<InstanceInfo> leaseToRenew = null; if (gMap != null) { leaseToRenew = gMap.get(id); } if (leaseToRenew == null) { // 没有找到租约 RENEW_NOT_FOUND.increment(isReplication); // 注册: 租约不存在,注册资源: logger.warn("DS: Registry: lease doesn't exist, registering resource: {} - {}", appName, id); return false; } else { // 获取服务实例信息 InstanceInfo instanceInfo = leaseToRenew.getHolder(); if (instanceInfo != null) { // touchASGCache(instanceInfo.getASGName()); InstanceStatus overriddenInstanceStatus = this.getOverriddenInstanceStatus( instanceInfo, leaseToRenew, isReplication); if (overriddenInstanceStatus == InstanceStatus.UNKNOWN) { logger.info("Instance status UNKNOWN possibly due to deleted override for instance {}" + "; re-register required", instanceInfo.getId()); RENEW_NOT_FOUND.increment(isReplication); return false; } if (!instanceInfo.getStatus().equals(overriddenInstanceStatus)) { // 实例状态{}与实例{}的重写实例状态{}不同。因此将状态设置为覆盖状态 logger.info( "The instance status {} is different from overridden instance status {} for instance {}. " + "Hence setting the status to overridden status", instanceInfo.getStatus().name(), overriddenInstanceStatus.name(), instanceInfo.getId()); instanceInfo.setStatusWithoutDirty(overriddenInstanceStatus); } } renewsLastMin.increment(); // 续约 leaseToRenew.renew(); return true; } }
主要逻辑:
- 根据appName从本地注册表服务实例信息
- 没有找到租约,返回false
- 获取到的服务实例状态为UNKNOWN,返回false;续约,更新续约字段,下面分析
Lease#renew()逻辑
public void renew() { lastUpdateTimestamp = System.currentTimeMillis() + duration; }
lastUpdateTimestamp是Lease租约的字段,维护租约时间,在服务剔除下线会根据该字段判断是否过期需要对服务剔除下线处理。下一篇我们就来探讨一下,敬请期待!!!
到此这篇关于SpringCloud微服务续约实现源码分析详解的文章就介绍到这了,更多相关SpringCloud微服务续约内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!