windowns使用PySpark环境配置和基本操作

下载依赖

首先需要下载hadoop和spark,解压,然后设置环境变量。
hadoop清华源下载
spark清华源下载

HADOOP_HOME => /path/hadoop
SPARK_HOME => /path/spark

安装pyspark。

pip install pyspark

基本使用

可以在shell终端,输入pyspark,有如下回显:

输入以下指令进行测试,并创建SparkContext,SparkContext是任何spark功能的入口点。

>>> from pyspark import SparkContext
>>> sc = SparkContext("local", "First App")

如果以上不会报错,恭喜可以开始使用pyspark编写代码了。
不过,我这里使用IDE来编写代码,首先我们先在终端执行以下代码关闭SparkContext。

>>> sc.stop()

下面使用pycharm编写代码,如果修改了环境变量需要先重启pycharm。
在pycharm运行如下程序,程序会起本地模式的spark计算引擎,通过spark统计abc.txt文件中a和b出现行的数量,文件路径需要自己指定。

from pyspark import SparkContext

sc = SparkContext("local", "First App")
logFile = "abc.txt"
logData = sc.textFile(logFile).cache()
numAs = logData.filter(lambda s: 'a' in s).count()
numBs = logData.filter(lambda s: 'b' in s).count()
print("Line with a:%i,line with b:%i" % (numAs, numBs))

运行结果如下:

20/03/11 16:15:57 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
20/03/11 16:15:58 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
Line with a:3,line with b:1

这里说一下,同样的工作使用python可以做,spark也可以做,使用spark主要是为了高效的进行分布式计算。
戳pyspark教程
戳spark教程

RDD

RDD代表Resilient Distributed Dataset,它们是在多个节点上运行和操作以在集群上进行并行处理的元素,RDD是spark计算的操作对象。
一般,我们先使用数据创建RDD,然后对RDD进行操作。
对RDD操作有两种方法:
Transformation(转换) - 这些操作应用于RDD以创建新的RDD。例如filter,groupBy和map。
Action(操作) - 这些是应用于RDD的操作,它指示Spark执行计算并将结果发送回驱动程序,例如count,collect等。

创建RDD

parallelize是从列表创建RDD,先看一个例子:

from pyspark import SparkContext

sc = SparkContext("local", "count app")
words = sc.parallelize(
    ["scala",
     "java",
     "hadoop",
     "spark",
     "akka",
     "spark vs hadoop",
     "pyspark",
     "pyspark and spark"
     ])
print(words)

结果中我们得到一个对象,就是我们列表数据的RDD对象,spark之后可以对他进行操作。

ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:195

Count

count方法返回RDD中的元素个数。

from pyspark import SparkContext

sc = SparkContext("local", "count app")
words = sc.parallelize(
    ["scala",
     "java",
     "hadoop",
     "spark",
     "akka",
     "spark vs hadoop",
     "pyspark",
     "pyspark and spark"
     ])
print(words)

counts = words.count()
print("Number of elements in RDD -> %i" % counts)

返回结果:

Number of elements in RDD -> 8

Collect

collect返回RDD中的所有元素。

from pyspark import SparkContext

sc = SparkContext("local", "collect app")
words = sc.parallelize(
    ["scala",
     "java",
     "hadoop",
     "spark",
     "akka",
     "spark vs hadoop",
     "pyspark",
     "pyspark and spark"
     ])
coll = words.collect()
print("Elements in RDD -> %s" % coll)

返回结果:

Elements in RDD -> ['scala', 'java', 'hadoop', 'spark', 'akka', 'spark vs hadoop', 'pyspark', 'pyspark and spark']

foreach

每个元素会使用foreach内的函数进行处理,但是不会返回任何对象。
下面的程序中,我们定义的一个累加器accumulator,用于储存在foreach执行过程中的值。

from pyspark import SparkContext
sc = SparkContext("local", "ForEach app")

accum = sc.accumulator(0)
data = [1, 2, 3, 4, 5]
rdd = sc.parallelize(data)

def increment_counter(x):
    print(x)
    accum.add(x)
 return 0

s = rdd.foreach(increment_counter)
print(s)  # None
print("Counter value: ", accum)

返回结果:

None
Counter value:  15

filter

返回一个包含元素的新RDD,满足过滤器的条件。

from pyspark import SparkContext
sc = SparkContext("local", "Filter app")
words = sc.parallelize(
    ["scala",
     "java",
     "hadoop",
     "spark",
     "akka",
     "spark vs hadoop",
     "pyspark",
     "pyspark and spark"]
)
words_filter = words.filter(lambda x: 'spark' in x)
filtered = words_filter.collect()
print("Fitered RDD -> %s" % (filtered))

