Spark学习笔记之Spark SQL的具体使用

1. Spark SQL是什么?

  • 处理结构化数据的一个spark的模块
  • 它提供了一个编程抽象叫做DataFrame并且作为分布式SQL查询引擎的作用

2. Spark SQL的特点

  • 多语言的接口支持(java python scala)
  • 统一的数据访问
  • 完全兼容hive
  • 支持标准的连接

3. 为什么学习SparkSQL?

我们已经学习了Hive,它是将Hive SQL转换成MapReduce然后提交到集群上执行,大大简化了编写MapReduce的程序的复杂性,由于MapReduce这种计算模型执行效率比较慢。所有Spark SQL的应运而生,它是将Spark SQL转换成RDD,然后提交到集群执行,执行效率非常快!

4. DataFrame(数据框)

  • 与RDD类似,DataFrame也是一个分布式数据容器
  • 然而DataFrame更像传统数据库的二维表格,除了数据以外,还记录数据的结构信息,即schema
  • DataFrame其实就是带有schema信息的RDD

5. SparkSQL1.x的API编程

<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-sql_2.11</artifactId>
  <version>${spark.version}</version>
</dependency>

5.1 使用sqlContext创建DataFrame(测试用)

object Ops3 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("Ops3").setMaster("local[3]")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)
    val rdd1 = sc.parallelize(List(Person("admin1", 14, "man"),Person("admin2", 16, "man"),Person("admin3", 18, "man")))
    val df1: DataFrame = sqlContext.createDataFrame(rdd1)
    df1.show(1)
  }
}
case class Person(name: String, age: Int, sex: String);

5.2 使用sqlContxet中提供的隐式转换函数(测试用)

import org.apache.spark
val conf = new SparkConf().setAppName("Ops3").setMaster("local[3]")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val rdd1 = sc.parallelize(List(Person("admin1", 14, "man"), Person("admin2", 16, "man"), Person("admin3", 18, "man")))
import sqlContext.implicits._
val df1: DataFrame = rdd1.toDF
df1.show()
5.3 使用SqlContext创建DataFrame(常用)
val conf = new SparkConf().setAppName("Ops3").setMaster("local[3]")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val linesRDD: RDD[String] = sc.textFile("hdfs://uplooking02:8020/sparktest/")
val schema = StructType(List(StructField("name", StringType), StructField("age", IntegerType), StructField("sex", StringType)))
val rowRDD: RDD[Row] = linesRDD.map(line => {
 val lineSplit: Array[String] = line.split(",")
 Row(lineSplit(0), lineSplit(1).toInt, lineSplit(2))
})
val rowDF: DataFrame = sqlContext.createDataFrame(rowRDD, schema)
rowDF.show()

6. 使用新版本的2.x的API

val conf = new SparkConf().setAppName("Ops5") setMaster ("local[3]")
val sparkSession: SparkSession = SparkSession.builder().config(conf).getOrCreate()
val sc = sparkSession.sparkContext
val linesRDD: RDD[String] = sc.textFile("hdfs://uplooking02:8020/sparktest/")
//数据清洗
val rowRDD: RDD[Row] = linesRDD.map(line => {
  val splits: Array[String] = line.split(",")
  Row(splits(0), splits(1).toInt, splits(2))
})
val schema = StructType(List(StructField("name", StringType), StructField("age", IntegerType), StructField("sex", StringType)))
val df: DataFrame = sparkSession.createDataFrame(rowRDD, schema)

df.createOrReplaceTempView("p1")
val df2 = sparkSession.sql("select * from p1")
df2.show()

7. 操作SparkSQL的方式

7.1 使用SQL语句的方式对DataFrame进行操作

val conf = new SparkConf().setAppName("Ops5") setMaster ("local[3]")
val sparkSession: SparkSession = SparkSession.builder().config(conf).getOrCreate()//Spark2.x新的API相当于Spark1.x的SQLContext
val sc = sparkSession.sparkContext
val linesRDD: RDD[String] = sc.textFile("hdfs://uplooking02:8020/sparktest/")
//数据清洗
val rowRDD: RDD[Row] = linesRDD.map(line => {
  val splits: Array[String] = line.split(",")
  Row(splits(0), splits(1).toInt, splits(2))
})
val schema = StructType(List(StructField("name", StringType), StructField("age", IntegerType), StructField("sex", StringType)))
val df: DataFrame = sparkSession.createDataFrame(rowRDD, schema)

