实时计算知多少?

实时计算是什么?
请看下面的图:

我们以热卖产品的统计为例,看下传统的计算手段:

1将用户行为、log等信息清洗后保存在数据库中.
2将订单信息保存在数据库中.
3利用触发器或者协程等方式建立本地索引,或者远程的独立索引.
4join订单信息、订单明细、用户信息、商品信息等等表,聚合统计20分钟内热卖产品,并返回top-10.
5web或app展示.

这是一个假想的场景,但假设你具有处理类似场景的经验,应该会体会到这样一些问题和难处:

1、水平扩展问题(scale-out)
显然,如果是一个具有一定规模的电子商务网站,数据量都是很大的。而交易信息因为涉及事务,所以很难直接舍弃关系型数据库的事务能力,迁移到具有更好的scale-out能力的NoSQL数据库中。

那么,一般都会做sharding。历史数据还好说,我们可以按日期来归档,并可以通过批处理式的离线计算,将结果缓存起来。
但是,这里的要求是20分钟内,这很难。

2、性能问题
这个问题,和scale-out是一致的,假设我们做了sharding,因为表分散在各个节点中,所以我们需要多次入库,并在业务层做聚合计算。

问题是,20分钟的时间要求,我们需要入库多少次呢?
10分钟呢?
5分钟呢?
实时呢?
而且,业务层也同样面临着单点计算能力的局限,需要水平扩展,那么还需要考虑一致性的问题。
所以,到这里一切都显得很复杂。

3、业务扩展问题
假设我们不仅仅要处理热卖商品的统计,还要统计广告点击、或者迅速根据用户的访问行为判断用户特征以调整其所见的信息,更加符合用户的潜在需求等,那么业务层将会更加复杂。

也许你有更好的办法,但实际上,我们需要的是一种新的认知:

这个世界发生的事,是实时的。
所以我们需要一种实时计算的模型,而不是批处理模型。
我们需要的这种模型,必须能够处理很大的数据,所以要有很好的scale-out能力,最好是,我们都不需要考虑太多一致性、复制的问题。

那么,这种计算模型就是实时计算模型,也可以认为是流式计算模型。

现在假设我们有了这样的模型,我们就可以愉快地设计新的业务场景:

转发最多的微博是什么?
最热卖的商品有哪些?
大家都在搜索的热点是什么?
我们哪个广告,在哪个位置,被点击最多?

或者说,我们可以问:

这个世界,在发生什么?

最热的微博话题是什么?

我们以一个简单的滑动窗口计数的问题,来揭开所谓实时计算的神秘面纱。

假设,我们的业务要求是:

统计20分钟内最热的10个微博话题。

解决这个问题,我们需要考虑:

1、数据源
这里,假设我们的数据,来自微博长连接推送的话题。
2、问题建模
我们认为的话题是#号扩起来的话题,最热的话题是此话题出现的次数比其它话题都要多。
比如:@foreach_break : 你好,#世界#,我爱你,#微博#。
“世界”和“微博”就是话题。
3、计算引擎
我们采用storm。
4、定义时间
如何定义时间?
时间的定义是一件很难的事情,取决于所需的精度是多少。
根据实际,我们一般采用tick来表示时刻这一概念。

在storm的基础设施中,executor启动阶段,采用了定时器来触发“过了一段时间”这个事件。
如下所示:

(defn setup-ticks! [worker executor-data]
 (let [storm-conf (:storm-conf executor-data)
    tick-time-secs (storm-conf TOPOLOGY-TICK-TUPLE-FREQ-SECS)
    receive-queue (:receive-queue executor-data)
    context (:worker-context executor-data)]
  (when tick-time-secs
   (if (or (system-id? (:component-id executor-data))
       (and (= false (storm-conf TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS))
          (= :spout (:type executor-data))))
    (log-message "Timeouts disabled for executor " (:component-id executor-data) ":" (:executor-id executor-data))
    (schedule-recurring
     (:user-timer worker)
     tick-time-secs
     tick-time-secs
     (fn []
      (disruptor/publish
       receive-queue
       [[nil (TupleImpl. context [tick-time-secs] Constants/SYSTEM_TASK_ID Constants/SYSTEM_TICK_STREAM_ID)]]
       )))))))

每隔一段时间,就会触发这样一个事件,当流的下游的bolt收到一个这样的事件时,就可以选择是增量计数还是将结果聚合并发送到流中。

