SparkSQL使用快速入门

目录
  • 一、SparkSQL的进化之路
  • 二、认识SparkSQL
    • 2.1 什么是SparkSQL?
    • 2.2 SparkSQL的作用
    • 2.3 运行原理
    • 2.4 特点
    • 2.5 SparkSession
    • 2.6 DataFrames
  • 三、RDD转换成为DataFrame
    • 3.1通过case class创建DataFrames(反射)
    • 3.2通过structType创建DataFrames(编程接口)
    • 3.3通过 json 文件创建DataFrames
  • 四、DataFrame的read和save和savemode
    • 4.1 数据的读取
    • 4.2 数据的保存
    • 4.3 数据的保存模式
  • 五、数据源
    • 5.1 数据源只json
    • 5.2 数据源之parquet
    • 5.3 数据源之Hive
  • 六、SparkSQL 的元数据
    • 1.1元数据的状态
    • 2.2Spark-SQL脚本

一、SparkSQL的进化之路

1.0以前: Shark

1.1.x开始:SparkSQL(只是测试性的) SQL

1.3.x: SparkSQL(正式版本)+Dataframe

1.5.x: SparkSQL 钨丝计划

1.6.x: SparkSQL+DataFrame+DataSet(测试版本)

2.x:

  • SparkSQL+DataFrame+DataSet(正式版本)
  • SparkSQL:还有其他的优化
  • StructuredStreaming(DataSet)

Spark on Hive和Hive on Spark

  • Spark on Hive:Hive只作为储存角色,Spark负责sql解析优化,执行。
  • Hive on Spark:Hive即作为存储又负责sql的解析优化,Spark负责执行。

二、认识SparkSQL

2.1 什么是SparkSQL?

spark SQL是spark的一个模块,主要用于进行结构化数据的处理。它提供的最核心的编程抽象就是DataFrame。

2.2 SparkSQL的作用

提供一个编程抽象(DataFrame) 并且作为分布式 SQL查询引擎

DataFrame:它可以根据很多源进行构建,包括:结构化的数据文件,hive中的表,外部的关系型数据库,以及RDD

2.3 运行原理

将Spark SQL转化为RDD,然后提交到集群执行

2.4 特点

(1)容易整合

(2)统一的数据访问方式

(3)兼容 Hive

(4)标准的数据连接

2.5 SparkSession

SparkSession是Spark 2.0引如的新概念。SparkSession为用户提供了统一的切入点,来让用户学习spark的各项功能。
  在spark的早期版本中,SparkContext是spark的主要切入点,由于RDD是主要的API,我们通过sparkcontext来创建和操作RDD。对于每个其他的API,我们需要使用不同的context。例如,对于Streming,我们需要使用StreamingContext;对于sql,使用sqlContext;对于Hive,使用hiveContext。但是随着DataSet和DataFrame的API逐渐成为标准的API,就需要为他们建立接入点。所以在spark2.0中,引入SparkSession作为DataSet和DataFrame API的切入点,SparkSession封装了SparkConf、SparkContext和SQLContext。为了向后兼容,SQLContext和HiveContext也被保存下来。
  
  SparkSession实质上是SQLContext和HiveContext的组合(未来可能还会加上StreamingContext),所以在SQLContext和HiveContext上可用的API在SparkSession上同样是可以使用的。SparkSession内部封装了sparkContext,所以计算实际上是由sparkContext完成的。

特点:

   ---- 为用户提供一个统一的切入点使用Spark 各项功能

---- 允许用户通过它调用 DataFrame 和 Dataset 相关 API 来编写程序

---- 减少了用户需要了解的一些概念,可以很容易的与 Spark 进行交互

---- 与 Spark 交互之时不需要显示的创建 SparkConf, SparkContext 以及 SQlContext,这些对象已经封闭在 SparkSession 中

2.6 DataFrames

在Spark中,DataFrame是一种以RDD为基础的分布式数据集,类似于传统数据库中的二维表格。DataFrame与RDD的主要区别在于,前者带有schema元信息,即DataFrame所表示的二维表数据集的每一列都带有名称和类型。这使得Spark SQL得以洞察更多的结构信息,从而对藏于DataFrame背后的数据源以及作用于DataFrame之上的变换进行了针对性的优化,最终达到大幅提升运行时效率的目标。反观RDD,由于无从得知所存数据元素的具体内部结构,Spark Core只能在stage层面进行简单、通用的流水线优化。

三、RDD转换成为DataFrame

使用spark1.x版本的方式

测试数据目录:spark/examples/src/main/resources(spark的安装目录里面)

people.txt

3.1通过case class创建DataFrames(反射)

