Pyspark获取并处理RDD数据代码实例

弹性分布式数据集(RDD)是一组不可变的JVM对象的分布集,可以用于执行高速运算,它是Apache Spark的核心。

在pyspark中获取和处理RDD数据集的方法如下:

1. 首先是导入库和环境配置(本测试在linux的pycharm上完成)

import os
from pyspark import SparkContext, SparkConf
from pyspark.sql.session import SparkSession
os.environ["PYSPARK_PYTHON"]="/usr/bin/python3"
conf = SparkConf().setAppName('test_rdd')
sc = SparkContext('local', 'test', conf=conf)
spark = SparkSession(sc)

2. 然后,提供hdfs分区数据的路径或者分区表名

txt_File = r"hdfs://host:port/apps/hive/warehouse/数据库名.db/表名/分区名/part-m-00029.deflate" # part-m-00029.deflate
# txt_File = r"hdfs://host:port/apps/hive/warehouse/数据库名.db/表名" # hive table

3. sc.textFile进行读取,得到RDD格式数据<还可以用 spark.sparkContext.parallelize(data) 来获取RDD数据>,参数中还可设置数据被划分的分区数

txt_ = sc.textFile(txt_File)

4. 基本操作:

  • type(txt_):显示数据类型,这时属于 'pyspark.rdd.RDD'
  • txt_.first():获取第一条数据
  • txt_.take(2):获取前2条数据,形成长度为2的list
  • txt_.take(2)[1].split('\1')[1]:表示获取前两条中的第[1]条数据(也就是第2条,因为python的索引是从0开始的),并以 '\1'字符分隔开(这要看你的表用什么作为分隔符的),形成list,再获取该list的第2条数据
  • txt_.map(lambda x:x.split('\1')):使用lambda函数和map函数快速处理每一行数据,这里表示将每一行以 '\1'字符分隔开,每一行返回一个list;此时数据结构是:'pyspark.rdd.PipelinedRDD'
  • txt_.map(lambda x:(x, x.split('\1'))).filter(lambda y:y[0].startswith('北京')):表示在返回 (x, x.split('\1')) 后,进行筛选filter,获取其中以 '北京' 开头的行,并按照相同格式 (例如,这里是(x, x.split('\1'))格式,即原数据+分割后的列表数据) 返回数据
  • txt_.collect():返回所有RDD数据元素,当数据量很大时谨慎操作
  • txt_.toDF():不能直接转成DataFrame格式,需要设置Schema

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持我们。

(0)