bolt如何判断收到的tuple表示的是“tick”呢?
负责管理bolt的executor线程,从其订阅的消息队列消费消息时,会调用到bolt的execute方法,那么,可以在execute中这样判断:

public static boolean isTick(Tuple tuple) {
  return tuple != null
      && Constants.SYSTEM_COMPONENT_ID .equals(tuple.getSourceComponent())
      && Constants.SYSTEM_TICK_STREAM_ID.equals(tuple.getSourceStreamId());
}

结合上面的setup-tick!的clojure代码,我们可以知道SYSTEM_TICK_STREAM_ID在定时事件的回调中就以构造函数的参数传递给了tuple,那么SYSTEM_COMPONENT_ID是如何来的呢?
可以看到,下面的代码中,SYSTEM_TASK_ID同样传给了tuple:

;; 请注意SYSTEM_TASK_ID和SYSTEM_TICK_STREAM_ID
(TupleImpl. context [tick-time-secs] Constants/SYSTEM_TASK_ID Constants/SYSTEM_TICK_STREAM_ID)
然后利用下面的代码,就可以得到SYSTEM_COMPONENT_ID:

  public String getComponentId(int taskId) {
    if(taskId==Constants.SYSTEM_TASK_ID) {
      return Constants.SYSTEM_COMPONENT_ID;
    } else {
      return _taskToComponent.get(taskId);
    }
  }

滑动窗口
有了上面的基础设施,我们还需要一些手段来完成“工程化”,将设想变为现实。

这里,我们看看Michael G. Noll的滑动窗口设计。

Topology

String spoutId = "wordGenerator";
  String counterId = "counter";
  String intermediateRankerId = "intermediateRanker";
  String totalRankerId = "finalRanker";
  // 这里,假设TestWordSpout就是我们发送话题tuple的源
  builder.setSpout(spoutId, new TestWordSpout(), 5);
  // RollingCountBolt的时间窗口为9秒钟,每3秒发送一次统计结果到下游
  builder.setBolt(counterId, new RollingCountBolt(9, 3), 4).fieldsGrouping(spoutId, new Fields("word"));
  // IntermediateRankingsBolt,将完成部分聚合,统计出top-n的话题
  builder.setBolt(intermediateRankerId, new IntermediateRankingsBolt(TOP_N), 4).fieldsGrouping(counterId, new Fields(
    "obj"));
    // TotalRankingsBolt, 将完成完整聚合,统计出top-n的话题
  builder.setBolt(totalRankerId, new TotalRankingsBolt(TOP_N)).globalGrouping(intermediateRankerId);

上面的topology设计如下:

将聚合计算与时间结合起来
前文,我们叙述了tick事件,回调中会触发bolt的execute方法,那可以这么做:

RollingCountBolt:

 @Override
 public void execute(Tuple tuple) {
  if (TupleUtils.isTick(tuple)) {
   LOG.debug("Received tick tuple, triggering emit of current window counts");
   // tick来了,将时间窗口内的统计结果发送,并让窗口滚动
   emitCurrentWindowCounts();
  }
  else {
   // 常规tuple,对话题计数即可
   countObjAndAck(tuple);
  }
 }

// obj即为话题,增加一个计数 count++
 // 注意,这里的速度基本取决于流的速度,可能每秒百万,也可能每秒几十.
 // 内存不足? bolt可以scale-out.
 private void countObjAndAck(Tuple tuple) {
  Object obj = tuple.getValue(0);
  counter.incrementCount(obj);
  collector.ack(tuple);
 }

 // 将统计结果发送到下游
 private void emitCurrentWindowCounts() {
  Map<Object, Long> counts = counter.getCountsThenAdvanceWindow();
  int actualWindowLengthInSeconds = lastModifiedTracker.secondsSinceOldestModification();
  lastModifiedTracker.markAsModified();
  if (actualWindowLengthInSeconds != windowLengthInSeconds) {
   LOG.warn(String.format(WINDOW_LENGTH_WARNING_TEMPLATE, actualWindowLengthInSeconds, windowLengthInSeconds));
  }
  emit(counts, actualWindowLengthInSeconds);
 }

