SparkSQl简介及运行原理

目录
  • 一:什么是SparkSQL?
    • (一)SparkSQL简介
    • (二)SparkSQL运行原理
    • (三)SparkSQL特点
  • 二:DataFrame
    • (一)什么是DataFrame?
  • 补充:Spark中的RDD、DataFrame和DataSet讲解
    • (一)Spark中的模块
    • (二)RDD和DataFrame的区别
  • 三:SparkSession
    • (一)SparkSession简介
    • (二)SparkSession实质
    • (三)SparkSession特点
  • 四:通过RDD创建DataFrame
    • (一)通过样本类创建(反射)
    • (二)通过SparkSession创建DataFrame
    • (三)通过 json 文件创建DataFrames
  • 五:临时视图
    • (一)什么是视图
    • (二)类型
    • (三)创建视图
    • (四)视图查询
    • (五)会话周期
  • 六:DataFrame的read和save和savemode
    • (一)数据读取
    • (二)数据保存
    • (三)数据保存模式
  • 七:数据集DataSet
    • (一)创建和使用DataSet---使用序列
    • (二)创建和使用DataSet---通过case class作为编码器,将DataFrame转换成DataSet
    • (三)创建和使用DataSet---读取HDFS数据文件

一:什么是SparkSQL?

(一)SparkSQL简介

Spark SQL是Spark的一个模块,用于处理结构化的数据,它提供了一个数据抽象DataFrame(最核心的编程抽象就是DataFrame),并且SparkSQL作为分布式SQL查询引擎。
Spark SQL就是将SQL转换成一个任务,提交到集群上运行,类似于Hive的执行方式。

(二)SparkSQL运行原理

将Spark SQL转化为RDD,然后提交到集群执行。

(三)SparkSQL特点

(1)容易整合,Spark SQL已经集成在Spark中

(2)提供了统一的数据访问方式:JSON、CSV、JDBC、Parquet等都是使用统一的方式进行访问

(3)兼容 Hive

(4)标准的数据连接:JDBC、ODBC

二:DataFrame

(一)什么是DataFrame?

在Spark中,DataFrame是一种以RDD为基础的分布式数据集,类似于传统数据库中的二维表格。

DataFrame是组织成命名列的数据集。

它在概念上等同于关系数据库中的表,但在底层具有更丰富的优化。

关系型数据库中的表由表结构和数据组成,而DataFrame也类似,由schema(结构)和数据组成,其数据集是RDD。

DataFrame可以根据很多源进行构建,包括:结构化的数据文件,hive中的表,外部的关系型数据库,以及RDD

补充:Spark中的RDD、DataFrame和DataSet讲解

(一)Spark中的模块

上图展示了Spark的模块及各模块之间的关系:

底层是Spark-core核心模块,Spark每个模块都有一个核心抽象,Spark-core的核心抽象是RDD,

Spark SQL等都基于RDD封装了自己的抽象,在Spark SQL中是DataFrame/DataSet。

相对来说RDD是更偏底层的抽象,DataFrame/DataSet是在其上做了一层封装,做了优化,使用起来更加方便。

从功能上来说,DataFrame/DataSet能做的事情RDD都能做,RDD能做的事情DataFrame/DataSet不一定能做。

(二)RDD和DataFrame的区别

DataFrame与RDD的主要区别在于:

DataFrame

DataFrame带有schema元信息,即DataFrame所表示的二维表数据集的每一列都带有名称和类型。

使得Spark SQL得以洞察更多的结构信息,从而对藏于DataFrame背后的数据源以及作用于DataFrame之上的变换进行了针对性的优化,最终达到大幅提升运行时效率的目标。

RDD

RDD,由于无从得知所存数据元素的具体内部结构,Spark Core只能在stage层面进行简单、通用的流水线优化。

DataFrame和RDD联系:

DataFrame底层是以RDD为基础的分布式数据集,和RDD的主要区别的是:RDD中没有schema信息,而DataFrame中数据每一行都包含schema

