spark: RDD与DataFrame之间的相互转换方法

DataFrame是一个组织成命名列的数据集。它在概念上等同于关系数据库中的表或R/Python中的数据框架,但其经过了优化。DataFrames可以从各种各样的源构建,例如:结构化数据文件,Hive中的表,外部数据库或现有RDD。

DataFrame API 可以被Scala,Java,Python和R调用。

在Scala和Java中,DataFrame由Rows的数据集表示。

在Scala API中,DataFrame只是一个类型别名Dataset[Row]。而在Java API中,用户需要Dataset<Row>用来表示DataFrame。

在本文档中,我们经常将Scala/Java数据集Row称为DataFrames。

那么DataFrame和spark核心数据结构RDD之间怎么进行转换呢?

代码如下:

# -*- coding: utf-8 -*-
from __future__ import print_function
from pyspark.sql import SparkSession
from pyspark.sql import Row

if __name__ == "__main__":
 # 初始化SparkSession
 spark = SparkSession \
 .builder \
 .appName("RDD_and_DataFrame") \
 .config("spark.some.config.option", "some-value") \
 .getOrCreate()

 sc = spark.sparkContext

 lines = sc.textFile("employee.txt")
 parts = lines.map(lambda l: l.split(","))
 employee = parts.map(lambda p: Row(name=p[0], salary=int(p[1])))

 #RDD转换成DataFrame
 employee_temp = spark.createDataFrame(employee)

 #显示DataFrame数据
 employee_temp.show()

 #创建视图
 employee_temp.createOrReplaceTempView("employee")
 #过滤数据
 employee_result = spark.sql("SELECT name,salary FROM employee WHERE salary >= 14000 AND salary <= 20000")

 # DataFrame转换成RDD
 result = employee_result.rdd.map(lambda p: "name: " + p.name + " salary: " + str(p.salary)).collect()

 #打印RDD数据
 for n in result:
 print(n)

以上这篇spark: RDD与DataFrame之间的相互转换方法就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持我们。

(0)

