Java和scala实现 Spark RDD转换成DataFrame的两种方法小结

一:准备数据源

在项目下新建一个student.txt文件,里面的内容为:

1,zhangsan,20
2,lisi,21
3,wanger,19
4,fangliu,18 

二:实现

Java版:

1.首先新建一个student的Bean对象,实现序列化和toString()方法,具体代码如下:

package com.cxd.sql;
import java.io.Serializable;
@SuppressWarnings("serial")
public class Student implements Serializable {
 String sid;
 String sname;
 int sage;
 public String getSid() {
  return sid;
 }
 public void setSid(String sid) {
  this.sid = sid;
 }
 public String getSname() {
  return sname;
 }
 public void setSname(String sname) {
  this.sname = sname;
 }
 public int getSage() {
  return sage;
 }
 public void setSage(int sage) {
  this.sage = sage;
 }
 @Override
 public String toString() {
  return "Student [sid=" + sid + ", sname=" + sname + ", sage=" + sage + "]";
 }

}
		

2.转换,具体代码如下

package com.cxd.sql;
import java.util.ArrayList;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
public class TxtToParquetDemo {
 public static void main(String[] args) {

  SparkConf conf = new SparkConf().setAppName("TxtToParquet").setMaster("local");
  SparkSession spark = SparkSession.builder().config(conf).getOrCreate();
  reflectTransform(spark);//Java反射
  dynamicTransform(spark);//动态转换
 }

 /**
  * 通过Java反射转换
  * @param spark
  */
 private static void reflectTransform(SparkSession spark)
 {
  JavaRDD<String> source = spark.read().textFile("stuInfo.txt").javaRDD();

  JavaRDD<Student> rowRDD = source.map(line -> {
   String parts[] = line.split(",");
   Student stu = new Student();
   stu.setSid(parts[0]);
   stu.setSname(parts[1]);
   stu.setSage(Integer.valueOf(parts[2]));
   return stu;
  });

  Dataset<Row> df = spark.createDataFrame(rowRDD, Student.class);
  df.select("sid", "sname", "sage").
  coalesce(1).write().mode(SaveMode.Append).parquet("parquet.res");
 }
 /**
  * 动态转换
  * @param spark
  */
 private static void dynamicTransform(SparkSession spark)
 {
  JavaRDD<String> source = spark.read().textFile("stuInfo.txt").javaRDD();

  JavaRDD<Row> rowRDD = source.map( line -> {
   String[] parts = line.split(",");
   String sid = parts[0];
   String sname = parts[1];
   int sage = Integer.parseInt(parts[2]);

   return RowFactory.create(
     sid,
     sname,
     sage
     );
  });

  ArrayList<StructField> fields = new ArrayList<StructField>();
  StructField field = null;
  field = DataTypes.createStructField("sid", DataTypes.StringType, true);
  fields.add(field);
  field = DataTypes.createStructField("sname", DataTypes.StringType, true);
  fields.add(field);
  field = DataTypes.createStructField("sage", DataTypes.IntegerType, true);
  fields.add(field);

  StructType schema = DataTypes.createStructType(fields);

  Dataset<Row> df = spark.createDataFrame(rowRDD, schema);
  df.coalesce(1).write().mode(SaveMode.Append).parquet("parquet.res1");

 }

}

scala版本:

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.IntegerType
object RDD2Dataset {

 case class Student(id:Int,name:String,age:Int)
 def main(args:Array[String])
 {

 val spark=SparkSession.builder().master("local").appName("RDD2Dataset").getOrCreate()
 import spark.implicits._
 reflectCreate(spark)
 dynamicCreate(spark)
 }