DataFrame = RDD[Row] + shcema

三:SparkSession

(一)SparkSession简介

SparkSession是Spark 2.0引如的新概念。SparkSession为用户提供了统一的切入点,来让用户学习spark的各项功能。

在spark的早期版本中,SparkContext是spark的主要切入点,由于RDD是主要的API,我们通过sparkcontext来创建和操作RDD。

对于每个其他的API,我们需要使用不同的context。

例如,对于Streming,我们需要使用StreamingContext;对于sql,使用sqlContext;对于Hive,使用hiveContext。

但是随着DataSet和DataFrame的API逐渐成为标准的API,就需要为他们建立接入点。所以在spark2.0中,引入SparkSession作为DataSet和DataFrame API的切入点。

SparkSession封装了SparkConf、SparkContext和SQLContext。为了向后兼容,SQLContext和HiveContext也被保存下来。

(二)SparkSession实质

SparkSession实质上是SQLContext和HiveContext的组合(未来可能还会加上StreamingContext),所以在SQLContext和HiveContext上可用的API在SparkSession上同样是可以使用的。

SparkSession内部封装了sparkContext,所以计算实际上是由sparkContext完成的。

(三)SparkSession特点

   ----为用户提供一个统一的切入点使用Spark各项功能

----允许用户通过它调用DataFrame和Dataset相关 API来编写程序

----减少了用户需要了解的一些概念,可以很容易的与Spark进行交互

----与Spark交互之时不需要显示的创建SparkConf, SparkContext以及 SQlContext,这些对象已经封闭在SparkSession中

四:通过RDD创建DataFrame

(一)通过样本类创建(反射)

case class People(val name:String,val age:Int)  //可以声明数据类型

object WordCount {
  def main(args:Array[String]):Unit={
    val conf = new SparkConf()
    //设置运行模式为本地运行,不然默认是集群模式
    //conf.setMaster("local")  //默认是集群模式
    //设置任务名
    conf.setAppName("WordCount").setMaster("local")
    conf.set("spark.default.parallelism","5")
    //设置SparkContext,是SparkCore的程序入口
    val sc = new SparkContext(conf)
    val Sqlsc = new SQLContext(sc)  //根据SparkContext生成SQLContext

    val array = Array("mark,14","kitty,23","dasi,45")
    val peopleRDD = sc.parallelize(array).map(line=>{    //生成RDD
      People(line.split(",")(0),line.split(",")(1).trim().toInt)
    })

    import Sqlsc.implicits._  //引入全部方法
    //将RDD转换成DataFrame
    val df = peopleRDD.toDF()
    //将DataFrame转换成一个临时的视图
    df.createOrReplaceTempView("people")
    //使用SQL语句进行查询
    Sqlsc.sql("select * from people").show()
  }
}

(二)通过SparkSession创建DataFrame

object WordCount {
  def main(args:Array[String]):Unit={
    val conf = new SparkConf()
    //设置运行模式为本地运行,不然默认是集群模式
    //conf.setMaster("local")  //默认是集群模式
    //设置任务名
    conf.setAppName("WordCount").setMaster("local")
    conf.set("spark.default.parallelism","5")
    //设置SparkContext,是SparkCore的程序入口
    val sc = new SparkContext(conf)
    val Sqlsc = new SQLContext(sc)  //根据SparkContext生成SQLContext

    val array = Array("mark,14","kitty,23","dasi,45")
    //1.需要将RDD数据映射成Row,需要引入import org.apache.spark.sql.Row
    val peopleRDD = sc.parallelize(array).map(line=>{    //生成RDD
      val fields = line.split(",")
      Row(fields(0),fields(1).trim().toInt)
    })

    //2.创建StructType定义结构
    val st:StructType = StructType(
      //字段名,字段类型,是否可以为空
      List(  //传参是列表类型,或者使用StructField("name", StringType, true) :: StructField("age", IntegerType, true) :: Nil来构建列表
          StructField("name",StringType,true),
          StructField("age",IntegerType,true)
          )
    )

    //3.使用SparkSession建立DataFrame
    val df = Sqlsc.createDataFrame(peopleRDD,st)
    //将DataFrame转换成一个临时的视图
    df.createOrReplaceTempView("people")
    //使用SQL语句进行查询
    Sqlsc.sql("select * from people").show()
  }
}

