java客户端Etcd官方仓库jetcd中KeepAlive接口实现

目录
  • 前言
  • 背景问题
  • KeepAlive实现
  • 文末小结

前言

Etcd的Java客户端有很多开源实现,Jetcd是Etcd官方仓库的Java客户端,整体api接口设计实现和官方go客户端类似,简洁易用。其中,租期续约的接口提供了两个分别是keepAliveOnce和keepAlive。功能如其名,keepAliveOnce是单次续约的接口,如果要保持租约,需要手动触发这个接口,所以这个接口基本不用。而keepAlive是自动续约保活的接口。大多数场景下,使用keepAlive即可,但是针对不同的场景,我们还需要考虑几个问题,如租约ttl的设置,以及keepAlive异常时的处理。

Jetcd项目地址:https://github.com/etcd-io/jetcd

背景问题

我们有一个基于mysql的binlog订阅数据变更的应用,线上有非常重要的应用基于这个服务,因为存在单点故障,后面使用了jetcd的lock + keepAlive的机制实现了主备服务秒级切换的功能,具体参见高可用架构etcd选主故障主备秒级切换实现,系统上线运行后发现,binlog的服务经常切换发生主备切换,而实际情况是,binlog的服务非常稳定,在没有上线主备切换服务前,从来没有发生过线上binlog服务宕掉的情况。最后查明问题出在了租约TTL的设置上面。这里先抛出问题和定位,下面先看下Jetcd的keepAlive具体实现,然后在分析为什么导致这个问题。

KeepAlive实现

先看下keepAlive的用法

private long acquireActiveLease() throws InterruptedException, ExecutionException {
        long leaseId = leaseClient.grant(leaseTTL).get().getID();
        logger.debug("LeaderSelector get leaseId:[{}] and ttl:[{}]", leaseId, leaseTTL);
        this.leaseCloser = leaseClient.keepAlive(leaseId, new StreamObserver() {
            @Override
            public void onNext(LeaseKeepAliveResponse value) {
                logger.debug("LeaderSelector lease keeps alive for [{}]s:", value.getTTL());
            }
            @Override
            public void onError(Throwable t) {
                logger.debug("LeaderSelector lease renewal Exception!", t.fillInStackTrace());
                cancelTask();
            }
            @Override
            public void onCompleted() {
                logger.info("LeaderSelector lease renewal completed! start canceling task.");
                cancelTask();
            }
        });
        return leaseId;
    }

租约实现都在LeaseImpl类里,通过EtcdClient拿到LeaseImpl实例后,首先通过grant方法设置ttl拿到租约的id,然后将租约作为入参调用keepAlive方法,第二个入参是一个观察者对象,内置了三个接口,分别是onNext:确定下一次租约续约时间后触发,onError:续约异常时触发,onCompleted:租约过期后触发。

keepAlive方法代码:

public synchronized CloseableClient keepAlive(long leaseId, StreamObserverobserver) {
    if (this.closed) {
      throw newClosedLeaseClientException();
    }
    KeepAlive keepAlive = this.keepAlives.computeIfAbsent(leaseId, (key) -> new KeepAlive(leaseId));
    keepAlive.addObserver(observer);

    if (!this.hasKeepAliveServiceStarted) {
      this.hasKeepAliveServiceStarted = true;
      this.start();
    }
    return new CloseableClient() {
      @Override
      public void close() {
        keepAlive.removeObserver(observer);
      }
    };
  }

LeaseImpl内部维护了一个以LeaseId为key,KeepAlive对象为value的map,KeepAlive的类中维护了一个StreamObserver集合,到期时间deadLine,下次续约时间nextKeepAlive和续约leaseId。第一次调用keepAlive方法时会触发start,启动续约的线程(sendKeepAliveExecutor())和检查是否过期的线程(deadLineExecutor())。

private void sendKeepAliveExecutor() {
    this.keepAliveResponseObserver = Observers.observer(
      response -> processKeepAliveResponse(response),
      error -> processOnError()
    );
    this.keepAliveRequestObserver = this.leaseStub.leaseKeepAlive(this.keepAliveResponseObserver);
    this.keepAliveFuture = scheduledExecutorService.scheduleAtFixedRate(
        () -> {
            // send keep alive req to the leases whose next keep alive is before now.
            this.keepAlives.entrySet().stream()
                .filter(entry -> entry.getValue().getNextKeepAlive() < System.currentTimeMillis())
                .map(Entry::getKey)
                .map(leaseId -> LeaseKeepAliveRequest.newBuilder().setID(leaseId).build())
                .forEach(keepAliveRequestObserver::onNext);
        },
        0,
        500,
        TimeUnit.MILLISECONDS
    );
  }

sendKeepAliveExecutor方法是整个keepAlive功能实现的核心,这个方法在LeaseImpl实例里只会被触发一次,开启了一个时间间隔为500毫秒的的定时任务调度。每次从keepAlives中筛选出nextkeepAlive时间小于当前时间的KeepAlive对象,触发续约。

nextkeepAlive初始化值就是创建KeepAlive实例时的当前时间,然后在续约的响应流观察者实例中,执行了processKeepAliveResponse方法,在这个里面维护了KeepAlive对象的nextkeepAlive。

