ZooKeeper开发实际应用案例实战

目录
  • 项目背景介绍
  • 面临问题
  • 如何解决
  • 代码讲解
    • 数据服务器
    • 检索服务器
  • 总结
  • 附:完整代码
    • 数据服务端代码
    • 检索服务端代码

项目背景介绍

首先给大家介绍一下本文描述项目的情况。这是一个检索网站,它让你能在几千万份复杂文档数据中检索出你所需要的文档数据。为了加快检索速度,项目的数据分布在100台机器的内存里,我们称之为数据服务器。除了数据,这100台机器上均部署着检索程序。这些server之外,还有数台给前端提供接口的搜索server,这些机器属一个集群,我们称之为检索服务器。当搜索请求过来时,他们负责把搜索请求转发到那100台机器,待所有机器返回结果后进行合并,最终返回给前端页面。结构如下图:

面临问题

网站上线之初,由于数据只有几百万,所以数据服务器只有10多台。是一个规模比较小的分布式系统,当时没有做分布式系统的协调,也能正常工作,偶尔出问题,马上解决。但是到了近期,机器增长到100台,网站几乎每天都会出现问题,导致整个分布式系统挂掉。问题原因如下:

数据服务器之前没有做分布式协调。对于检索服务器来说,并不知道哪些数据服务器还存活,所以检索服务器每次检索,都会等待100台机器返回结果。但假如100台数据服务中某一台死掉了,检索服务器也会长时间等待他的返回。这导致了检索服务器积累了大量的请求,最终被压垮。当所有的检索服务器都被压垮时,那么网站也就彻底不可用了。

问题的本质为检索服务器维护的数据服务器列表是静态不变的,不能感知数据服务器的上下线。

在10台数据服务器的时候,某一台机器出问题的概率很小。但当增长到100台服务器时,出问题的概率变成了10倍。所以才会导致网站几乎每天都要死掉一次。

由于一台机器的问题,导致100台机器的分布式系统不可用,这是极其不合理,也是无法忍受的。

之前此项目的数据和检索不由我负责。了解到此问题的时候,我觉得这个问题得立刻解决,否则不但用户体验差,而且开发和运维也要每天疲于系统维护,浪费了大量资源,但由于还有很多新的需求在开发,原来的团队也没时间去处理。今年我有机会来解决这个问题,当时正好刚刚研究完zookeeper,立刻想到这正是采用zookeeper的典型场景。

如何解决

我直接说方案,程序分为数据服务器和检索服务器两部分。

数据服务器:

1、每台数据服务器启动时候以临时节点的形式把自己注册到zookeeper的某节点下,如/data_servers。这样当某数据服务器死掉时,session断开链接,该节点被删除。

检索服务器:

1、启动时,加载/data_servers下所有子节点数据,获取了目前所有能提供服务的数据服务器列表,并且加载到内存中。

2、启动时,同时监听/data_servers节点,当新的数据server上线或者某个server下线时,获得通知,然后重新加载/data_servers下所有子节点数据,刷新内存中数据服务器列表。

通过以上方案,做到数据服务器上下线时,检索服务器能够动态感知。检索服务器在检索前,从内存中取得的数据服务器列表将是最新的、可用的。即使在刷新时间差内取到了掉线的数据服务器也没关系,最多影响本次查询,而不会拖垮整个集群。见下图:

代码讲解

捋清思路后,其实代码就比较简单了。数据服务器只需要启动的时候写zookeeper临时节点就好了,同时写入自己服务器的相关信息,比如ip、port之类。检索无服务器端会稍微复杂点,不过此处场景和zookeeper官方给的例子十分符合,所以我直接参考官方例子进行修改,实现起来也很简单。关于官方例子我写过两篇博文,可以参考学习:

zookeeper官方例子翻译:ZooKeeper官方文档之Java客户端开发案例翻译

zookeeper官方例子解读:ZooKeeper官方文档之Java案例解读

数据服务器

数据服务器程序十分简单,只会做一件事情:启动的时候,把自己以临时节点的形式注册到zookeeper。一旦服务器挂掉,zookeeper自动删除临时znode。

我们创建ServiceRegister.java实现Runnable,数据服务启动的时候,单独线程运行此代码,实现注册到zookeeper逻辑。维系和zookeeper的链接。

检索服务器

检索服务器,代码设计完全采用官方案例,所以详细的代码解读请参考上面提到的两篇文章,这里只做下简述。

