如何使用IDEA开发Spark SQL程序(一文搞懂)

目录
  • 前言
  • Spark SQL是什么
  • 1、使用IDEA开发Spark SQL
    • 1.1、指定列名添加Schema
    • 1.2、通过StructType指定Schema
    • 1.3、反射推断Schema–掌握
    • 1.4、花式查询
    • 1.5、 相互转化
    • 1.6、Spark SQL完成WordCount(案例)
      • 1.6.1、SQL风格
      • 1.6.2、DQL风格

前言

大家好,我是DJ丶小哪吒,我又来跟你们分享知识了。对软件开发有着浓厚的兴趣。喜欢与人分享知识。做博客的目的就是为了能与 他 人知识共享。由于水平有限。博客中难免会有一些错误。如有 纰漏 之处,欢迎大家在留言区指正。小编也会及时改正。

DJ丶小哪吒又来与各位分享知识了。今天我们不飙车,今天就静静的坐下来,我们来聊一聊关于sparkSQL。准备好茶水,听老朽与你娓娓道来。

Spark SQL是什么

Spark SQL 是一个用来处理结构化数据的spark组件。它提供了一个叫做DataFrames的可编程抽象数据模型,并且可被视为一个分布式的SQL查询引擎。

1、使用IDEA开发Spark SQL

Spark会根据文件信息尝试着去推断DataFrame/DataSet的Schema,当然我们也可以手动指定,手动指定的方式有以下几种:

  • 第1种:指定列名添加Schema
  • 第2种:通过StructType指定Schema
  • 第3种:编写样例类,利用反射机制推断Schema

1.1、指定列名添加Schema

package cn.itcast.sql

import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}

object CreateDFDS {
  def main(args: Array[String]): Unit = {
    //1.创建SparkSession
    val spark: SparkSession = SparkSession.builder().master("local[*]").appName("SparkSQL").getOrCreate()
    val sc: SparkContext = spark.sparkContext
    sc.setLogLevel("WARN")
    //2.读取文件
    val fileRDD: RDD[String] = sc.textFile("D:\\data\\person.txt")
    val linesRDD: RDD[Array[String]] = fileRDD.map(_.split(" "))
    val rowRDD: RDD[(Int, String, Int)] = linesRDD.map(line =>(line(0).toInt,line(1),line(2).toInt))
    //3.将RDD转成DF
    //注意:RDD中原本没有toDF方法,新版本中要给它增加一个方法,可以使用隐式转换
    import spark.implicits._
    val personDF: DataFrame = rowRDD.toDF("id","name","age")
    personDF.show(10)
    personDF.printSchema()
    sc.stop()
    spark.stop()
  }
}

1.2、通过StructType指定Schema

package cn.itcast.sql

import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types._
import org.apache.spark.sql.{DataFrame, Row, SparkSession}

object CreateDFDS2 {
  def main(args: Array[String]): Unit = {
    //1.创建SparkSession
    val spark: SparkSession = SparkSession.builder().master("local[*]").appName("SparkSQL").getOrCreate()
    val sc: SparkContext = spark.sparkContext
    sc.setLogLevel("WARN")
    //2.读取文件
    val fileRDD: RDD[String] = sc.textFile("D:\\data\\person.txt")
    val linesRDD: RDD[Array[String]] = fileRDD.map(_.split(" "))
    val rowRDD: RDD[Row] = linesRDD.map(line =>Row(line(0).toInt,line(1),line(2).toInt))
    //3.将RDD转成DF
    //注意:RDD中原本没有toDF方法,新版本中要给它增加一个方法,可以使用隐式转换
    //import spark.implicits._
    val schema: StructType = StructType(Seq(
      StructField("id", IntegerType, true),//允许为空
      StructField("name", StringType, true),
      StructField("age", IntegerType, true))
    )
    val personDF: DataFrame = spark.createDataFrame(rowRDD,schema)
    personDF.show(10)
    personDF.printSchema()
    sc.stop()
    spark.stop()
  }
}

1.3、反射推断Schema–掌握

package cn.itcast.sql

import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}

object CreateDFDS3 {
case class Person(id:Int,name:String,age:Int)
  def main(args: Array[String]): Unit = {
    //1.创建SparkSession
    val spark: SparkSession = SparkSession.builder().master("local[*]").appName("SparkSQL")
.getOrCreate()
    val sc: SparkContext = spark.sparkContext
    sc.setLogLevel("WARN")
    //2.读取文件
    val fileRDD: RDD[String] = sc.textFile("D:\\data\\person.txt")
    val linesRDD: RDD[Array[String]] = fileRDD.map(_.split(" "))
    val rowRDD: RDD[Person] = linesRDD.map(line =>Person(line(0).toInt,line(1),line(2).toInt))
    //3.将RDD转成DF
    //注意:RDD中原本没有toDF方法,新版本中要给它增加一个方法,可以使用隐式转换
    import spark.implicits._
    //注意:上面的rowRDD的泛型是Person,里面包含了Schema信息
    //所以SparkSQL可以通过反射自动获取到并添加给DF
    val personDF: DataFrame = rowRDD.toDF
    personDF.show(10)
    personDF.printSchema()
    sc.stop()
    spark.stop()
  }
}

