Spark操作之aggregate、aggregateByKey详解

1. aggregate函数

将每个分区里面的元素进行聚合,然后用combine函数将每个分区的结果和初始值(zeroValue)进行combine操作。这个函数最终返回的类型不需要和RDD中元素类型一致。

seqOp操作会聚合各分区中的元素,然后combOp操作把所有分区的聚合结果再次聚合,两个操作的初始值都是zeroValue.   seqOp的操作是遍历分区中的所有元素(T),第一个T跟zeroValue做操作,结果再作为与第二个T做操作的zeroValue,直到遍历完整个分区。combOp操作是把各分区聚合的结果,再聚合。aggregate函数返回一个跟RDD不同类型的值。因此,需要一个操作seqOp来把分区中的元素T合并成一个U,另外一个操作combOp把所有U聚合。

例子程序:

scala> val rdd = List(1,2,3,4,5,6,7,8,9)
rdd: List[Int] = List(1, 2, 3, 4, 5, 6, 7, 8, 9)
scala> rdd.par.aggregate((0,0))(
(acc,number) => (acc._1 + number, acc._2 + 1),
(par1,par2) => (par1._1 + par2._1, par1._2 + par2._2)
)
res0: (Int, Int) = (45,9)

scala> res0._1 / res0._2
res1: Int = 5

过程大概这样:

首先,初始值是(0,0),这个值在后面2步会用到。
然后,(acc,number) => (acc._1 + number, acc._2 + 1),number即是函数定义中的T,这里即是List中的元素。所以acc._1 + number,acc._2 + 1的过程如下。

1.  0+1,  0+1
2.  1+2,  1+1
3.  3+3,  2+1
4.  6+4,  3+1
5.  10+5,  4+1
6.  15+6,  5+1
7.  21+7,  6+1
8.  28+8,  7+1
9.  36+9,  8+1

结果即是(45,9)。这里演示的是单线程计算过程,实际Spark执行中是分布式计算,可能会把List分成多个分区,假如3个,p1(1,2,3,4),p2(5,6,7,8),p3(9),经过计算各分区的的结果(10,4),(26,4),(9,1),这样,执行(par1,par2) =>(par1._1 + par2._1, par1._2 + par2._2)就是(10+26+9,4+4+1)即(45,9),再求平均值就简单了。

2. aggregateByKey函数:

对PairRDD中相同的Key值进行聚合操作,在聚合过程中同样使用了一个中立的初始值。和aggregate函数类似,aggregateByKey返回值的类型不需要和RDD中value的类型一致。因为aggregateByKey是对相同Key中的值进行聚合操作,所以aggregateByKey'函数最终返回的类型还是PairRDD,对应的结果是Key和聚合后的值,而aggregate函数直接返回的是非RDD的结果。

例子程序:

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext

object AggregateByKeyOp {
 def main(args:Array[String]){
   val sparkConf: SparkConf = new SparkConf().setAppName("AggregateByKey").setMaster("local")
  val sc: SparkContext = new SparkContext(sparkConf)

   val data=List((1,3),(1,2),(1,4),(2,3))
   val rdd=sc.parallelize(data, 2)

   //合并不同partition中的值,a,b得数据类型为zeroValue的数据类型
   def combOp(a:String,b:String):String={
    println("combOp: "+a+"\t"+b)
    a+b
   }
   //合并在同一个partition中的值,a的数据类型为zeroValue的数据类型,b的数据类型为原value的数据类型
   def seqOp(a:String,b:Int):String={
    println("SeqOp:"+a+"\t"+b)
    a+b
   }
   rdd.foreach(println)
   //zeroValue:中立值,定义返回value的类型,并参与运算
   //seqOp:用来在同一个partition中合并值
   //combOp:用来在不同partiton中合并值
   val aggregateByKeyRDD=rdd.aggregateByKey("100")(seqOp, combOp)
   sc.stop()
 }
}

运行结果:

将数据拆分成两个分区

//分区一数据
(1,3)
(1,2)
//分区二数据
(1,4)
(2,3)

//分区一相同key的数据进行合并
seq: 100     3   //(1,3)开始和中立值进行合并  合并结果为 1003
seq: 1003     2   //(1,2)再次合并 结果为 10032

//分区二相同key的数据进行合并
seq: 100     4  //(1,4) 开始和中立值进行合并 1004
seq: 100     3  //(2,3) 开始和中立值进行合并 1003

将两个分区的结果进行合并
//key为2的,只在一个分区存在,不需要合并 (2,1003)
(2,1003)

//key为1的, 在两个分区存在,并且数据类型一致,合并
comb: 10032     1004
(1,100321004)

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持我们。

(0)

