基于Morphia实现MongoDB按小时、按天聚合操作方法

MongoDB按照天数或小时聚合

需求

最近接到需求,需要对用户账户下的设备状态,分别按照天以及小时进行聚合,以此为基础绘制设备状态趋势图.
实现思路是启动定时任务,对各用户的设备状态数据分别按照小时以及天进行聚合,并存储进数据库中供用户后续查询.
涉及到的技术栈分别为:Spring Boot,MongoDB,Morphia.

数据模型

@Data
@Builder
@Entity(value = "rawDevStatus", noClassnameStored = true)
// 设备状态索引
@Indexes({
    // 设置数据超时时间(TTL,MongoDB根据TTL在后台进行数据删除操作)
    @Index(fields = @Field("time"), options = @IndexOptions(expireAfterSeconds = 3600 * 24 * 72)),
    @Index(fields = {@Field("userId"), @Field(value = "time", type = IndexType.DESC)})
})
public class RawDevStatus {
  @Id
  @JsonProperty(access = JsonProperty.Access.WRITE_ONLY)
  private ObjectId objectId;
  private String userId;
  private Instant time;
  @Embedded("points")
  List<Point> protocolPoints;
  @Data
  @AllArgsConstructor
  public static class Point {
    /**
     * 协议类型
     */
    private Protocol protocol;
    /**
     * 设备总数
     */
    private Integer total;
    /**
     * 设备在线数目
     */
    private Integer onlineNum;
    /**
     * 处于启用状态设备数目
     */
    private Integer enableNum;
  }
}

上述代码是设备状态实体类,其中设备状态数据是按照设备所属协议进行区分的.

@Data
@Builder
@Entity(value = "aggregationDevStatus", noClassnameStored = true)
@Indexes({
    @Index(fields = @Field("expireAt"), options = @IndexOptions(expireAfterSeconds = 0)),
    @Index(fields = {@Field("userId"), @Field(value = "time", type = IndexType.DESC)})
})
public class AggregationDevStatus {
  @Id
  @JsonProperty(access = JsonProperty.Access.WRITE_ONLY)
  private ObjectId objectId;
  /**
   * 用户ID
   */
  private String userId;
  /**
   * 设备总数
   */
  private Double total;
  /**
   * 设备在线数目
   */
  private Double onlineNum;
  /**
   * 处于启用状态设备数目
   */
  private Double enableNum;
  /**
   * 聚合类型(按照小时还是按照天聚合)
   */
  @Property("aggDuration")
  private AggregationDuration aggregationDuration;
  private Instant time;
  /**
   * 动态设置文档过期时间
   */
  private Instant expireAt;
}

上述代码是期待的聚合结果,其中构建两个索引:(1)超时索引;(2)复合索引,程序会根据用户名以及时间查询设备状态聚合结果.

聚合操作符介绍

聚合操作类似于管道,管道中的每一步操作产生的中间结果作为下一步的输入源,最终输出聚合结果.

此次聚合主要涉及以下操作:

•$project:指定输出文档中的字段.
•$unwind:拆分数据中的数组;
•match:选择要处理的文档数据;
•group:根据key分组聚合结果.

原始聚合语句

db.getCollection('raw_dev_status').aggregate([
  {$match:
    {
      time:{$gte: ISODate("2019-06-27T00:00:00Z")},
    }
  },
  {$unwind: "$points"},
  {$project:
    {
      userId:1,points:1,
      tmp: {$dateToString: { format: "%Y:%m:%dT%H:00:00Z", date: "$time" } }
    }
  },
  {$project:
    {
      userId:1,points:1,
      groupTime: {$dateFromString: { dateString: "$tmp", format: "%Y:%m:%dT%H:%M:%SZ", } }
    }
  },
  {$group:
    {
      _id:{user_id:'$userId', cal_time:'$groupTime'},
      devTotal:{'$avg':'$points.total'},
      onlineTotal:{'$avg':'$points.onlineNum'},
      enableTotal:{'$avg':'$points.enableNum'}
    }
  },
])

上述代码是按小时聚合数据,以下来逐步介绍处理思路:

(1) $match

根据小时聚合数据,因为只需要获取近24小时的聚合结果,所以对数据进行初步筛选.

(2) $unwind

raw_dev_status中的设备状态是按照协议区分的数组,因此需要对其进行展开,以便下一步进行筛选;

(3) $project

  {$project:
    {
      userId:1,points:1,
      tmp: {$dateToString: { format: "%Y:%m:%dT%H:00:00Z", date: "$time" } }
    }
  }

选择需要输出的数据,分别为:userId,points以及tmp.

需要注意,为了按照时间聚合,对$time属性进行操作,提取%Y:%m:%dT%H时信息至$tmp作为下一步的聚合依据.

如果需要按天聚合,则format数据可修改为:%Y:%m:%dT00:00:00Z即可满足要求.

(4) $project

  {$project:
    {
      userId:1,points:1,
      groupTime: {$dateFromString: { dateString: "$tmp", format: "%Y:%m:%dT%H:%M:%SZ", } }
    }
  }

因为上一步project操作中,tmp为字符串数据,最终的聚合结果需要时间戳(主要懒,不想在程序中进行转换操作).
因此,此处对$tmp进行操作,转换为时间类型数据,即groupTime.

(5) $group

对聚合结果进行分类操作,并生成最终输出结果.

 {$group:
    {
      # 根据_id进行分组操作,依据是`user_id`以及`$groupTime`
      _id:{user_id:'$userId', cal_time:'$groupTime'},
      # 求设备总数平均值
      devTotal:{'$avg':'$points.total'},
      # 求设备在线数平均值
      onlineTotal:{'$avg':'$points.onlineNum'},
      # ...
      enableTotal:{'$avg':'$points.enableNum'}
    }
  }

代码编写

此处ODM选择Morphia,亦可以使用MongoTemplate,原理类似.

 /**
   * 创建聚合条件
   *
   * @param pastTime   过去时间段
   * @param dateToString 格式化字符串(%Y:%m:%dT%H:00:00Z或%Y:%m:%dT00:00:00Z)
   * @return 聚合条件
   */
  private AggregationPipeline createAggregationPipeline(Instant pastTime, String dateToString, String stringToDate) {
    Query<RawDevStatus> query = datastore.createQuery(RawDevStatus.class);
    return datastore.createAggregation(RawDevStatus.class)
        .match(query.field("time").greaterThanOrEq(pastTime))
        .unwind("points", new UnwindOptions().preserveNullAndEmptyArrays(false))
        .match(query.field("points.protocol").equal("ALL"))
        .project(Projection.projection("userId"),
            Projection.projection("points"),
            Projection.projection("convertTime",
                Projection.expression("$dateToString",
                    new BasicDBObject("format", dateToString)
                        .append("date", "$time"))
            )
        )
        .project(Projection.projection("userId"),
            Projection.projection("points"),
            Projection.projection("convertTime",
                Projection.expression("$dateFromString",
                    new BasicDBObject("format", stringToDate)
                        .append("dateString", "$convertTime"))
            )
        )
        .group(
            Group.id(Group.grouping("userId"), Group.grouping("convertTime")),
            Group.grouping("total", Group.average("points.total")),
            Group.grouping("onlineNum", Group.average("points.onlineNum")),
            Group.grouping("enableNum", Group.average("points.enableNum"))
        );
  }
  /**
   * 获取聚合结果
   *
   * @param pipeline 聚合条件
   * @return 聚合结果
   */
  private List<AggregationMidDevStatus> getAggregationResult(AggregationPipeline pipeline) {
    List<AggregationMidDevStatus> statuses = new ArrayList<>();
    Iterator<AggregationMidDevStatus> resultIterator = pipeline.aggregate(
        AggregationMidDevStatus.class, AggregationOptions.builder().allowDiskUse(true).build());
    while (resultIterator.hasNext()) {
      statuses.add(resultIterator.next());
    }
    return statuses;
  }
  //......................................................................................
  // 获取聚合结果(省略若干代码)
  AggregationPipeline pipeline = createAggregationPipeline(pastTime, dateToString, stringToDate);
  List<AggregationMidDevStatus> midStatuses = getAggregationResult(pipeline);
  if (CollectionUtils.isEmpty(midStatuses)) {
    log.warn("Can not get dev status aggregation result.");
    return;
  }

