nacos客户端一致性hash负载需求实现

目录
  • 1.首先创建NacosClient,监听对应的服务:
  • 2.创建Nodelistener,主要处理构建hash环等逻辑:
  • 3.负载均衡器:
  • 思考:

最近接到一个需求,由于文件服务器上传文件后,不同节点之间共享文件需要延迟,上游上传文件后立刻去下载,如果负载到其他节点上可能会找不到文件,所以使用文件服务器接入nacos根据相同的trace_id路由到一个节点上,这样保证上传后立刻下载的请求都能路由到同一个节点上,研究了两天nacos原生的client发现并没有提供相关功能,于是便产生了一个想法,手撸一个客户端负载。

要想同一个trace_id需要路由到相同的节点上,首先想到的方法就是利用hash算法,目前常用于分布式系统中负载的哈希算法分为两种:

1.普通hash取模

2.一致性hash

普通hash虽然开发起来快,看起来也满足需求,但是当集群扩容或者缩容的时候,就会造成trace_id的hash结果与之前不同,可能不会路由到一个节点上。

而一致性hash在扩容缩容时只会影响哈希环中顺时针方向的相邻的节点, 对其他节点无影响。但是缺点时数据的分布和节点的位置有关,因为这些节点不是均匀的分布在哈希环上的,不过根据选择适当的hash算法也可以避免这个缺点,让数据相对均匀的在hash环上分布。

网上关于一致性hash的讨论已经有很多了,在这里就放一张图便于大家理解。

其他的不说,先上代码

1.首先创建NacosClient,监听对应的服务:

public class NacosClient {
    //nacos监听器,处理初始化hash环等逻辑
    private Nodelistener nodelistener;
    //初始化nacosClient,并且监听服务
    public void init() {
        NamingService naming = null;
        try {
            System.out.println(System.getProperty("serveAddr"));
            naming = NamingFactory.createNamingService(System.getProperty("serveAddr"));
            //注册监听器,当集群节点变化的时候调用nodelistener处理节点信息
            naming.subscribe("test", event -> {
                if (event instanceof NamingEvent) {
                    nodelistener.handlerChange((NamingEvent)event);
                }
            });
        } catch (NacosException e) {
            e.printStackTrace();
        }
    }
}

2.创建Nodelistener,主要处理构建hash环等逻辑:

public class Nodelistener {
    private List<Instance> servers;
    //利用treeMap构建hash环
    private volatile SortedMap<Long, Instance> sortedMap = new TreeMap<Long, Instance>();
    //虚拟节点
    private int virtualNodeCount = 100;
    public synchronized void handlerChange(NamingEvent event) {
            List<Instance> servers = new ArrayList<Instance>();
            event.getInstances().stream().filter(instance -> {
                return instance.isEnabled() && instance.isHealthy();
            }).forEach(instance -> servers.add(instance));
            this.onChange(servers);
    }
    //每次集群节点变化时,重新构建hash环
    public void onChange(List<Instance> servers) {
        //只有一个节点的时候这里暂不考虑,读者可以自行处理
        if(servers.size() != 1) {
            SortedMap<Long, Instance> newSortedMap = new TreeMap<Long, Instance>();
            for (int i = 0; i < servers.size(); i ++) {
                for (int j = 0; j < this.virtualNodeCount; j++) {
                    //计算虚拟节点的hash,这里用到的是MurMurHash,网上还有很多其他hash实现,
                    //有兴趣可以自行查阅,具体实现细节就不列出了
                    Long hash = HashUtil.getHash(servers.get(i).getIp() + ":" +
                                                                     servers.get(i).getPort() + j);
                    //把虚拟节点加入hash环
                    sortedMap.put(hash, servers.get(i));
                }
            }
            sortedMap = newSortedMap;
        }
        this.servers = servers;
    }
    //根据传入的key获取hash环上顺时针到hash环尾部部分所有节点
    public Instance getInstance(String str) {
        Long hash = HashUtil.getHash(str);
        SortedMap<Long, Instance> map = sortedMap.tailMap(hash);
        //这里证明刚好获取的是尾部,所以返回所有的节点,其实是获取第一个节点
        if (map.isEmpty()) {
            map = sortedMap;
        }
        return map.get(map.firstKey());
    }
}

其中,虚拟节点是一致性hash经常用到的,主要是用于解决hash倾斜问题,即节点数比较少时,数据落在hash环上会造成不均衡,下图即没有虚拟节点的情况:

有虚拟节点的情况,这样hash环就均匀分割,相应数据落入的区间也会平衡:

3.负载均衡器:

public class LoadBalance {
    private Nodelistener nodelistener;
    //只需要简单的从hash环中获取第一个节点
    public Instance doSelect(String key) {
        return nodelistener.getInstance(key);
    }
}

测试下结果:

    public void test2() {
        Map<String, Integer> map = new HashMap<String, Integer>();
        Random random = new Random();
        for (int i = 0; i < 10000; i ++) {
            String key = String.valueOf(random.nextLong());
            Instance instance = loadBalance.doSelect(key);
            if(!map.containsKey(instance.getIp())) {
                map.put(instance.getIp(), 0);
            }else {
                map.put(instance.getIp(), map.get(instance.getIp()) + 1);
            }
            System.out.println("test2 count :" + i);
            System.out.printf("select IP is :" + instance.getIp());
        }
        System.out.println(map.toString());
    }

此处为了方便就直接用随机数模拟trace_id,结果如下:

select IP is :127.0.0.0{127.0.0.4=2031, 127.0.0.3=2144, 127.0.0.2=1925, 127.0.0.1=1931, 127.0.0.0=1964}

可以看到10000次请求被均匀的分布到了4个节点上。

思考:

1. 本次我们使用到了treeMap构建hash环,那么treeMap构建的hash具体的查找效率如何呢?

treeMap是由红黑树构成的,其 containsKey(),get(),put(), remove() 方法时间复杂度均为O(logn),均是对数阶,已经算相当不错了。

2.在Nodelistener 中我们两个方法都使用了synchronized 这样会有什么影响?

首先因为treeMap是线程不安全的,所以我们都使用了方法级别的synchronized,所以两个方法不会同时执行,这样使用treeMap时,不会造成线程不安全问题,其次可以保证我们在获取hash环中节点的时候,treeMap不会因为节点变化而变化。但是这样处理的话就会产生一个问题,我们正在计算trace_id的路由节点时,机器不巧缩容了,treeMap还没进行更新,刚好路由到的节点时下线的机器,那么就会访问失败,笔者这里解决这个问题的思路是重试,如果失败获取下一个节点,此时文件服务器不同节点之间文件已经同步完毕,所以不同节点访问是没问题的。

    public void test(String key) throws InterruptedException {
        //这里可以设置重试次数
        for (;;) {
            Instance instance = loadBalance.doSelect(key);
            String addr = instance.getIp() + ":" + instance.getPort();
            //测试请求
            if(post(addr)) {
                //成功逻辑
                .....
                break;
            }else {
                //等待两秒,即可以使文件服务器不同节点之间同步文件,还可以等待更新本地hash环
                Thread.sleep(2000);
                //失败则选取下一个节点
                instance = loadBalance.doSelect(key);
                //此处可以增加重试次数逻辑和如果重试到hash环上最后一个节点则重新获取hash环第一个节点逻辑,
                // 在此就不做论述,读者可以自由发挥
                continue;
            }
        }
    }

那么我们考虑当我们计算trace_id路由时,正好扩容的情况,此时treeMap还没有进行更新,情况如下图,我们路由到的节点如果不是图中标记的受影响区域则不会有影响,如果是图中受影响的区域计算得出的路由是扩容前的也就是 127.0.0.2-1(真实节点是127.0.0.2),那么下次相同的trace_id则会路由到新节点,此时会出现同一个trace_id路由到的节点不一样的问题,笔者在此处也使用的重试机制。(其实这个地方可以使用缓存Key和节点的关系,扩容后关系改变之后再改变图中受影响的hash环,但是因为trace_id比较特殊,并不适合缓存所有,所以使用了重试机制)

3.每次探知到服务器节点变化的时候都需要重新构建hash环,这样操作会比较耗时,可以修改成每次节点变化只需要改变对应虚拟节点信息,更新本地hash环时间,可以将onChange方法改造下。

    public void onChange(List<Instance> newServers) {
        //单节点时这里暂不考虑
        if(servers.size() != 1 ) {
            //TODO ..
        }
        Map<String, Instance> oldAddrs =
               this.servers.stream()
                           .collect(Collectors.toMap(Instance::toInetAddr, instance -> instance));
        Map<String, Instance> newAddrs =
               newServers.stream()
                           .collect(Collectors.toMap(Instance::toInetAddr, instance -> instance));
        //remove
        oldAddrs.forEach((key, value) -> {
            if (!newAddrs.containsKey(key)) {
                for(int j = 0; j < virtualNodeCount; j++) {
                    Long hash = HashUtil.getHash( value.toInetAddr() + "&&VM"+ j);
                    sortedMap.remove(hash);
                }
            }
        });
        //add
        newAddrs.forEach((key, value) -> {
            if (!oldAddrs.containsKey(key)) {
                for(int j = 0; j < virtualNodeCount; j++) {
                    Long hash = HashUtil.getHash(value.toInetAddr() + "&&VM" + j);
                    sortedMap.put(hash, value);
                }
            }
        });
        this.servers = newServers;
    }

