浅谈Spark RDD API中的Map和Reduce

RDD是什么?

RDD是Spark中的抽象数据结构类型,任何数据在Spark中都被表示为RDD。从编程的角度来看,RDD可以简单看成是一个数组。和普通数组的区别是,RDD中的数据是分区存储的,这样不同分区的数据就可以分布在不同的机器上,同时可以被并行处理。因此,Spark应用程序所做的无非是把需要处理的数据转换为RDD,然后对RDD进行一系列的变换和操作从而得到结果。本文为第一部分,将介绍Spark RDD中与Map和Reduce相关的API中。

如何创建RDD?

RDD可以从普通数组创建出来,也可以从文件系统或者HDFS中的文件创建出来。

举例:从普通数组创建RDD,里面包含了1到9这9个数字,它们分别在3个分区中。

scala> val a = sc.parallelize(1 to 9, 3)
a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at <console>:12

举例:读取文件README.md来创建RDD,文件中的每一行就是RDD中的一个元素

scala> val b = sc.textFile("README.md")
b: org.apache.spark.rdd.RDD[String] = MappedRDD[3] at textFile at <console>:12

虽然还有别的方式可以创建RDD,但在本文中我们主要使用上述两种方式来创建RDD以说明RDD的API。

map

map是对RDD中的每个元素都执行一个指定的函数来产生一个新的RDD。任何原RDD中的元素在新RDD中都有且只有一个元素与之对应。

举例:

scala> val a = sc.parallelize(1 to 9, 3)
scala> val b = a.map(x => x*2)
scala> a.collect
res10: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9)
scala> b.collect
res11: Array[Int] = Array(2, 4, 6, 8, 10, 12, 14, 16, 18)

上述例子中把原RDD中每个元素都乘以2来产生一个新的RDD。

mapPartitions

mapPartitions是map的一个变种。map的输入函数是应用于RDD中每个元素,而mapPartitions的输入函数是应用于每个分区,也就是把每个分区中的内容作为整体来处理的。
它的函数定义为:

def mapPartitions[U: ClassTag](f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): RDD[U]

f即为输入函数,它处理每个分区里面的内容。每个分区中的内容将以Iterator[T]传递给输入函数f,f的输出结果是Iterator[U]。最终的RDD由所有分区经过输入函数处理后的结果合并起来的。

举例:

scala> val a = sc.parallelize(1 to 9, 3)
scala> def myfunc[T](iter: Iterator[T]) : Iterator[(T, T)] = {
  var res = List[(T, T)]()
  var pre = iter.next while (iter.hasNext) {
    val cur = iter.next;
    res .::= (pre, cur) pre = cur;
  }
  res.iterator
}
scala> a.mapPartitions(myfunc).collect
res0: Array[(Int, Int)] = Array((2,3), (1,2), (5,6), (4,5), (8,9), (7,8))

上述例子中的函数myfunc是把分区中一个元素和它的下一个元素组成一个Tuple。因为分区中最后一个元素没有下一个元素了,所以(3,4)和(6,7)不在结果中。

mapPartitions还有些变种,比如mapPartitionsWithContext,它能把处理过程中的一些状态信息传递给用户指定的输入函数。还有mapPartitionsWithIndex,它能把分区的index传递给用户指定的输入函数。

mapValues

mapValues顾名思义就是输入函数应用于RDD中Kev-Value的Value,原RDD中的Key保持不变,与新的Value一起组成新的RDD中的元素。因此,该函数只适用于元素为KV对的RDD。

举例:

scala> val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", " eagle"), 2)
scala> val b = a.map(x => (x.length, x))
scala> b.mapValues("x" + _ + "x").collect
res5: Array[(Int, String)] = Array((3,xdogx), (5,xtigerx), (4,xlionx),(3,xcatx), (7,xpantherx), (5,xeaglex))

mapWith

mapWith是map的另外一个变种,map只需要一个输入函数,而mapWith有两个输入函数。它的定义如下:

def mapWith[A: ClassTag, U: ](constructA: Int => A, preservesPartitioning: Boolean = false)(f: (T, A) => U): RDD[U]

第一个函数constructA是把RDD的partition index(index从0开始)作为输入,输出为新类型A;

第二个函数f是把二元组(T, A)作为输入(其中T为原RDD中的元素,A为第一个函数的输出),输出类型为U。

举例:把partition index 乘以10,然后加上2作为新的RDD的元素。

val x = sc.parallelize(List(1,2,3,4,5,6,7,8,9,10), 3)
x.mapWith(a => a * 10)((a, b) => (b + 2)).collect
res4: Array[Int] = Array(2, 2, 2, 12, 12, 12, 22, 22, 22, 22)

flatMap

与map类似,区别是原RDD中的元素经map处理后只能生成一个元素,而原RDD中的元素经flatmap处理后可生成多个元素来构建新RDD。

举例:对原RDD中的每个元素x产生y个元素(从1到y,y为元素x的值)

scala> val a = sc.parallelize(1 to 4, 2)
scala> val b = a.flatMap(x => 1 to x)
scala> b.collect
res12: Array[Int] = Array(1, 1, 2, 1, 2, 3, 1, 2, 3, 4)

flatMapWith

flatMapWith与mapWith很类似,都是接收两个函数,一个函数把partitionIndex作为输入,输出是一个新类型A;另外一个函数是以二元组(T,A)作为输入,输出为一个序列,这些序列里面的元素组成了新的RDD。它的定义如下:

def flatMapWith[A: ClassTag, U: ClassTag](constructA: Int => A, preservesPartitioning: Boolean = false)(f: (T, A) => Seq[U]): RDD[U]

举例:

scala> val a = sc.parallelize(List(1,2,3,4,5,6,7,8,9), 3)
scala> a.flatMapWith(x => x, true)((x, y) => List(y, x)).collect
res58: Array[Int] = Array(0, 1, 0, 2, 0, 3, 1, 4, 1, 5, 1, 6, 2, 7, 2,
8, 2, 9)

flatMapValues

flatMapValues类似于mapValues,不同的在于flatMapValues应用于元素为KV对的RDD中Value。每个一元素的Value被输入函数映射为一系列的值,然后这些值再与原RDD中的Key组成一系列新的KV对。

举例

scala> val a = sc.parallelize(List((1,2),(3,4),(3,6)))
scala> val b = a.flatMapValues(x=>x.to(5))
scala> b.collect
res3: Array[(Int, Int)] = Array((1,2), (1,3), (1,4), (1,5), (3,4), (3,5))

上述例子中原RDD中每个元素的值被转换为一个序列(从其当前值到5),比如第一个KV对(1,2), 其值2被转换为2,3,4,5。然后其再与原KV对中Key组成一系列新的KV对(1,2),(1,3),(1,4),(1,5)。

reduce

reduce将RDD中元素两两传递给输入函数,同时产生一个新的值,新产生的值与RDD中下一个元素再被传递给输入函数直到最后只有一个值为止。

举例

scala> val c = sc.parallelize(1 to 10)
scala> c.reduce((x, y) => x + y)
res4: Int = 55

上述例子对RDD中的元素求和。

reduceByKey

顾名思义,reduceByKey就是对元素为KV对的RDD中Key相同的元素的Value进行reduce,因此,Key相同的多个元素的值被reduce为一个值,然后与原RDD中的Key组成一个新的KV对。

举例:

scala> val a = sc.parallelize(List((1,2),(3,4),(3,6)))
scala> a.reduceByKey((x,y) => x + y).collect
res7: Array[(Int, Int)] = Array((1,2), (3,10))

上述例子中,对Key相同的元素的值求和,因此Key为3的两个元素被转为了(3,10)。

Reference

本文中的部分例子来自:http://homepage.cs.latrobe.edu.au/zhe/ZhenHeSparkRDDAPIExamples.html

总结

