JAVA spark创建DataFrame的方法

述说正传,接下来开始说正事。

以前用Python和Scala操作Spark的时候比较多,毕竟Python和Scala代码写起来要简洁很多。

今天一起来看看Java版本怎么创建DataFrame,代码写起来其实差不多,毕竟公用同一套API。测试数据可以参考我之前的文章。

先来总结下Spark的一般流程:

1,先创建Spark基础变量,spark,sc

2,加载数据,rdd.textFile,spark.read.csv/json等

3,数据处理,mapPartition, map,filter,reduce等一系列transformation操作

4,数据保存,saveAstextFile,或者其他DataFrame方法

祭出代码

package dev.java;

import dev.utils.Utils;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.StructType;
import scala.Tuple2;

import java.util.List;

public class Spark1 {

  private static final String fileData = "seed";
  private static final String fileSave = "result";
  private static SparkSession spark = SparkSession.builder()
        .appName("Java-Spark")
        .master("local[*]")
        .config("spark.default.parallelism", 100)
        .config("spark.sql.shuffle.partitions", 100)
        .config("spark.driver.maxResultSize", "3g")
        .getOrCreate();
  private static JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());

  public static void main(String[] args) {
    Utils.delete(fileSave);
    //
    t1();
  }

  private static void t1() {
    JavaRDD<Row> rdd = sc.textFile(fileData)
        .map(v -> {
          String[] parts = v.split("\t");
          return RowFactory.create(parts[0], Long.parseLong(parts[1]));
        })
        .filter(v -> v.getLong(1) >= 10000)
        .sortBy(v -> v.getLong(1), false, 100)
        .coalesce(2);
    Dataset<Row> df = spark.createDataFrame(rdd, StructType.fromDDL("title string, qty long"));
    df.write().csv(fileSave);
    spark.stop();
  }
}

以上就是JAVA操作spark创建DataFrame的方法的详细内容,更多关于JAVA Spark 创建DataFrame的资料请关注我们其它相关文章!

(0)

