关于Openfire集群源码的分析

本文介绍了openfire的相关内容,这个东西现在用的人好像不多了。算了,我们看看具体内容。

openfire是什么?

Openfire 采用Java开发,开源的实时协作(RTC)服务器基于XMPP(Jabber)协议。Openfire安装和使用都非常简单,并利用Web进行管理。单台服务器可支持上万并发用户。由于是采用开放的XMPP协议,您可以使用各种支持XMPP协议的IM客户端软件登陆服务。如果你想轻易地构建高效率的即时通信服务器,那就选择它吧!

openfire能做什么?

我们要了解Openfire,首先要了解XMPP协议,因为Openfire是用Java语言编写的,基于XMPP协议、开源的实时协作的服务器。Openfire具有跨平台的能力,Openfire与客户端采用的是C/S架构,一个服务器要负责为连接在其上的客户端提供服务。Openfire客户端有spark,pidgin, Miranda IM,iChat等,用户如果自己开发客户端,可以采用遵循GPL的开源Client端API--Smack。Openfire服务器端支持插件开发,如果开发者需要添加新的服务,可以开发出自己的插件后,安装至服务器,就可以提供服务,如查找联系人服务就是以插件的形式提供的。

openfire如果用户量增加后为了解决吞吐量问题,需要引入集群,在openfire中提供了集群的支持,另外也实现了两个集群插件:hazelcast和clustering。为了了解情况集群的工作原理,我就沿着openfire的源代码进行了分析,也是一次学习的过程。
首先理解集群的一些简单概念

集群的目的是让多个实例像一个实例一样运行,这样就可以通过增长实例来增长计算能力。也就是所谓的分布式计算问题,这其中最为关注的一个特性就是——CAP理论,也就是所谓的一致性、可用性、分区容错性。集群中最核心解决的问题就是CAP。

CAP综合理解就是我上面写的,多个实例像一个实例一样运行。

所以所谓集群就是把一些数据共享或者同步到不同的实例上,这样系统使用同样的算法,取的结果当然应该是相同啦。所以一些数据库的主从复制,缓存数据集群都是类似这种解决方法。只是代码实现质量和处理规模的问题。

有了这个基础我们再来看看openfire是怎么解决这个问题的。

openfire的集群设计

1、哪些需要进行集群间的同步

对于openfire而言,有这几方面的数据需要进行保证集群间的同步:数据库存的数据、缓存数据、session。貌似就这些吧?

数据库

因为对于openfire来说基本上是透明的,所以这块就交给数据库本身来实现。

缓存数据

缓存是存在内存里的,所以这部分是要同步的

session

session在openfire并不需要所有实例同步,但是需要做用户路由缓存,否则发消息时找不到对应的会话。由此用户路由还是要同步的。

2、缓存的设计

缓存接口

openfire里对缓存的数据容器提供了一个包装接口,这个接口提供了缓存数据的基本方法,用于统一数据操作。

publicinterface Cache<K,V> extends java.util.Map<K,V>

如果不开启集群时缓存的默认缓存容器类是:public class DefaultCache<K, V> ,实际上DefaultCache就是用一个Hashmap来存数据的。

缓存工厂类

为了保证缓存是可以扩展的,提供了一个工厂类:

publicclass CacheFactory

CacheFactory类中会管理所有的缓存容器,如下代码:

/**
   * Returns the named cache, creating it as necessary.
   *
   * @param name     the name of the cache to create.
   * @return the named cache, creating it as necessary.
   */
  @SuppressWarnings("unchecked")
  publicstaticsynchronized <T extends Cache> T createCache(String name) {
    T cache = (T) caches.get(name);
    if (cache != null) {
      return cache;
    }
    cache = (T) cacheFactoryStrategy.createCache(name);
    log.info("Created cache [" + cacheFactoryStrategy.getClass().getName() + "] for " + name);
    return wrapCache(cache, name);
  }

上面代码中会通过缓存工厂策略对象来创建一个缓存容器,最后warpCache方法会将此容器放入到caches中。

缓存工厂类的策略

