Spark处理数据排序问题如何避免OOM

错误思想

举个列子,当我们想要比较 一个 类型为 RDD[(Long, (String, Int))] 的RDD,让它先按Long分组,然后按int的值进行倒序排序,最容易想到的思维就是先分组,然后把Iterable 转换为 list,然后sortby,但是这样却有一个致命的缺点,就是Iterable 在内存中是一个指针,不占内存,而list是一个容器,占用内存,如果Iterable 含有元素过多,那么极易引起OOM

 val cidAndSidCountGrouped: RDD[(Long, Iterable[(String, Int)])] = cidAndSidCount.groupByKey()
    // 4. 排序, 取top10
    val result: RDD[(Long, List[(String, Int)])] = cidAndSidCountGrouped.map {
      case (cid, sidCountIt) =>
        // sidCountIt 排序, 取前10
        // Iterable转成容器式集合的时候, 如果数据量过大, 极有可能导致oom
        (cid, sidCountIt.toList.sortBy(-_._2).take(5))
    }

首先,我们要知道,RDD 的排序需要 shuffle, 是采用了内存+磁盘来完成的排序.这样能有效避免OOM的风险,但是RDD是全部排序,所以需要针对性的过滤Key值来进行排序

方法一 利用RDD排序特点

 //把long(即key值)提取出来
    val cids: List[Long] = categoryCountList.map(_.cid.toLong)
    val buffer: ListBuffer[(Long, List[(String, Int)])] = ListBuffer[(Long, List[(String, Int)])]()
    //根据每个key来过滤RDD
    for (cid <- cids) {
      /*
      List((15,(632972a4-f811-4000-b920-dc12ea803a41,10)), (15,(f34878b8-1784-4d81-a4d1-0c93ce53e942,8)), (15,(5e3545a0-1521-4ad6-91fe-e792c20c46da,8)), (15,(66a421b0-839d-49ae-a386-5fa3ed75226f,8)), (15,(9fa653ec-5a22-4938-83c5-21521d083cd0,8)))
      目标:
      (9,List((199f8e1d-db1a-4174-b0c2-ef095aaef3ee,9), (329b966c-d61b-46ad-949a-7e37142d384a,8), (5e3545a0-1521-4ad6-91fe-e792c20c46da,8), (e306c00b-a6c5-44c2-9c77-15e919340324,7), (bed60a57-3f81-4616-9e8b-067445695a77,7)))
       */
      val arr: Array[(String, Int)] = cidAndSidCount.filter(cid == _._1)
        .sortBy(-_._2._2)
        .take(5)
        .map(_._2)
      buffer += ((cid, arr.toList))
    }
    buffer.foreach(println)

这样做也有缺点:即有多少个key,就有多少个Job,占用资源

方法二 利用TreeSet自动排序特性

 def statCategoryTop10Session_3(sc: SparkContext,
                  categoryCountList: List[CategroyCount],
                  userVisitActionRDD: RDD[UserVisitAction]) = {
    // 1. 过滤出来 top10品类的所有点击记录
    // 1.1 先map出来top10的品类id
    val cids = categoryCountList.map(_.cid.toLong)
    val topCategoryActionRDD: RDD[UserVisitAction] = userVisitActionRDD.filter(action => cids.contains(action.click_category_id))

    // 2. 计算每个品类 下的每个session 的点击量 rdd ((cid, sid) ,1)
    val cidAndSidCount: RDD[(Long, (String, Int))] = topCategoryActionRDD
      .map(action => ((action.click_category_id, action.session_id), 1))
      // 使用自定义分区器 重点理解分区器的原理
      .reduceByKey(new CategoryPartitioner(cids), _ + _)
      .map {
        case ((cid, sid), count) => (cid, (sid, count))
      }

    // 3. 排序取top10
//因为已经按key分好了区,所以用Mappartitions ,在每个分区中新建一个TreeSet即可
    val result: RDD[(Long, List[SessionInfo])] = cidAndSidCount.mapPartitions((it: Iterator[(Long, (String, Int))]) => {
//new 一个TreeSet,并同时指定排序规则
   var treeSet: mutable.TreeSet[CategorySession] = new mutable.TreeSet[CategorySession]()(new Ordering[CategorySession] {
          override def compare(x: CategorySession, y: CategorySession): Int = {
            if (x.clickCount >= y.clickCount) -1 else 1
          }
        })
   var id = 0l
  iter.foreach({
    case (l, session) => {
      id = l
      treeSet.add(session)
    if (treeSet.size > 10) treeSet = treeSet.take(10)
          }
        })
        Iterator(id, treeSet)
      })

    result.collect.foreach(println)

    Thread.sleep(1000000)
  }
}

