Spark SQL数据加载和保存实例讲解

一、前置知识详解
Spark SQL重要是操作DataFrame,DataFrame本身提供了save和load的操作,
Load:可以创建DataFrame,
Save:把DataFrame中的数据保存到文件或者说与具体的格式来指明我们要读取的文件的类型以及与具体的格式来指出我们要输出的文件是什么类型。

二、Spark SQL读写数据代码实战

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.*;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

import java.util.ArrayList;
import java.util.List;

public class SparkSQLLoadSaveOps {
 public static void main(String[] args) {
  SparkConf conf = new SparkConf().setMaster("local").setAppName("SparkSQLLoadSaveOps");
  JavaSparkContext sc = new JavaSparkContext(conf);
  SQLContext = new SQLContext(sc);
  /**
   * read()是DataFrameReader类型,load可以将数据读取出来
   */
  DataFrame peopleDF = sqlContext.read().format("json").load("E:\\Spark\\Sparkinstanll_package\\Big_Data_Software\\spark-1.6.0-bin-hadoop2.6\\examples\\src\\main\\resources\\people.json");

  /**
   * 直接对DataFrame进行操作
   * Json: 是一种自解释的格式,读取Json的时候怎么判断其是什么格式?
   * 通过扫描整个Json。扫描之后才会知道元数据
   */
  //通过mode来指定输出文件的是append。创建新文件来追加文件
 peopleDF.select("name").write().mode(SaveMode.Append).save("E:\\personNames");
 }
}

读取过程源码分析如下:
1. read方法返回DataFrameReader,用于读取数据。

/**
 * :: Experimental ::
 * Returns a [[DataFrameReader]] that can be used to read data in as a [[DataFrame]].
 * {{{
 *  sqlContext.read.parquet("/path/to/file.parquet")
 *  sqlContext.read.schema(schema).json("/path/to/file.json")
 * }}}
 *
 * @group genericdata
 * @since 1.4.0
 */
@Experimental
//创建DataFrameReader实例,获得了DataFrameReader引用
def read: DataFrameReader = new DataFrameReader(this)

2.  然后再调用DataFrameReader类中的format,指出读取文件的格式。

/**
 * Specifies the input data source format.
 *
 * @since 1.4.0
 */
def format(source: String): DataFrameReader = {
 this.source = source
 this
}

3.  通过DtaFrameReader中load方法通过路径把传入过来的输入变成DataFrame。

/**
 * Loads input in as a [[DataFrame]], for data sources that require a path (e.g. data backed by
 * a local or distributed file system).
 *
 * @since 1.4.0
 */
// TODO: Remove this one in Spark 2.0.
def load(path: String): DataFrame = {
 option("path", path).load()
}

至此,数据的读取工作就完成了,下面就对DataFrame进行操作。
下面就是写操作!!!

1. 调用DataFrame中select函数进行对列筛选

/**
 * Selects a set of columns. This is a variant of `select` that can only select
 * existing columns using column names (i.e. cannot construct expressions).
 *
 * {{{
 *  // The following two are equivalent:
 *  df.select("colA", "colB")
 *  df.select($"colA", $"colB")
 * }}}
 * @group dfops
 * @since 1.3.0
 */
@scala.annotation.varargs
def select(col: String, cols: String*): DataFrame = select((col +: cols).map(Column(_)) : _*)

2.  然后通过write将结果写入到外部存储系统中。

/**
 * :: Experimental ::
 * Interface for saving the content of the [[DataFrame]] out into external storage.
 *
 * @group output
 * @since 1.4.0
 */
@Experimental
def write: DataFrameWriter = new DataFrameWriter(this)

3.   在保持文件的时候mode指定追加文件的方式

/**
 * Specifies the behavior when data or table already exists. Options include:
// Overwrite是覆盖
 *  - `SaveMode.Overwrite`: overwrite the existing data.
//创建新的文件,然后追加
 *  - `SaveMode.Append`: append the data.
 *  - `SaveMode.Ignore`: ignore the operation (i.e. no-op).
 *  - `SaveMode.ErrorIfExists`: default option, throw an exception at runtime.
 *
 * @since 1.4.0
 */