Fitered RDD -> ['spark', 'spark vs hadoop', 'pyspark', 'pyspark and spark']

也可以改写成这样:

from pyspark import SparkContext
sc = SparkContext("local", "Filter app")
words = sc.parallelize(
    ["scala",
     "java",
     "hadoop",
     "spark",
     "akka",
     "spark vs hadoop",
     "pyspark",
     "pyspark and spark"]
)

def g(x):
    for i in x:
        if "spark" in x:
            return i

words_filter = words.filter(g)
filtered = words_filter.collect()
print("Fitered RDD -> %s" % (filtered))

map

将函数应用于RDD中的每个元素并返回新的RDD。

from pyspark import SparkContext
sc = SparkContext("local", "Map app")
words = sc.parallelize(
    ["scala",
     "java",
     "hadoop",
     "spark",
     "akka",
     "spark vs hadoop",
     "pyspark",
     "pyspark and spark"]
)
words_map = words.map(lambda x: (x, 1, "_{}".format(x)))
mapping = words_map.collect()
print("Key value pair -> %s" % (mapping))

返回结果:

Key value pair -> [('scala', 1, '_scala'), ('java', 1, '_java'), ('hadoop', 1, '_hadoop'), ('spark', 1, '_spark'), ('akka', 1, '_akka'), ('spark vs hadoop', 1, '_spark vs hadoop'), ('pyspark', 1, '_pyspark'), ('pyspark and spark', 1, '_pyspark and spark')]

Reduce

执行指定的可交换和关联二元操作后,然后返回RDD中的元素。

from pyspark import SparkContext
from operator import add

sc = SparkContext("local", "Reduce app")
nums = sc.parallelize([1, 2, 3, 4, 5])
adding = nums.reduce(add)
print("Adding all the elements -> %i" % (adding))

这里的add是python内置的函数,可以使用ide查看:

def add(a, b):
    "Same as a + b."
    return a + b

reduce会依次对元素相加,相加后的结果加上其他元素,最后返回结果(RDD中的元素)。

Adding all the elements -> 15

Join

返回RDD,包含两者同时匹配的键,键包含对应的所有元素。

from pyspark import SparkContext

sc = SparkContext("local", "Join app")
x = sc.parallelize([("spark", 1), ("hadoop", 4), ("python", 4)])
y = sc.parallelize([("spark", 2), ("hadoop", 5)])
print("x =>", x.collect())
print("y =>", y.collect())
joined = x.join(y)
final = joined.collect()
print( "Join RDD -> %s" % (final))

返回结果:

x => [('spark', 1), ('hadoop', 4), ('python', 4)]
y => [('spark', 2), ('hadoop', 5)]
Join RDD -> [('hadoop', (4, 5)), ('spark', (1, 2))]