上面的代码可能有点抽象,看下这个图就明白了,tick一到,窗口就滚动:

 IntermediateRankingsBolt & TotalRankingsBolt:

 public final void execute(Tuple tuple, BasicOutputCollector collector) {
  if (TupleUtils.isTick(tuple)) {
   getLogger().debug("Received tick tuple, triggering emit of current rankings");
   // 将聚合并排序的结果发送到下游
   emitRankings(collector);
  }
  else {
   // 聚合并排序
   updateRankingsWithTuple(tuple);
  }
 }

其中,IntermediateRankingsBolt和TotalRankingsBolt的聚合排序方法略有不同:

IntermediateRankingsBolt的聚合排序方法:

// IntermediateRankingsBolt的聚合排序方法:
 @Override
 void updateRankingsWithTuple(Tuple tuple) {
  // 这一步,将话题、话题出现的次数提取出来
  Rankable rankable = RankableObjectWithFields.from(tuple);
  // 这一步,将话题出现的次数进行聚合,然后重排序所有话题
  super.getRankings().updateWith(rankable);
 }

TotalRankingsBolt的聚合排序方法:

// TotalRankingsBolt的聚合排序方法
 @Override
 void updateRankingsWithTuple(Tuple tuple) {
 // 提出来自IntermediateRankingsBolt的中间结果
  Rankings rankingsToBeMerged = (Rankings) tuple.getValue(0);
 // 聚合并排序
  super.getRankings().updateWith(rankingsToBeMerged);
 // 去0,节约内存
  super.getRankings().pruneZeroCounts();
 }

而重排序方法比较简单粗暴,因为只求前N个,N不会很大:

 private void rerank() {
  Collections.sort(rankedItems);
  Collections.reverse(rankedItems);
 }

结语
下图可能就是我们想要的结果,我们完成了t0 - t1时刻之间的热点话题统计,其中的foreach_break仅仅是为了防盗版 : ].

以上就是本文的全部内容,希望大家喜欢。

(0)