def mode(saveMode: SaveMode): DataFrameWriter = {
 this.mode = saveMode
 this
}

4.   最后,save()方法触发action,将文件输出到指定文件中。

/**
 * Saves the content of the [[DataFrame]] at the specified path.
 *
 * @since 1.4.0
 */
def save(path: String): Unit = {
 this.extraOptions += ("path" -> path)
 save()
}

三、Spark SQL读写整个流程图如下

四、对于流程中部分函数源码详解

DataFrameReader.Load()

1. Load()返回DataFrame类型的数据集合,使用的数据是从默认的路径读取。

/**
 * Returns the dataset stored at path as a DataFrame,
 * using the default data source configured by spark.sql.sources.default.
 *
 * @group genericdata
 * @deprecated As of 1.4.0, replaced by `read().load(path)`. This will be removed in Spark 2.0.
 */
@deprecated("Use read.load(path). This will be removed in Spark 2.0.", "1.4.0")
def load(path: String): DataFrame = {
//此时的read就是DataFrameReader
 read.load(path)
}

2.  追踪load源码进去,源码如下:
在DataFrameReader中的方法。Load()通过路径把输入传进来变成一个DataFrame。

/**
 * Loads input in as a [[DataFrame]], for data sources that require a path (e.g. data backed by
 * a local or distributed file system).
 *
 * @since 1.4.0
 */
// TODO: Remove this one in Spark 2.0.
def load(path: String): DataFrame = {
 option("path", path).load()
}

3.  追踪load源码如下:

/**
 * Loads input in as a [[DataFrame]], for data sources that don't require a path (e.g. external
 * key-value stores).
 *
 * @since 1.4.0
 */
def load(): DataFrame = {
//对传入的Source进行解析
 val resolved = ResolvedDataSource(
  sqlContext,
  userSpecifiedSchema = userSpecifiedSchema,
  partitionColumns = Array.empty[String],
  provider = source,
  options = extraOptions.toMap)
 DataFrame(sqlContext, LogicalRelation(resolved.relation))
}

DataFrameReader.format()

1. Format:具体指定文件格式,这就获得一个巨大的启示是:如果是Json文件格式可以保持为Parquet等此类操作。
Spark SQL在读取文件的时候可以指定读取文件的类型。例如,Json,Parquet.

/**
 * Specifies the input data source format.Built-in options include “parquet”,”json”,etc.
 *
 * @since 1.4.0
 */
def format(source: String): DataFrameReader = {
 this.source = source //FileType
 this
}

DataFrame.write()

1. 创建DataFrameWriter实例

/**
 * :: Experimental ::
 * Interface for saving the content of the [[DataFrame]] out into external storage.
 *
 * @group output
 * @since 1.4.0
 */
@Experimental
def write: DataFrameWriter = new DataFrameWriter(this)
1

2.  追踪DataFrameWriter源码如下:
以DataFrame的方式向外部存储系统中写入数据。

/**
 * :: Experimental ::
 * Interface used to write a [[DataFrame]] to external storage systems (e.g. file systems,
 * key-value stores, etc). Use [[DataFrame.write]] to access this.
 *
 * @since 1.4.0
 */