//定义case class,相当于表结构
case class People(var name:String,var age:Int)
object TestDataFrame1 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("RDDToDataFrame").setMaster("local")
    val sc = new SparkContext(conf)
    val context = new SQLContext(sc)
    // 将本地的数据读入 RDD, 并将 RDD 与 case class 关联
    val peopleRDD = sc.textFile("E:\\666\\people.txt")
      .map(line => People(line.split(",")(0), line.split(",")(1).trim.toInt))
    import context.implicits._
    // 将RDD 转换成 DataFrames
    val df = peopleRDD.toDF
    //将DataFrames创建成一个临时的视图
    df.createOrReplaceTempView("people")
    //使用SQL语句进行查询
    context.sql("select * from people").show()
  }
}

运行结果

3.2通过structType创建DataFrames(编程接口)

object TestDataFrame2 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("TestDataFrame2").setMaster("local")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)
    val fileRDD = sc.textFile("E:\\666\\people.txt")
    // 将 RDD 数据映射成 Row,需要 import org.apache.spark.sql.Row
    val rowRDD: RDD[Row] = fileRDD.map(line => {
      val fields = line.split(",")
      Row(fields(0), fields(1).trim.toInt)
    })
    // 创建 StructType 来定义结构
    val structType: StructType = StructType(
      //字段名,字段类型,是否可以为空
      StructField("name", StringType, true) ::
      StructField("age", IntegerType, true) :: Nil
    )
    /**
      * rows: java.util.List[Row],
      * schema: StructType
      * */
    val df: DataFrame = sqlContext.createDataFrame(rowRDD,structType)
    df.createOrReplaceTempView("people")
    sqlContext.sql("select * from people").show()
  }
}

运行结果

3.3通过 json 文件创建DataFrames

object TestDataFrame3 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("TestDataFrame2").setMaster("local")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)
    val df: DataFrame = sqlContext.read.json("E:\\666\\people.json")
    df.createOrReplaceTempView("people")
    sqlContext.sql("select * from people").show()
  }
}

四、DataFrame的read和save和savemode

4.1 数据的读取

object TestRead {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("TestDataFrame2").setMaster("local")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)
    //方式一
    val df1 = sqlContext.read.json("E:\\666\\people.json")
    val df2 = sqlContext.read.parquet("E:\\666\\users.parquet")
    //方式二
    val df3 = sqlContext.read.format("json").load("E:\\666\\people.json")
    val df4 = sqlContext.read.format("parquet").load("E:\\666\\users.parquet")
    //方式三,默认是parquet格式
    val df5 = sqlContext.load("E:\\666\\users.parquet")
  }
}

4.2 数据的保存

object TestSave {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("TestDataFrame2").setMaster("local")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)
    val df1 = sqlContext.read.json("E:\\666\\people.json")
    //方式一
    df1.write.json("E:\\111")
    df1.write.parquet("E:\\222")
    //方式二
    df1.write.format("json").save("E:\\333")
    df1.write.format("parquet").save("E:\\444")
    //方式三
    df1.write.save("E:\\555")

  }
}

4.3 数据的保存模式

使用mode

df1.write.format("parquet").mode(SaveMode.Ignore).save("E:\\444")

五、数据源

5.1 数据源只json

参考4.1

5.2 数据源之parquet

参考4.1

5.3 数据源之Mysql

object TestMysql {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("TestMysql").setMaster("local")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)

    val url = "jdbc:mysql://192.168.123.102:3306/hivedb"
    val table = "dbs"
    val properties = new Properties()
    properties.setProperty("user","root")
    properties.setProperty("password","root")
    //需要传入Mysql的URL、表明、properties(连接数据库的用户名密码)
    val df = sqlContext.read.jdbc(url,table,properties)
    df.createOrReplaceTempView("dbs")
    sqlContext.sql("select * from dbs").show()

  }
}

运行结果

5.3 数据源之Hive

(1)准备工作

在pom.xml文件中添加依赖

<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-hive -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_2.11</artifactId>
            <version>2.3.0</version>
        </dependency>

开发环境则把resource文件夹下添加hive-site.xml文件,集群环境把hive的配置文件要发到$SPARK_HOME/conf目录下

<configuration>
        <property>
                <name>javax.jdo.option.ConnectionURL</name>
                <value>jdbc:mysql://localhost:3306/hivedb?createDatabaseIfNotExist=true</value>
                <description>JDBC connect string for a JDBC metastore</description>
                <!-- 如果 mysql 和 hive 在同一个服务器节点,那么请更改 hadoop02 为 localhost -->
        </property>
        <property>
                <name>javax.jdo.option.ConnectionDriverName</name>
                <value>com.mysql.jdbc.Driver</value>
                <description>Driver class name for a JDBC metastore</description>
        </property>
        <property>
                <name>javax.jdo.option.ConnectionUserName</name>
                <value>root</value>
                <description>username to use against metastore database</description>
        </property>
        <property>
                <name>javax.jdo.option.ConnectionPassword</name>
                <value>root</value>
        <description>password to use against metastore database</description>
        </property>
    <property>
                <name>hive.metastore.warehouse.dir</name>
                <value>/hive/warehouse</value>
                <description>hive default warehouse, if nessecory, change it</description>
        </property>
