Spark中的数据读取保存和累加器实例详解

目录
  • 数据读取与保存
    • Text文件
    • Sequence文件
    • Object对象文件
  • 累加器
    • 累加器概念
    • 系统累加器

数据读取与保存

Text文件

对于 Text文件的读取和保存 ,其语法和实现是最简单的,因此我只是简单叙述一下这部分相关知识点,大家可以结合demo具体分析记忆。

1)基本语法

(1)数据读取:textFile(String)

(2)数据保存:saveAsTextFile(String)

2)实现代码demo如下:

object Operate_Text {
    def main(args: Array[String]): Unit = {
        //1.创建SparkConf并设置App名称
        val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[1]")
        //2.创建SparkContext,该对象是提交Spark App的入口
        val sc: SparkContext = new SparkContext(conf)
        //3.1 读取输入文件
        val inputRDD: RDD[String] = sc.textFile("input/demo.txt")
        //3.2 保存数据
        inputRDD.saveAsTextFile("textFile")
        //4.关闭连接
        sc.stop()
    }
}

Sequence文件

SequenceFile文件 是Hadoop中用来存储二进制形式的 key-value对 的一种平面文件(Flat File)。在SparkContext中,可以通过调用 sequenceFile[ keyClass,valueClass ] (path) 来调用。

1)基本语法

  • (1)数据读取:sequenceFile[ keyClass, valueClass ] (path)
  • (2)数据保存:saveAsSequenceFile(String)

2)实现代码demo如下:

object Operate_Sequence {
    def main(args: Array[String]): Unit = {
        //1.创建SparkConf并设置App名称
        val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[1]")
        //2.创建SparkContext,该对象是提交Spark App的入口
        val sc: SparkContext = new SparkContext(conf)
        //3.1 创建rdd
        val dataRDD: RDD[(Int, Int)] = sc.makeRDD(Array((1,2,3),(4,5,6),(7,8,9)))
        //3.2 保存数据为SequenceFile
        dataRDD.saveAsSequenceFile("seqFile")
        //3.3 读取SequenceFile文件
        sc.sequenceFile[Int,Int]("seqFile").collect().foreach(println)
        //4.关闭连接
        sc.stop()
    }
}

Object对象文件

对象文件是将对象序列化后保存的文件,采用Hadoop的序列化机制。可以通过 objectFile[ k , v ] (path) 函数接收一个路径,读取对象文件,返回对应的RDD,也可以通过调用 saveAsObjectFile() 实现对对象文件的输出。因为要序列化所以要指定类型。

1)基本语法

  • (1)数据读取:objectFile[ k , v ] (path)
  • (2)数据保存:saveAsObjectFile(String)

2)实现代码demo如下:

object Operate_Object {
    def main(args: Array[String]): Unit = {
        //1.创建SparkConf并设置App名称
        val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[1]")
        //2.创建SparkContext,该对象是提交Spark App的入口
        val sc: SparkContext = new SparkContext(conf)
        //3.1 创建RDD
        val dataRDD: RDD[Int] = sc.makeRDD(Array(1,2,3,4,5,6),2)
        //3.2 保存数据
        dataRDD.saveAsObjectFile("objFile")
        //3.3 读取数据
        sc.objectFile[Int]("objFile").collect().foreach(println)
        //4.关闭连接
        sc.stop()
    }
}

累加器

累加器概念

累加器,是一种变量---分布式共享只写变量。仅支持“add”,支持并发,但Executor和Executor之间不能读数据,可实现所有分片处理时更新共享变量的功能。

累加器用来把Executor端变量信息聚合到Driver端。在Driver中定义的一个变量,在Executor端的每个task都会得到这个变量的一份新的副本,每个task更新这些副本的值后,传回Driver端进行合并计算。

系统累加器

1)累加器定义(SparkContext.accumulator(initialValue)方法)

val sum: LongAccumulator = sc.longAccumulator("sum")

2)累加器添加数据(累加器.add方法)

sum.add(count)

3)累加器获取数据(累加器.value)

sum.value

注意:Executor端的任务不能读取累加器的值(例如:在Executor端调用sum.value,获取的值不是累加器最终的值)。因此我们说,累加器是一个分布式共享只写变量。

4)累加器要放在行动算子中

因为转换算子执行的次数取决于job的数量,如果一个 spark应用 有多个行动算子,那么转换算子中的累加器可能会发生不止一次更新,导致结果错误。所以,如果想要一个无论在失败还是重复计算时都绝对可靠的累加器,必须把它放在foreach()这样的行动算子中。

5) 代码实现:

object accumulator_system {
package com.atguigu.cache
import org.apache.spark.rdd.RDD
import org.apache.spark.util.LongAccumulator
import org.apache.spark.{SparkConf, SparkContext}
object accumulator_system {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
    val sc = new SparkContext(conf)
    val dataRDD: RDD[(String, Int)] = sc.makeRDD(List(("a", 1), ("a", 2), ("a", 3), ("a", 4)))
    //需求:统计a出现的所有次数 ("a",10)
    //普通算子实现 reduceByKey 代码会走shuffle 效率低
    val rdd: RDD[(String, Int)] = dataRDD.reduceByKey(_ + _)
    //累加器实现
    //1 声明累加器
    val accSum: LongAccumulator = sc.longAccumulator("sum")
    dataRDD.foreach{
      case (a,count) => {
        //2 使用累加器累加  累加器.add()
        accSum.add(count)
        // 4 不在executor端获取累加器的值,因为得到的值不准确,所以累加器叫分布式共享只写变量
        //println("sum = " + accSum.value)
      }
    }
    //3 获取累加器的值 累加器.value
    println(("a",accSum.value))
    sc.stop()
  }
}

以上就是Spark中的数据读取保存和累加器实例详解的详细内容,更多关于Spark数据读取保存累加器的资料请关注我们其它相关文章!

(0)