在CacheFactory中默认是使用一个DefaultLocalCacheStrategy来完成缓存创建的。另外还提供了在集群条件下的缓存策略接入。也就是通过实例化不同的策略来切换缓存管理方案。比如后面要提到的hazelcast就是通过这个来替换了本地缓存策略的。从接口的设计上来看,openfire的缓存策略也就是为了集群与非集群的实现。

3、集群的设计

在openfire中的集群主要包括:集群管理、数据同步管理、集群计算任务。

集群管理者

在openfire中主要是一个类来实现:ClusterManager,在ClusterManager中实现了集群实例的加入、退出管理,因为没有使用主从结构,所以ClusterManager实现了一个无中心管理,不知道我理解的对不对。因为只要当前实实例启用了集群,ClusterManager就会主动的加载集群管理并与其他的集群进行同步。

startup

startup是启动集群的方法,代码:

publicstaticsynchronizedvoid startup() {
    if (isClusteringEnabled() && !isClusteringStarted()) {
      initEventDispatcher();
      CacheFactory.startClustering();
    }
  }
 

首先要判断是否开启了集群并且当前集群实例未运行时才去启动。

先是初始化了事件分发器,用于处理集群的同步事情。

然后就是调用CacheFactory的startClustering来运行集群。在startClustering方法中主要是这几个事情:

会使用集群的缓存工厂策略来启动,同时使自己加入到集群中。

开启一个线程用于同步缓存的状态

在前面startup中的initEventDispatcher方法,在这里会注册一个分发线程监听到集群事件,收到事件后会执行joinedCluster或者leftCluster的操作,joinedCluster就是加入到集群中的意思。

在joinedCluster时会将本地的缓存容器都转换为集群缓存。由此便完成了集群的初始化并加入到集群中了。

shutdown

shutdown相对简单点就是退出集群,并且将缓存工厂恢复为本地缓存。

同步管理

上面主要是讲了如何管理集群,接着比较重要的就是如何在集群间同步数据呢?这部分主要是看具体的分布式计算系统的实现了,从openfire来说就是将数据放到集群缓存中,然后通过集群组件来完成的,比如使用hazelcast。

因为使用缓存来解决,所以在CacheFactory中才会有这些么多关于集群的处理代码,特别是对于缓存策略的切换,以及集群任务处理都在CacheFactory作为接口方法向外公开。这样也把集群的实现透明了。

集群计算任务

在这之前一直没有提到集群中的计算问题,因为既然有了集群是不是可以利用集群的优势进行一些并行计算呢?这部分我倒没有太过确定,只是看到相关的代码所以简单列一下。

在CacheFactory类中有几个方法:doClusterTask、doSynchronousClusterTask,这两个都是overload方法,参数有所不同而已。这几个方法就是用于执行一些计算任务的。就看一下doClusterTask:

 public static void doClusterTask(final ClusterTask<?> task) {
    cacheFactoryStrategy.doClusterTask(task);
  }

这里有个限定就是必须是ClusterTask派生的类才行,看看它的定义:

public interface ClusterTask<V> extends Runnable, Externalizable {
  V getResult();
}

主要是为了异步执行和序列化,异步是因为不能阻塞,而序列化当然就是为了能在集群中传送。

再看CacheFactory的doClusterTask方法可以发现,它只不过是代理了缓存策略工厂的doClusterTask,具体的实现还是要看集群实现的。

看一看hazelcast的实现简单理解openfire集群

在openfire中有集群的插件实现,这里就以hazelcast为例子简单的做一下分析与学习。

缓存策略工厂类(ClusteredCacheFactory)

ClusteredCacheFactory实现了CacheFactoryStrategy,代码如下:

publicclass ClusteredCacheFactory implements CacheFactoryStrategy {

首先是startCluster方法用于启动集群,主要完成几件事情:

设置缓存序列化工具类,ClusterExternalizableUtil。这个是用于集群间数据复制时的序列化工具

设置远程session定位器,RemoteSessionLocator,因为session不同步,所以它主要是用于多实例间的session读取

设置远程包路由器ClusterPacketRouter,这样就可以在集群中发送消息了

加载Hazelcast的实例设置NodeID,以及设置ClusterListener

在前面说起集群启动时提到了缓存切换,那具体实现时是如何做的呢?

因为集群启动后就要是CacheFactory.joinedCluster方法来加入集群的。看一下加入的代码:

/**
   * Notification message indicating that this JVM has joined a cluster.
   */
  @SuppressWarnings("unchecked")
  publicstaticsynchronizedvoid joinedCluster() {
    cacheFactoryStrategy = clusteredCacheFactoryStrategy;
    // Loop through local caches and switch them to clustered cache (copy content)for (Cache cache : getAllCaches()) {
      // skip local-only cachesif (localOnly.contains(cache.getName())) continue;
      CacheWrapper cacheWrapper = ((CacheWrapper) cache);
      Cache clusteredCache = cacheFactoryStrategy.createCache(cacheWrapper.getName());
      clusteredCache.putAll(cache);
      cacheWrapper.setWrappedCache(clusteredCache);
    }
    clusteringStarting = false;
    clusteringStarted = true;
    log.info("Clustering started; cache migration complete");
  }

这里可以看到会读取所有的缓存容器并一个个的使用Wrapper包装一下,然后用同样的缓存名称去createCache一个新的Cache,这步使用的是切换后的集群缓存策略工厂,也就是说会使用ClusteredCacheFactory去创建新的缓存容器。最后再将cache写入到新的clusteredCache 里,这样就完成了缓存的切换。

当然这里还是要看一下ClusteredCacheFactory的createCache实现:

public Cache createCache(String name) {
    // Check if cluster is being started upwhile (state == State.starting) {
      // Wait until cluster is fully started (or failed)try {
        Thread.sleep(250);
      }
      catch (InterruptedException e) {
        // Ignore
      }
    }
    if (state == State.stopped) {
      thrownew IllegalStateException("Cannot create clustered cache when not in a cluster");
    }
    returnnew ClusteredCache(name, hazelcast.getMap(name));
  }

这里使用的是ClusteredCache,而且最重要的是传入的第二个map参数换成了hazelcast的了,这样之后再访问这个缓存容器时已经不再是原先的本地Cache了,已经是hazelcast的map对象。hazelcast会自动对map的数据进行同步管理,这也就完成了缓存同步的功能。

集群计算

那就看hazelcast的实现吧,在ClusteredCacheFactory中doClusterTask举个例子吧

publicvoid doClusterTask(final ClusterTask task) {
    if (cluster == null) { return; }
    Set<Member> members = new HashSet<Member>();
    Member current = cluster.getLocalMember();
    for(Member member : cluster.getMembers()) {
      if (!member.getUuid().equals(current.getUuid())) {
        members.add(member);
      }
    }
    if (members.size() > 0) {
      // Asynchronously execute the task on the other cluster members
      logger.debug("Executing asynchronous MultiTask: " + task.getClass().getName());
      hazelcast.getExecutorService(HAZELCAST_EXECUTOR_SERVICE_NAME).submitToMembers(
        new CallableTask<Object>(task), members);
    } else {
        logger.warn("No cluster members selected for cluster task " + task.getClass().getName());
    }
  }

过程就是,先获取到集群中的实例成员,当然要排除自己。然后hazelcast提供了ExecutorService来执行这个task,方法就是submiteToMembers。这样就提交了一个运算任务。只不过具体是如何分配计算并汇集结果倒真不太清楚。

总结

花了一天时间看了一下openfire的集群,顺手就写了一篇文章,确实也到了一些东西。和一些网友沟通中好像目前大家更愿意使用redies来完成缓存共享,以及通过代理来实现集群,而不愿意使用openfire的集群方案。这部分我没有遇到如何大的并发量需求确实不知道区别在哪里。以后有机会还是动手试试写一个redies的插件。

(0)