(三)通过 json 文件创建DataFrames

[{"name":"dafa","age":12},{"name":"safaw","age":17},{"name":"ge","age":34}]
def main(args:Array[String]):Unit={
    val conf = new SparkConf()
    //设置运行模式为本地运行,不然默认是集群模式
    //conf.setMaster("local")  //默认是集群模式
    //设置任务名
    conf.setAppName("WordCount").setMaster("local")
    //设置SparkContext,是SparkCore的程序入口
    val sc = new SparkContext(conf)
    val Sqlsc = new SQLContext(sc)  //根据SparkContext生成SQLContext

    //通过json数据直接创建DataFrame
    val df = Sqlsc.read.json("E:\\1.json")

    //将DataFrame转换成一个临时的视图
    df.createOrReplaceTempView("people1")
    //使用SQL语句进行查询
    Sqlsc.sql("select * from people1").show()
  }

五:临时视图

(一)什么是视图

视图是一个虚表,跟Mysql里的概念是一样的,视图基于实际的表而存在,其实质是一系列的查询语句

(二)类型

局部视图(Temoporary View):只在当前会话中有效,如果创建它的会话终止,则视图也会消失。

全局视图(Global Temporary View): 在全局范围内有效,不同的Session中都可以访问,生命周期是Spark的Application运行周期,全局视图会绑定到系统保留的数据库global_temp中,因此使用它的时候必须加上相应前缀。

(三)创建视图

创建局部视图:df.createOrReplaceTempView("emp")
创建全局视图:df.createOrReplaceGlobalTempView("empG")

(四)视图查询

spark.sql("select * from emp").show
spark.sql("select * from global_temp.empG").show  //查询全局视图,需要添加前缀

(五)会话周期

spark.newSession.sql("select * from emp").show -----> 报错,Table or View Not Found
spark.newSession.sql("select * from global_temp.empG").show ---->可以正常查询

六:DataFrame的read和save和savemode

(一)数据读取

val conf = new SparkConf().setAppName("TestDataFrame2").setMaster("local")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)
    //方式一
    val df1 = sqlContext.read.json("E:\\666\\people.json")
    val df2 = sqlContext.read.parquet("E:\\666\\users.parquet")
    //方式二
    val df3 = sqlContext.read.format("json").load("E:\\666\\people.json")
    val df4 = sqlContext.read.format("parquet").load("E:\\666\\users.parquet")
    //方式三,默认是parquet格式
    val df5 = sqlContext.load("E:\\666\\users.parquet")
   //方式四,使用MySQL进行数据源读取
    val url = "jdbc:mysql://192.168.123.102:3306/hivedb"
    val table = "dbs"
    val properties = new Properties()
    properties.setProperty("user","root")
    properties.setProperty("password","root")
    //需要传入Mysql的URL、表明、properties(连接数据库的用户名密码)
    val df = sqlContext.read.jdbc(url,table,properties)
    df.createOrReplaceTempView("dbs")
    sqlContext.sql("select * from dbs").show()

使用Hive作为数据源:需要在pom.xml文件中添加依赖

<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-hive -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_2.11</artifactId>
            <version>2.3.0</version>
        </dependency>

开发环境则把resource文件夹下添加hive-site.xml文件,集群环境把hive的配置文件要发到$SPARK_HOME/conf目录下

