Pyspark读取parquet数据过程解析

parquet数据:列式存储结构,由Twitter和Cloudera合作开发,相比于行式存储,其特点是:

可以跳过不符合条件的数据,只读取需要的数据,降低IO数据量;压缩编码可以降低磁盘存储空间,使用更高效的压缩编码节约存储空间;只读取需要的列,支持向量运算,能够获取更好的扫描性能。

那么我们怎么在pyspark中读取和使用parquet数据呢?我以local模式,linux下的pycharm执行作说明。

首先,导入库文件和配置环境:

import os
from pyspark import SparkContext, SparkConf
from pyspark.sql.session import SparkSession

os.environ["PYSPARK_PYTHON"]="/usr/bin/python3" #多个python版本时需要指定

conf = SparkConf().setAppName('test_parquet')
sc = SparkContext('local', 'test', conf=conf)
spark = SparkSession(sc)

然后,使用spark进行读取,得到DataFrame格式的数据:host:port 属于主机和端口号

parquetFile = r"hdfs://host:port/Felix_test/test_data.parquet"
df = spark.read.parquet(parquetFile)

而,DataFrame格式数据有一些方法可以使用,例如:

1.df.first() :显示第一条数据,Row格式

print(df.first())

2.df.columns:列名

3.df.count():数据量,数据条数

4.df.toPandas():从spark的DataFrame格式数据转到Pandas数据结构

5.df.show():直接显示表数据;其中df.show(n) 表示只显示前n行信息

6.type(df):显数据示格式

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持我们。

(0)