以上就是本文关于浅谈Spark RDD API中的Map和Reduce的全部内容,希望对大家有所帮助。感兴趣的朋友可以继续参阅本站:Spark三种属性配置方式详解、浅谈七种常见的Hadoop和Spark项目案例等,有什么问题可以随时留言,小编会及时回复大家的。感谢朋友们对本站的支持!

(0)

相关推荐

  • Spark SQL数据加载和保存实例讲解

    一.前置知识详解 Spark SQL重要是操作DataFrame,DataFrame本身提供了save和load的操作, Load:可以创建DataFrame, Save:把DataFrame中的数据保存到文件或者说与具体的格式来指明我们要读取的文件的类型以及与具体的格式来指出我们要输出的文件是什么类型. 二.Spark SQL读写数据代码实战 import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD;

  • Python中用Spark模块的使用教程

    在日常的编程中,我经常需要标识存在于文本文档中的部件和结构,这些文档包括:日志文件.配置文件.定界的数据以及格式更自由的(但还是半结构化的)报表格式.所有这些文档都拥有它们自己的"小语言",用于规定什么能够出现在文档内.我编写这些非正式解析任务的程序的方法总是有点象大杂烩,其中包括定制状态机.正则表达式以及上下文驱动的字符串测试.这些程序中的模式大概总是这样:"读一些文本,弄清是否可以用它来做些什么,然后可能再多读一些文本,一直尝试下去." 解析器将文档中部件和结构

  • 微软推DreamSpark计划为学生提供免费软件下载地址

    微软推DreamSpark计划为学生提供免费软件 微软公司董事长比尔·盖茨宣布将为全球数百万大学和中学生提供免费的开发和设计工具,以发掘学生的创造潜力,帮助他们踏上学术和职业成功之路.  据国外媒体报道,微软推出的DreamSpark学生计划提供了众多开发和设计软件供学生免费下载,该计划现已向比利时.中国.芬兰.法国.德国.西班牙.瑞典.瑞士.英国和美国的3500万大学生推出.未来6个月内,微软预计将把DreamSpark计划拓展到涵盖澳大利亚.捷克共和国.爱沙尼亚.日本.立陶宛.拉脱维亚.斯洛

  • 浅谈Spark RDD API中的Map和Reduce

    RDD是什么? RDD是Spark中的抽象数据结构类型,任何数据在Spark中都被表示为RDD.从编程的角度来看,RDD可以简单看成是一个数组.和普通数组的区别是,RDD中的数据是分区存储的,这样不同分区的数据就可以分布在不同的机器上,同时可以被并行处理.因此,Spark应用程序所做的无非是把需要处理的数据转换为RDD,然后对RDD进行一系列的变换和操作从而得到结果.本文为第一部分,将介绍Spark RDD中与Map和Reduce相关的API中. 如何创建RDD? RDD可以从普通数组创建出来,

  • 浅谈s:select 标签中list存放map对象的使用

    1.XXXAction.java private List<Map<String, String>> maptest = null; public List<Map<String, String>> getMaptest() { return maptest; } public void setMaptest(List<Map<String, String>> maptest) { this.maptest = maptest; }

  • 浅谈ajax在jquery中的请求和servlet中的响应

    在jsp中,首先,你需要导入jquery的架包: 获取可返回站点的根路径: <% String path = request.getContextPath(); %> 在jquery中写ajax请求: <script type="text/javascript"> $(function(){ $(".B").click(function(){ $.ajax({ type: "GET", //对应servlet中的方法 ur

  • 浅谈ASP.NET Core 中jwt授权认证的流程原理

    1,快速实现授权验证 什么是 JWT ?为什么要用 JWT ?JWT 的组成? 这些百度可以直接找到,这里不再赘述. 实际上,只需要知道 JWT 认证模式是使用一段 Token 作为认证依据的手段. 我们看一下 Postman 设置 Token 的位置. 那么,如何使用 C# 的 HttpClient 访问一个 JWT 认证的 WebAPI 呢? 下面来创建一个 ASP.NET Core 项目,尝试添加 JWT 验证功能. 1.1 添加 JWT 服务配置 在 Startup.cs 的 Confi

  • 浅谈Vue3 Composition API如何替换Vue Mixins

    想在你的Vue组件之间共享代码?如果你熟悉Vue 2 则可能知道使用mixin,但是新的Composition API 提供了更好的解决方案. 在本文中,我们将研究mixins的缺点,并了解Composition API如何克服它们,并使Vue应用程序具有更大的可伸缩性. 回顾Mixins功能 让我们快速回顾一下mixins模式,因为对于下一部分我们将要讲到的内容,请务必将其放在首位. 通常,Vue组件是由一个JavaScript对象定义的,它具有表示我们所需功能的各种属性--诸如 data,m

  • 浅谈优化Django ORM中的性能问题

    Django是个好工具,使用的很广泛. 在应用比较小的时候,会觉得它很快,但是随着应用复杂和壮大,就显得没那么高效了.当你了解所用的Web框架一些内部机制之后,才能写成比较高效的代码. 怎么查问题 Web系统是个挺复杂的玩意,有时候有点无从下手哈.可以采用 自底向上 的顺序,从数据存储一直到数据展现,按照这个顺序一点一点查找性能问题. 数据库 (缺少索引/数据模型) 数据存储接口 (ORM/低效的查询) 展现/数据使用 (Views/报表等) Web应用的大部分问题都会跟 数据库 扯上关系.除非

  • 浅谈web服务器项目中request请求和response的相关响应处理

    我们经常使用别人的服务器进行构建网站,现在我们就自己来写一个自己的服务来使用. 准备工作:下载所需的题材及文档 注:完整项目下载 一.request请求获取  1.了解request请求 在写服务器之前,我们需要知道客户端发送给我们哪些信息?以及要求我们返回哪些信息?经过测试我们能够知道用户客户端发送的信息有以下几点: 客户端发送到服务器端的请求消息,我们称之为请求(request),其实就是一个按照http协议的规则拼接而成的字符串,Request请求消息包含三部分: 请求行 消息报头 请求正

  • 浅谈vue在html中出现{{}}的原因及解决方式

    原因: 浏览器渲染机制,解析html结构 -> 加载外部脚本和样式表文件 -> 解析并执行脚本代码 -> 构造html dom模型 -> 加载图片等外部文件 -> 页面加载完毕. 初始化vue的js写在页面底部,也就是最后才执行js脚本. 所以页面从头到尾开始渲染时,渲染到标签时,由于vue还未初始化,所以就会显示类似这样的代码 <h2>{{courseName}}</h2> 当网速很慢的时候就看得比较清楚,可能会让用户误以为bug之类的,快一点的话就

  • 浅谈JS和Nodejs中的事件驱动

    事件驱动和发布-订阅 事件驱动架构是建立在软件开发中一种通用模式上的,这种模式被称为发布-订阅或观察者模式. 在事件驱动架构中,至少有两个参与者:主题(subject)和观察者(observer). 主题就像调频收音机一样,向有兴趣收听该主题所说内容的观察者进行广播. 观察者可能只有一个,也可能有一百个,这都没有关系,只要主题有一些要广播的消息就够了. 请记住,事件驱动.发布-订阅和观察者模式在实践中不是一回事,但在理想情况下,它们使用相同的方法:一个实体广播一条消息,其他实体侦听该消息. 发布

  • 浅谈Web Storage API的使用

    目录 一.浏览器的本地存储技术 1.1.sessionStorage 1.2.localStorage 二.Web Storage相关接口 三.浏览器兼容性 四.隐身模式 五.使用Web Storage API 一.浏览器的本地存储技术 除了最早的使用cookie来进行本地存储之外,现代浏览器使用Web Storage API来方便的进行key/value的存储. Web Storage有两种存储方式: 1.1.sessionStorage 对于每一个访问源,都会维持一个独立的存储区域.只要浏览

随机推荐