相关推荐

  • 在IntelliJ IDEA中创建和运行java/scala/spark程序的方法

    本文将分两部分来介绍如何在IntelliJ IDEA中运行Java/Scala/Spark程序: 基本概念介绍 在IntelliJ IDEA中创建和运行java/scala/spark程序 基本概念介绍 IntelliJ IDEA 本文使用版本为: ideaIC-2020.1 IDEA 全称 IntelliJ IDEA,是java编程语言开发的集成环境.IntelliJ在业界被公认为最好的java开发工具,它的旗舰版本还支持HTML,CSS,PHP,MySQL,Python等,免费版只支持Jav

  • 详解Java编写并运行spark应用程序的方法

    我们首先提出这样一个简单的需求: 现在要分析某网站的访问日志信息,统计来自不同IP的用户访问的次数,从而通过Geo信息来获得来访用户所在国家地区分布状况.这里我拿我网站的日志记录行示例,如下所示: 121.205.198.92 - - [21/Feb/2014:00:00:07 +0800] "GET /archives/417.html HTTP/1.1" 200 11465 "http://shiyanjun.cn/archives/417.html/" &qu

  • 从0开始学习大数据之java spark编程入门与项目实践

    本文实例讲述了大数据java spark编程.分享给大家供大家参考,具体如下: 上节搭建好了eclipse spark编程环境 在测试运行scala 或java 编写spark程序 ,在eclipse平台都可以运行,但打包导出jar,提交 spark-submit运行,都不能执行,最后确定是版本问题,就是你在eclipse调试的spark版本需和spark-submit 提交spark的运行版本一致,还有就是scala版本一致,才能正常运行. 以下是java spark程序运行 1.新建mave

  • java-spark中各种常用算子的写法示例

    Spark的算子的分类 从大方向来说,Spark 算子大致可以分为以下两类: 1)Transformation 变换/转换算子:这种变换并不触发提交作业,完成作业中间过程处理. Transformation 操作是延迟计算的,也就是说从一个RDD 转换生成另一个 RDD 的转换操作不是马上执行,需要等到有 Action 操作的时候才会真正触发运算. 2)Action 行动算子:这类算子会触发 SparkContext 提交 Job 作业. Action 算子会触发 Spark 提交作业(Job)

  • java 中Spark中将对象序列化存储到hdfs

    java 中Spark中将对象序列化存储到hdfs 摘要: Spark应用中经常会遇到这样一个需求: 需要将JAVA对象序列化并存储到HDFS, 尤其是利用MLlib计算出来的一些模型, 存储到hdfs以便模型可以反复利用. 下面的例子演示了Spark环境下从Hbase读取数据, 生成一个word2vec模型, 存储到hdfs. 废话不多说, 直接贴代码了. spark1.4 + hbase0.98 import org.apache.spark.storage.StorageLevel imp

  • Java和scala实现 Spark RDD转换成DataFrame的两种方法小结

    一:准备数据源 在项目下新建一个student.txt文件,里面的内容为: 1,zhangsan,20 2,lisi,21 3,wanger,19 4,fangliu,18 二:实现 Java版: 1.首先新建一个student的Bean对象,实现序列化和toString()方法,具体代码如下: package com.cxd.sql; import java.io.Serializable; @SuppressWarnings("serial") public class Stude

  • JAVA spark创建DataFrame的方法

    述说正传,接下来开始说正事. 以前用Python和Scala操作Spark的时候比较多,毕竟Python和Scala代码写起来要简洁很多. 今天一起来看看Java版本怎么创建DataFrame,代码写起来其实差不多,毕竟公用同一套API.测试数据可以参考我之前的文章. 先来总结下Spark的一般流程: 1,先创建Spark基础变量,spark,sc 2,加载数据,rdd.textFile,spark.read.csv/json等 3,数据处理,mapPartition, map,filter,r

  • 在 Python 中创建DataFrame的方法

    目录 方法一:创建空的DataFrame ​方法二:使用List创建DataFrame​ ​方法三:使用字典创建DataFrame​ ​方法四:使用数组创建带索引DataFrame​ 方法五:从字典列表创建DataFrame ​方法六:使用zip()函数创建DataFrame​ ​方法七:从序列的字典创建DataFrame​ 前言: DataFrame是数据的二维集合. 它是一种数据结构,其中数据以表格形式存储. 数据集按行和列排列: 我们可以在DataFrame中存储多个数据集. 我们可以执行

  • pandas创建DataFrame的7种方法小结

    笔者在学习pandas,在学习过程中总结了一下创建dataframe的方法,通过查阅资料总结遗下几种方法,如果你有其他的方法欢迎留言补充. 练习代码 请点击此处下载 学习环境: 第一种: 用Python中的字典生成 第二种: 利用指定的列内容.索引以及数据 第三种:通过读取文件,可以是json,csv,excel等等. 本文例子就用excel, 上篇博客笔者已经用csv举例了.这里要注意,如果用excel请先安装xlrd这个包 这个文件笔者放在代码同目录 第四种:用numpy中的array生成

  • pyspark创建DataFrame的几种方法

    pyspark创建DataFrame 为了便于操作,使用pyspark时我们通常将数据转为DataFrame的形式来完成清洗和分析动作. RDD和DataFrame 在上一篇pyspark基本操作有提到RDD也是spark中的操作的分布式数据对象. 这里简单看一下RDD和DataFrame的类型. print(type(rdd)) # <class 'pyspark.rdd.RDD'> print(type(df)) # <class 'pyspark.sql.dataframe.Dat

  • java线程之用Thread类创建线程的方法

    在Java中创建线程有两种方法:使用Thread类和使用Runnable接口.在使用Runnable接口时需要建立一个Thread实例.因此,无论是通过Thread类还是Runnable接口建立线程,都必须建立Thread类或它的子类的实例.Thread类的构造方法被重载了八次,构造方法如下: 复制代码 代码如下: public Thread( ); public Thread(Runnable target); public Thread(String name); public Thread

  • java 创建线程的方法总结

    java 创建线程 Java提供了线程类Thread来创建多线程的程序.其实,创建线程与创建普通的类的对象的操作是一样的,而线程就是Thread类或其子类的实例对象.每个Thread对象描述了一个单独的线程.要产生一个线程,有两种方法: ◆需要从Java.lang.Thread类派生一个新的线程类,重载它的run()方法: ◆实现Runnalbe接口,重载Runnalbe接口中的run()方法. 为什么Java要提供两种方法来创建线程呢?它们都有哪些区别?相比而言,哪一种方法更好呢? 在Java

  • 通过Java代码来创建view的方法

    一.简介 需要了解的知识 二.方法 1)java代码创建view方法 * 1.先建view对象 View view= View.inflate(this, R.layout.activity01, null); * 2.在view中填充R.layout.activity01页面 View view= View.inflate(this, R.layout.activity01, null); * 3.然后在view对象中添加各种控件(例如TextView,Button等),注意要转化成ViewG

  • java实现创建缩略图、伸缩图片比例生成的方法

    本文实例讲述了java实现创建缩略图.伸缩图片比例生成的方法.分享给大家供大家参考.具体实现方法如下: 该实例支持将Image的宽度.高度缩放到指定width.height,并保存在指定目录 通过目标对象的大小和标准(指定)大小计算出图片缩小的比例,可以设置图片缩放质量,并且可以根据指定的宽高缩放图片. 具体代码如下所示: 复制代码 代码如下: package com.hoo.util;   import java.awt.Image; import java.awt.image.Buffere

  • Java中创建ZIP文件的方法

    java创建zip文件的代码如下如下: import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.IOException; import java.util.zip.ZipEntry; import java.util.zip.ZipInputStream; import java.util.zip.ZipOutputStream; public cla

随机推荐