相关推荐

  • Python搭建Spark分布式集群环境

    前言 Apache Spark 是一个新兴的大数据处理通用引擎,提供了分布式的内存抽象.Spark 最大的特点就是快,可比 Hadoop MapReduce 的处理速度快 100 倍.本文没有使用一台电脑上构建多个虚拟机的方法来模拟集群,而是使用三台电脑来搭建一个小型分布式集群环境安装. 本教程采用Spark2.0以上版本(比如Spark2.0.2.Spark2.1.0等)搭建集群,同样适用于搭建Spark1.6.2集群. 安装Hadoop并搭建好Hadoop集群环境 Spark分布式集群的安装

  • pyspark 读取csv文件创建DataFrame的两种方法

    方法一:用pandas辅助 from pyspark import SparkContext from pyspark.sql import SQLContext import pandas as pd sc = SparkContext() sqlContext=SQLContext(sc) df=pd.read_csv(r'game-clicks.csv') sdf=sqlc.createDataFrame(df) 方法二:纯spark from pyspark import SparkCo

  • PyCharm搭建Spark开发环境的实现步骤

    1.安装好JDK 下载并安装好jdk-12.0.1_windows-x64_bin.exe,配置环境变量: 新建系统变量JAVA_HOME,值为Java安装路径 新建系统变量CLASSPATH,值为 .;%JAVA_HOME%\lib\dt.jar;%JAVA_HOME%\lib\tools.jar;(注意最前面的圆点) 配置系统变量PATH,添加 %JAVA_HOME%bin;%JAVA_HOME%jrebin 在CMD中输入:java或者java -version,不显示不是内部命令等,说明

  • Pyspark获取并处理RDD数据代码实例

    弹性分布式数据集(RDD)是一组不可变的JVM对象的分布集,可以用于执行高速运算,它是Apache Spark的核心. 在pyspark中获取和处理RDD数据集的方法如下: 1. 首先是导入库和环境配置(本测试在linux的pycharm上完成) import os from pyspark import SparkContext, SparkConf from pyspark.sql.session import SparkSession os.environ["PYSPARK_PYTHON&

  • Linux下远程连接Jupyter+pyspark部署教程

    博主最近试在服务器上进行spark编程,因此,在开始编程作业之前,要先搭建一个便利的编程环境,这样才能做到舒心地开发.本文主要有以下内容: 1.python多版本管理利器-pythonbrew 2.Jupyter notebooks 安装与使用以及远程连接方法 3.Jupyter连接pyspark,实现web端sprak开发 一.python多版本管理利器-pythonbrew 在利用python进行编程开发的时候,很多时候我们需要多个Python版本进行测试,博主之前一直在Python2.x和

  • pycharm编写spark程序,导入pyspark包的3中实现方法

    一种方法: File --> Default Setting --> 选中Project Interpreter中的一个python版本-->点击右边锯齿形图标(设置)-->选择more-->选择刚才选中的那个python版本-->点击最下方编辑(也就是增加到这个python版本下)-->点击➕-->选中spark安装目录下的python目录-->一路OK. 再次在python文件中写入如下 from pyspark import SparkConf

  • Python中用Spark模块的使用教程

    在日常的编程中,我经常需要标识存在于文本文档中的部件和结构,这些文档包括:日志文件.配置文件.定界的数据以及格式更自由的(但还是半结构化的)报表格式.所有这些文档都拥有它们自己的"小语言",用于规定什么能够出现在文档内.我编写这些非正式解析任务的程序的方法总是有点象大杂烩,其中包括定制状态机.正则表达式以及上下文驱动的字符串测试.这些程序中的模式大概总是这样:"读一些文本,弄清是否可以用它来做些什么,然后可能再多读一些文本,一直尝试下去." 解析器将文档中部件和结构

  • Pyspark读取parquet数据过程解析

    parquet数据:列式存储结构,由Twitter和Cloudera合作开发,相比于行式存储,其特点是: 可以跳过不符合条件的数据,只读取需要的数据,降低IO数据量:压缩编码可以降低磁盘存储空间,使用更高效的压缩编码节约存储空间:只读取需要的列,支持向量运算,能够获取更好的扫描性能. 那么我们怎么在pyspark中读取和使用parquet数据呢?我以local模式,linux下的pycharm执行作说明. 首先,导入库文件和配置环境: import os from pyspark import

  • 通过openpyxl读取excel文件过程解析

    这篇文章主要介绍了通过openpyxl读取excel文件过程解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 1.环境准备 python3环境.安装openpyxl模块 2.excel文件数据准备 3.为方便直接调用,本代码直接封装成类 from openpyxl import load_workbook class DoExcel: def __init__(self,filename): ''' :param filename: exce

  • sql获取存储过程返回数据过程解析

    这篇文章主要介绍了sql获取存储过程返回数据过程解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 就是在执行存储后,获取存储过程执行的数据并作为其他应的二次使用, 其实在代码中可以说是调用类似,具体操作如下: 创建一个存储过程: use [库名] go set ansi_null on GO SET QUOTED_IDENTIFIER ON GO CREATE PROCEDURE DBO.P_TEST ( @RBACK VARCHAR(20)

  • Java基于final修饰数据过程解析

    这篇文章主要介绍了Java基于final修饰数据过程解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 final是Java中的一个重要关键字,它可以修饰数据.方法和类,本篇将从final修饰的数据角度对final做出总结. final修饰的数据代表着:永远不变.意思是,一旦你用final修饰一块数据,你之后就只能看看它,你想修改它,没门. 我们不希望改变的数据有下面两种情况: 永不改变的编译时常量. //编译时知道其值 private fin

  • python多线程+代理池爬取天天基金网、股票数据过程解析

    简介 提到爬虫,大部分人都会想到使用Scrapy工具,但是仅仅停留在会使用的阶段.为了增加对爬虫机制的理解,我们可以手动实现多线程的爬虫过程,同时,引入IP代理池进行基本的反爬操作. 本次使用天天基金网进行爬虫,该网站具有反爬机制,同时数量足够大,多线程效果较为明显. 技术路线 IP代理池 多线程 爬虫与反爬 编写思路 首先,开始分析天天基金网的一些数据.经过抓包分析,可知: ./fundcode_search.js包含所有基金的数据,同时,该地址具有反爬机制,多次访问将会失败的情况. 同时,经

  • JAVA使用POI(XSSFWORKBOOK)读取EXCEL文件过程解析

    经过一番搜索发现,java操纵excel文件常用的有jxl和poi两种方式,孰好孰坏看自己需求而定. 其中最主要的区别在于jxl不支持.xlsx,而poi支持.xlsx 这里介绍的使用poi方式(XSSFWorkbook),实际上poi提供了HSSFWorkbook和XSSFWorkbook两个实现类.区别在于HSSFWorkbook是针对.xls文件,XSSFWorkbook是针对.xslx文件. 首先明确一下基本概念: 先创建一个工作簿,一个工作簿可以有多个工作表,一个工作表可以有多个行,一

  • Python应用实现处理excel数据过程解析

    实现功能 excel表格中有4列数,分别为RMF计算得到的 β,γ,势能面及组态,需要挑选出相同 β 值下势能面最低时的组态.为了减小数据量,先将 β 值保留两位小数. 代码 import xlrd import xlwt # read xls file readfile = xlrd.open_workbook('./beta-gamma-constrain.xlsx') readsheet = readfile.sheet_by_name('Sheet1') beta = readsheet

  • MyBatis批量插入数据过程解析

    在程序中封装了一个List集合对象,然后需要把该集合中的实体插入到数据库中,由于项目使用了Spring+MyBatis的配置,所以打算使用MyBatis批量插入,由于之前没用过批量插入,在网上找了一些资料后最终实现了,把详细过程贴出来. 实体类TrainRecord结构如下: public class TrainRecord implements Serializable { private static final long serialVersionUID = -12069604621179

  • python 动态迁移solr数据过程解析

    前言 上项目的时候,遇见一次需求,需要把在线的 其中一个 collection 里面的数据迁移到另外一个collection下,于是就百度了看到好多文章,其中大部分都是使用导入的方法,没有找到在线数据的迁移方法.于是写了python脚本,分享出来. 思路: collection数据量比较大,所以一次性操作所有数据太大,于是分段执行操作. 先分段 按1000条数据量进行查询,处理成json数据 把处理后的json数据 发送到目的collection上即可 实现: 一.使用http的接口先进行查询

  • Java实现文件读取和写入过程解析

    需求说明 实际操作过程中,从D盘根目录下的ak.txt读取文件写入D盘根目录下的hello.txt文件内 实现思路 写两个方法,一个用于读取目标文件,一个用于写入目标文件--详情见代码注释 代码内容 文件读取和写入练习 package com.io; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.IOExce

随机推荐