代码有两个类DataMonitor和LoadSaidsExecutor。LoadSaidsExecutor是启动入口,他来启动DataMonitor监控zookeeper节点变化。DataMonitor负责监控,初次启动和发现变化时,调用LoadSaidsExecutor的方法来加载最新的数据服务器列表信息。

DataMonitor和LoadSaidsExecutor的工作流程如下:

Excutor把自己注册为DataMonitor的监听

DataMonitor实现watcher接口,并监听znode

znode变化时,触发DataMonitor的监听

回调回调中通过ZooKeeper.exist() 再次监听znode

上一步exist的回调方法中,调用监听自己的Executor,执行业务逻辑6

Executor启新的线程加载数据服务器信息到内存中

注意:图为以前文章配图。图里应该把6,7步改为文字描述的第6步。

检索服务启动的时候,单独线程运行LoadSaIdsExecutor。LoadSaIdsExecutor会阻塞线程,转为事件驱动。

总结

我们通过一个例子,展示了zookeeper在实际系统中的应用,通过zookeeper解决了分布式系统的问题。其实以上代码还有很大的优化空间。我能想到如下两点:

1、数据服务器会假死或者变慢,但和zk链接还在,并不会从zk中删除,但已经拖慢了集群的速度。解决此问题,我们可以在数据服务器中加入定时任务,通过定时跑真实业务查询,监控服务器状态,一旦达到设定的红线阈值,强制下线,而不是等到server彻底死掉。

2、检索服务器每个server都监控zookeeper同一个节点,在节点变化时会出现羊群效应。当然,检索服务器如果数量不多还好。其实检索服务器应该通过zookeeper做一个leader选举,只由leader去监控zookeeper节点变化,更新redis中的数据服务器列表缓存即可。

附:完整代码

数据服务端代码

ServiceRegister.java

public class ServiceRegister implements Runnable{
    private ZooKeeper zk;
    private static final String ZNODE = "/sas";
    private static final String SA_NODE_PREFIX = "sa_";
    private String hostName="localhost:2181";
    public void setHostName(String hostName) {
        this.hostName = hostName;
    }
     public ServiceRegister() throws IOException {
        zk = new ZooKeeper(hostName, 10000,null);
    }