</configuration>

(2)测试代码

object TestHive {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local").setAppName(this.getClass.getSimpleName)
    val sc = new SparkContext(conf)
    val sqlContext = new HiveContext(sc)
    sqlContext.sql("select * from myhive.student").show()
  }
}

运行结果

六、SparkSQL 的元数据

1.1元数据的状态

SparkSQL 的元数据的状态有两种:

1、in_memory,用完了元数据也就丢了

2、hive , 通过hive去保存的,也就是说,hive的元数据存在哪儿,它的元数据也就存在哪儿。

换句话说,SparkSQL的数据仓库在建立在Hive之上实现的。我们要用SparkSQL去构建数据仓库的时候,必须依赖于Hive。

2.2Spark-SQL脚本

如果用户直接运行bin/spark-sql命令。会导致我们的元数据有两种状态:

1、in-memory状态:如果SPARK-HOME/conf目录下没有放置hive-site.xml文件,元数据的状态就是in-memory

2、hive状态:如果我们在SPARK-HOME/conf目录下放置了,hive-site.xml文件,那么默认情况下,spark-sql的元数据的状态就是hive.

到此这篇关于SparkSQL使用快速入门的文章就介绍到这了,更多相关SparkSQL使用内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • IDEA 开发配置SparkSQL及简单使用案例代码

    1.添加依赖 在idea项目的pom.xml中添加依赖. <!--spark sql依赖,注意版本号--> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.12</artifactId> <version>3.0.0</version> </dependency> 2.案例代码 package com.

  • 浅谈DataFrame和SparkSql取值误区

    1.DataFrame返回的不是对象. 2.DataFrame查出来的数据返回的是一个dataframe数据集. 3.DataFrame只有遇见Action的算子才能执行 4.SparkSql查出来的数据返回的是一个dataframe数据集. 原始数据 scala> val parquetDF = sqlContext.read.parquet("hdfs://hadoop14:9000/yuhui/parquet/part-r-00004.gz.parquet") df: or

  • SparkSQL读取hive数据本地idea运行的方法详解

    环境准备: hadoop版本:2.6.5 spark版本:2.3.0 hive版本:1.2.2 master主机:192.168.100.201 slave1主机:192.168.100.201 pom.xml依赖如下: <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="

  • DataFrame:通过SparkSql将scala类转为DataFrame的方法

    如下所示: import java.text.DecimalFormat import com.alibaba.fastjson.JSON import com.donews.data.AppConfig import com.typesafe.config.ConfigFactory import org.apache.spark.sql.types.{StructField, StructType} import org.apache.spark.sql.{Row, SaveMode, Da

  • 创建SparkSession和sparkSQL的详细过程

    目录 一.概述 二.创建SparkSession 三. SQLContext 四. HiveContext 一.概述 spark 有三大引擎,spark core.sparkSQL.sparkStreaming, spark core 的关键抽象是 SparkContext.RDD: SparkSQL 的关键抽象是 SparkSession.DataFrame: sparkStreaming 的关键抽象是 StreamingContext.DStream SparkSession 是 spark

  • SparkSQl简介及运行原理

    目录 一:什么是SparkSQL? (一)SparkSQL简介 (二)SparkSQL运行原理 (三)SparkSQL特点 二:DataFrame (一)什么是DataFrame? 补充:Spark中的RDD.DataFrame和DataSet讲解 (一)Spark中的模块 (二)RDD和DataFrame的区别 三:SparkSession (一)SparkSession简介 (二)SparkSession实质 (三)SparkSession特点 四:通过RDD创建DataFrame (一)通

  • SparkSQL使用快速入门

    目录 一.SparkSQL的进化之路 二.认识SparkSQL 2.1 什么是SparkSQL? 2.2 SparkSQL的作用 2.3 运行原理 2.4 特点 2.5 SparkSession 2.6 DataFrames 三.RDD转换成为DataFrame 3.1通过case class创建DataFrames(反射) 3.2通过structType创建DataFrames(编程接口) 3.3通过 json 文件创建DataFrames 四.DataFrame的read和save和save

  • SparkSQL使用IDEA快速入门DataFrame与DataSet的完美教程

    目录 1.使用IDEA开发Spark SQL 1.1创建DataFrame/DataSet 1.1.1指定列名添加Schema 1.1.2StructType指定Schema 1.1.3反射推断Schema 1.使用IDEA开发Spark SQL 1.1创建DataFrame/DataSet 1.指定列名添加Schema 2.通过StrucType指定Schema 3.编写样例类,利用反射机制推断Schema 1.1.1指定列名添加Schema //导包 import org.apache.sp

  • SparkSQL快速入门教程

    目录 (一)概述 (二)SparkSQL实战 (三)非JSON格式的Dataset创建 (四)通过JDBC创建DataFrame (五)总结 (一)概述 SparkSQL可以理解为在原生的RDD上做的一层封装,通过SparkSQL可以在scala和java中写SQL语句,并将结果作为Dataset/DataFrame返回.简单来讲,SparkSQL可以让我们像写SQL一样去处理内存中的数据. Dataset是一个数据的分布式集合,是Spark1.6之后新增的接口,它提供了RDD的优点和Spark

  • hibernate4快速入门实例详解

    Hibernate是什么 Hibernate是一个轻量级的ORMapping框架 ORMapping原理(Object RelationalMapping) ORMapping基本对应规则: 1:类跟表相对应 2:类的属性跟表的字段相对应 3:类的实例与表中具体的一条记录相对应 4:一个类可以对应多个表,一个表也可以对应对个类 5:DB中的表可以没有主键,但是Object中必须设置主键字段 6:DB中表与表之间的关系(如:外键)映射成为Object之间的关系 7:Object中属性的个数和名称可

  • 新手如何快速入门Python(菜鸟必看篇)

    学习任何一门语言都是从入门(1年左右),通过不间断练习达到熟练水准(3到5年),少数人最终能精通语言,成为执牛耳者,他们是金字塔的最顶层.虽然万事开头难,但好的开始是成功的一半,今天这篇文章就来谈谈如何开始入门Python.只要方向对了,就不怕路远. 设定目标 当你决定入门 Python 时,需要一个清晰且短期内可实现的目标,比如通过学习找一份初级程序员工作,目标明确后,你需要了解企业对初级程序员有哪些技能要求,下面是我从拉勾网找的一个初级 Python 工程师的任职要求: 1.熟悉 Pytho

  • MongoDB快速入门笔记(八)之MongoDB的java驱动操作代码讲解

    MongoDB的Java驱动是线程安全的,对于一般的应用,只要一个Mongo实例即可,Mongo有个内置的连接池(池大小默认为10个). 下面代码给大家介绍MongoDB的java驱动操作,具体代码如下所示: import java.util.ArrayList; import java.util.List; import java.util.regex.Pattern; import org.bson.Document; import com.mongodb.MongoClient; impo

  • DB2 9(Viper)快速入门

    正在看的db2教程是:DB2 9(Viper)快速入门. 为了帮助您快速掌握 DB2 自身的 XML 特性,请完成几个普通的任务,比如: 创建用于管理 XML 数据的数据库对象,包括一个测试数据库.一些示例表和视图. 使用 INSERT 和 IMPORT 语句将 XML 数据填充到数据库中. 验证您的 XML 数据.使用 DB2 开发和注册您的 XML 模式,并在导入数据时使用 XMLVALIDATE 选项.后续文章将包括其他主题,比如使用 SQL 查询.更新和删除 DB2 XML 数据,使用

  • MongoDB快速入门笔记(二)之MongoDB的概念及简单操作

    MongoDB是面向集合的文档式数据库,不像关系数据库那样,有表,列.行,mongoDB数据库则是由一系列的文档组成.下面给大家介绍MongoDB的概念及简单操作. 1.以下列举普通的关系型数据库和MongoDB数据库简单概念上的区别: 2.MongoDB的简单操作 (1)启动MongoDB数据库之后,使用命令mongo,显示如下,默认连接到test数据库. MongoDB shell version: 3.2.6 connecting to: test 使用命令show dbs,可以查看所有的

  • MongoDB快速入门笔记(一)之windows下安装MongoDB方法

    MongoDB 是一个基于分布式文件存储的数据库.由 C++ 语言编写.旨在为 WEB 应用提供可扩展的高性能数据存储解决方案. MongoDB 是一个介于关系数据库和非关系数据库之间的产品,是非关系数据库当中功能最丰富,最像关系数据库的. MongoDB下载地址:http://www.mongodb.org/downloads 1.安装MongoDB 从MongoDB官网上下载MongoDB,我下载的版本是64位的3.2.6.下载完以后直接安装,我的安装目录是D:\work\MongoDB.

  • MongoDB快速入门笔记(四)之MongoDB查询文档操作实例代码

    MongoDB简介 MongoDB 是一个基于分布式文件存储的数据库.由 C++ 语言编写.旨在为 WEB 应用提供可扩展的高性能数据存储解决方案. MongoDB 是一个介于关系数据库和非关系数据库之间的产品,是非关系数据库当中功能最丰富,最像关系数据库的. 下面给大家介绍MongoDB查询文档操作的实例 先把student删除,再重新插入数据 > db.student.drop() true > db.student.insert([{ "_id" : 1, "

随机推荐