相关推荐

  • js实时计算字数提醒的文本框

    自己想了一下应该是用JavaScript实现的,今天把它做出来了.原理很简单就是根据文本框触发的onkeyup事件来获得当前文本框字符的长度,然后文本框允许输入的最大字符数来减去当前的字符数,并用label控件显示出来就可以了. 效果图:演示代码: enter function keypress1() //text输入长度处理 { var text1=document.getElementById("mytext1").value; var len=15-text1.length; v

  • 实时计算知多少?

    实时计算是什么? 请看下面的图: 我们以热卖产品的统计为例,看下传统的计算手段: 1将用户行为.log等信息清洗后保存在数据库中. 2将订单信息保存在数据库中. 3利用触发器或者协程等方式建立本地索引,或者远程的独立索引. 4join订单信息.订单明细.用户信息.商品信息等等表,聚合统计20分钟内热卖产品,并返回top-10. 5web或app展示. 这是一个假想的场景,但假设你具有处理类似场景的经验,应该会体会到这样一些问题和难处: 1.水平扩展问题(scale-out) 显然,如果是一个具有

  • eBay 打造基于 Apache Druid 的大数据实时监控系统

    首先需要注意的是,本文即将提到的 Druid,并非阿里巴巴的 Druid 数据库连接池,而是另一个大数据场景下的解决方案:Apache Druid. Apache Druid 是一个用于大数据实时查询和分析的高容错.高性能开源分布式时序数据库系统,旨在快速处理大规模的数据,并能够实现快速查询和分析.尤其是当发生代码部署.机器故障以及其他产品系统遇到宕机等情况时,Druid 仍能够保持 100% 正常运行.创建 Druid 的最初意图主要是为了解决查询延迟问题,当时试图使用 Hadoop 来实现交

  • Python如何爬取实时变化的WebSocket数据的方法

    一.前言 作为一名爬虫工程师,在工作中常常会遇到爬取实时数据的需求,比如体育赛事实时数据.股市实时数据或币圈实时变化的数据.如下图: Web 领域中,用于实现数据'实时'更新的手段有轮询和 WebSocket 这两种.轮询指的是客户端按照一定时间间隔(如 1 秒)访问服务端接口,从而达到 '实时' 的效果,虽然看起来数据像是实时更新的,但实际上它有一定的时间间隔,并不是真正的实时更新.轮询通常采用 拉 模式,由客户端主动从服务端拉取数据. WebSocket 采用的是 推 模式,由服务端主动将数

  • 在C#中使用二叉树实时计算海量用户积分排名的实现详解

    从何说起 前些天和朋友讨论一个问题,他们的应用有几十万会员然后对应有积分,现在想做积分排名的需求,问有没有什么好方案.这个问题也算常见,很多地方都能看到,常规做法一般是数据定时跑批把计算结果到中间表然后直接查表就行,或者只显示个TOP N的排行榜,名次高的计算真实名次,名次比较低的直接显示在xxx名开外这种.但是出于探索问题的角度,我还是想找一下有没有实时计算的办法,并且效率能够接受. 在博客园搜到一篇不错的文章,基本罗列了常用的方案,每种算法详细介绍了具体思路,其中基于二叉树的算法是个非常不错

  • FreeRTOS实时操作系统的任务应用函数详解

    目录 1.获取任务系统状态 1.1函数描述 1.2参数描述 1.3返回值 1.4用法举例 2.获取当前任务句柄 2.1函数描述 2.2返回值 3.获取空闲任务句柄 3.1函数描述 3.2返回值 4.获取任务堆栈最大使用深度 4.1函数描述 4.2参数描述 4.3返回值 4.4用法举例 5.获取任务状态 5.1函数描述 5.2参数描述 5.3返回值 6.获取任务描述内容 6.1函数描述 6.2参数描述 6.3返回值 7.获取系统节拍次数 7.1函数描述 7.2返回值 8.获取调度器状态 8.1函数

  • FreeRTOS实时操作系统之可视化追踪调试

    目录 前言 1.使能可视化追踪和运行时间统计功能 2.获取任务信息并格式化 3.添加到命令解释列表 前言 用RTOS编程,为每个任务分配多大的堆栈空间就成了一项技术活:分配多了浪费系统资源,分配少了又恐怕会发生堆栈溢出.由于中断和抢占式调度器的存在,我们要估算出一个任务需要多少堆栈是非常困难的,今天我们就介绍一种方法,来获取每个任务的剩余堆栈空间.本文以NXP LPC177x_8x系列微控制器为例. 我们将这个功能做成一个命令,添加到FreeRTOS使用任务通知实现命令行解释器一文介绍的命令解释

  • FreeRTOS实时操作系统信号量基础

    目录 前言 1.信号量简介 2.二进制信号量 3.计数信号量 4.互斥量 5.递归互斥量 前言 本文介绍信号量的基础知识,详细源码分析见<FreeRTOS进阶FreeRTOS信号量分析> 1.信号量简介 FreeRTOS的信号量包括二进制信号量.计数信号量.互斥信号量(以后简称互斥量)和递归互斥信号量(以后简称递归互斥量). 我们可以把互斥量和递归互斥量看成特殊的信号量.互斥量和信号量在用法上不同: 信号量用于同步,任务间或者任务和中断间同步:互斥量用于互锁,用于保护同时只能有一个任务访问的资

  • jquery select插件异步实时搜索实例代码

    一.先看看效果. 二.做此插件的原因. 1.数据量过大(几千.几万条),无法一次性全部加载. 2.现有插件各不相同,无法满足功能需求. 3.美观性,可控性不足. 三.如何使用. 1.html和js <select id="unit"></select> <script type="text/javascript" src="/demo/thirdparty/jquery/jquery-1.8.3.min.js">

  • 纯JavaScript实现实时反馈系统时间

    用javascript反馈系统时间 运用知识 JavaScript HTML DOM HTML DOM 中的setInterval() 方法会不停地调用函数,直到 clearInterval() 被调用或窗口被关闭.由 setInterval() 返回的 ID 值可用作 clearInterval() 方法的参数. 语法setInterval(code,milliseconds) code--代码(可以为函数) milliseconds--在此调用的时间(毫秒) 因此,我们想让反馈的系统时间动起

  • AngularJs实现聊天列表实时刷新功能

    昨天在做app的聊天列表时,遇到了一个问题,消息监听器监听到了一个新的消息,但是如果这时已经处于消息列表的页面那么消息列表并不会及时更新. 我的想法是在service层中的监听器方法里,当监听到了一个新的消息,那么就在根作用域中发出一个广播,告诉controller层需要去获取最新的消息列表了. service层中发出广播: controller层中接听广播: rootScope是所有scope的父级 它的广播(broadcast)和监听(on) 可以在无交集的controller间使用 sco

随机推荐