df.createOrReplaceTempView("p1")//这是Sprk2.x新的API 相当于Spark1.x的registTempTable()
val df2 = sparkSession.sql("select * from p1")
df2.show()

7.2 使用DSL语句的方式对DataFrame进行操作

DSL(domain specific language ) 特定领域语言

val conf = new SparkConf().setAppName("Ops5") setMaster ("local[3]")
val sparkSession: SparkSession = SparkSession.builder().config(conf).getOrCreate()
val sc = sparkSession.sparkContext
val linesRDD: RDD[String] = sc.textFile("hdfs://uplooking02:8020/sparktest/")
//数据清洗
val rowRDD: RDD[Row] = linesRDD.map(line => {
  val splits: Array[String] = line.split(",")
  Row(splits(0), splits(1).toInt, splits(2))
})
val schema = StructType(List(StructField("name", StringType), StructField("age", IntegerType), StructField("sex", StringType)))
val rowDF: DataFrame = sparkSession.createDataFrame(rowRDD, schema)
import sparkSession.implicits._
val df: DataFrame = rowDF.select("name", "age").where("age>10").orderBy($"age".desc)
df.show()

8. SparkSQL的输出

8.1 写出到JSON文件

val conf = new SparkConf().setAppName("Ops5") setMaster ("local[3]")
val sparkSession: SparkSession = SparkSession.builder().config(conf).getOrCreate()
val sc = sparkSession.sparkContext
val linesRDD: RDD[String] = sc.textFile("hdfs://uplooking02:8020/sparktest")
//数据清洗
val rowRDD: RDD[Row] = linesRDD.map(line => {
  val splits: Array[String] = line.split(",")
  Row(splits(0), splits(1).toInt, splits(2))
})
val schema = StructType(List(StructField("name", StringType), StructField("age", IntegerType), StructField("sex", StringType)))
val rowDF: DataFrame = sparkSession.createDataFrame(rowRDD, schema)
import sparkSession.implicits._
val df: DataFrame = rowDF.select("name", "age").where("age>10").orderBy($"age".desc)
df.write.json("hdfs://uplooking02:8020/sparktest1")

8.2 写出到关系型数据库(mysql)

val conf = new SparkConf().setAppName("Ops5") setMaster ("local[3]")
val sparkSession: SparkSession = SparkSession.builder().config(conf).getOrCreate()
val sc = sparkSession.sparkContext
val linesRDD: RDD[String] = sc.textFile("hdfs://uplooking02:8020/sparktest")
//数据清洗
val rowRDD: RDD[Row] = linesRDD.map(line => {
  val splits: Array[String] = line.split(",")
  Row(splits(0), splits(1).toInt, splits(2))
})
val schema = StructType(List(StructField("name", StringType), StructField("age", IntegerType), StructField("sex", StringType)))
val rowDF: DataFrame = sparkSession.createDataFrame(rowRDD, schema)
import sparkSession.implicits._
val df: DataFrame = rowDF.select("name", "age").where("age>10").orderBy($"age".desc)
val url = "jdbc:mysql://localhost:3306/test"
//表会自动创建
val tbName = "person1";
val prop = new Properties()
prop.put("user", "root")
prop.put("password", "root")
//SaveMode 默认为ErrorIfExists
df.write.mode(SaveMode.Append).jdbc(url, tbName, prop)

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持我们。

(0)