相关推荐

  • pycharm编写spark程序,导入pyspark包的3中实现方法

    一种方法: File --> Default Setting --> 选中Project Interpreter中的一个python版本-->点击右边锯齿形图标(设置)-->选择more-->选择刚才选中的那个python版本-->点击最下方编辑(也就是增加到这个python版本下)-->点击➕-->选中spark安装目录下的python目录-->一路OK. 再次在python文件中写入如下 from pyspark import SparkConf

  • PyCharm搭建Spark开发环境的实现步骤

    1.安装好JDK 下载并安装好jdk-12.0.1_windows-x64_bin.exe,配置环境变量: 新建系统变量JAVA_HOME,值为Java安装路径 新建系统变量CLASSPATH,值为 .;%JAVA_HOME%\lib\dt.jar;%JAVA_HOME%\lib\tools.jar;(注意最前面的圆点) 配置系统变量PATH,添加 %JAVA_HOME%bin;%JAVA_HOME%jrebin 在CMD中输入:java或者java -version,不显示不是内部命令等,说明

  • Linux下远程连接Jupyter+pyspark部署教程

    博主最近试在服务器上进行spark编程,因此,在开始编程作业之前,要先搭建一个便利的编程环境,这样才能做到舒心地开发.本文主要有以下内容: 1.python多版本管理利器-pythonbrew 2.Jupyter notebooks 安装与使用以及远程连接方法 3.Jupyter连接pyspark,实现web端sprak开发 一.python多版本管理利器-pythonbrew 在利用python进行编程开发的时候,很多时候我们需要多个Python版本进行测试,博主之前一直在Python2.x和

  • Python中用Spark模块的使用教程

    在日常的编程中,我经常需要标识存在于文本文档中的部件和结构,这些文档包括:日志文件.配置文件.定界的数据以及格式更自由的(但还是半结构化的)报表格式.所有这些文档都拥有它们自己的"小语言",用于规定什么能够出现在文档内.我编写这些非正式解析任务的程序的方法总是有点象大杂烩,其中包括定制状态机.正则表达式以及上下文驱动的字符串测试.这些程序中的模式大概总是这样:"读一些文本,弄清是否可以用它来做些什么,然后可能再多读一些文本,一直尝试下去." 解析器将文档中部件和结构

  • pyspark 读取csv文件创建DataFrame的两种方法

    方法一:用pandas辅助 from pyspark import SparkContext from pyspark.sql import SQLContext import pandas as pd sc = SparkContext() sqlContext=SQLContext(sc) df=pd.read_csv(r'game-clicks.csv') sdf=sqlc.createDataFrame(df) 方法二:纯spark from pyspark import SparkCo

  • Python搭建Spark分布式集群环境

    前言 Apache Spark 是一个新兴的大数据处理通用引擎,提供了分布式的内存抽象.Spark 最大的特点就是快,可比 Hadoop MapReduce 的处理速度快 100 倍.本文没有使用一台电脑上构建多个虚拟机的方法来模拟集群,而是使用三台电脑来搭建一个小型分布式集群环境安装. 本教程采用Spark2.0以上版本(比如Spark2.0.2.Spark2.1.0等)搭建集群,同样适用于搭建Spark1.6.2集群. 安装Hadoop并搭建好Hadoop集群环境 Spark分布式集群的安装

  • Pyspark读取parquet数据过程解析

    parquet数据:列式存储结构,由Twitter和Cloudera合作开发,相比于行式存储,其特点是: 可以跳过不符合条件的数据,只读取需要的数据,降低IO数据量:压缩编码可以降低磁盘存储空间,使用更高效的压缩编码节约存储空间:只读取需要的列,支持向量运算,能够获取更好的扫描性能. 那么我们怎么在pyspark中读取和使用parquet数据呢?我以local模式,linux下的pycharm执行作说明. 首先,导入库文件和配置环境: import os from pyspark import

  • Pyspark获取并处理RDD数据代码实例

    弹性分布式数据集(RDD)是一组不可变的JVM对象的分布集,可以用于执行高速运算,它是Apache Spark的核心. 在pyspark中获取和处理RDD数据集的方法如下: 1. 首先是导入库和环境配置(本测试在linux的pycharm上完成) import os from pyspark import SparkContext, SparkConf from pyspark.sql.session import SparkSession os.environ["PYSPARK_PYTHON&

  • Spring Date jpa 获取最新一条数据的实例代码

    #Repository import test.demo.domain.entity.TestEntity; import org.springframework.data.jpa.repository.support.JpaRepositoryImplementation; import org.springframework.stereotype.Repository; @Repository public interface TestEntityRepository extends Jpa

  • Java用POI解析excel并获取所有单元格数据的实例

    1.导入POI相关jar包 org.apache.poi jar 2.代码示例 public List getAllExcel(File file, String tableName, String fname, String enterpriseId, String reportId, String projectId) throws FileNotFoundException, IOException, ClassNotFoundException, InstantiationExcepti

  • Vue-cli项目获取本地json文件数据的实例

    在自己做的vue demo项目中,我想通过在本地添加一些json数据,写到json文件中,并且通过异步请求获取到,然后加载数据. axios.get('http://localhost:8080/datas/json') 然而在这一过程中,我的访问总是404.通过查阅,我发现,在vue-cli基础上构建的项目中,只有static目录才是vue-cli向外暴露的静态数据文件夹,我放在static下的图片可以正常访问到,我是在static同级目录新建了datas目录,将json文件放入datas目录

  • java获取当前时间并格式化代码实例

    这篇文章主要介绍了java获取当前时间并格式化代码实例,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 private static final DateTimeFormatter FORMAT_FOURTEEN = DateTimeFormatter.ofPattern("yyyyMMddHHmmss"); private static final DateTimeFormatter FORMAT_DATE = DateTimeFor

  • javascript写一个ajax自动拦截并下载数据代码实例

    这篇文章主要介绍了javascript写一个ajax自动拦截并下载数据代码实例,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 代码如下 <!DOCTYPE html> <html lang="zh"> <head> <meta charset="UTF-8"> <meta name="viewport" content="width

  • 微信小程序获取地理位置及经纬度授权代码实例

    这篇文章主要介绍了微信小程序获取地理位置及经纬度授权代码实例,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 微信小程序获取地理位置授权,首先需要在app.json中添加配置: "permission": { "scope.userLocation": { "desc": "请确认授权" } } 获取经纬度:如果手机未开启位置信息,那么授权成功后在wx.getLocation(

  • Python通过递归获取目录下指定文件代码实例

    这篇文章主要介绍了python通过递归获取目录下指定文件代码实例,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 获取一个目录下所有指定格式的文件是实际生产中常见需求. import os #递归获取一个目录下所有的指定格式的文件 def get_jsonfile(path,file_list): dir_list=os.listdir(path) for x in dir_list: new_x=os.path.join(path,x) if

  • Vue请求java服务端并返回数据代码实例

    这篇文章主要介绍了Vue请求java服务端并返回数据代码实例,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 最近在自学vue怎么与java进行数据交互.其实axios还是挺简单的,与ajax请求几乎一样,无外乎也就是要解决下跨域的问题. 废话不多说了,直接贴代码,一看就懂! //向springmvc Controller发起请求,传递一个参数 get请求(带参数传递) axios.get('http://127.0.0.1:8088/inas/

  • 使用Python脚本从文件读取数据代码实例

    这篇文章主要介绍了使用Python脚本从文件读取数据代码实例,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 最近自学Python的进度比较慢,工作之余断断续续的看着效率比较低,看来还是要狠下心来每天进步一点点. 还记得前段时间陈大猫提了一口"先实现用python读取本地文件",碰巧今天看到文件与异常,结合练习整理下用Python读取本地文件的代码: import os #从标准库导入os模块 os.chdir('F:\HeadFirs

随机推荐