相关推荐

  • Laravel框架实现redis集群的方法分析

    本文实例讲述了Laravel框架实现redis集群的方法.分享给大家供大家参考,具体如下: 在app/config/database.php中配置如下: 'redis' => array( 'cluster' => true, 'default' => array( 'host' => '172.21.107.247', 'port' => 6379, ), 'redis1' => array( 'host' => '172.21.107.248', 'port'

  • mongodb3.4集群搭建实战之高可用的分片+副本集

    前言 最近因为工作的原因,在学习使用mongodb数据库,mongodb是最常用的nodql数据库,在数据库排名中已经上升到了前六.这篇文章介绍如何搭建高可用的mongodb(分片+副本)集群,分享出来供大家参考学习,下面话不多说了,来一起看看详细的介绍: 在搭建集群之前,需要首先了解几个概念:路由,分片.副本集.配置服务器等. 相关概念 先来看一张图: 从图中可以看到有四个组件:mongos.config server.shard.replica set. mongos,数据库集群请求的入口,

  • Linux集群/分布式环境下session处理的五种策略详解

    前言 我们一般在搭建完集群环境后,不得不考虑的一个问题就是用户访问产生的session如何处理.如果不做任何处理的话,用户将出现频繁登录的现象,比如集群中存在A.B两台服务器,用户在第一次访问网站时,Nginx通过其负载均衡机制将用户请求转发到A服务器,这时A服务器就会给用户创建一个Session.当用户第二次发送请求时,Nginx将其负载均衡到B服务器,而这时候B服务器并不存在Session,所以就会将用户踢到登录页面.这将大大降低用户体验度,导致用户的流失,这种情况是项目绝不应该出现的. 我

  • 使用docker快速搭建Spark集群的方法教程

    前言 Spark 是 Berkeley 开发的分布式计算的框架,相对于 Hadoop 来说,Spark 可以缓存中间结果到内存而提高某些需要迭代的计算场景的效率,目前收到广泛关注.下面来一起看看使用docker快速搭建Spark集群的方法教程. 适用人群 正在使用spark的开发者 正在学习docker或者spark的开发者 准备工作 安装docker (可选)下载java和spark with hadoop Spark集群 Spark运行时架构图 如上图: Spark集群由以下两个部分组成 集

  • Redis集群搭建全记录

    Redis集群是一个提供在多个Redis节点间共享数据的程序集. Redis集群中不支持处理多个keys的命令. Redis集群通过分区来提供一定程度的可用性.在某个节点宕机或者不可用的时候可以继续处理命令. Redis集群数据分片 在Redis集群中,使用数据分片(sharding)而不是一致性hash(consistency hashing)来实现,一个Redis集群包含16384个哈希槽(hash slot),数据库中的每个键都存在这些哈希槽中的某一个,通过CRC16校验后对16384取模

  • 关于Openfire集群源码的分析

    本文介绍了openfire的相关内容,这个东西现在用的人好像不多了.算了,我们看看具体内容. openfire是什么? Openfire 采用Java开发,开源的实时协作(RTC)服务器基于XMPP(Jabber)协议.Openfire安装和使用都非常简单,并利用Web进行管理.单台服务器可支持上万并发用户.由于是采用开放的XMPP协议,您可以使用各种支持XMPP协议的IM客户端软件登陆服务.如果你想轻易地构建高效率的即时通信服务器,那就选择它吧! openfire能做什么? 我们要了解Open

  • jQuery.prototype.init选择器构造函数源码思路分析

    一.源码思路分析总结 概要: jQuery的核心思想可以简单概括为"查询和操作dom",今天主要是分析一下jQuery.prototype.init选择器构造函数,处理选择器函数中的参数: 这个函数的参数就是jQuery()===$()执行函数中的参数,可以先看我之前写的浅析jQuery基础框架一文,了解基础框架后,再看此文. 思路分析: 以下是几种jQuery的使用情况(用于查询dom),每种情况都返回一个选择器实例(习惯称jQuery对象(一个nodeList对象),该对象包含查询

  • java 中Buffer源码的分析

    java 中Buffer源码的分析 Buffer Buffer的类图如下: 除了Boolean,其他基本数据类型都有对应的Buffer,但是只有ByteBuffer才能和Channel交互.只有ByteBuffer才能产生Direct的buffer,其他数据类型的Buffer只能产生Heap类型的Buffer.ByteBuffer可以产生其他数据类型的视图Buffer,如果ByteBuffer本身是Direct的,则产生的各视图Buffer也是Direct的. Direct和Heap类型Buff

  • Java集合源码全面分析

    Java集合工具包位于Java.util包下,包含了很多常用的数据结构,如数组.链表.栈.队列.集合.哈希表等.学习Java集合框架下大致可以分为如下五个部分:List列表.Set集合.Map映射.迭代器(Iterator.Enumeration).工具类(Arrays.Collections). 从上图中可以看出,集合类主要分为两大类:Collection和Map. Collection是List.Set等集合高度抽象出来的接口,它包含了这些集合的基本操作,它主要又分为两大部分:List和Se

  • Android ViewPager源码详细分析

    1.问题 由于Android Framework源码很庞大,所以读源码必须带着问题来读!没有问题,创造问题再来读!否则很容易迷失在无数的方法与属性之中,最后无功而返. 那么,关于ViewPager有什么问题呢? 1). setOffsreenPageLimit()方法是如何实现页面缓存的? 2). 在布局文件中,ViewPager布局内部能否添加其他View? 3). 为什么ViewPager初始化时,显示了一个页面却不会触发onPageSelected回调? 问题肯定不止这三个,但是有这三个问

  • Java源码角度分析HashMap用法

    -HashMap- 优点:超级快速的查询速度,时间复杂度可以达到O(1)的数据结构非HashMap莫属.动态的可变长存储数据(相对于数组而言). 缺点:需要额外计算一次hash值,如果处理不当会占用额外的空间. -HashMap如何使用- 平时我们使用hashmap如下 Map<Integer,String> maps=new HashMap<Integer,String>(); maps.put(1, "a"); maps.put(2, "b&quo

  • koa中间件核心(koa-compose)源码解读分析

    最近经常使用koa进行服务端开发,迷恋上了koa的洋葱模型,觉得这玩意太好用了.而且koa是以精简为主,没有很多集成东西,所有的东西都需按需加载,这个更是太合我胃口了哈哈哈哈. 相对与express的中间件,express的中间件使用的是串联,就像冰糖葫芦一样一个接着一个,而koa使用的V型结构(洋葱模型),这将给我们的中间件提供更加灵活的处理方式. 基于对洋葱模型的热衷,所以对koa的洋葱模型进行一探究竟,不管是koa1还是koa2的中间件都是基于koa-compose进行编写的,这种V型结构

  • 从源码角度分析Android的消息机制

    前言 说到Android的消息机制,那么主要的就是指的Handler的运行机制.其中包括MessageQueue以及Looper的工作过程. 在开始正文之前,先抛出两个问题: 为什么更新UI的操作要在主线程中进行? Android中为什么主线程不会因为Looper.loop()里的死循环卡死? UI线程的判断是在ViewRootImpl中的checkThread方法中完成的. 对于第一个问题,这里给一个简单的回答: 如果可以在子线程中修改UI,多线程的并发访问可能会导致UI控件的不可预期性,采用

  • java线程池核心API源码详细分析

    目录 概述 源码分析 Executor ExecutorService ScheduledExecutorService ThreadPoolExecutor ScheduledThreadPoolExecutor 总结 概述 线程池是一种多线程处理形式,处理过程中将任务添加到队列,然后在创建线程后自动启动这些任务.线程池线程都是后台线程.每个线程都使用默认的堆栈大小,以默认的优先级运行,并处于多线程单元中.如果某个线程在托管代码中空闲(如正在等待某个事件),则线程池将插入另一个辅助线程来使所有

  • Java源码深度分析String与StringBuffer及StringBuilder详解

    目录 StringBuffer和StringBuild的区别 创建StringBuffer() 添加功能 删除功能 替换功能 反转功能 最后总结一下 String的字符串是不可变的,StringBuffer和StringBuilder是可变的 String:是字符常量,适用于少量的字符串操作的情况. StringBuilder:适用于单线程下在字符缓冲区进行大量操作的情况 . StringBuffer:适用多线程下在字符缓冲区进行大量操作的情况. StringBuffer和StringBuild

随机推荐