@Experimental
final class DataFrameWriter private[sql](df: DataFrame) {

DataFrameWriter.mode()

1. Overwrite是覆盖,之前写的数据全都被覆盖了。
Append:是追加,对于普通文件是在一个文件中进行追加,但是对于parquet格式的文件则创建新的文件进行追加。

/**
 * Specifies the behavior when data or table already exists. Options include:
 *  - `SaveMode.Overwrite`: overwrite the existing data.
 *  - `SaveMode.Append`: append the data.
 *  - `SaveMode.Ignore`: ignore the operation (i.e. no-op).
//默认操作
 *  - `SaveMode.ErrorIfExists`: default option, throw an exception at runtime.
 *
 * @since 1.4.0
 */
def mode(saveMode: SaveMode): DataFrameWriter = {
 this.mode = saveMode
 this
}

2.  通过模式匹配接收外部参数

/**
 * Specifies the behavior when data or table already exists. Options include:
 *  - `overwrite`: overwrite the existing data.
 *  - `append`: append the data.
 *  - `ignore`: ignore the operation (i.e. no-op).
 *  - `error`: default option, throw an exception at runtime.
 *
 * @since 1.4.0
 */
def mode(saveMode: String): DataFrameWriter = {
 this.mode = saveMode.toLowerCase match {
  case "overwrite" => SaveMode.Overwrite
  case "append" => SaveMode.Append
  case "ignore" => SaveMode.Ignore
  case "error" | "default" => SaveMode.ErrorIfExists
  case _ => throw new IllegalArgumentException(s"Unknown save mode: $saveMode. " +
   "Accepted modes are 'overwrite', 'append', 'ignore', 'error'.")
 }
 this
}

DataFrameWriter.save()

1. save将结果保存传入的路径。

/**
 * Saves the content of the [[DataFrame]] at the specified path.
 *
 * @since 1.4.0
 */
def save(path: String): Unit = {
 this.extraOptions += ("path" -> path)
 save()
}

2.  追踪save方法。

/**
 * Saves the content of the [[DataFrame]] as the specified table.
 *
 * @since 1.4.0
 */
def save(): Unit = {
 ResolvedDataSource(
  df.sqlContext,
  source,
  partitioningColumns.map(_.toArray).getOrElse(Array.empty[String]),
  mode,
  extraOptions.toMap,
  df)
}

3.  其中source是SQLConf的defaultDataSourceName
private var source: String = df.sqlContext.conf.defaultDataSourceName
其中DEFAULT_DATA_SOURCE_NAME默认参数是parquet。

// This is used to set the default data source
val DEFAULT_DATA_SOURCE_NAME = stringConf("spark.sql.sources.default",
 defaultValue = Some("org.apache.spark.sql.parquet"),
 doc = "The default data source to use in input/output.")

DataFrame.scala中部分函数详解:

1. toDF函数是将RDD转换成DataFrame

/**
 * Returns the object itself.
 * @group basic
 * @since 1.3.0
 */
// This is declared with parentheses to prevent the Scala compiler from treating
// `rdd.toDF("1")` as invoking this toDF and then apply on the returned DataFrame.
def toDF(): DataFrame = this

2.  show()方法:将结果显示出来

/**
 * Displays the [[DataFrame]] in a tabular form. For example:
 * {{{
 *  year month AVG('Adj Close) MAX('Adj Close)
 *  1980 12  0.503218    0.595103
 *  1981 01  0.523289    0.570307
 *  1982 02  0.436504    0.475256
 *  1983 03  0.410516    0.442194
 *  1984 04  0.450090    0.483521
 * }}}
 * @param numRows Number of rows to show
 * @param truncate Whether truncate long strings. If true, strings more than 20 characters will
 *       be truncated and all cells will be aligned right
 *
 * @group action
 * @since 1.5.0
 */
// scalastyle:off println
def show(numRows: Int, truncate: Boolean): Unit = println(showString(numRows, truncate))
// scalastyle:on println

追踪showString源码如下:showString中触发action收集数据。

/**
 * Compose the string representing rows for output
 * @param _numRows Number of rows to show
 * @param truncate Whether truncate long strings and align cells right
 */
private[sql] def showString(_numRows: Int, truncate: Boolean = true): String = {
 val numRows = _numRows.max(0)
 val sb = new StringBuilder
 val takeResult = take(numRows + 1)
 val hasMoreData = takeResult.length > numRows
 val data = takeResult.take(numRows)
 val numCols = schema.fieldNames.length

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持我们。

(0)

相关推荐

  • Python中用Spark模块的使用教程

    在日常的编程中,我经常需要标识存在于文本文档中的部件和结构,这些文档包括:日志文件.配置文件.定界的数据以及格式更自由的(但还是半结构化的)报表格式.所有这些文档都拥有它们自己的"小语言",用于规定什么能够出现在文档内.我编写这些非正式解析任务的程序的方法总是有点象大杂烩,其中包括定制状态机.正则表达式以及上下文驱动的字符串测试.这些程序中的模式大概总是这样:"读一些文本,弄清是否可以用它来做些什么,然后可能再多读一些文本,一直尝试下去." 解析器将文档中部件和结构

  • 浅谈Spark RDD API中的Map和Reduce

    RDD是什么? RDD是Spark中的抽象数据结构类型,任何数据在Spark中都被表示为RDD.从编程的角度来看,RDD可以简单看成是一个数组.和普通数组的区别是,RDD中的数据是分区存储的,这样不同分区的数据就可以分布在不同的机器上,同时可以被并行处理.因此,Spark应用程序所做的无非是把需要处理的数据转换为RDD,然后对RDD进行一系列的变换和操作从而得到结果.本文为第一部分,将介绍Spark RDD中与Map和Reduce相关的API中. 如何创建RDD? RDD可以从普通数组创建出来,

  • 微软推DreamSpark计划为学生提供免费软件下载地址

    微软推DreamSpark计划为学生提供免费软件 微软公司董事长比尔·盖茨宣布将为全球数百万大学和中学生提供免费的开发和设计工具,以发掘学生的创造潜力,帮助他们踏上学术和职业成功之路.  据国外媒体报道,微软推出的DreamSpark学生计划提供了众多开发和设计软件供学生免费下载,该计划现已向比利时.中国.芬兰.法国.德国.西班牙.瑞典.瑞士.英国和美国的3500万大学生推出.未来6个月内,微软预计将把DreamSpark计划拓展到涵盖澳大利亚.捷克共和国.爱沙尼亚.日本.立陶宛.拉脱维亚.斯洛

  • Spark SQL数据加载和保存实例讲解

    一.前置知识详解 Spark SQL重要是操作DataFrame,DataFrame本身提供了save和load的操作, Load:可以创建DataFrame, Save:把DataFrame中的数据保存到文件或者说与具体的格式来指明我们要读取的文件的类型以及与具体的格式来指出我们要输出的文件是什么类型. 二.Spark SQL读写数据代码实战 import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD;

  • Java高级之虚拟机加载机制的实例讲解

    Jvm要加载的是二进制流,可以是.class文件形式,也可以是其他形式,按照它加载的标准来设计就不会有太大问题. 以下主要就机制和标准两个问题分析一番: 首先来Java类文件的加载机制 ,跟变量的加载机制类似,它先把Class文件加载入内存,再对数据进行验证.解析和初始化,最终形成虚拟机可以直接使用的Java类型.由于Java是采用JIT机制,所以加载时会比较慢,但优点也明显,具有高度灵活性,支持动态加载和动态连接. 接下来就讲讲类的加载过程: 一个类加载的基本过程是按照下面的顺序 来,但也有不

  • vue进行图片的预加载watch用法实例讲解

    watch应用场景 我想信图片预加载大家肯定都有接触过,当图片量大的时候,为了保证页面图片都加载出来的时候,我们才把主页面给显示出来,再进行一些ajax请求,或者逻辑操作 那此时你用computed对这种监听一个数据然后进行一系列逻辑操作和ajax请求,那watch再适合不过了,如果用computed的话那你连实现都实现不了,只有用watch监听 <template> <div v-show=show> <img src="https://img.alicdn.co

  • iframe异步加载实现点击左边菜单加载右边内容实例讲解

    关于iframe异步加载,我们常用的大都是左边菜单栏右边是内容页面,要求我们不能左边菜单不能刷新的情况下,异步加载右边的内容页面. 话不多说,做了一个实例大致是这样的: 1.首先在你的项目中建立三个文件如: 2.在Default页面引入jquery文件并在body中加入也下代码: 复制代码 代码如下: <div style="width: 20%; float: left"> <div id="butten" style="cursor:

  • python用pandas数据加载、存储与文件格式的实例

    数据加载.存储与文件格式 pandas提供了一些用于将表格型数据读取为DataFrame对象的函数.其中read_csv和read_talbe用得最多 pandas中的解析函数: 函数 说明 read_csv 从文件.URL.文件型对象中加载带分隔符的数据,默认分隔符为逗号 read_table 从文件.URL.文件型对象中加载带分隔符的数据.默认分隔符为制表符("\t") read_fwf 读取定宽列格式数据(也就是说,没有分隔符) read_clipboard 读取剪贴板中的数据,

  • Oracle 高速批量数据加载工具sql*loader使用说明

    SQL*Loader(SQLLDR)是Oracle的高速批量数据加载工具.这是一个非常有用的工具,可用于多种平面文件格式向Oralce数据库中加载数据.SQLLDR可以在极短的时间内加载数量庞大的数据.它有两种操作模式. 传统路径:(conventional path):SQLLDR会利用SQL插入为我们加载数据. 直接路径(direct path):采用这种模式,SQLLDR不使用SQL:而是直接格式化数据库块. 利用直接路径加载,你能从一个平面文件读数据,并将其直接写至格式化的数据库块,而绕

  • C++ 将文件数据一次性加载进内存实例代码

    C++ 将文件数据一次性加载进内存实例代码 问题: 早先写了一个目标检测SDK,里面有从bin文件加载模型和从内存加载模型两个接口.后来遇到了级联检测,即有多个bin模型文件,当想要把多个bin文件合并成一个的时候,发现对应的加载接口也得变. 解决: 为了不改变接口,采用了下面的解决思路: (1) 将多个bin文件进行拼接,同时记录每个文件的大小. 合并后的文件为: 模型文件个数+模型A大小+-+模型X大小+模型A参数- (2) 采用下面的方法将这个合并的文件一次性加载进内存 /********

  • Tensorflow 多线程与多进程数据加载实例

    在项目中遇到需要处理超级大量的数据集,无法载入内存的问题就不用说了,单线程分批读取和处理(虽然这个处理也只是特别简单的首尾相连的操作)也会使瓶颈出现在CPU性能上,所以研究了一下多线程和多进程的数据读取和预处理,都是通过调用dataset api实现 1. 多线程数据读取 第一种方法是可以直接从csv里读取数据,但返回值是tensor,需要在sess里run一下才能返回真实值,无法实现真正的并行处理,但如果直接用csv文件或其他什么文件存了特征值,可以直接读取后进行训练,可使用这种方法. imp

  • uni-app实现数据上拉加载更多功能实例

    目录 实现上拉加载更多 优化: 通过节流阀防止发起额外的请求 判断数据是否加载完毕 总结 实现上拉加载更多 打开项目根目录中的 pages.json 配置文件,为 subPackages 分包中的商品 goods_list 页面配置上拉触底的距离: "subPackages": [ { "root": "subpkg", "pages": [ { "path": "goods_detail/goo

  • Python实现实时增量数据加载工具的解决方案

    目录 创建增量ID记录表 数据库连接类 增量数据服务客户端 结果测试 本次主要分享结合单例模式实际应用案例:实现实时增量数据加载工具的解决方案.最关键的是实现一个可进行添加.修改.删除等操作的增量ID记录表. 单例模式:提供全局访问点,确保类有且只有一个特定类型的对象.通常用于以下场景:日志记录或数据库操作等,避免对用一资源请求冲突. 创建增量ID记录表 import sqlite3 import datetime import pymssql import pandas as pd impor

随机推荐