private synchronized void processKeepAliveResponse(io.etcd.jetcd.api.LeaseKeepAliveResponse leaseKeepAliveResponse) {
    if (this.closed) {
      return;
    }
    final long leaseID = leaseKeepAliveResponse.getID();
    final long ttl = leaseKeepAliveResponse.getTTL();
    final KeepAlive ka = this.keepAlives.get(leaseID);
    if (ka == null) {
      // return if the corresponding keep alive has closed.
      return;
    }
    if (ttl > 0) {
      long nextKeepAlive = System.currentTimeMillis() + ttl * 1000 / 3;
      ka.setNextKeepAlive(nextKeepAlive);
      ka.setDeadLine(System.currentTimeMillis() + ttl * 1000);
      ka.onNext(leaseKeepAliveResponse);
    } else {
      // lease expired; close all keep alive
      this.removeKeepAlive(leaseID);
      ka.onError(
          newEtcdException(
            ErrorCode.NOT_FOUND,
            "etcdserver: requested lease not found"
          )
      );
    }
  }

可以看到,在首次续约后的响应处理中,nextKeepAlive被设置为当前时间加上ttl的1/3时间后,也就是说如果我们设置一个key的过期时间为6s,那么在使用keepAlive时续期的时间间隔为,每2s执行续约一次。如果ttl小于零,说明key已经过期被删除了,就直接触发onError,传递了一个requested lease not found的异常对象。

文末小结

1、回到最上面binlog的主备频繁切换的问题,由于我们将ttl的时间设置的过小5s。只要client和etcd 服务失联5s以上,期间可能由于各种原因导致keepAlive没有正常续约上,就会触发主备切换。这个时候binlog服务本身是没有任何问题的,却要因为失去领导权,而选择自杀。后面将ttl调整到了20s后,主备切换就没有那么敏感了。

2、还有一个场景,在将etcd作为服务注册中心时,也会使用到keepAlive,即使设置了ttl为20s,还是有可能没有续约上,导致注册的服务过期被删了,这个时候,我们的服务进程还是健康的。这个场景下,需要在onError、onCompleted事件中重新获取租约以及添加新的KeepAlive。

以上就是java客户端Etcd官方仓库jetcd中KeepAlive接口实现的详细内容,更多关于jetcd中的KeepAlive实现的资料请关注我们其它相关文章!

(0)