 /**
	 * 通过Java反射转换
	 * @param spark
	 */
 private def reflectCreate(spark:SparkSession):Unit={
 import spark.implicits._
 val stuRDD=spark.sparkContext.textFile("student2.txt")
 //toDF()为隐式转换
 val stuDf=stuRDD.map(_.split(",")).map(parts⇒Student(parts(0).trim.toInt,parts(1),parts(2).trim.toInt)).toDF()
 //stuDf.select("id","name","age").write.text("result") //对写入文件指定列名
 stuDf.printSchema()
 stuDf.createOrReplaceTempView("student")
 val nameDf=spark.sql("select name from student where age<20")
 //nameDf.write.text("result") //将查询结果写入一个文件
 nameDf.show()
 }

 /**
	 * 动态转换
	 * @param spark
	 */
 private def dynamicCreate(spark:SparkSession):Unit={
 val stuRDD=spark.sparkContext.textFile("student.txt")
 import spark.implicits._
 val schemaString="id,name,age"
 val fields=schemaString.split(",").map(fieldName => StructField(fieldName, StringType, nullable = true))
 val schema=StructType(fields)
 val rowRDD=stuRDD.map(_.split(",")).map(parts⇒Row(parts(0),parts(1),parts(2)))
 val stuDf=spark.createDataFrame(rowRDD, schema)
  stuDf.printSchema()
 val tmpView=stuDf.createOrReplaceTempView("student")
 val nameDf=spark.sql("select name from student where age<20")
 //nameDf.write.text("result") //将查询结果写入一个文件
 nameDf.show()
 }
}

注:

1.上面代码全都已经测试通过,测试的环境为spark2.1.0,jdk1.8。

2.此代码不适用于spark2.0以前的版本。

以上这篇Java和scala实现 Spark RDD转换成DataFrame的两种方法小结就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持我们。

(0)