1.4、花式查询

package cn.itcast.sql

import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}

object QueryDemo {
case class Person(id:Int,name:String,age:Int)
  def main(args: Array[String]): Unit = {
    //1.创建SparkSession
    val spark: SparkSession = SparkSession.builder().master("local[*]").appName("SparkSQL")
.getOrCreate()
    val sc: SparkContext = spark.sparkContext
    sc.setLogLevel("WARN")
    //2.读取文件
    val fileRDD: RDD[String] = sc.textFile("D:\\data\\person.txt")
    val linesRDD: RDD[Array[String]] = fileRDD.map(_.split(" "))
    val rowRDD: RDD[Person] = linesRDD.map(line =>Person(line(0).toInt,line(1),line(2).toInt))
    //3.将RDD转成DF
    //注意:RDD中原本没有toDF方法,新版本中要给它增加一个方法,可以使用隐式转换
    import spark.implicits._
    //注意:上面的rowRDD的泛型是Person,里面包含了Schema信息
    //所以SparkSQL可以通过反射自动获取到并添加给DF
    val personDF: DataFrame = rowRDD.toDF
    personDF.show(10)
    personDF.printSchema()
    //=======================SQL方式查询=======================
    //0.注册表
    personDF.createOrReplaceTempView("t_person")
    //1.查询所有数据
    spark.sql("select * from t_person").show()
    //2.查询age+1
    spark.sql("select age,age+1 from t_person").show()
    //3.查询age最大的两人
    spark.sql("select name,age from t_person order by age desc limit 2").show()
    //4.查询各个年龄的人数
    spark.sql("select age,count(*) from t_person group by age").show()
    //5.查询年龄大于30的
    spark.sql("select * from t_person where age > 30").show()

    //=======================DSL方式查询=======================
    //1.查询所有数据
    personDF.select("name","age")
    //2.查询age+1
    personDF.select($"name",$"age" + 1)
    //3.查询age最大的两人
    personDF.sort($"age".desc).show(2)
    //4.查询各个年龄的人数
    personDF.groupBy("age").count().show()
    //5.查询年龄大于30的
    personDF.filter($"age" > 30).show()

    sc.stop()
    spark.stop()
  }
  }

1.5、 相互转化

RDD、DF、DS之间的相互转换有很多(6种),但是我们实际操作就只有2类:
1)使用RDD算子操作
2)使用DSL/SQL对表操作

package cn.itcast.sql

import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}

object TransformDemo {
case class Person(id:Int,name:String,age:Int)

  def main(args: Array[String]): Unit = {
    //1.创建SparkSession
    val spark: SparkSession = SparkSession.builder().master("local[*]").appName("SparkSQL").getOrCreate()
    val sc: SparkContext = spark.sparkContext
    sc.setLogLevel("WARN")
    //2.读取文件
    val fileRDD: RDD[String] = sc.textFile("D:\\data\\person.txt")
    val linesRDD: RDD[Array[String]] = fileRDD.map(_.split(" "))
    val personRDD: RDD[Person] = linesRDD.map(line =>Person(line(0).toInt,line(1),line(2).toInt))
    //3.将RDD转成DF
    //注意:RDD中原本没有toDF方法,新版本中要给它增加一个方法,可以使用隐式转换
    import spark.implicits._
    //注意:上面的rowRDD的泛型是Person,里面包含了Schema信息
    //所以SparkSQL可以通过反射自动获取到并添加给DF
    //=========================相互转换======================
    //1.RDD-->DF
    val personDF: DataFrame = personRDD.toDF
    //2.DF-->RDD
    val rdd: RDD[Row] = personDF.rdd
    //3.RDD-->DS
    val DS: Dataset[Person] = personRDD.toDS()
    //4.DS-->RDD
    val rdd2: RDD[Person] = DS.rdd
    //5.DF-->DS
    val DS2: Dataset[Person] = personDF.as[Person]
    //6.DS-->DF
    val DF: DataFrame = DS2.toDF()

    sc.stop()
    spark.stop()
  }
  }

1.6、Spark SQL完成WordCount(案例)

1.6.1、SQL风格

package cn.itcast.sql

import org.apache.spark.SparkContext
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}