    @Override
    public void run() {
        try {
             createSaNode();
            synchronized (this) {
                wait();
            }

        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    //测试用
    public static void main(String[] args){
        try {
            new ServiceRegister().run();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    //创建子节点
    private String createSaNode() throws KeeperException, InterruptedException {
        // 如果根节点不存在,则创建根节点
        Stat stat = zk.exists(ZNODE, false);
        if (stat == null) {
            zk.create(ZNODE, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        }

        String hostName = System.getenv("HOSTNAME");
        // 创建EPHEMERAL_SEQUENTIAL类型节点
        String saPath = zk.create(ZNODE + "/" + SA_NODE_PREFIX,
                hostName.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
                CreateMode.EPHEMERAL_SEQUENTIAL);
        return saPath;
    }
}

检索服务端代码

DataMonitor.java

public class DataMonitor implements Watcher, AsyncCallback.ChildrenCallback {
    ZooKeeper zk;
    String znode;
    Watcher chainedWatcher;
    boolean dead;
    DataMonitorListener listener;
     List<String> prevSaIds;
     public DataMonitor(ZooKeeper zk, String znode, Watcher chainedWatcher,
                       DataMonitorListener listener) {
        this.zk = zk;
        this.znode = znode;
        this.chainedWatcher = chainedWatcher;
        this.listener = listener;
        // 这是整个监控的真正开始,通过获取children节点开始。设置了本对象为监控对象,回调对象也是本对象。以后均是事件驱动。
        zk.getChildren(znode, true, this, null);
    }

    /**
     * 其他和monitor产生交互的类,需要实现此listener
     */
    public interface DataMonitorListener {
        /**
         * The existence status of the node has changed.
         */
        void changed(List<String> saIds);

        /**
         * The ZooKeeper session is no longer valid.
         *
         * @param rc
         *                the ZooKeeper reason code
         */
        void closing(int rc);
    }

   /*
    *监控/saids的回调函数。除了处理异常外。
    *如果发生变化,和构造函数中一样,通过getChildren,再次监控,并处理children节点变化后的业务
    */
    public void process(WatchedEvent event) {
        String path = event.getPath();
        if (event.getType() == Event.EventType.None) {
            // We are are being told that the state of the
            // connection has changed
            switch (event.getState()) {
                case SyncConnected:
                    // In this particular example we don't need to do anything
                    // here - watches are automatically re-registered with
                    // server and any watches triggered while the client was
                    // disconnected will be delivered (in order of course)
                    break;
                case Expired:
                    // It's all over
                    dead = true;
                    listener.closing(Code.SESSIONEXPIRED.intValue());
                    break;
            }
        } else {
            if (path != null && path.equals(znode)) {
                // Something has changed on the node, let's find out
                zk.getChildren(znode, true, this, null);
            }
        }
        if (chainedWatcher != null) {
            chainedWatcher.process(event);
        }
    }

    //拿到Children节点后的回调函数。
    @Override
    public void processResult(int rc, String path, Object ctx, List<String> children) {
        boolean exists;
        switch (rc) {
            case Code.Ok:
                exists = true;
                break;
            case Code.NoNode:
                exists = false;
                break;
            case Code.SessionExpired:
            case Code.NoAuth:
                dead = true;
                listener.closing(rc);
                return;
            default:
                // Retry errors
                zk.getChildren(znode, true, this, null);
                return;
        }

        List<String> saIds = null;

        //如果存在,再次查询到最新children,此时仅查询,不要设置监控了
        if (exists) {
            try {
                saIds = zk.getChildren(znode,null);
            } catch (KeeperException e) {
                // We don't need to worry about recovering now. The watch
                // callbacks will kick off any exception handling
                e.printStackTrace();
            } catch (InterruptedException e) {
                return;
            }
        }

        //拿到最新saids后,通过listener(executor),加载Saids。
        if ((saIds == null && saIds != prevSaIds)
                || (saIds != null && !saIds.equals(prevSaIds))) {
            listener.changed(saIds);
            prevSaIds = saIds;
        }
    }
}

LoadSaIdsExecutor.java

public class LoadSaIdsExecutor
        implements Watcher, Runnable, DataMonitor.DataMonitorListener
{

    private DataMonitor dm;
    private ZooKeeper zk;
    private static final String znode = "/sas";
    private String hostName="localhost:2181";
    public void setHostName(String hostName) {
        this.hostName = hostName;
    }

        /*
         *初始化zookeeper及DataMonitor
         * 自己作为zookeeper的监控者,监控和zookeeper连接的变化
         * 自己作为DataMonitor的listener。当dm监控到变化时会调用executor执行业务操作
         */
    public LoadSaIdsExecutor() throws KeeperException, IOException {
        zk = new ZooKeeper(hostName, 300000, this);
        dm = new DataMonitor(zk, znode, null, this);
    }

    /**
     * 入口方法,测试用。
     */
    public static void main(String[] args) {
        try {
            new LoadSaIdsExecutor().run();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    /**
     * 作为单独线程运行
     */
    public void run() {
        try {
            synchronized (this) {
                while (!dm.dead) {
                    wait();
                }
            }
        } catch (InterruptedException e) {
        }
    }
    /*
     *作为zookeeper监控者的回调,直接传递事件给monitor的回调函数统一处理
     */
    @Override
    public void process(WatchedEvent event) {
        dm.process(event);
    }
    /*
     *当关闭时,让线程线继续走完
     */
    public void closing(int rc) {
        synchronized (this) {
            notifyAll();
        }
    }

    /*
     *监控到/saids变化后的处理类
     */
    static class SaIdsLoader extends Thread {
         List<String> saIds = null;
        //构造对象后直接启动线程
        public SaIdsLoader(List<String> saIds){
            this.saIds = saIds;
            start();
        }

        public void run() {
            System.out.println("------------加载开始------------");
            //业务处理的地方
            if(saIds!=null){
                saIds.forEach(id->{
                    System.out.println(id);
                });
            }
            System.out.println("------------加载结束------------");
        }
    }

    /*
     *作为listener对外暴露的方法,在节点/saids变化时被调用。
     */
    @Override
    public void changed(List<String> data) {
                new SaIdsLoader(data);
    }
}

以上就是ZooKeeper开发实际应用案例实战的详细内容,更多关于ZooKeeper开发应用案例的资料请关注我们其它相关文章!

(0)

相关推荐

  • 基于Zookeeper的使用详解

    更多内容请查看zookeeper官网 Zookper: 一种分布式应用的协作服务 Zookper是一种分布式的,开源的,应用于分布式应用的协作服务.它提供了一些简单的操作,使得分布式应用可以基于这些接口实现诸如同步.配置维护和分集群或者命名的服务.Zookper很容易编程接入,它使用了一个和文件树结构相似的数据模型.可以使用Java或者C来进行编程接入. 众所周知,分布式的系统协作服务很难有让人满意的产品.这些协作服务产品很容易陷入一些诸如竞争选择条件或者死锁的陷阱中.Zookper的目的就是将

  • ZooKeeper官方文档之Java案例解读

    目录 需求理解 举例类比 Executor和DataMonitor 内部类和接口 Executor: DataMonitor: 继承关系 Executor: DataMonitor: 引用关系 Executor: DataMonitor: 图解 文档原文连接:http://zookeeper.apache.org/doc/current/javaExample.html#sc_completeSourceCode 翻译连接:https://www.jb51.net/article/236127.

  • ZooKeeper Java API编程实例分析

    本实例我们用的是java3.4.6版本,实例方便大家学习完后有不明白的可以在留言区讨论. 开发应用程序的ZooKeeper Java绑定主要由两个Java包组成: org.apache.zookeeper org.apache.zookeeper.data org.apache.zookeeper包由ZooKeeper监视的接口定义和ZooKeeper的各种回调处理程序组成. 它定义了ZooKeeper客户端类库的主要类以及许多ZooKeeper事件类型和状态的静态定义. org.apache.

  • ZooKeeper官方文档之Java客户端开发案例翻译

    目录 一个简单的监听客户端 需求 程序设计 Executor类 DataMonitor类 完整代码清单 官网原文标题<ZooKeeper Java Example> 官网原文地址:http://zookeeper.apache.org/doc/current/javaExample.html#sc_completeSourceCode 针对本篇翻译文章,我还有一篇对应的笔记<ZooKeeper官方Java例子解读>,如果对官网文档理解有困难,可以结合我的笔记理解. 一个简单的监听客

  • Zookeeper接口kazoo实例解析

    本文主要研究的是Zookeeper接口kazoo的相关内容,具体介绍如下. zookeeper的开发接口以前主要以java和c为主,随着python项目越来越多的使用zookeeper作为分布式集群实现,python的zookeeper接口也出现了很多,现在主流的纯python的zookeeper接口是kazoo.因此如何使用kazoo开发基于python的分布式程序是必须掌握的. 1.安装kazoo yum install python-pip pip install kazoo 安装过程中会

  • ZooKeeper开发实际应用案例实战

    目录 项目背景介绍 面临问题 如何解决 代码讲解 数据服务器 检索服务器 总结 附:完整代码 数据服务端代码 检索服务端代码 项目背景介绍 首先给大家介绍一下本文描述项目的情况.这是一个检索网站,它让你能在几千万份复杂文档数据中检索出你所需要的文档数据.为了加快检索速度,项目的数据分布在100台机器的内存里,我们称之为数据服务器.除了数据,这100台机器上均部署着检索程序.这些server之外,还有数台给前端提供接口的搜索server,这些机器属一个集群,我们称之为检索服务器.当搜索请求过来时,

  • PHP+Redis开发的书签案例实战详解

    本文实例讲述了PHP+Redis开发的书签案例.分享给大家供大家参考,具体如下: redis是一个key-value存储系统.和Memcached类似,它支持存储的value类型相对更多,包括string(字符串).list(链表).set(集合).zset(sorted set 有序集合)和hash(哈希类型).这些数据类型都支持push/pop.add/remove及取交集并集和差集及更丰富的操作,而且这些操作都是原子性的.在此基础上,redis支持各种不同方式的排序.与memcached一

  • Android开发之对话框案例详解(五种对话框)

    下面通过实例代码给大家分享5种android对话框,具体内容详情如下所示: 1 弹出普通对话框 --- 系统更新 2 自定义对话框-- 用户登录 3 时间选择对话框 -- 时间对话框 4 进度条对话框 -- 信息加载.. 5 popuWindow对话框 1 弹出普通对话框 --- 系统更新  //弹出普通对话框 public void showNormalDialog(View v) { AlertDialog.Builder builder = new Builder(this); //设置D

  • Java有趣好玩的图形界面开发八个案例实现

    目录 1.复选框和单选框按钮组 2.文本编辑组件和滚动窗格 3.多个选项卡设置 4.在框架窗口中加入面板 5.在窗口中加入标签 6.框架中加入指定大小的标签 7.在框架窗口中加入按钮 8.框架窗口的创建 总结 虽然GUI技术没有很大的市场,甚至很多初学者放弃学习GUI,但是学习GUI编程的过程对于提高编程兴趣,深入理解Java编程有很大的作用.效果图如下,加油吧!! 1.复选框和单选框按钮组 —在框架窗口中加入复选框和单选框按钮组 import javax.swing.*; public cla

  • Pygame游戏开发之太空射击实战图像精灵下篇

    目录 视频 图像精灵 变为图形精灵 在哪里可以找到艺术品 组织游戏组件 视频 观看视频 图像精灵 这是我们教程系列“使用 Pygame 进行游戏开发”的第 3 部分.它适用于对游戏开发和提高Python编码技能感兴趣的初学者/中级程序员.您应该从第 1 部分开始:入门 变为图形精灵 彩色矩形很好 - 它们是开始并确保游戏正常工作的好方法,但迟早你会想要为你的精灵使用很酷的宇宙飞船图像或角色.这就引出了第一个问题:您从哪里获得游戏图形. 在哪里可以找到艺术品 当您的游戏需要美术时,您有 3 种选择

  • Pygame游戏开发之太空射击实战精灵的使用上篇

    目录 视频 使用精灵 什么是精灵 创建一个精灵 精灵运动 视频 观看视频 使用精灵 这是我们“使用 Pygame 进行游戏开发”教程系列的第 2 部分.您应该从第 1 部分开始:入门 什么是精灵 sprite 是一个计算机图形术语,指屏幕上可以移动的任何对象.当您玩任何2D游戏时,您在屏幕上看到的所有对象都是精灵.精灵可以是动画的,它们可以由玩家控制,甚至可以相互交互. 我们将在游戏循环的更新和绘制部分中负责更新和绘制精灵.但您可能可以想象,如果您的游戏具有大量精灵,那么游戏循环的这些部分可能会

  • Pygame游戏开发之太空射击实战入门篇

    目录 视频 入门 游戏循环 1. 处理输入(或事件) 2. 更新游戏 3. 渲染(或绘制) 时钟 构建 Pygame 模板 渲染/绘制部分 输入/事件部分 控制屏幕刷新频率 结束语 本部分代码 视频 观看视频 入门 pygame 这是我们教程系列“使用 Pygame 进行游戏开发”的第 1 部分.它适用于对游戏开发和提高Python编码技能感兴趣的初学者/中级程序员. 什么是Pygame Pygame是一个“游戏库” - 一套帮助程序员制作游戏的工具.其中一些内容是: 图形和动画 声音(包括音乐

  • Pygame游戏开发之太空射击实战敌人精灵篇

    目录 视频 敌人精灵 敌人精灵 生成敌人 视频 观看视频 敌人精灵 这是我们“Shmup”项目的第2部分!在本课中,我们将添加一些敌人的精灵供玩家躲避.在本系列课程中,我们将使用Python和Pygame构建一个完整的游戏.它适用于已经了解Python基础知识并希望加深对Python的理解并学习编程游戏基础知识的初学者. 敌人精灵 在这一点上,我们不需要担心我们的敌人精灵是什么,我们只想让它们出现在屏幕上.你可能会认为你的游戏是关于宇宙飞船躲避流星或独角兽躲避飞行的比萨饼 - 就代码而言,这并不

  • Pygame游戏开发之太空射击实战添加图形篇

    目录 视频 选择图形 加载图像 绘制背景 精灵图像 结束语 视频 本教程的视频 选择图形 我们谈到了Opengameart.org,这是免费游戏艺术的重要来源,也是我们最喜欢的艺术家之一“肯尼”.Kenney为我们的游戏制作了完美的艺术包“太空射击包”,你可以在这里找到: http://opengameart.org/content/space-shooter-redux 它有很多非常漂亮的图像,包括宇宙飞船,激光,小行星等等. 当您下载包时,它会解压缩到一堆不同的文件夹中.我们想要的是PNG文

  • Pygame游戏开发之太空射击实战子弹与碰撞处理篇

    目录 视频 碰撞 边界框 敌人与玩家碰撞 射击 子弹精灵 按键事件 生成子弹 子弹碰撞 视频 本教程的视频 碰撞 碰撞是游戏开发的基本组成部分.碰撞检测就是要检测游戏中的一个对象是否正在接触另一个对象.碰撞处理决定了当碰撞发生时你想要发生什么. 在我们的游戏中,我们目前有许多敌人的精灵沿着屏幕飞向玩家,我们想知道其中一个精灵何时出现.对于我们游戏的这个阶段,我们只会说敌人击中玩家意味着游戏结束了. 边界框 请记住,Pygame 中的每个精灵都有一个rect定义其坐标和大小的属性.Pygame中的

随机推荐