以上就是nacos客户端一致性hash负载需求实现的详细内容,更多关于nacos客户端一致性hash负载的资料请关注我们其它相关文章!

(0)

相关推荐

  • nacos配置注册中心时指定命名空间不起作用的问题

    目录 nacos配置注册中心时指定命名空间不起作用 因为命名空间不适用于nacos 0.8 nacos命名空间的使用 我们为什么要用命名空间呢 创建命名空间 最终读取 nacos配置注册中心时指定命名空间不起作用 因为命名空间不适用于nacos 0.8 Nacos客户端服务注册不划分命名空间,默认注册位于公共命名空间 下. 他被阿里设计的起初原因是:namespace主要是进行业务隔离的,多个业务或者部门可以用一个Nacos,但是数据都相互屏蔽. 因此并不是做服务和开发环境隔离的,目前没看到官方

  • Nacos配置中心集群原理及源码分析

    目录 Nacos集群工作原理 配置变更同步入口 AsyncNotifyService AsyncTask 目标节点接收请求 NacosDelayTaskExecuteEngine ProcessRunnable processTasks DumpProcessor.process Nacos作为配置中心,必然需要保证服务节点的高可用性,那么Nacos是如何实现集群的呢? 下面这个图,表示Nacos集群的部署图. Nacos集群工作原理 Nacos作为配置中心的集群结构中,是一种无中心化节点的设计

  • Nacos源码阅读方法

    为什么我会经常阅读源码呢,因为阅读源码能让你更加接近大佬,哈哈,这是我瞎扯的. 这篇文章将会带大家阅读Nacos源码 以及 教大家阅读源码的技巧,我们正式开始吧! 先给大家献上一张我梳理的高清源码图,方便大家对nacos的源码有一个整体上的认识. 有了这张图,我们就很容易去看nacos源码了. 如何找切入点 首先我们得要找一个切入点进入到nacos源码中,那么就从nacos依赖入手 <dependency> <groupId>com.alibaba.cloud</groupI

  • Nginx实现Nacos反向代理的项目实践

    目录 1.win10安装Nginx 1.1 windows系统启动和停止的命令 2.win10安装nacos 2.1 搭建三台nacos步骤 1.win10安装Nginx nginx下载地址 nginx: download 下载后解压,进入bin目录,根据你的系统执行相应的命令 1.1 windows系统启动和停止的命令 启动 start nginx.exe 终止 nginx.exe -s stop //停止nginx nginx.exe -s reload //重新加载nginx nginx.

  • Nacos客户端配置中心缓存动态更新实现源码

    目录 客户端配置缓存更新 长轮训任务启动入口 ClientWorker checkConfigInfo LongPollingRunnable.run checkLocalConfig checkListenerMd5 检查服务端配置 checkUpdateDataIds checkUpdateConfigStr 客户端缓存配置长轮训机制总结 服务端配置更新的推送 doPollingConfig addLongPollingClient ClientLongPolling allSubs Lon

  • Docker安装Nacos服务的实现

    目录 拉取镜像 查看镜像 启动容器中的nacos 查看启动日志 修改Nacos的配置 拉取镜像 docker pull nacos/nacos-server 查看镜像 docker images 启动容器中的nacos docker run --env MODE=standalone --name mynacos -d -p 8848:8848 docker.io/nacos/nacos-server –name:设置容器的名称. -d:表示在后台运行容器. -p:指定端口映射.左边的8848是

  • nacos客户端一致性hash负载需求实现

    目录 1.首先创建NacosClient,监听对应的服务: 2.创建Nodelistener,主要处理构建hash环等逻辑: 3.负载均衡器: 思考: 最近接到一个需求,由于文件服务器上传文件后,不同节点之间共享文件需要延迟,上游上传文件后立刻去下载,如果负载到其他节点上可能会找不到文件,所以使用文件服务器接入nacos根据相同的trace_id路由到一个节点上,这样保证上传后立刻下载的请求都能路由到同一个节点上,研究了两天nacos原生的client发现并没有提供相关功能,于是便产生了一个想法

  • 基于一致性hash算法 C++语言的实现详解

    一致性hash算法实现有两个关键问题需要解决,一个是用于结点存储和查找的数据结构的选择,另一个是结点hash算法的选择. 首先来谈一下一致性hash算法中用于存储结点的数据结构.通过了解一致性hash的原理,我们知道结点可以想象为是存储在一个环形的数据结构上(如下图),结点A.B.C.D按hash值在环形分布上是有序的,也就是说结点可以按hash值存储在一个有序的队列里.如下图所示,当一个hash值为-2^20的请求点P查找路由结点时,一致性hash算法会按hash值的顺时针方向路由到第一个结点

  • PHP实现的一致性Hash算法详解【分布式算法】

    本文实例讲述了PHP实现的一致性Hash算法.分享给大家供大家参考,具体如下: 一致性哈希算法是分布式系统中常用的算法,为什么要用这个算法? 比如:一个分布式存储系统,要将数据存储到具体的节点(服务器)上, 在服务器数量不发生改变的情况下,如果采用普通的hash再对服务器总数量取模的方法(如key%服务器总数量),如果期间有服务器宕机了或者需要增加服务器,问题就出来了. 同一个key经过hash之后,再与服务器总数量取模的结果跟之前的结果会不一样,这就导致了之前保存数据的丢失.因此,引入了一致性

  • memcache一致性hash的php实现方法

    本文实例讲述了memcache一致性hash的php实现方法.分享给大家供大家参考.具体如下: 最近在看一些分布式方面的文章,所以就用php实现一致性hash来练练手,以前一般用的是最原始的hash取模做 分布式,当生产过程中添加或删除一台memcache都会造成数据的全部失效,一致性hash就是为了解决这个问题,把失效数据降到最低,相关资料可以 google一下! php实现效率有一定的缺失,如果要高效率,还是写扩展比较好 经测试,5个memcache,每个memcache生成100个虚拟节点

  • PHP实现的一致性HASH算法示例

    本文实例讲述了PHP实现的一致性HASH算法.分享给大家供大家参考,具体如下: <?php // +---------------------------------------------------------------------- // | Perfect Is Shit // +---------------------------------------------------------------------- // | PHP实现:一致性HASH算法 // +--------

  • PHP一致性hash分布式算法封装类定义与用法示例

    本文实例讲述了PHP一致性hash分布式算法封装类定义与用法.分享给大家供大家参考,具体如下: 一.无虚拟节点实现 <?php /** * 一致性hash分布式算法 * @param $key * @return int * 实现步骤 * 1.先将0~ 是32位最大带符号整数(0x7FFFFFFF) 想象成一个闭环 * 2.将服务器列表通过hash算法分布在 圆环之中 * 3.将key值也分布在圆环之中 * 4.通过_isSorted判断服务器是否需要进行倒序排序 排序后遍历服务器 找到最近的服

  • PHP实现的服务器一致性hash分布算法示例

    本文实例讲述了PHP实现的服务器一致性hash分布算法.分享给大家供大家参考,具体如下: <?php /** * 对服务器进行一致性hash分布算法 */ class HashRing { private $servers = array(); private $nodeList = array(); private $nodeHashList = array(); private $nodeTotalNum = 0; private $virtualNodeNum = 32; private

  • java实现一致性hash算法实例代码

    一致性hash算法java版本简单实现 package com.java4all.grouth.consistent; import java.util.LinkedList; import java.util.List; import java.util.SortedMap; import java.util.TreeMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * 一致性hash算法java简易实现 * @

  • 详解spring cloud中使用Ribbon实现客户端的软负载均衡

    开篇 本例是在springboot整合H2内存数据库,实现单元测试与数据库无关性和使用RestTemplate消费spring boot的Restful服务两个示例的基础上改造而来 在使用RestTemplate来消费spring boot的Restful服务示例中,我们提到,调用spring boot服务的时候,需要将服务的URL写死或者是写在配置文件中,但这两种方式,无论哪一种,一旦ip地址发生了变化,都需要改动程序,并重新部署服务,使用Ribbon的时候,可以有效的避免这个问题. 前言:

随机推荐