/*
根据传入的key值来决定分区号,让相同key进入相同的分区,能够避免多次shuffle
 */
class CategoryPartitioner(cids: List[Long]) extends Partitioner {
  // 用cid索引, 作为将来他的分区索引.
  private val cidWithIndex: Map[Long, Int] = cids.zipWithIndex.toMap

  // 返回集合的长度
  override def numPartitions: Int = cids.length

  // 根据key返回分区的索引
  override def getPartition(key: Any): Int = {
    key match {
      // 根据品类id返回分区的索引!  0-9
      case (cid: Long, _) =>
        cidWithIndex(cid)
    }
  }
}

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持我们。

(0)

相关推荐

  • Pyspark获取并处理RDD数据代码实例

    弹性分布式数据集(RDD)是一组不可变的JVM对象的分布集,可以用于执行高速运算,它是Apache Spark的核心. 在pyspark中获取和处理RDD数据集的方法如下: 1. 首先是导入库和环境配置(本测试在linux的pycharm上完成) import os from pyspark import SparkContext, SparkConf from pyspark.sql.session import SparkSession os.environ["PYSPARK_PYTHON&

  • 从0开始学习大数据之java spark编程入门与项目实践

    本文实例讲述了大数据java spark编程.分享给大家供大家参考,具体如下: 上节搭建好了eclipse spark编程环境 在测试运行scala 或java 编写spark程序 ,在eclipse平台都可以运行,但打包导出jar,提交 spark-submit运行,都不能执行,最后确定是版本问题,就是你在eclipse调试的spark版本需和spark-submit 提交spark的运行版本一致,还有就是scala版本一致,才能正常运行. 以下是java spark程序运行 1.新建mave

  • Python如何把Spark数据写入ElasticSearch

    这里以将Apache的日志写入到ElasticSearch为例,来演示一下如何使用Python将Spark数据导入到ES中. 实际工作中,由于数据与使用框架或技术的复杂性,数据的写入变得比较复杂,在这里我们简单演示一下. 如果使用Scala或Java的话,Spark提供自带了支持写入ES的支持库,但Python不支持.所以首先你需要去这里下载依赖的ES官方开发的依赖包包. 下载完成后,放在本地目录,以下面命令方式启动pyspark: pyspark --jars elasticsearch-ha

  • Spark SQL常见4种数据源详解

    通用load/write方法 手动指定选项 Spark SQL的DataFrame接口支持多种数据源的操作.一个DataFrame可以进行RDDs方式的操作,也可以被注册为临时表.把DataFrame注册为临时表之后,就可以对该DataFrame执行SQL查询. Spark SQL的默认数据源为Parquet格式.数据源为Parquet文件时,Spark SQL可以方便的执行所有的操作. 修改配置项spark.sql.sources.default,可修改默认数据源格式. scala> val

  • 将string类型的数据类型转换为spark rdd时报错的解决方法

    在将string类型的数据类型转换为spark rdd时,一直报这个错,StructType can not accept object %r in type %s" % (obj, type(obj))) . . . s = str(tree) y = str(YESTERDAY) list0 = [s, y] outRes = self.sc.parallelize(list0) df_tree = outRes.toDF("model: string, dt: string&qu

  • Pyspark读取parquet数据过程解析

    parquet数据:列式存储结构,由Twitter和Cloudera合作开发,相比于行式存储,其特点是: 可以跳过不符合条件的数据,只读取需要的数据,降低IO数据量:压缩编码可以降低磁盘存储空间,使用更高效的压缩编码节约存储空间:只读取需要的列,支持向量运算,能够获取更好的扫描性能. 那么我们怎么在pyspark中读取和使用parquet数据呢?我以local模式,linux下的pycharm执行作说明. 首先,导入库文件和配置环境: import os from pyspark import

  • Spark SQL数据加载和保存实例讲解

    一.前置知识详解 Spark SQL重要是操作DataFrame,DataFrame本身提供了save和load的操作, Load:可以创建DataFrame, Save:把DataFrame中的数据保存到文件或者说与具体的格式来指明我们要读取的文件的类型以及与具体的格式来指出我们要输出的文件是什么类型. 二.Spark SQL读写数据代码实战 import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD;

  • Spark处理数据排序问题如何避免OOM

    错误思想 举个列子,当我们想要比较 一个 类型为 RDD[(Long, (String, Int))] 的RDD,让它先按Long分组,然后按int的值进行倒序排序,最容易想到的思维就是先分组,然后把Iterable 转换为 list,然后sortby,但是这样却有一个致命的缺点,就是Iterable 在内存中是一个指针,不占内存,而list是一个容器,占用内存,如果Iterable 含有元素过多,那么极易引起OOM val cidAndSidCountGrouped: RDD[(Long, I

  • Echarts之悬浮框中的数据排序问题

    Echarts非常强大,配置也非常的多,有很多细节需要深入研究.详解一下关于悬浮框中的数据排序问题 悬浮框的数据排序默认是根据series中的数据位置排序的,在我们想自定义排序时,在echarts的配置中有一个tooltip 以下为数据降序的代码: tooltip = { trigger: 'axis', formatter: (params) => { // params为悬浮框上的全部数据 const newParams = []; let paramsData = _.sortBy(par

  • 一文学会Hadoop与Spark等大数据框架知识

    目录 一个实际的需求场景:日志分析 Hadoop Hadoop的生态坏境 Spark Spark整体架构 Spark核心概念 Spark的核心组件 海量数据的存储问题很早就已经出现了,一些行业或者部门因为历史的积累,数据量也达到了一定的级别.很早以前,当一台电脑无法存储这么庞大的数据时,采用的解决方案是使用NFS(网络文件系统)将数据分开存储.但是这种方法无法充分利用多台计算机同时进行分析数据. 一个实际的需求场景:日志分析 日志分析是对日志中的每一个用户的流量进行汇总求和.对于一个日志文件,如

  • Spark自定义累加器的使用实例详解

    累加器(accumulator)是Spark中提供的一种分布式的变量机制,其原理类似于mapreduce,即分布式的改变,然后聚合这些改变.累加器的一个常见用途是在调试时对作业执行过程中的事件进行计数. 累加器简单使用 Spark内置的提供了Long和Double类型的累加器.下面是一个简单的使用示例,在这个例子中我们在过滤掉RDD中奇数的同时进行计数,最后计算剩下整数的和. val sparkConf = new SparkConf().setAppName("Test").setM

  • Spark SQL操作JSON字段的小技巧

    前言 介绍Spark SQL的JSON支持,这是我们在Databricks中开发的一个功能,可以在Spark中更容易查询和创建JSON数据.随着网络和移动应用程序的普及,JSON已经成为Web服务API以及长期存储的常用的交换格式.使用现有的工具,用户通常会使用复杂的管道来在分析系统中读取和写入JSON数据集.在Apache Spark 1.1中发布Spark SQL的JSON支持,在Apache Spark 1.2中增强,极大地简化了使用JSON数据的端到端体验. 很多时候,比如用struct

  • Spark简介以及与Hadoop对比分析

    目录 1. Spark 与 Hadoop 比较 1.1 Haoop 的缺点 1.2 相较于Hadoop MR的优点 2. Spark 生态系统 2.1 大数据处理的三种类型 1. 复杂的批量数据处理 2. 基于历史数据的交互式查询 3. 基于实时数据流的数据处理 2.2 BDAS架构 2.3 Spark 生态系统 3. 基本概念与架构设计 3.1 基本概念 3.2 运行架构 3.3 各种概念之间的相互关系 4. Spark运行基本流程 4.1 运行流程 4.2 运行架构特点 5. Spark的部

  • Golang 高效排序数据详情

    目录 1.介绍 2.切片排序 3.自定义集合排序 4总结 1.介绍 在 Golang 语言项目开发中,经常会遇到数据排序问题.Golang 语言标准库 sort 包,为我们提供了数据排序的功能,我们可以直接使用 sort.Sort() 函数进行数据排序,sort.Sort() 函数底层实现是以快排为主,并根据目标数据的具体情况选择不同的排序算法.本文我们介绍 sort 包排序数据的使用方法. 2.切片排序 在 Golang 语言标准库 sort 包中,sort.Sort() 函数用于数据排序,该

  • 详解android 通过uri获取bitmap图片并压缩

    详解android 通过uri获取bitmap图片并压缩 很多人在调用图库选择图片时会在onActivityResult中用Media.getBitmap来获取返回的图片,如下: Uri mImageCaptureUri = data.getData(); Bitmap photoBmp = null; if (mImageCaptureUri != null) { photoBmp = MediaStore.Images.Media.getBitmap(ac.getContentResolve

  • python读取hdfs并返回dataframe教程

    不多说,直接上代码 from hdfs import Client import pandas as pd HDFSHOST = "http://xxx:50070" FILENAME = "/tmp/preprocess/part-00000" #hdfs文件路径 COLUMNNAMES = [xx'] def readHDFS(): ''' 读取hdfs文件 Returns: df:dataframe hdfs数据 ''' client = Client(HDF

随机推荐