<configuration>
        <property>
                <name>javax.jdo.option.ConnectionURL</name>
                <value>jdbc:mysql://localhost:3306/hivedb?createDatabaseIfNotExist=true</value>
                <description>JDBC connect string for a JDBC metastore</description>
                <!-- 如果 mysql 和 hive 在同一个服务器节点,那么请更改 hadoop02 为 localhost -->
        </property>
        <property>
                <name>javax.jdo.option.ConnectionDriverName</name>
                <value>com.mysql.jdbc.Driver</value>
                <description>Driver class name for a JDBC metastore</description>
        </property>
        <property>
                <name>javax.jdo.option.ConnectionUserName</name>
                <value>root</value>
                <description>username to use against metastore database</description>
        </property>
        <property>
                <name>javax.jdo.option.ConnectionPassword</name>
                <value>root</value>
        <description>password to use against metastore database</description>
        </property>
    <property>
                <name>hive.metastore.warehouse.dir</name>
                <value>/hive/warehouse</value>
                <description>hive default warehouse, if nessecory, change it</description>
        </property>
</configuration>

hive-site.xml配置文件
val conf = new SparkConf().setMaster("local").setAppName(this.getClass.getSimpleName)
    val sc = new SparkContext(conf)
    val sqlContext = new HiveContext(sc)
    sqlContext.sql("select * from myhive.student").show() 

(二)数据保存

val conf = new SparkConf().setAppName("TestDataFrame2").setMaster("local")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)
    val df1 = sqlContext.read.json("E:\\666\\people.json")
    //方式一
    df1.write.json("E:\\111")
    df1.write.parquet("E:\\222")
    //方式二
    df1.write.format("json").save("E:\\333")
    df1.write.format("parquet").save("E:\\444")
    //方式三
    df1.write.save("E:\\555")

(三)数据保存模式

df1.write.format("parquet").mode(SaveMode.Ignore).save("E:\\444")

七:数据集DataSet

Dataset也是一个分布式数据容器,简单来说是类似二维表,Dataset里头存有schema数据结构信息和原生数据,Dataset的底层封装的是RDD,当RDD的泛型是Row类型的时候,我们也可以称它为DataFrame。即Dataset<Row> = DataFrame。DataFrame是特殊的Dataset。

Spark整合了Dataset和DataFrame,前者是有明确类型的数据集,后者是无明确类型的数据集。根据官方的文档:

Dataset是一种强类型集合,与领域对象相关,可以使用函数或者关系进行分布式的操作。
每个Dataset也有一个无类型的视图,叫做DataFrame,也就是关于Row的Dataset。
简单来说,Dataset一般都是Dataset[T]形式,这里的T是指数据的类型,如上图中的Person,而DataFrame就是一个Dataset[Row]。

Datasets是懒加载的,即只有actions被调用的时候才会触发计算。在内部,Dataset代表一个逻辑计划,用来描述产生数据需要的计算。当一个action被调用的时候,Spark的query优化器会优化这个逻辑计划并以分布式的方式在物理上进行实际的计算操作。

(一)创建和使用DataSet---使用序列

(1,"Tom")  (2,"Mary")

测试数据

(1)定义case class
             case class MyData(a:Int,b:String)
(2)使用序列创建DataSet
             val DS = Seq(MyData(1,"Tom"),MyData(2,"Mary")).toDS

(二)创建和使用DataSet---通过case class作为编码器,将DataFrame转换成DataSet

(1)定义case class
             case class Person(name:String,age:BigInt)
(2)读入JSON的数据
             val df = spark.read.json("/root/temp/people.json")
(3)将DataFrame转换成DataSet
             val PersonDS =df.as[Person]

(三)创建和使用DataSet---读取HDFS数据文件

(1)读取HDFS的文件,直接创建DataSet
             val lineDS = spark.read.text("hdfs://bigdata111:9000/input/data.txt").as[String]
(2)分词操作,查询长度大于3的单词
             val words = lineDS.flatMap(_.split(" ")).filter(_.length > 3)
             words.show
             words.collect