总结

以上所述是小编给大家介绍的基于Morphia实现MongoDB按小时、按天聚合操作方法,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对我们网站的支持!
如果你觉得本文对你有帮助,欢迎转载,烦请注明出处,谢谢!

(0)

相关推荐

  • MongoDB教程之聚合(count、distinct和group)

    1. count: 复制代码 代码如下: --在空集合中,count返回的数量为0.     > db.test.count()     0     --测试插入一个文档后count的返回值.     > db.test.insert({"test":1})     > db.test.count()     1     > db.test.insert({"test":2})     > db.test.count()     2  

  • Mongodb中MapReduce实现数据聚合方法详解

    Mongodb是针对大数据量环境下诞生的用于保存大数据量的非关系型数据库,针对大量的数据,如何进行统计操作至关重要,那么如何从Mongodb中统计一些数据呢? 在Mongodb中,给我们提供了三种用于数据聚合的方式: (1)简单的用户聚合函数: (2)使用aggregate进行统计: (3)使用mapReduce进行统计: 今天我们首先来讲讲mapReduce是如何统计,在后续的文章中,将另起文章进行相关说明. MapReduce是啥呢?以我的理解,其实就是对集合中的各个满足条件的文档进行预处理

  • MongoDB聚合功能浅析

    MongoDB数据库功能强大!除了基本的查询功能之外,还提供了强大的聚合功能.这里简单介绍一下count.distinct和group. 1.count: --在空集合中,count返回的数量为0. > db.test.count() 0 --测试插入一个文档后count的返回值. > db.test.insert({"test":1}) > db.test.count() 1 > db.test.insert({"test":2}) >

  • JAVA mongodb 聚合几种查询方式详解

    一.BasicDBObject 整个聚合查询是统计用户的各种状态下的用户数量为场景: 1.筛选条件: date为查询日期: BasicDBObject Query = new BasicDBObject(); Query.put("time",new BasicDBObject("$gte", date + " 00:00:00") .append("$lte", date + " 23:59:59"));

  • Mongodb聚合函数count、distinct、group如何实现数据聚合操作

    上篇文章给大家介绍了Mongodb中MapReduce实现数据聚合方法详解,我们提到过Mongodb中进行数据聚合操作的一种方式--MapReduce,但是在大多数日常使用过程中,我们并不需要使用MapReduce来进行操作.在这边文章中,我们就简单说说用自带的聚合函数进行数据聚合操作的实现. MongoDB除了基本的查询功能之外,还提供了强大的聚合功能.Mongodb中自带的基本聚合函数有三种:count.distinct和group.下面我们分别来讲述一下这三个基本聚合函数. (1)coun

  • 基于Morphia实现MongoDB按小时、按天聚合操作方法

    MongoDB按照天数或小时聚合 需求 最近接到需求,需要对用户账户下的设备状态,分别按照天以及小时进行聚合,以此为基础绘制设备状态趋势图. 实现思路是启动定时任务,对各用户的设备状态数据分别按照小时以及天进行聚合,并存储进数据库中供用户后续查询. 涉及到的技术栈分别为:Spring Boot,MongoDB,Morphia. 数据模型 @Data @Builder @Entity(value = "rawDevStatus", noClassnameStored = true) //

  • 基于Docker的MongoDB实现授权访问的方法

    基于Docker部署一个数据库实例通常比直接在服务器上安装数据库还要简单,Gevin在开发环境中经常使用基于docker的数据库服务,docker也渐渐成为Gevin在Linux上安装MongoDB的首选方式,由于MongoDB默认是不用通过认证就能直接连接的,出于安全考虑,在公网上部署MongoDB时,务必设置authentication机制,以避免类似 "黑客赎金" 问题的发生. 那么,基于Docker拉起的MongoDB,如何实现通过用户名密码访问指定数据库呢?方法很简单,但前提

  • 基于MySQL到MongoDB简易对照表的详解

    查询:MySQL:SELECT * FROM userMongo:db.user.find()MySQL:SELECT * FROM user WHERE name = 'starlee'Mongo:db.user.find({'name' : 'starlee'})插入:MySQL:INSERT INOT user (`name`, `age`) values ('starlee',25)Mongo:db.user.insert({'name' : 'starlee', 'age' : 25}

  • MongoDB的基础查询和索引操作方法总结

    查询操作 1.查询所有记录 db.userInfo.find(); 相当于: select* from userInfo; 2.查询去掉后的当前聚集集合中的某列的重复数据 db.userInfo.distinct("name"); 会过滤掉name中的相同数据 相当于: select disttince name from userInfo; 3.查询age = 22的记录 db.userInfo.find({"age": 22}); 相当于: select * f

  • Docker安装MongoDB并使用Navicat连接的操作方法

    目录 MongoDB简介: 查看可用的MongoDB版本: 拉取最新版本的MongoDB镜像: 验证MongoDB镜像是否成功拉取到本地: 参数说明: 解决无法正常执行mongo命令问题 添加MongoDB连接用户和密码: 1.进入创建的MongoDB容器 2.创建MongoDB用户 服务器配置27017的开放端口: Navicat连接mongoDB 1.连接MongoDB点击这个位置 2.连接参数介绍: MongoDB简介: MongoDB是一个基于分布式文件存储的数据库.由C++语言编写.旨

  • mongodb中按天进行聚合查询的实例教程

    前言 最近在写项目的时候遇到一个问题,使用mongodb记录了用例的执行结果,但是在时间的记录上使用的是date格式,现在有一个需求,以天为单位,统计一下每天成功的用例和失败的用例,说到统计,肯定是要用到聚合查询,但是如果以date格式的时间为group依据,那么等同于没有分组,因为在记录用例的时间几乎不可能同时,今天查阅了一下相关文档,可以使用mongodb的$dateToString命令来完成这个需求 问题来源 假如我们以如下的数据 /* 1 */ { "_id" : Object

  • 基于Spring boot @Value 注解注入属性值的操作方法

    本文主要介绍Spring @Value 注解注入属性值的使用方法的分析,文章通过示例代码非常详细地介绍,对于每个人的学习或工作都有一定的参考学习价值 在使用spring框架的项目中,@Value是经常使用的注解之一.其功能是将与配置文件中的键对应的值分配给其带注解的属性.在日常使用中,我们常用的功能相对简单.本文使您系统地了解@Value的用法. @Value注入形式 根据注入的内容来源,@ Value属性注入功能可以分为两种:通过配置文件进行属性注入和通过非配置文件进行属性注入. 非配置文件注

  • 作为PHP程序员应该了解MongoDB的五件事

    2010年应该被人们记住,因为SQL将在这一年死去.这一年关系数据库行将就木,这一年开发者发现他们再不需要长时间辛苦的构造列或者表格来存放数据.2010年将是文档型数据库的起始年.尽管这样的势头已经持续多年,现在才是一个更多,更广泛的文档型数据库出现的年代.从基于云计算的Amazon到Google,大量开源工具,以及随之诞生的CouchDB和MongoDB.那么什么是MongoDB?下面有五件事是PHP开发者应该了解的:1. MongoDB是一个单独的服务器;2. MongoDB是基于文档,而不

  • 利用Mongoose让JSON数据直接插入或更新到MongoDB

    前言 Nodejs基于Javascript,MongoDB脚步同样也是基于Javascript.而且他们的数据存储格式都是JSON,这就是为什么要把他们放在一起的原因了.如果程序前后端能直接处理JSON,我想数据处理过程又可以极大的减化了,代码量又将低少1/5.多么的兴奋啊!让我们来动手验证一下想法吧. 本文重点介绍web前端通过JQuery发起POST提交JSON数据,通过Mongoose直接插入或更新到MongoDB. 工程目录沿用nodejs-demo,增加/mongoose路径及对应文件

随机推荐