相关推荐

  • Spark GraphX 分布式图处理框架图算法详解

    目录 正文 Graphx图结构 1. 最短路径 示例数据 可视化数据 计算最短路径 2. 网页排名 数据可视化 pagerank算法测试 算法结果 3. 连通域(连通组件) 加载图测试连通域 生成图测试 图实例的形态展示 强连接域的计算 4. 三角计数 代码测试 测试结果 5. 标签传播算法(LPA) 基本思想 正文 Spark GraphX是一个分布式图处理框架,基于 Pregel 接口实现了常用的图算法. 包括 PageRank.SVDPlusPlus.TriangleCount. Conn

  • PySpark与GraphFrames的安装与使用环境搭建过程

    目录 PySpark环境搭建 配置hadoop 安装pyspark与Java graphframes安装 使用方法 启动spark并读取数据 启动hive支持 Spark的DataFrame与RDD DataFrame的基础api RDD的简介 RDD的API概览 各类RDD cache&checkpoint graphframes的用法 Motiffinding(模式发现) Subgraphs子图 GraphFrames支持的GraphX算法 PySpark3.X与pandas融合 自定义UD

  • pyspark自定义UDAF函数调用报错问题解决

    目录 问题场景: 问题描述 原因分析及解决方案: 问题场景: 在SparkSQL中,因为需要用到自定义的UDAF函数,所以用pyspark自定义了一个,但是遇到了一个问题,就是自定义的UDAF函数一直报 AttributeError: 'NoneType' object has no attribute '_jvm' 在此将解决过程记录下来 问题描述 在新建的py文件中,先自定义了一个UDAF函数,然后在 if __name__ == '__main__': 中调用,死活跑不起来,一遍又一遍的对

  • Pyspark 线性回归梯度下降交叉验证知识点详解

    我正在尝试在 pyspark 中的 SGD 模型上执行交叉验证,我正在使用pyspark.mllib.regression,ParamGridBuilder和CrossValidator都来自pyspark.ml.tuning库的LinearRegressionWithSGD. 在 Spark 网站上跟踪文件资料之后,我希望运行此方法可以正常工作 资料参考:https://spark.apache.org/docs/2.1.0/ml-tuning.html lr = LinearRegressi

  • SparkSQL快速入门教程

    目录 (一)概述 (二)SparkSQL实战 (三)非JSON格式的Dataset创建 (四)通过JDBC创建DataFrame (五)总结 (一)概述 SparkSQL可以理解为在原生的RDD上做的一层封装,通过SparkSQL可以在scala和java中写SQL语句,并将结果作为Dataset/DataFrame返回.简单来讲,SparkSQL可以让我们像写SQL一样去处理内存中的数据. Dataset是一个数据的分布式集合,是Spark1.6之后新增的接口,它提供了RDD的优点和Spark

  • Spark中的数据读取保存和累加器实例详解

    目录 数据读取与保存 Text文件 Sequence文件 Object对象文件 累加器 累加器概念 系统累加器 数据读取与保存 Text文件 对于 Text文件的读取和保存 ,其语法和实现是最简单的,因此我只是简单叙述一下这部分相关知识点,大家可以结合demo具体分析记忆. 1)基本语法 (1)数据读取:textFile(String) (2)数据保存:saveAsTextFile(String) 2)实现代码demo如下: object Operate_Text { def main(args

  • Python Pandas中合并数据的5个函数使用详解

    目录 join 索引一致 索引不一致 merge concat 纵向拼接 横向拼接 append combine 前几天在一个群里面,看到一位朋友,说到自己的阿里面试,被问了一些关于pandas的使用.其中一个问题是:pandas中合并数据的5中方法. 今天借着这个机会,就为大家盘点一下pandas中合并数据的5个函数.但是对于每个函数,我这里不打算详细说明,具体用法大家可以参考pandas官当文档. join主要用于基于索引的横向合并拼接: merge主要用于基于指定列的横向合并拼接: con

  • oracle数据匹配merge into的实例详解

    oracle数据匹配merge into的实例详解 前言: 很久之前,估计在2010年左右在使用Oralce,当时有个需求就是需要对两个表的数据进行匹配,这两个表的数据结构一致,一个是正式表,一个是临时表,这两表数据量还算是比较大几百M.业务需求是用临时表中的数据和正式表的匹配,所有字段都需要一一匹配,而且两表还没有主键,这是一个比较麻烦和糟糕的事情. 场景: 1.如果两表所有字段值都一致则不处理: 2.如果有部分字段不一致则更新: 3.如果正式表中数据在临时表中不存在,则需要删除: 满足上面场

  • java 中mongodb的各种操作查询的实例详解

    java 中mongodb的各种操作查询的实例详解 一. 常用查询: 1. 查询一条数据:(多用于保存时判断db中是否已有当前数据,这里 is  精确匹配,模糊匹配 使用regex...) public PageUrl getByUrl(String url) { return findOne(new Query(Criteria.where("url").is(url)),PageUrl.class); } 2. 查询多条数据:linkUrl.id 属于分级查询 public Lis

  • thinkphp中的多表关联查询的实例详解

    thinkphp中的多表关联查询的实例详解 在进行后端管理系统的编程的时候一般会使用框架来进行页面的快速搭建,我最近使用比较多的就是thinkphp框架,thinkphp框架的应用其实就是把前端和后端进行分割管理,前端用户登录查询系统放在thinkphp中的home文件夹中进行管理,后端管理系统放在thinkphp中的admin文件夹中进行管理.对了,在使用thinkphp框架的时候是是要用到mvc架构的,mvc架构就是model(数据模型).view(视图).controller(控制器)的结

  • jQuery中text() val()和html()的区别实例详解

    简单的说:html()和text()的区别主要在于是否包含标签.而val()针对的是表单元素. 但是有时还是不是那么太清晰. html(),val(),text()都分为有参和无参. 举例说明它们的不同之处: html()在没有参数的情况下,取得第一个匹配元素的内容.必须要注意的是,即使匹配多个,也只能取得匹配的第一个元素. 如: <body> <p>你选中这段文字后,看看它们的文本颜色和背景色,就能明白::selection的作用.</p> <h3>选中下

  • mybatis实现对数据的增删查改实例详解

    前期准备 新建java工程或java wweb工程,需要导入以下的包, 基本工作已经完成,接下来开始进入正题. 新建实体类 新建与数据库表对应的实体类 package com.edu.hpu.domain; /** * @author Administrator *user表所对应的实体类 */ public class User { //实体类的属性和表的字段名称一一对应 private int id; private String name; private int age; //对属性进行

  • Android读取properties配置文件的实例详解

    Android读取properties配置文件的实例详解 因为一些配置信息,多处用到的.且以后可能变更的,我想写个.prorperties配置文件给管理起来. 我把配置文件放在了assets文件夹下 appConfig.properties: serverUrl=http://192.168.1.155 import java.io.InputStream; import java.util.Properties; import android.content.Context; /** * 读取

  • 在vue项目中优雅的使用SVG的方法实例详解

    1.基础介绍 本文旨在介绍如何在项目中配置和方便的使用svg图标. 本文以vue项目为例,当然在react中的使用原理基本相似. svg图标可以直接通过img标签来使用,也可当做icon来使用. 本文是参考了鑫旭大佬的文章:SVG Sprite技术介绍. 2.配置 安装svg-sprite-loader.通过vue-cli脚手架创建的项目默认情况下会使用 url-loader 对svg进行处理,所以需要处理下: { test: /\.(png|jpe?g|gif|svg)(\?.*)?$/, l

  • Python中zip()函数的解释和可视化(实例详解)

    zip()的作用 先看一下语法: zip(iter1 [,iter2 [...]]) -> zip object Python的内置help()模块提供了一个简短但又有些令人困惑的解释: 返回一个元组迭代器,其中第i个元组包含每个参数序列或可迭代对象中的第i个元素.当最短的可迭代输入耗尽时,迭代器将停止.使用单个可迭代参数,它将返回1元组的迭代器.没有参数,它将返回一个空的迭代器. 与往常一样,当您精通更一般的计算机科学和Python概念时,此模块非常有用.但是,对于初学者来说,这段话只会引发更

随机推荐