object WordCount {
  def main(args: Array[String]): Unit = {
    //1.创建SparkSession
    val spark: SparkSession = SparkSession.builder().master("local[*]").appName("SparkSQL").getOrCreate()
    val sc: SparkContext = spark.sparkContext
    sc.setLogLevel("WARN")
    //2.读取文件
    val fileDF: DataFrame = spark.read.text("D:\\data\\words.txt")
    val fileDS: Dataset[String] = spark.read.textFile("D:\\data\\words.txt")
    //fileDF.show()
    //fileDS.show()
    //3.对每一行按照空格进行切分并压平
    //fileDF.flatMap(_.split(" ")) //注意:错误,因为DF没有泛型,不知道_是String
    import spark.implicits._
    val wordDS: Dataset[String] = fileDS.flatMap(_.split(" "))//注意:正确,因为DS有泛型,知道_是String
    //wordDS.show()
    /*
    +-----+
    |value|
    +-----+
    |hello|
    |   me|
    |hello|
    |  you|
      ...
     */
    //4.对上面的数据进行WordCount
    wordDS.createOrReplaceTempView("t_word")
    val sql =
      """
        |select value ,count(value) as count
        |from t_word
        |group by value
        |order by count desc
      """.stripMargin
    spark.sql(sql).show()

    sc.stop()
    spark.stop()
  }
}

1.6.2、DQL风格

package cn.itcast.sql

import org.apache.spark.SparkContext
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}

object WordCount2 {
  def main(args: Array[String]): Unit = {
    //1.创建SparkSession
    val spark: SparkSession = SparkSession.builder().master("local[*]").appName("SparkSQL").getOrCreate()
    val sc: SparkContext = spark.sparkContext
    sc.setLogLevel("WARN")
    //2.读取文件
    val fileDF: DataFrame = spark.read.text("D:\\data\\words.txt")
    val fileDS: Dataset[String] = spark.read.textFile("D:\\data\\words.txt")
    //fileDF.show()
    //fileDS.show()
    //3.对每一行按照空格进行切分并压平
    //fileDF.flatMap(_.split(" ")) //注意:错误,因为DF没有泛型,不知道_是String
    import spark.implicits._
    val wordDS: Dataset[String] = fileDS.flatMap(_.split(" "))//注意:正确,因为DS有泛型,知道_是String
    //wordDS.show()
    /*
    +-----+
    |value|
    +-----+
    |hello|
    |   me|
    |hello|
    |  you|
      ...
     */
    //4.对上面的数据进行WordCount
    wordDS.groupBy("value").count().orderBy($"count".desc).show()

    sc.stop()
    spark.stop()
  }
}

好了,以上内容就到这里了。你学到了吗。