到此这篇关于SparkSQl简介及运行原理的文章就介绍到这了,更多相关SparkSQl使用内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • SparkSQL使用快速入门

    目录 一.SparkSQL的进化之路 二.认识SparkSQL 2.1 什么是SparkSQL? 2.2 SparkSQL的作用 2.3 运行原理 2.4 特点 2.5 SparkSession 2.6 DataFrames 三.RDD转换成为DataFrame 3.1通过case class创建DataFrames(反射) 3.2通过structType创建DataFrames(编程接口) 3.3通过 json 文件创建DataFrames 四.DataFrame的read和save和save

  • 浅谈DataFrame和SparkSql取值误区

    1.DataFrame返回的不是对象. 2.DataFrame查出来的数据返回的是一个dataframe数据集. 3.DataFrame只有遇见Action的算子才能执行 4.SparkSql查出来的数据返回的是一个dataframe数据集. 原始数据 scala> val parquetDF = sqlContext.read.parquet("hdfs://hadoop14:9000/yuhui/parquet/part-r-00004.gz.parquet") df: or

  • 创建SparkSession和sparkSQL的详细过程

    目录 一.概述 二.创建SparkSession 三. SQLContext 四. HiveContext 一.概述 spark 有三大引擎,spark core.sparkSQL.sparkStreaming, spark core 的关键抽象是 SparkContext.RDD: SparkSQL 的关键抽象是 SparkSession.DataFrame: sparkStreaming 的关键抽象是 StreamingContext.DStream SparkSession 是 spark

  • SparkSQL读取hive数据本地idea运行的方法详解

    环境准备: hadoop版本:2.6.5 spark版本:2.3.0 hive版本:1.2.2 master主机:192.168.100.201 slave1主机:192.168.100.201 pom.xml依赖如下: <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="

  • DataFrame:通过SparkSql将scala类转为DataFrame的方法

    如下所示: import java.text.DecimalFormat import com.alibaba.fastjson.JSON import com.donews.data.AppConfig import com.typesafe.config.ConfigFactory import org.apache.spark.sql.types.{StructField, StructType} import org.apache.spark.sql.{Row, SaveMode, Da

  • IDEA 开发配置SparkSQL及简单使用案例代码

    1.添加依赖 在idea项目的pom.xml中添加依赖. <!--spark sql依赖,注意版本号--> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.12</artifactId> <version>3.0.0</version> </dependency> 2.案例代码 package com.

  • SparkSQl简介及运行原理

    目录 一:什么是SparkSQL? (一)SparkSQL简介 (二)SparkSQL运行原理 (三)SparkSQL特点 二:DataFrame (一)什么是DataFrame? 补充:Spark中的RDD.DataFrame和DataSet讲解 (一)Spark中的模块 (二)RDD和DataFrame的区别 三:SparkSession (一)SparkSession简介 (二)SparkSession实质 (三)SparkSession特点 四:通过RDD创建DataFrame (一)通

  • JSP程序运行原理、文档结构及简单输入输出实例分析

    本文实例讲述了JSP程序运行原理.文档结构及简单输入输出.分享给大家供大家参考.具体如下: 目标: 掌握Web应用的文档结构: 掌握JSP的运行原理: 掌握JSP的简单输入和输出. 主要内容: 通过一个简单实例介绍Web应用的文档结构和运行原理: 通过一个简单的注册功能介绍基本的输入输出. 实现内容:客户端验证. 1. 文档结构 每个应用都有一个根目录,例如ch2:理论上可以放在任何地方,但是需要配置,简单的做法,直接放在了webapps这个目录下,在这个目录的应用会被自动加载. 在根目录下会有

  • C++ 学习之旅 Windows程序内部运行原理

    学习C++与.net不同的是,一定要搞清楚Windows程序内部运行原理,因为他所涉及大多数是操作系统的调用,而.net毕竟是在.netFrameWork上唱戏. 那Windows应用程序,操作系统,计算机硬件之间的相互关系究竟什么了,下面的图就给予很好的解释. 向下箭头①是 应用程序运行判断处理的结果,输出到输出的设备. 向上箭头②是输入设备,输入到操作系统中. 向下箭头③代表API,我们要解释以下API是什么.API是应用程序接口, 表示应用程序可以通知操作系统执行某个具体的动作,如操作系统

  • android 键盘事件和屏幕事件的运行原理及交互实现

    当在自定义View或者做游戏的时候,我们常常会用到键盘触发事件和屏幕触发事件!在自定义的View里的键盘触发事件(比如:onKeyDown(int keyCode, KeyEvent event))和屏幕触发事件(onTouchEvent(MotionEvent event))和activity里的键盘触发事件(比如:onKeyDown(int keyCode, KeyEvent event))和屏幕触发事件(onTouchEvent(MotionEvent event))是怎么样交互的呢?是怎

  • Python程序运行原理图文解析

    本文研究的主要是Python程序运行原理,具体介绍如下. 编译型语言(C语言为例) 动态型语言 一个程序是如何运行起来的?比如下面的代码 #othermodule.py def add(a, b): return a + b #mainrun.py import othermodule a = ['xiaoke', 1, 'python'] a = 'xiaoke string' def func(): a = -5 b = 257 print(a + b) print(a) if __name

  • Nodejs libuv运行原理详解

    前言 这应该是Nodejs的运行原理的第7篇分享,这篇过后,短时间内不会再分享Nodejs的运行原理,会停更一段时间,PS:不是不更,而是会开挖新的坑,最近有在研究RPG Maker MV,区块链,云计算,可能会更新一些相关文章,或者相关教学. 回到正题,异步编程的难点在于请求与响应不是按顺序发生的.以http server 为例,异步编程赋予了server 高并发的品质,而且他可以以很小的资源代价,不断地接受和处理请求.但是快速处理请求不表示快速地返回请求=>高并发不等同于快速反馈. 在Nod

  • 简单了解SpringCloud运行原理

    这篇文章主要介绍了简单了解SpringCloud运行原理,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 SpringCloud是基于SpringBoot这一高度自动化的应用开发框架,将各类业界比较知名的.得到过实践反馈的开元服务治理相关的技术框架进行优化整合的框架,是一种开发方式的优化和组合,,是一组框架的统称,基于SpringBoot的starter定制,实现开箱即用的目标,通过简单的声明式注解,就能实现服务的调用.负载均衡.限流.熔断等机制

  • Java线程状态运行原理解析

    这篇文章主要介绍了Java线程状态运行原理解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 代码实例如下 package com.fgy.demo05; /** * 等待唤醒案例:线程之间通信 * 注意: * 同步使用的锁对象必须唯一 * 只有锁对象才能调用wait和notify()/notifyAll()方法 */ public class Demo1WaitAndNotify { public static void main(Strin

  • SQL查询的底层运行原理深入分析

    前言 SQL 语言无处不在.SQL 已经不仅仅是技术人员的专属技能了,似乎人人都会写SQL,就如同人人都是产品经理一样.如果你是做后台开发的,那么CRUD就是家常便饭.如果你是做数仓开发的,那么写SQL可能占据了你的大部分工作时间.我们在理解 SELECT 语法的时候,还需要了解 SELECT 执行时的底层原理.只有这样,才能让我们对 SQL 有更深刻的认识.本文分享将逐步分解SQL的执行过程,希望对你有所帮助. 数据准备 本文旨在说明SQL查询的执行过程,不会涉及太复杂的SQL操作,主要涉及两

  • Jmeter结构体系及运行原理顺序解析

    一.Jmeter 运行原理: Jmeter 时以线程的方式来运行的(由于Jmeter 是 java 开发的所以是运行在 JVM 虚拟机上的,java 也是支持多线程的) 二.Jmeter 结构体系 1.线程组 性能测试需要模拟大量用户负载的情况,线程组就是用来完成这个任务的,在线程组中我们可以设置运行的线程数(用户数),运行时长,循环次数等 2.逻辑控制器 控制循环次数等 3.配置元件 性能测试过程中为了模拟大量用户操作我们需要做参数化,那么 Jmeter 参数化就可以通过配置元件来完成,另外

随机推荐