相关推荐

  • Java8与Scala中的Lambda表达式深入讲解

    前言 最近几年Lambda表达式风靡于编程界.很多现代编程语言都把它作为函数式编程的基本组成部分.基于JVM的编程语言如Scala.Groovy及Clojure把它作为关键部分集成在语言中.而如今,(最终)Java 8也加入了这个有趣的行列. Java8 终于要支持Lambda表达式!自2009年以来Lambda表达式已经在Lambda项目中被支持.在那时候,Lambda表达式仍被称为Java闭包.在我们进入一些代码示例以前,先来解释下为什么Lambda表达式在Java程序员中广受欢迎. 1.为

  • 浅析Java和Scala中的Future

    随着CPU的核数的增加,异步编程模型在并发领域中的得到了越来越多的应用,由于Scala是一门函数式语言,天然的支持异步编程模型,今天主要来看一下Java和Scala中的Futrue,带你走入异步编程的大门. Future 很多同学可能会有疑问,Futrue跟异步编程有什么关系?从Future的表面意思是未来,一个Future对象可以看出一个将来得到的结果,这就和异步执行的概念很像,你只管自己去执行,只要将最终的结果传达给我就行,线程不必一直暂停等待结果,可以在具体异步任务执行的时候去执行其他操作

  • 深入学习java中的Groovy 和 Scala 类

    前言 Java 传承的是平台,而不是语言.有超过 200 种语言可以在 JVM 上运行,它们之中不可避免地会有一种语言最终将取代 Java 语言,成为编写 JVM 程序的最佳方式.本系列将探讨三种下一代 JVM 语言:Groovy.Scala 和 Clojure,比较并对比新的功能和范例,让 Java 开发人员对自己近期的未来发展有大体的认识. Java 语言的开发人员精通 C++ 和其他语言,包括多继承(multiple inheritance),使得类可以继承自任意数量的父类.多继承带来的一

  • Java和scala实现 Spark RDD转换成DataFrame的两种方法小结

    一:准备数据源 在项目下新建一个student.txt文件,里面的内容为: 1,zhangsan,20 2,lisi,21 3,wanger,19 4,fangliu,18 二:实现 Java版: 1.首先新建一个student的Bean对象,实现序列化和toString()方法,具体代码如下: package com.cxd.sql; import java.io.Serializable; @SuppressWarnings("serial") public class Stude

  • Java 中二进制转换成十六进制的两种实现方法

    Java 中二进制转换成十六进制的两种实现方法 每个字节转成16进制,方法1 /** * 每个字节转成16进制,方法1 * * @param result */ private static String toHex(byte[] result) { StringBuffer sb = new StringBuffer(result.length * 2); for (int i = 0; i < result.length; i++) { sb.append(Character.forDigi

  • Java实现将png格式图片转换成jpg格式图片的方法【测试可用】

    本文实例讲述了Java实现将png格式图片转换成jpg格式图片的方法.分享给大家供大家参考,具体如下: import java.awt.Color; import java.awt.image.BufferedImage; import java.io.File; import java.io.IOException; import javax.imageio.ImageIO; public class ConvertImageFile { public static void main(Str

  • python 读取文件并把矩阵转成numpy的两种方法

    在当前目录下: 方法1: file = open('filename') a =file.read() b =a.split('\n')#使用换行 len(b) #统计有多少行 for i in range(len(b)): b[i] = b[i].split()#使用空格分开 len(b[0])#可以查看第一行有多少列. B[0][311]#可以查看具体某行某列的数 import numpy as np b = np.array(b)#转成numpy形的 type(b) # 输出<输出clas

  • 基于Java数组实现循环队列的两种方法小结

    用java实现循环队列的方法: 1.添加一个属性size用来记录眼下的元素个数. 目的是当head=rear的时候.通过size=0还是size=数组长度.来区分队列为空,或者队列已满. 2.数组中仅仅存储数组大小-1个元素,保证rear转一圈之后不会和head相等.也就是队列满的时候.rear+1=head,中间刚好空一个元素. 当rear=head的时候.一定是队列空了. 队列(Queue)两端同意操作的类型不一样: 能够进行删除的一端称为队头,这样的操作也叫出队dequeue: 能够进行插

  • js 字符串转换成数字的三种方法

    方法主要有三种 转换函数.强制类型转换.利用js变量弱类型转换. 1. 转换函数: js提供了parseInt()和parseFloat()两个转换函数.前者把值转换成整数,后者把值转换成浮点数.只有对String类型调用这些方法,这两个函数才能正确运行:对其他类型返回的都是NaN(Not a Number). 一些示例如下: 复制代码 代码如下: parseInt("1234blue");   //returns   1234parseInt("0xA");  

  • 将html页改成jsp的两种方式

    一般情况,将html页改成jsp有两种方法,第一种是直接修改html文件,另一种是新建jsp文件.下面具体说一下这两种方式. 假设我们要将testPage.html文件修改为testPage.jsp文件.原testPage.html文件内容为: 复制代码 代码如下: <!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd&qu

  • spring boot @ResponseBody转换JSON 时 Date 类型处理方法【两种方法】

    spring boot @ResponseBody转换JSON 时 Date 类型处理方法[两种方法],Jackson和FastJson两种方式. spring boot @ResponseBody转换JSON 时 Date 类型处理方法 ,这里一共有两种不同解析方式(Jackson和FastJson两种方式) 第一种方式:默认的json处理是 jackson 也就是对configureMessageConverters 没做配置时 mybatis数据查询返回的时间,是一串数字,如何转化成时间.

  • java编程中字节流转换成字符流的实现方法

    java编程中字节流转换成字符流的实现方法 import java.io.*; /*readLine方法是字符流BufferReader类中的方法 * 而键盘录入的方法是字节流InputStream的方法 * 那么能不能将字节流转成字符流再使用字符流缓冲区中的readLine方法呢? * * InputStreamReader类是字节流转向字符流的桥梁.(它本身是一个字符流所以在构造时接受一个字节流) * * */ public class TransStreamDemo { public st

  • Java实现字符串转换成可执行代码的方法

    使用commons的jexl可实现将字符串变成可执行代码的功能,我写了一个类来封装这个功能: import java.util.Map; import org.apache.commons.jexl2.Expression; import org.apache.commons.jexl2.JexlContext; import org.apache.commons.jexl2.JexlEngine; import org.apache.commons.jexl2.MapContext; /**

随机推荐