相关推荐

  • 浅谈DataFrame和SparkSql取值误区

    1.DataFrame返回的不是对象. 2.DataFrame查出来的数据返回的是一个dataframe数据集. 3.DataFrame只有遇见Action的算子才能执行 4.SparkSql查出来的数据返回的是一个dataframe数据集. 原始数据 scala> val parquetDF = sqlContext.read.parquet("hdfs://hadoop14:9000/yuhui/parquet/part-r-00004.gz.parquet") df: or

  • spark dataframe 将一列展开,把该列所有值都变成新列的方法

    The original dataframe 需求:hour代表一天的24小时,现在要将hour列展开,每一个小时都作为一个列 实现: val pivots = beijingGeoHourPopAfterDrop.groupBy("geoHash").pivot("hour").sum("countGeoPerHour").na.fill(0) 并且统计了对应的countGeoPerHour的和,如果有些行没有这个新列对应的数据,将用null填

  • spark: RDD与DataFrame之间的相互转换方法

    DataFrame是一个组织成命名列的数据集.它在概念上等同于关系数据库中的表或R/Python中的数据框架,但其经过了优化.DataFrames可以从各种各样的源构建,例如:结构化数据文件,Hive中的表,外部数据库或现有RDD. DataFrame API 可以被Scala,Java,Python和R调用. 在Scala和Java中,DataFrame由Rows的数据集表示. 在Scala API中,DataFrame只是一个类型别名Dataset[Row].而在Java API中,用户需要

  • pyspark.sql.DataFrame与pandas.DataFrame之间的相互转换实例

    代码如下,步骤流程在代码注释中可见: # -*- coding: utf-8 -*- import pandas as pd from pyspark.sql import SparkSession from pyspark.sql import SQLContext from pyspark import SparkContext #初始化数据 #初始化pandas DataFrame df = pd.DataFrame([[1, 2, 3], [4, 5, 6]], index=['row1

  • spark rdd转dataframe 写入mysql的实例讲解

    dataframe是在spark1.3.0中推出的新的api,这让spark具备了处理大规模结构化数据的能力,在比原有的RDD转化方式易用的前提下,据说计算性能更还快了两倍.spark在离线批处理或者实时计算中都可以将rdd转成dataframe进而通过简单的sql命令对数据进行操作,对于熟悉sql的人来说在转换和过滤过程很方便,甚至可以有更高层次的应用,比如在实时这一块,传入kafka的topic名称和sql语句,后台读取自己配置好的内容字段反射成一个class并利用出入的sql对实时数据进行

  • 对Java字符串与整形、浮点类型之间的相互转换方法总结

    1.字符串转化为整形.浮点类型 String s = "100"; //方法一 int a = Integer.parseInt(String s); Long.parseLong(String s); Float.parseFloat(String s); Double.parseDouble(String s) //方法二 int a = Integer.valueOf(s).intValue(); 不同之处:Integer.parseInt(String s)生成的是一个整形:

  • 将string类型的数据类型转换为spark rdd时报错的解决方法

    在将string类型的数据类型转换为spark rdd时,一直报这个错,StructType can not accept object %r in type %s" % (obj, type(obj))) . . . s = str(tree) y = str(YESTERDAY) list0 = [s, y] outRes = self.sc.parallelize(list0) df_tree = outRes.toDF("model: string, dt: string&qu

  • Java和scala实现 Spark RDD转换成DataFrame的两种方法小结

    一:准备数据源 在项目下新建一个student.txt文件,里面的内容为: 1,zhangsan,20 2,lisi,21 3,wanger,19 4,fangliu,18 二:实现 Java版: 1.首先新建一个student的Bean对象,实现序列化和toString()方法,具体代码如下: package com.cxd.sql; import java.io.Serializable; @SuppressWarnings("serial") public class Stude

  • 使用Spark进行实时流计算的方法

    Spark Streaming VS Structured Streaming Spark Streaming是Spark最初的流处理框架,使用了微批的形式来进行流处理. 提供了基于RDDs的Dstream API,每个时间间隔内的数据为一个RDD,源源不断对RDD进行处理来实现流计算 Apache Spark 在 2016 年的时候启动了 Structured Streaming 项目,一个基于 Spark SQL 的全新流计算引擎 Structured Streaming,让用户像编写批处理

  • R语言技巧Rcpp与Eigen库之间的相互转换

    当我们在使用Rcpp时,进行矩阵运算最简单的是使用Eigen库进行相关操作,可以很轻松地讲R中向量化与矩阵化的思想应用到C++代码上,从而对代码进行加速.可参考前面的博客:利用RcppEigen进行矩阵运算. 但有时,我们却必须使用Rcpp进行DataFrame,List等对象格式的处理.或者如果我们涉及到缺失值的处理,也需要使用Rcpp中的函数来做. 所以,如何在两种矩阵或向量格式,NumericVector/Matrix与VectorXd/MatrixXd之间相互转化就变得非常重要. 我们可

  • php实现xml与json之间的相互转换功能实例

    本文实例讲述了php实现xml与json之间的相互转换功能.分享给大家供大家参考,具体如下: 用php实现xml与json之间的相互转换: 相关函数请查看php手册. 一.参考xml如下 <?xml version="1.0" encoding="UTF-8"?> <humans> <zhangying> <name>张三</name> <sex>男</sex> <old>

  • Java 和 Javascript 的 Date 与 .Net 的 DateTime 之间的相互转换

    Java 和 Javascript 的 Date 对象内部存放的是从1970年1月1日0点以来的毫秒值. .Net 的 DateTime 对象内部存放的是从0001年1月1日12点以来的tick值,1ticks=100纳秒=0.1微秒. 因此,我们可以借助1970年1月1日0点这个特殊的时刻来对二者进行换算,代码如下: using System; namespace Extends { public static class DateTimeEx { #region DateTime Exten

随机推荐