到此这篇关于如何使用IDEA开发Spark SQL程序(一文搞懂)的文章就介绍到这了,更多相关IDEA开发Spark SQL内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • SparkSQL读取hive数据本地idea运行的方法详解

    环境准备: hadoop版本:2.6.5 spark版本:2.3.0 hive版本:1.2.2 master主机:192.168.100.201 slave1主机:192.168.100.201 pom.xml依赖如下: <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="

  • IDEA 开发配置SparkSQL及简单使用案例代码

    1.添加依赖 在idea项目的pom.xml中添加依赖. <!--spark sql依赖,注意版本号--> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.12</artifactId> <version>3.0.0</version> </dependency> 2.案例代码 package com.

  • 如何使用IDEA开发Spark SQL程序(一文搞懂)

    目录 前言 Spark SQL是什么 1.使用IDEA开发Spark SQL 1.1.指定列名添加Schema 1.2.通过StructType指定Schema 1.3.反射推断Schema–掌握 1.4.花式查询 1.5. 相互转化 1.6.Spark SQL完成WordCount(案例) 1.6.1.SQL风格 1.6.2.DQL风格 前言 大家好,我是DJ丶小哪吒,我又来跟你们分享知识了.对软件开发有着浓厚的兴趣.喜欢与人分享知识.做博客的目的就是为了能与 他 人知识共享.由于水平有限.博

  • 一文搞懂SQL注入攻击

    目录 1. 前言 2. SQL注入简介 (1)SQL语言 (2)SQL注入 3. SQL注入步骤 (1)发现漏洞 (2)信息收集 (3)攻击Web系统(猜解用户名和密码) (4)获取管理员权限 4. 防范SQL注入 (1)使用参数化查询或存储过程 (2)用户输入检测 (3)SQL语法分析 (4)其他 1. 前言 随着互联网的发展和普及,网络安全问题越来越突出,网络在为用户提供越来越多服务的同时,也要面对各类越来越复杂的恶意攻击.SQL注入(SQL Injection)攻击是其中最普遍的安全隐患之

  • 一文搞懂Java JDBC中的SQL注入问题

    目录 SQL注入 什么是SQL注入 SQL注入的效果的演示 SQL注入代码 SQL注入效果 如何避免SQL注入 PrepareStatement解决SQL注入 PreparedStatement的应用 参数标记 动态参数绑定 综合案例 PreparedStatement总结 必须使用Statement的情况 SQL注入 什么是SQL注入 在用户输入的数据中有SQL关键字或语法,并且关键字或语法参与了SQL语句的编译.导致SQL语句编译后的条件为true,一直得到正确的结果.这种现象就是SQL注入

  • 详解Java编写并运行spark应用程序的方法

    我们首先提出这样一个简单的需求: 现在要分析某网站的访问日志信息,统计来自不同IP的用户访问的次数,从而通过Geo信息来获得来访用户所在国家地区分布状况.这里我拿我网站的日志记录行示例,如下所示: 121.205.198.92 - - [21/Feb/2014:00:00:07 +0800] "GET /archives/417.html HTTP/1.1" 200 11465 "http://shiyanjun.cn/archives/417.html/" &qu

  • 基于Oracle的高性能动态SQL程序开发

    正在看的ORACLE教程是:基于Oracle的高性能动态SQL程序开发. 摘要:对动态SQL的程序开发进行了总结,并结合笔者实际开发经验给出若干开发技巧. 关键词:动态SQL,PL/SQL,高性能 1. 静态SQLSQL与动态SQL Oracle编译PL/SQL程序块分为两个种:其一为前期联编(early binding),即SQL语句在程序编译期间就已经确定,大多数的编译情况属于这种类型:另外一种是后期联编(late binding),即SQL语句只有在运行阶段才能建立,例如当查询条件为用户输

  • Spark SQL操作JSON字段的小技巧

    前言 介绍Spark SQL的JSON支持,这是我们在Databricks中开发的一个功能,可以在Spark中更容易查询和创建JSON数据.随着网络和移动应用程序的普及,JSON已经成为Web服务API以及长期存储的常用的交换格式.使用现有的工具,用户通常会使用复杂的管道来在分析系统中读取和写入JSON数据集.在Apache Spark 1.1中发布Spark SQL的JSON支持,在Apache Spark 1.2中增强,极大地简化了使用JSON数据的端到端体验. 很多时候,比如用struct

  • Spark SQL的整体实现逻辑解析

    1.sql语句的模块解析 当我们写一个查询语句时,一般包含三个部分,select部分,from数据源部分,where限制条件部分,这三部分的内容在sql中有专门的名称: 当我们写sql时,如上图所示,在进行逻辑解析时会把sql分成三个部分,project,DataSource,Filter模块,当生成执行部分时又把他们称为:Result模块. DataSource模块和Opertion模块. 那么在关系数据库中,当我们写完一个查询语句进行执行时,发生的过程如下图所示: 整个执行流程是:query

  • VC中使用ADO开发数据库应用程序简明教程

    本文实例讲述了VC中使用ADO开发数据库应用程序的方法.分享给大家供大家参考,具体如下: 一.ADO概述 ADO是Microsoft为最新和最强大的数据访问范例 OLE DB 而设计的,是一个便于使用的应用程序层接口.ADO 使您能够编写应用程序以通过 OLE.DB 提供者访问和操作数据库服务器中的数据.ADO 最主要的优点是易于使用.速度快.内存支出少和磁盘遗迹小.ADO 在关键的应用方案中使用最少的网络流量,并且在前端和数据源之间使用最少的层数,所有这些都是为了提供轻量.高性能的接口.之所以

  • Spark学习笔记之Spark SQL的具体使用

    1. Spark SQL是什么? 处理结构化数据的一个spark的模块 它提供了一个编程抽象叫做DataFrame并且作为分布式SQL查询引擎的作用 2. Spark SQL的特点 多语言的接口支持(java python scala) 统一的数据访问 完全兼容hive 支持标准的连接 3. 为什么学习SparkSQL? 我们已经学习了Hive,它是将Hive SQL转换成MapReduce然后提交到集群上执行,大大简化了编写MapReduce的程序的复杂性,由于MapReduce这种计算模型执

  • 使用idea开发javaWeb应用程序的思路(实现用户的增删改查)

    在2.7 节基础上,这一节实现了用户的增删改查操作. 完整工程代码: 链接: https://pan.baidu.com/s/1zSGMvSQa-ihwEN5pP4vW6w 提取码: dpw8 工程目录结构如下: 1.添加/修改用户页面 在webapp下新建目录user,用于存放用户相关的页面. 添加和修改使用的是同一个页面,通过url中的参数控制是否是修改操作. 后台通过是否传递了id进行判断新增还是修改. 新建:add.html,代码如下: <!DOCTYPE html> <html

随机推荐