到此这篇关于windowns使用PySpark环境配置和基本操作的文章就介绍到这了,更多相关PySpark环境配置 内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • PyCharm+PySpark远程调试的环境配置的方法

    前言:前两天准备用 Python 在 Spark 上处理量几十G的数据,熟料在利用PyCharm进行PySpark远程调试时掉入深坑,特写此博文以帮助同样深处坑中的bigdata&machine learning fans早日出坑. Version :Spark 1.5.0.Python 2.7.14 1. 远程Spark集群环境 首先Spark集群要配置好且能正常启动,版本号可以在Spark对应版本的官方网站查到,注意:Spark 1.5.0作为一个比较古老的版本,不支持Python 3.6+

  • PyCharm搭建Spark开发环境实现第一个pyspark程序

    一, PyCharm搭建Spark开发环境 Windows7, Java1.8.0_74, Scala 2.12.6, Spark 2.2.1, Hadoop2.7.6 通常情况下,Spark开发是基于Linux集群的,但这里作为初学者并且囊中羞涩,还是在windows环境下先学习吧. 参照这个配置本地的Spark环境. 之后就是配置PyCharm用来开发Spark.本人在这里浪费了不少时间,因为百度出来的无非就以下两种方式: 1.在程序中设置环境变量 import os import sys

  • windowns使用PySpark环境配置和基本操作

    下载依赖 首先需要下载hadoop和spark,解压,然后设置环境变量. hadoop清华源下载 spark清华源下载 HADOOP_HOME => /path/hadoop SPARK_HOME => /path/spark 安装pyspark. pip install pyspark 基本使用 可以在shell终端,输入pyspark,有如下回显: 输入以下指令进行测试,并创建SparkContext,SparkContext是任何spark功能的入口点. >>> fro

  • MongoDB数据库安装配置、基本操作实例详解

    本文实例讲述了MongoDB数据库安装配置.基本操作.分享给大家供大家参考,具体如下: 1.简介 NO SQL:NoSQL(NoSQL = Not Only SQL ),意即"不仅仅是SQL",是对不同于传统的关系数据库管理系统(RDBMS)的统称.NoSQL用于超大规模数据的存储,这些类型的数据存储不需要固定的模式,无需多余操作就可以横向扩展. RDBMS NOSQL - 高度组织化结构化数据 - 结构化查询语言(SQL) - 数据和关系都存储在单独的表中. - 数据操纵语言,数据定

  • docker的pdflatex环境配置的方法步骤

    技术背景 Latex在文档撰写方面是不可或缺的工具,尤其是在写文章方面,是必须要用到的文字排版工具.但是latex的环境部署并不是一个特别人性化的操作,尤其是在各种不同的平台上操作是完全不一样的,还经常容易报错.我们可以一个一个的去解决报错问题,但是这需要耗费极大的精力和时间,所以很多人选择了直接在overleaf进行latex的创作.但其实overleaf也有它的缺点,比如免费版本的带宽和速度都比较受限,尤其是在国内的网络,访问速度可谓是"一绝".因此这里我们介绍一个更加人性化的方案

  • MAC下Anaconda+Pyspark安装配置详细步骤

    在MAC的Anaconda上使用pyspark,主要包括以下步骤: 在MAC下安装Spark,并配置环境变量. 在Anaconda中安装引用pyspark. 1. MAC下安装Spark   到Apark Spark官网上下载Spark文件,无论是windows系统,还是MAC系统,亦或者Linux系统,都可以下载这个文件(独立于系统). 将下载的文件进行解压(可以使用命令行进行解压,也可以使用解压软件).解压之后的文件如下: 配置环境变量.打开MAC命令行窗口,输入如下命令: sudo vi

  • Apache Thrift环境配置

    安装Thrift的官方文档地址: http://thrift.apache.org/docs/install/ 当我看到windows安装需要Cygwin或MinGW时,我就直接放弃在windows中配置的想法了,直接打开虚拟机用CentOS进行安装,使用一堆命令安装毕竟比windows方便. CentOS安装Thrift 官方文档地址: http://thrift.apache.org/docs/install/centos 基本上按照官方的操作,可以一直进行到最后一组命令,就是下面这个地方:

  • nodejs开发环境配置与使用

    先说下nodejs这个哦,有人以为它是一种语言,其实不是,它是一个平台,一个建立在google的V8引擎上的js运行平台,就是解析js,并提供自己 的一些API给用户调用.从目前的情况来看,这个发展情况还算好,明天都有好多的前端后台工程师在加入,连一些大神也在关注甚至写博客,昨晚我还看见一篇 文章写道一个外国的网站写了将近90搞nodejs的web插件,这个真牛啊!那学习中国东东对于我们来说最直接的能带来什么利益呢:前端人员由于熟悉 js那么可以基本简单学习下linux就可以上手了,那后台工程师

  • windows系统下简单nodejs安装及环境配置

    相信对于很多关注javascript发展的同学来说,nodejs已经不是一个陌生的词眼.有关nodejs的相关资料网上已经铺天盖地.由于它的高并发特性,造就了其特殊的应用地位. 国内目前关注最高,维护最好的一个关于nodejs的网站应该是http://www.cnodejs.org/  这里不想谈太多的nodejs的相关信息.只说一下,windows系统下简单nodejs环境配置. 第一步:下载安装文件 下载地址:官网http://www.nodejs.org/download/  这里用的是

  • React + webpack 环境配置的方法步骤

    本文介绍了React + webpack 环境配置的方法步骤,分享给大家,具体如下: 安装配置Babel babel-preset-es2015 ES6语法包,使代码可以随意地使用ES6的新特性. babel-preset-react React语法包,专门用于React的优化,在代码中可以使用React ES6 classes的写法,同时直接支持JSX语法格式 安装Babel loader // 安装babel-core核心模块和babel-loader npm install babel-c

  • Angular2 环境配置详细介绍

    看到angular发布正式版,心动不已准备测试下. 看着官网教程,使用了cli创建项目,在命令行中键入: 安装cli npm install -g angular-cli 如果安装过以前的版本,请执行以下命令,进行更新: npm uninstall -g angular-cli npm cache clean npm install -g angular-cli@latest 旧版本的cli使用的是SystemJS而最新的创建,是基于webpack构建. 旧版本的angular2(rc1-rc6

随机推荐