相关推荐

  • 基于js对象,操作属性、方法详解

    一,概述 在Java语言中,我们可以定义自己的类,并根据这些类创建对象来使用,在Javascript中,我们也可以定义自己的类,例如定义User类.Hashtable类等等. 目前在Javascript中,已经存在一些标准的类,例如Date.Array.RegExp.String.Math.Number等等,这为我们编程提供了许多方便.但对于复杂的客户端程序而言,这些还远远不够. 与Java不同,Java2提供给我们的标准类很多,基本上满足了我们的编程需求,但是Javascript提供的标准类很

  • java 中mongodb的各种操作查询的实例详解

    java 中mongodb的各种操作查询的实例详解 一. 常用查询: 1. 查询一条数据:(多用于保存时判断db中是否已有当前数据,这里 is  精确匹配,模糊匹配 使用regex...) public PageUrl getByUrl(String url) { return findOne(new Query(Criteria.where("url").is(url)),PageUrl.class); } 2. 查询多条数据:linkUrl.id 属于分级查询 public Lis

  • jQuery操作属性和样式详解

    • 区分 DOM 属性和元素属性 <img src="images/image.1.jpg" id="hibiscus" alt="Hibiscus" class="classA" /> 通常开发人员习惯将id,src,alt等叫做这个元素的"属性".我们将其称为"元素属性".但是在解析成 DOM 对象时,实际浏览器最后会将标签元素解析成"DOM 对象",

  • JavaScript——DOM操作——Window.document对象详解

    一.找到元素: docunment.getElementById("id"):根据id找,最多找一个:     var a =docunment.getElementById("id");将找到的元素放在变量中:     docunment.getElementsByName("name"):根据name找,找出来的是数组:     docunment.getElementsByTagName("name"):根据标签名找,找

  • Java连接操作Oracle数据库代码详解

    废话不多说了,直接给大家贴关键代码了,具体代码如下所示: package com.sp.test; import java.sql.*; import java.util.*; public class Text_lianxi extends Thread { public void run() { try { yunxing(); Thread.sleep(10000); } catch (InterruptedException e) { // TODO 自动生成的 catch 块 e.pr

  • Linux操作系统启动流程图文详解

    理解Linux操作系统启动流程,能有助于后期在企业中更好的维护Linux服务器,能快速定位系统问题,进而解决问题. 上图为Linux操作系统启动流程 1.加载BIOS 计算机电源加电质检,首先加载基本输入输出系统(Basic Input Output System,BIOS),BIOS中包含硬件CPU.内存.硬盘等相关信息,包含设备启动顺序信息.硬盘信息.内存信息.时钟信息.即插即用(Plug-and-Play,PNP)特性等.加载完BIOS信息,计算机将根据顺序进行启动. 2.读取MBR 读取

  • Python文件操作函数用法实例详解

    这篇文章主要介绍了Python文件操作函数用法实例详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 字符编码 二进制和字符之间的转换过程 --> 字符编码 ascii,gbk,shit,fuck 每个国家都有自己的编码方式 美国电脑内存中的编码方式为ascii ; 中国电脑内存中的编码方式为gbk , 美国电脑无法识别中国电脑写的程序 , 中国电脑无法识别美国电脑写的程序 现在硬盘中躺着 ascii/gbk/shit/fuck 编码的文件,

  • 易语言操作数据库“取错误信息”命令详解

    如果执行某数据库命令失败,在其后执行本命令可以取回错误信息文本.如果该数据库命令执行成功,执行本命令将返回空文本. 语法: 文本型 取错误信息 () 例程: 说明: 首先把要操作的数据库打开,然后执行"写()"命令,程序将改写"改写字段编辑框"中输入的字段名,改写内容为"改写内容编辑框"的内容.如果改写成功,会弹出信息框显示"写入数据成功":如果改写失败,会弹出信息框提示失败,将本次操作的错误码和错误信息取出,并显示在信息框中

  • 易语言操作数据库“替换打开”命令详解

    打开指定的数据库文件.成功返回真,并自动关闭当前数据库后将当前数据库设置为此数据库,失败返回假. 语法: 逻辑型 替换打开 (数据库文件名,[在程序中使用的别名],[是否只读],[共享方式],[保留参数1],[数据库密码],[索引文件表],- ) 参数名 描 述 数据库文件名 必需的:文本型. 在程序中使用的别名 可选的:文本型.别名为在后面的程序中引用本数据库时可使用的另一个名称.欲引用一个已经被打开的数据库可以使用该数据库本身的名称(数据库名称为数据库文件名的无路径和后缀部分.譬如 c:\m

  • 易语言数据库操作之“取字段名”命令详解

    返回当前数据库中指定字段的名称.如果指定字段不存在,将返回空文本. 语法: 文本型 取字段名 (字段名称或位置) 参数名 描 述 字段名称或位置 必需的:通用型.参数值可以为一个字段名称文本或者一个字段位置数值,字段位置数值从 1 开始. 例程: 说明: 首先使用"取字段数()"命令取出数据库中的字段数,并规定循环的次数为该字段数,然后在记次循环中使用"取字段名()"命令将每个字段的字段名依次取出,并显示在列表框中. 到此这篇关于易语言数据库操作之"取字段

随机推荐