相关推荐

  • Spark SQL操作JSON字段的小技巧

    前言 介绍Spark SQL的JSON支持,这是我们在Databricks中开发的一个功能,可以在Spark中更容易查询和创建JSON数据.随着网络和移动应用程序的普及,JSON已经成为Web服务API以及长期存储的常用的交换格式.使用现有的工具,用户通常会使用复杂的管道来在分析系统中读取和写入JSON数据集.在Apache Spark 1.1中发布Spark SQL的JSON支持,在Apache Spark 1.2中增强,极大地简化了使用JSON数据的端到端体验. 很多时候,比如用struct

  • Spark SQL数据加载和保存实例讲解

    一.前置知识详解 Spark SQL重要是操作DataFrame,DataFrame本身提供了save和load的操作, Load:可以创建DataFrame, Save:把DataFrame中的数据保存到文件或者说与具体的格式来指明我们要读取的文件的类型以及与具体的格式来指出我们要输出的文件是什么类型. 二.Spark SQL读写数据代码实战 import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD;

  • DataFrame:通过SparkSql将scala类转为DataFrame的方法

    如下所示: import java.text.DecimalFormat import com.alibaba.fastjson.JSON import com.donews.data.AppConfig import com.typesafe.config.ConfigFactory import org.apache.spark.sql.types.{StructField, StructType} import org.apache.spark.sql.{Row, SaveMode, Da

  • pyspark.sql.DataFrame与pandas.DataFrame之间的相互转换实例

    代码如下,步骤流程在代码注释中可见: # -*- coding: utf-8 -*- import pandas as pd from pyspark.sql import SparkSession from pyspark.sql import SQLContext from pyspark import SparkContext #初始化数据 #初始化pandas DataFrame df = pd.DataFrame([[1, 2, 3], [4, 5, 6]], index=['row1

  • 浅谈DataFrame和SparkSql取值误区

    1.DataFrame返回的不是对象. 2.DataFrame查出来的数据返回的是一个dataframe数据集. 3.DataFrame只有遇见Action的算子才能执行 4.SparkSql查出来的数据返回的是一个dataframe数据集. 原始数据 scala> val parquetDF = sqlContext.read.parquet("hdfs://hadoop14:9000/yuhui/parquet/part-r-00004.gz.parquet") df: or

  • spark rdd转dataframe 写入mysql的实例讲解

    dataframe是在spark1.3.0中推出的新的api,这让spark具备了处理大规模结构化数据的能力,在比原有的RDD转化方式易用的前提下,据说计算性能更还快了两倍.spark在离线批处理或者实时计算中都可以将rdd转成dataframe进而通过简单的sql命令对数据进行操作,对于熟悉sql的人来说在转换和过滤过程很方便,甚至可以有更高层次的应用,比如在实时这一块,传入kafka的topic名称和sql语句,后台读取自己配置好的内容字段反射成一个class并利用出入的sql对实时数据进行

  • Spark学习笔记之Spark SQL的具体使用

    1. Spark SQL是什么? 处理结构化数据的一个spark的模块 它提供了一个编程抽象叫做DataFrame并且作为分布式SQL查询引擎的作用 2. Spark SQL的特点 多语言的接口支持(java python scala) 统一的数据访问 完全兼容hive 支持标准的连接 3. 为什么学习SparkSQL? 我们已经学习了Hive,它是将Hive SQL转换成MapReduce然后提交到集群上执行,大大简化了编写MapReduce的程序的复杂性,由于MapReduce这种计算模型执

  • Spark学习笔记之Spark中的RDD的具体使用

    1. Spark中的RDD Resilient Distributed Datasets(弹性分布式数据集) Spark中的最基本的抽象 有了RDD的存在我们就可以像操作本地集合一样操作分布式的数据 包含所有元素的分区的集合 RDD包含了很多的分区 2. RDD中的弹性 RDD中的数据是可大可小的 RDD的数据默认情况下存放在内存中的,但是在内存资源不足时,Spark会自动将RDD数据写入磁盘 RDD有自动容错功能,当其中一个RDD中的分区的数据丢失,或者当前节点故障时,rdd会根据依赖关系重新

  • Spark学习笔记 (二)Spark2.3 HA集群的分布式安装图文详解

    本文实例讲述了Spark2.3 HA集群的分布式安装.分享给大家供大家参考,具体如下: 一.下载Spark安装包 1.从官网下载 http://spark.apache.org/downloads.html 2.从微软的镜像站下载 http://mirrors.hust.edu.cn/apache/ 3.从清华的镜像站下载 https://mirrors.tuna.tsinghua.edu.cn/apache/ 二.安装基础 1.Java8安装成功 2.zookeeper安装成功 3.hadoo

  • Spark学习笔记(一)Spark初识【特性、组成、应用】

    本文实例讲述了Spark基本特性.组成.应用.分享给大家供大家参考,具体如下: 一.官网介绍 1.什么是Spark 官网地址:http://spark.apache.org/ Apache Spark™是用于大规模数据处理的统一分析引擎. 从右侧最后一条新闻看,Spark也用于AI人工智能 spark是一个实现快速通用的集群计算平台.它是由加州大学伯克利分校AMP实验室 开发的通用内存并行计算框架,用来构建大型的.低延迟的数据分析应用程序.它扩展了广泛使用的MapReduce计算模型.高效的支撑

  • Spark学习笔记Spark Streaming的使用

    1. Spark Streaming Spark Streaming是一个基于Spark Core之上的实时计算框架,可以从很多数据源消费数据并对数据进行处理 Spark Streaing中有一个最基本的抽象叫DStream(代理),本质上就是一系列连续的RDD,DStream其实就是对RDD的封装 DStream可以认为是一个RDD的工厂,该DStream里面生产都是相同业务逻辑的RDD,只不过是RDD里面要读取数据的不相同 在一个批次的处理时间间隔里, DStream只产生一个RDD DSt

  • Mybatis学习笔记之动态SQL揭秘

    前言 MyBatis 的强大特性之一便是它的动态 SQL.所以今天小编在这里为大家介绍一下Mybatis的一个强大功能-动态SQL 动态SQL是Mybatis的一个强大的特性,在使用JDBC操作数据时,如果查询条件特别多,将条件串联成SQL字符串是一件非常痛苦的事情,通常的解决方法使写很多的if-else条件语句去判断和拼接,并确保不能忘了空格或在字段的最后省略逗号.Mybatis使用一种强大的动态SQL语言来改善这种情况 动态SQL基于OGNL的表达式,可使我们能方便地在SQL语句中实现某些逻

  • SQL Server学习笔记之事务、锁定、阻塞、死锁用法详解

    本文实例讲述了SQL Server学习笔记之事务.锁定.阻塞.死锁用法.分享给大家供大家参考,具体如下: 1.事务 隐式事务 /*================================================================== 当以create,drop, fetch,open, revoke,grand, alter table,select,insert,delete,update,truncate table 语句首先执行的时候,SQL Server会话

  • DB2 UDB V8.1管理学习笔记(一)

    正在看的db2教程是:DB2 UDB V8.1管理学习笔记(一). DB2 基本概念 在DB2中由上至下的几个概念: 实例(Instance), 数据库(Database), 表空间(TableSpace), 容器(Container) 在一个操作系统中,DB2数据服务可以同时运行多个实例(有别于Oracle在一个系统内只能起一个实例). 数据库定义在实例中,一个实例可以包含多个数据库.在同一个实例中的不同数据库是完全独立的,分别拥有自己独立的系统编目表. 表空间有2种管理方式: DMS(Dat

  • DB2 UDB V8.1管理学习笔记(三)

    正在看的db2教程是:DB2 UDB V8.1管理学习笔记(三).强制断开已有连接,停止实例并删除.  $ db2idrop -f instance_name 用于在UNIX下迁移实例. $ db2imigr instance_name 更新实例,用于实例获得一些新的产品选项或修订包的访问权. $ db2iupdt instance_name 获取当前所处的实例. $ db2 get instance 当更新实例级别或数据库级别的参数后,有些可以立即生效,有些需要重新启动实例才可生效.immed

  • Java中jqGrid 学习笔记整理——进阶篇(二)

    相关阅读: Java中jqGrid 学习笔记整理--进阶篇(一) 本篇开始正式与后台(java语言)进行数据交互,使用的平台为 JDK:java 1.8.0_71 myEclisp 2015 Stable 2.0 Apache Tomcat-8.0.30 Mysql 5.7 Navicat for mysql 11.2.5(mysql数据库管理工具) 一.数据库部分 1.创建数据库 使用Navicat for mysql创建数据库(使用其他工具或直接使用命令行暂不介绍) 2. 2.创建表 双击打

随机推荐