相关推荐

  • vue缓存的keepalive页面刷新数据的方法

    用到这个的业务场景是这样的: a页面点击新建列表按钮进入到新建的页面b,填写b页面并点击b页面确认添加按钮,把这些数据带到a页面,填充到列表(数组),可以添加多条, 点击这条的时候进入到编辑页面,确认修改之后,回退到a页面,a页面需要更新这条数据 实现这个功能的时候,由于是路由页面之间的跳转,首先想到的方案有几个:1. 用sessionStorage本地存储:2. 用路由参数带过去:3. 用兄弟组件传值 由于是添加完之后如果按回退是需要退出整个页面,如果用路由跳转,会出现回退到编辑页面了,所以这

  • keepalived+nginx高可用实现方法示例

    1.keepalived介绍 keepalived最初是专为LVS负载均衡软件设计的,用来管理并监控LVS集群系统中各个服务节点的状态,后来又加入了实现高可用的VRRP功能.keepalived除了能够管理LVS软件外,还能支持其他服务的高可用解决方案. keepalived通过VRRP协议实现高可用功能的.VRRP(Virtual Router Redundancy Protocol)虚拟路由冗余协议.VRRP出现的目的就是为了解决静态路由单点故障问题,它能保证当个别节点宕机时,整个网络可以不

  • haproxy+keepalived实现高可用负载均衡(理论篇)

    HAProxy相比LVS的使用要简单很多,功能方面也很丰富.当 前,HAProxy支持两种主要的代理模式:"tcp"也即4层(大多用于邮件服务器.内部协议通信服务器等),和7层(HTTP).在4层模式 下,HAProxy仅在客户端和服务器之间转发双向流量.7层模式下,HAProxy会分析协议,并且能通过允许.拒绝.交换.增加.修改或者删除请求 (request)或者回应(response)里指定内容来控制协议,这种操作要基于特定规则. 我现在用HAProxy主要在于它有以下优点,这里我

  • java客户端Etcd官方仓库jetcd中KeepAlive接口实现

    目录 前言 背景问题 KeepAlive实现 文末小结 前言 Etcd的Java客户端有很多开源实现,Jetcd是Etcd官方仓库的Java客户端,整体api接口设计实现和官方go客户端类似,简洁易用.其中,租期续约的接口提供了两个分别是keepAliveOnce和keepAlive.功能如其名,keepAliveOnce是单次续约的接口,如果要保持租约,需要手动触发这个接口,所以这个接口基本不用.而keepAlive是自动续约保活的接口.大多数场景下,使用keepAlive即可,但是针对不同的

  • Java客户端利用Jedis操作redis缓存示例代码

    前言 Redis是一个开源的Key-Value数据缓存,和Memcached类似.Redis多种类型的value,包括string(字符串).list(链表).set(集合).zset(sorted set --有序集合)和hash(哈希类型). Jedis 是 Redis 官方首选的 Java 客户端开发包.下面就来给大家详细关于Java客户端利用Jedis操作redis缓存的相关内容,话不多说,直接来看示例代码吧. 示例代码: //连接redis ,redis的默认端口是6379 Jedis

  • 实例讲解分布式缓存软件Memcached的Java客户端使用

    Memcached介绍 下面就来介绍一下Memcached. 1.什么是Memcached Memcached是一个开源的高性能,分布式的内存对象缓存系统,通过键值队的形式来对数据进行存取,Memcached是简单而强大,它的简单设计促进快速部署,易于开发,解决了大数据缓存面临的许多问题. 官方网址是:http://memcached.org/,目前已经有很多知名的互联网应用使用到了Memcached,比如Wikipedia.Flickr.Youtube.Wordpress等等. 2.下载Win

  • Java 客户端操作 FastDFS 实现文件上传下载替换删除功能

    FastDFS 的作者余庆先生已经为我们开发好了 Java 对应的 SDK.这里需要解释一下:作者余庆并没有及时更新最新的 Java SDK 至 Maven 中央仓库,目前中央仓库最新版仍旧是 1.27 版.所以我们需要通过 Github:https://github.com/happyfish100/fastdfs-client-java 下载项目源码,再通过命令 mvn clean install 编译打包导入 Maven 仓库使用即可. 接下来我们通过 Java API 操作 FastDF

  • ZooKeeper官方文档之Java客户端开发案例翻译

    目录 一个简单的监听客户端 需求 程序设计 Executor类 DataMonitor类 完整代码清单 官网原文标题<ZooKeeper Java Example> 官网原文地址:http://zookeeper.apache.org/doc/current/javaExample.html#sc_completeSourceCode 针对本篇翻译文章,我还有一篇对应的笔记<ZooKeeper官方Java例子解读>,如果对官网文档理解有困难,可以结合我的笔记理解. 一个简单的监听客

  • java客户端登陆服务器用户名验证

    本文实例为大家分享了java客户端登陆服务器用户名验证的具体实现代码,供大家参考,具体内容如下 客户端通过键盘录入用户名,服务端对用户名进行验证.  如果用户名存在,服务端显示xxx已登录,客户端显示xxx,欢迎登陆. 如果用户名不存在,服务端显示xxx尝试登陆,客户端显示xxx,用户名不存在.  最多登陆三次,防止暴力登陆. import java.io.*; import java.net.*; /* *客户端 */ class client { public static void mai

  • Java解析word,获取文档中图片位置的方法

    前言(背景介绍): Apache POI是Apache基金会下一个开源的项目,用来处理office系列的文档,能够创建和解析word.excel.ppt格式的文档. 其中对word文档的处理有两个技术,分别是HWPF(.doc)和XWPF(.docx).如果你对这两个技术熟悉的话,就应该能明白使用java解析word文档的痛楚所在. 其中两个最大的问题在于: 第一是这两个类并没有统一的父类和接口(隔壁的XSSF和HSSF投过来鄙视的眼光),所以没法进行同一格式的接口式编程: 第二是官方API中并

  • Vue中keep-alive 实现后退不刷新并保持滚动位置

    什么是KeepAlive? 首先,我们要明确我们谈的是TCP的 KeepAlive 还是HTTP的 Keep-Alive.TCP的KeepAlive和HTTP的Keep-Alive是完全不同的概念,不能混为一谈.实际上HTTP的KeepAlive写法是Keep-Alive,跟TCP的KeepAlive写法上也有不同. TCP的keepalive是侧重在保持客户端和服务端的连接,一方会不定期发送心跳包给另一方,当一方端掉的时候,没有断掉的定时发送几次心跳包,如果间隔发送几次,对方都返回的是RST,

  • Java 冻结或解除冻结Excel中的行和列的方法

    当Excel表格中有大量数据时,为了方便浏览,我们可通过冻结窗口这一功能将某几行或某几列的数据冻结起来,这样在我们滚动窗口时,这几行或几列的数据就会被固定住,而不会随着其他单元格的移动而移动.总的来说,Excel冻结窗口可细分为三类:冻结行.冻结列及同时冻结行和列.本文将通过使用Java程序来演示如何冻结或解除冻结Excel中的行和列. 使用工具:Free Spire.XLS for Java(免费版) Jar文件获取及导入: 方法1:通过官网下载获取jar包.解压后将lib文件夹下的Spire

  • 使用cmd根据WSDL网址生成java客户端代码的实现

    windows下使用cmd命令提示符生成java webservice客户端代码,可以使用命令提示符直接生成客户端代码,直接导入到项目中,只需配置jdk即可,在jdk的bin文件夹下,按Shift并点击右键,选中"在此处打开命令窗口" 输入命令如下: wsimport -keep -p com.demo.client -d D:\\(存放的地址) http://XX/Account?wsdl(wsdl地址) 命令参数说明: -d:生成客户端执行类的class文件的存放目录(默认存放在C

随机推荐