PySpark与GraphFrames的安装与使用环境搭建过程

目录
  • PySpark环境搭建
    • 配置hadoop
    • 安装pyspark与Java
    • graphframes安装
  • 使用方法
    • 启动spark并读取数据
    • 启动hive支持
    • Spark的DataFrame与RDD
      • DataFrame的基础api
      • RDD的简介
      • RDD的API概览
      • 各类RDD
      • cache&checkpoint
    • graphframes的用法
      • Motiffinding(模式发现)
      • Subgraphs子图
      • GraphFrames支持的GraphX算法
    • PySpark3.X与pandas融合
      • 自定义UDF和UDAF
      • 分组聚合与JOIN
      • map迭代
      • Pyspark与Pandas的交互

PySpark环境搭建

配置hadoop

spark访问本地文件并执行运算时,可能会遇到权限问题或是dll错误。这是因为spark需要使用到Hadoop的winutils和hadoop.dll,首先我们必须配置好Hadoop相关的环境。可以到github下载:https://github.com/4ttty/winutils

gitcode提供了镜像加速:https://gitcode.net/mirrors/4ttty/winutils

我选择了使用这个仓库提供的最高的Hadoop版本3.0.0将其解压到D:\deploy\hadoop-3.0.0目录下,然后配置环境变量:

我们还需要将对应的hadoop.dll复制到系统中,用命令表达就是:

copy D:\deploy\hadoop-3.0.0\bin\hadoop.dll C:\Windows\System32

不过这步需要拥有管理员权限才可以操作。

为了能够在任何地方使用winutils命令工具,将%HADOOP_HOME%\bin目录加入环境变量中:

安装pyspark与Java

首先,我们安装spark当前(2022-2-17)的最新版本:

pip install pyspark==3.2.1

需要注意pyspark的版本决定了jdk的最高版本,例如假如安装2.4.5版本的pyspark就只能安装1.8版本的jdk,否则会报出java.lang.IllegalArgumentException: Unsupported class file major version 55的错误。

这是因为pyspark内置了Scala,而Scala是基于jvm的编程语言,Scala与jdk的版本存在兼容性问题,JDK与scala的版本兼容性表:

JDK version Minimum Scala versions Recommended Scala versions
17 2.13.6, 2.12.15 (forthcoming) 2.13.6, 2.12.15 (forthcoming)
16 2.13.5, 2.12.14 2.13.6, 2.12.14
13, 14, 15 2.13.2, 2.12.11 2.13.6, 2.12.14
12 2.13.1, 2.12.9 2.13.6, 2.12.14
11 2.13.0, 2.12.4, 2.11.12 2.13.6, 2.12.14, 2.11.12
8 2.13.0, 2.12.0, 2.11.0, 2.10.2 2.13.6, 2.12.14, 2.11.12, 2.10.7
6, 7 2.11.0, 2.10.0 2.11.12, 2.10.7

当前3.2.1版本的pyspark内置的Scala版本为2.12.15,意味着jdk17与其以下的所有版本都支持。

这里我依然选择安装jdk8的版本:

测试一下:

>java -version
java version "1.8.0_201"
Java(TM) SE Runtime Environment (build 1.8.0_201-b09)
Java HotSpot(TM) 64-Bit Server VM (build 25.201-b09, mixed mode)

jdk11的详细安装教程(jdk1.8在官网只有安装包,无zip绿化压缩包):

绿化版Java11的环境配置与Python调用Java
https://xxmdmst.blog.csdn.net/article/details/118366166

graphframes安装

pip安装当前最新的graphframes:

pip install graphframes==0.6

然后在官网下载graphframes的jar包。

下载地址:https://spark-packages.org/package/graphframes/graphframes

由于安装的pyspark版本是3.2,所以这里我选择了这个jar包:

然后将该jar包放入pyspark安装目录的jars目录下:

pyspark安装位置可以通过pip查看:

C:\Users\ASUS>pip show pyspark
Name: pyspark
Version: 3.2.1
Summary: Apache Spark Python API
Home-page: https://github.com/apache/spark/tree/master/python
Author: Spark Developers
Author-email: dev@spark.apache.org
License: http://www.apache.org/licenses/LICENSE-2.0
Location: d:\miniconda3\lib\site-packages
Requires: py4j
Required-by:

使用方法

学习pyspark的最佳路径是官网:https://spark.apache.org/docs/latest/quick-start.html

在下面的网页,官方提供了在线jupyter:

https://spark.apache.org/docs/latest/api/python/getting_started/index.html

启动spark并读取数据

本地模式启动spark:

from pyspark.sql import SparkSession, Row

spark = SparkSession \
    .builder \
    .appName("Python Spark") \
    .master("local[*]") \
    .getOrCreate()
sc = spark.sparkContext
spark

SparkSession输出的内容中包含了spark的web页面,新标签页打开页面后大致效果如上。

点击Environment选项卡可以查看当前环境中的变量:

启动hive支持

找到pyspark的安装位置,例如我的电脑在D:\Miniconda3\Lib\site-packages\pyspark

手动创建conf目录并将hive-site.xml配置文件复制到其中。如果hive使用了MySQL作为原数据库,则还需要将MySQL对应的驱动jar包放入spark的jars目录下。

创建spark会话对象时可通过enableHiveSupport()开启hive支持:

from pyspark.sql import SparkSession
from pyspark.sql import Row

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL Hive integration example") \
    .enableHiveSupport() \
    .getOrCreate()
sc = spark.sparkContext
spark

spark访问hive自己创建的表有可能会出现如下的权限报错:

Caused by: java.lang.RuntimeException: The root scratch dir: /tmp/hive on HDFS s
hould be writable. Current permissions are: rwx------

是因为当前用户不具备对\tmp\hive的操作权限:

>winutils ls \tmp\hive
drwx------ 1 BUILTIN\Administrators XIAOXIAOMING\None 0 May  4 2020 \tmp\hive

把\tmp\hive目录的权限改为777即可顺利访问:

>winutils chmod 777 \tmp\hive

>winutils ls \tmp\hive
drwxrwxrwx 1 BUILTIN\Administrators XIAOXIAOMING\None 0 May  4 2020 \tmp\hive

Spark的DataFrame与RDD

从spark2.x开始将RDD和DataFrame的API统一抽象成dataset,DataFrame就是Dataset[Row],RDD则是Dataset.rdd。可以将DataFrame理解为包含结构化信息的RDD。

将含row的RDD转换为DataFrame只需要调用toDF方法或SparkSession的createDataFrame方法即可,也可以传入schema覆盖类型或名称设置。

DataFrame的基础api

DataFrame默认支持DSL风格语法,例如:

//查看DataFrame中的内容
df.show()

//查看DataFrame部分列中的内容
df.select(df['name'], df['age'] + 1).show()
df.select("name").show()
//打印DataFrame的Schema信息
df.printSchema()
//过滤age大于等于 21 的
df.filter(df['age'] > 21).show()
//按年龄进行分组并统计相同年龄的人数
personDF.groupBy("age").count().show()

将DataFrame注册成表或视图之后即可进行纯SQL操作:

df.createOrReplaceTempView("people")
//df.createTempView("t_person")

//查询年龄最大的前两名
spark.sql("select * from t_person order by age desc limit 2").show()
//显示表的Schema信息
spark.sql("desc t_person").show()

Pyspark可以直接很方便的注册udf并直接使用:

strlen = spark.udf.register("len", lambda x: len(x))
print(spark.sql("SELECT len('test') length").collect())
print(spark.sql("SELECT 'foo' AS text").select(strlen("text").alias('length')).collect())

执行结果:

[Row(length='4')]
[Row(length='3')]

RDD的简介

DataFrame的本质是对RDD的包装,可以理解为DataFrame=RDD[Row]+schema。

RDD(A Resilient Distributed Dataset)叫做弹性可伸缩分布式数据集,是Spark中最基本的数据抽象。它代表一个不可变、自动容错、可伸缩性、可分区、里面的元素可并行计算的集合。

在每一个RDD内部具有五大属性:

  • 具有一系列的分区
  • 一个计算函数操作于每一个切片
  • 具有一个对其他RDD的依赖列表
  • 对于 key-value RDDs具有一个Partitioner分区器
  • 存储每一个切片最佳计算位置

一组分片(Partition),即数据集的基本组成单位。对于RDD来说,每个分片都会被一个计算任务处理,并决定并行计算的粒度。用户可以在创建RDD时指定RDD的分片个数,如果没有指定,那么就会采用默认值。默认值就是程序所分配到的CPU Core的数目。

**一个计算每个分区的函数。**Spark中RDD的计算是以分片为单位的,每个RDD都会实现compute函数以达到这个目的。compute函数会对迭代器进行复合,不需要保存每次计算的结果。

**RDD之间的依赖关系。**RDD的每次转换都会生成一个新的RDD,所以RDD之间就会形成类似于流水线一样的前后依赖关系。在部分分区数据丢失时,Spark可以通过这个依赖关系重新计算丢失的分区数据,而不是对RDD的所有分区进行重新计算。

**一个Partitioner,即RDD的分片函数。**当前Spark中实现了两种类型的分片函数,一个是基于哈希的HashPartitioner,另外一个是基于范围的RangePartitioner。只有对于于key-value的RDD,才会有Partitioner,非key-value的RDD的Parititioner的值是None。Partitioner函数不但决定了RDD本身的分片数量,也决定了parent RDD Shuffle输出时的分片数量。

**一个列表,存储存取每个Partition的优先位置(preferred location)。**对于一个HDFS文件来说,这个列表保存的就是每个Partition所在的块的位置。按照“移动数据不如移动计算”的理念,Spark在进行任务调度的时候,会尽可能地将计算任务分配到其所要处理数据块的存储位置。

RDD的API概览

RDD包含Transformation APIAction API,Transformation API都是延迟加载的只是记住这些应用到基础数据集上的转换动作,只有当执行Action API时这些转换才会真正运行。

Transformation API产生的两类RDD最重要,分别是MapPartitionsRDDShuffledRDD

产生MapPartitionsRDD的算子有map、keyBy、keys、values、flatMap、mapValues 、flatMapValues、mapPartitions、mapPartitionsWithIndex、glom、filter和filterByRange 。其中用的最多的是mapflatMap,但任何产生MapPartitionsRDD的算子都可以直接使用mapPartitionsmapPartitionsWithIndex实现。

产生ShuffledRDD的算子有combineByKeyWithClassTag、combineByKey、aggregateByKey、foldByKey 、reduceByKey 、distinct、groupByKey、groupBy、partitionBy、sortByKey 和 repartitionAndSortWithinPartitions。

combineByKey到groupByKey 底层均是调用combineByKeyWithClassTag方法:

@Experimental
def combineByKeyWithClassTag[C](
    createCombiner: V => C,
    mergeValue: (C, V) => C,
    mergeCombiners: (C, C) => C)(implicit ct: ClassTag[C]): RDD[(K, C)] = self.withScope {
  combineByKeyWithClassTag(createCombiner,mergeValue,mergeCombiners
       ,defaultPartitioner(self))
}
def combineByKey[C](
    createCombiner: V => C,
    mergeValue: (C, V) => C,
    mergeCombiners: (C, C) => C): RDD[(K, C)] = self.withScope {
  combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners)(null)
}
def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] = self.withScope {
  val createCombiner = (v: V) => CompactBuffer(v)
  val mergeValue = (buf: CompactBuffer[V], v: V) => buf += v
  val mergeCombiners = (c1: CompactBuffer[V], c2: CompactBuffer[V]) => c1 ++= c2
  val bufs = combineByKeyWithClassTag[CompactBuffer[V]](
    createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine = false)
  bufs.asInstanceOf[RDD[(K, Iterable[V])]]
}

三个重要参数的含义:

  • createCombiner:根据每个分区的第一个元素操作产生一个初始值
  • mergeValue:对每个分区内部的元素进行迭代合并
  • mergeCombiners:对所有分区的合并结果进行合并

groupByKey的partitioner未指定时会传入默认的defaultPartitioner。例如:

val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "spider", "eagle"), 2).keyBy(_.length)
a.groupByKey.collect

res9: Array[(Int, Iterable[String])] = Array((4,CompactBuffer(lion)), (6,CompactBuffer(spider)), (3,CompactBuffer(dog, cat)), (5,CompactBuffer(tiger, eagle)))

aggregateByKey:每个分区使用zeroValue作为初始值,迭代每一个元素用seqOp进行合并,对所有分区的结果用combOp进行合并。例如:

val pairRDD = sc.parallelize(List( ("cat",2), ("cat", 5), ("mouse", 4),("cat", 12), ("dog", 12), ("mouse", 2)), 2)
pairRDD.aggregateByKey(0)(math.max(_, _), _ + _).collect
res6: Array[(String, Int)] = Array((dog,12), (cat,17), (mouse,6))
pairRDD.aggregateByKey(100)(math.max(_, _), _ + _).collect
res7: Array[(String, Int)] = Array((dog,100), (cat,200), (mouse,200))

reduceByKey :每个分区迭代每一个元素用func进行合并,对所有分区的结果用func再进行合并,例如:

val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", "eagle"), 2)
val b = a.map(x => (x.length, x))
b.reduceByKey(_ + _).collect

Action API有:

动作 含义
reduce(func) 通过func函数聚集RDD中的所有元素,这个功能必须是课交换且可并联的
collect() 在驱动程序中,以数组的形式返回数据集的所有元素
count() 返回RDD的元素个数
first() 返回RDD的第一个元素(类似于take(1))
take(n) 返回一个由数据集的前n个元素组成的数组
takeSample(withReplacement*,*num, [seed]) 返回一个数组,该数组由从数据集中随机采样的num个元素组成,可以选择是否用随机数替换不足的部分,seed用于指定随机数生成器种子
takeOrdered(n, [ordering]) 排序并取前N个元素
saveAsTextFile(path) 将数据集的元素以textfile的形式保存到HDFS文件系统或者其他支持的文件系统,对于每个元素,Spark将会调用toString方法,将它装换为文件中的文本
saveAsSequenceFile(path) 将数据集中的元素以Hadoop sequencefile的格式保存到指定的目录下,可以使HDFS或者其他Hadoop支持的文件系统。
saveAsObjectFile(path) 将RDD中的元素用NullWritable作为key,实际元素作为value保存为sequencefile格式
countByKey() 针对(K,V)类型的RDD,返回一个(K,Int)的map,表示每一个key对应的元素个数。
foreach(func) 在数据集的每一个元素上,运行函数func进行更新。

spark模拟实现mapreduce版wordcount:

object MapreduceWordcount {
  def main(args: Array[String]): Unit = {
    import org.apache.spark._
    val sc: SparkContext = new SparkContext(new SparkConf().setAppName("wordcount").setMaster("local[*]"))
    sc.setLogLevel("WARN")

    import org.apache.hadoop.io.{LongWritable, Text}
    import org.apache.hadoop.mapred.TextInputFormat
    import org.apache.spark.rdd.HadoopRDD

    import scala.collection.mutable.ArrayBuffer

    def map(k: LongWritable, v: Text, collect: ArrayBuffer[(String, Int)]) = {
      for (word <- v.toString.split("\\s+"))
        collect += ((word, 1))
    }
    def reduce(key: String, value: Iterator[Int], collect: ArrayBuffer[(String, Int)]) = {
      collect += ((key, value.sum))
    }
    val rdd = sc.hadoopFile("/hdfs/wordcount/in1/*", classOf[TextInputFormat], classOf[LongWritable], classOf[Text], 2)
      .asInstanceOf[HadoopRDD[LongWritable, Text]]
      .mapPartitionsWithInputSplit((split, it) =>{
        val collect: ArrayBuffer[(String, Int)] = ArrayBuffer[(String, Int)]()
        it.foreach(kv => map(kv._1, kv._2, collect))
        collect.toIterator
      })
      .repartitionAndSortWithinPartitions(new HashPartitioner(2))
      .mapPartitions(it => {
        val collect: ArrayBuffer[(String, Int)] = ArrayBuffer[(String, Int)]()
        var lastKey: String = ""
        var values: ArrayBuffer[Int] = ArrayBuffer[Int]()
        for ((currKey, value) <- it) {
          if (!currKey.equals(lastKey)) {
            if (values.length != 0)
              reduce(lastKey, values.toIterator, collect)
            values.clear()
          }
          values += value
          lastKey = currKey
        }
        if (values.length != 0) reduce(lastKey, values.toIterator, collect)
        collect.toIterator
      })
    rdd.foreach(println)
  }
}

各类RDD

  • ShuffledRDD :表示需要走Shuffle过程的网络传输
  • CoalescedRDD :用于将一台机器的多个分区合并成一个分区
  • CartesianRDD :对两个RDD的所有元素产生笛卡尔积
  • MapPartitionsRDD :用于对每个分区的数据进行特定的处理
  • CoGroupedRDD :用于将2~4个rdd,按照key进行连接聚合
  • SubtractedRDD :用于对2个RDD求差集
  • UnionRDDPartitionerAwareUnionRDD :用于对2个RDD求并集
  • ZippedPartitionsRDD2:zip拉链操作产生的RDD
  • ZippedWithIndexRDD:给每一个元素标记一个自增编号
  • PartitionwiseSampledRDD:用于对rdd的元素按照指定的百分比进行随机采样

当我们需要给Datafream添加自增列时,可以使用zipWithUniqueId方法:

from pyspark.sql.types import StructType, LongType

schema = data.schema.add(StructField("id", LongType()))
rowRDD = data.rdd.zipWithUniqueId().map(lambda t: t[0]+Row(t[1]))
data = rowRDD.toDF(schema)
data.show()

API用法详情可参考:https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.html#pyspark.RDD

cache&checkpoint

RDD通过persist方法或cache方法可以将前面的计算结果缓存,但是并不是这两个方法被调用时立即缓存,而是触发后面的action时,该RDD将会被缓存在计算节点的内存中,并供后面重用。

rdd.persist()

checkpoint的源码注释可以看到:

  • 标记该RDD作为检查点。
  • 它将被保存在通过SparkContext#setCheckpointDir方法设置的检查点目录中
  • 它所引用的所有父RDD引用将全部被移除
  • 这个方法在这个RDD上必须在所有job执行前运行。
  • 强烈建议将这个RDD缓存在内存中,否则这个保存文件的计算任务将重新计算。

从中我们得知,在执行checkpoint方法时,最好同时,将该RDD缓存起来,否则,checkpoint也会产生一个计算任务。

sc.setCheckpointDir("checkpoint")
rdd.cache()
rdd.checkpoint()

graphframes 的用法

GraphFrame是将Spark中的Graph算法统一到DataFrame接口的Graph操作接口,为Scala、Java和Python提供了统一的图处理API。

Graphframes是开源项目,源码工程如下:https://github.com/graphframes/graphframes

可以参考:

  • 官网:https://graphframes.github.io/graphframes/docs/_site/index.html
  • GraphFrames用户指南-Python — Databricks文档:https://docs.databricks.com/spark/latest/graph-analysis/graphframes/user-guide-python.html

在GraphFrames中图的顶点(Vertex)和边(edge)都是以DataFrame形式存储的:

  • 顶点DataFrame:必须包含列名为“id”的列,用于作为顶点的唯一标识
  • 边DataFrame:必须包含列名为“src”和“dst”的列,根据唯一标识id标识关系

创建图的示例:

from graphframes import GraphFrame
vertices = spark.createDataFrame([
    ("a", "Alice", 34),
    ("b", "Bob", 36),
    ("c", "Charlie", 30),
    ("d", "David", 29),
    ("e", "Esther", 32),
    ("f", "Fanny", 36),
    ("g", "Gabby", 60)], ["id", "name", "age"])
edges = spark.createDataFrame([
    ("a", "b", "friend"),
    ("b", "c", "follow"),
    ("c", "b", "follow"),
    ("f", "c", "follow"),
    ("e", "f", "follow"),
    ("e", "d", "friend"),
    ("d", "a", "friend"),
    ("a", "e", "friend")
], ["src", "dst", "relationship"])
# 生成图
g = GraphFrame(vertices, edges)

GraphFrame提供三种视图:

print("顶点表视图:")
graph.vertices.show() # graph.vertices 就是原始的vertices
print("边表视图:")
graph.edges.show() # graph.edges 就是原始的 edges
print("三元组视图:")
graph.triplets.show()

获取顶点的度、入度和出度:

# 顶点的度
graph.degrees.show()
# 顶点的入度
graph.inDegrees.show()
# 顶点的出度
graph.outDegrees.show()

Motif finding (模式发现)

示例:

# 多个路径条件
motif = graph.find("(a)-[e]->(b); (b)-[e2]->(a)")
# 在搜索的结果上进行过滤
motif.filter("b.age > 30")
# 不需要返回路径中的元素时,可以使用匿名顶点和边
motif = graph.find("(start)-[]->()")
# 设置路径不存在的条件
motif = graph.find("(a)-[]->(b); !(b)-[]->(a)")

假设我们要想给用户推荐关注的人,可以找出这样的关系:A关注B,B关注C,但是A未关注C。找出这样的关系就可以把C推荐给A:

# Motif: A->B->C but not A->C
results = graph.find("(A)-[]->(B); (B)-[]->(C); !(A)-[]->(C)")
# 排除自己
results = results.filter("A.id != C.id")
# 选择需要的列
results = results.select(results.A.id.alias("A"), results.C.id.alias("C"))
results.show()

结果:

+---+---+
|  A|  C|
+---+---+
|  e|  c|
|  e|  a|
|  d|  b|
|  a|  d|
|  f|  b|
|  d|  e|
|  a|  f|
|  a|  c|
+---+---+

Motif在查找路径过程的过程中,还可以沿着路径携带状态。例如我们想要找出关系链有4个顶点,而且其中3条边全部都是"friend"关系:

from pyspark.sql.functions import col, lit, when
from functools import reduce
chain4 = graph.find("(a)-[ab]->(b); (b)-[bc]->(c); (c)-[cd]->(d)")
def sumFriends(cnt, relationship):
    "定义下一个顶点更新状态的条件:如果关系为friend则cnt+1"
    return when(relationship == "friend", cnt+1).otherwise(cnt)
# 将更新方法应用到整个链的,链上每有一个关系是 friend 就加一,链上共三个关系。
condition = reduce(lambda cnt, e: sumFriends(
    cnt, col(e).relationship), ["ab", "bc", "cd"], lit(0))
chainWith2Friends2 = chain4.where(condition >= 3)
chainWith2Friends2.show()

结果:

+---------------+--------------+---------------+--------------+---------------+--------------+---------------+
|              a|            ab|              b|            bc|              c|            cd|              d|
+---------------+--------------+---------------+--------------+---------------+--------------+---------------+
| {a, Alice, 34}|{a, e, friend}|{e, Esther, 32}|{e, d, friend}| {d, David, 29}|{d, a, friend}| {a, Alice, 34}|
|{e, Esther, 32}|{e, d, friend}| {d, David, 29}|{d, a, friend}| {a, Alice, 34}|{a, b, friend}|   {b, Bob, 36}|
| {d, David, 29}|{d, a, friend}| {a, Alice, 34}|{a, e, friend}|{e, Esther, 32}|{e, d, friend}| {d, David, 29}|
|{e, Esther, 32}|{e, d, friend}| {d, David, 29}|{d, a, friend}| {a, Alice, 34}|{a, e, friend}|{e, Esther, 32}|
+---------------+--------------+---------------+--------------+---------------+--------------+---------------+

Subgraphs 子图

可以直接过滤其顶点或边,dropIsolatedVertices()方法用于删除孤立没有连接的点:

graph.filterVertices("age > 30").filterEdges("relationship = 'friend'").dropIsolatedVertices()

还可以基于模式发现获取到的边创建Subgraphs :

paths = graph.find("(a)-[e]->(b)")\
    .filter("e.relationship = 'follow'")\
    .filter("a.age < b.age")
# 抽取边信息e2 = paths.select("e.src", "e.dst", "e.relationship")
e2 = paths.select("e.*")
# 创建Subgraphs
g2 = GraphFrame(graph.vertices, e2)

GraphFrames支持的GraphX算法

  • PageRank:查找图中的重要顶点。
  • 广度优先搜索(BFS):查找从一组顶点到另一组顶点的最短路径
  • 连通组件(ConnectedComponents):为具备连接关系的顶点分配相同的组件ID
  • 强连通组件(StronglyConnectedConponents):根据每个顶点的强连通分量分配SCC。
  • 最短路径(Shortest paths):查找从每个顶点到目标顶点集的最短路径。
  • 三角形计数(TriangleCount):计算每个顶点所属的三角形的数量,经常用于确定组的稳定性(相互连接的数量代表了稳定性)或作为其他网络度量(如聚类系数)的一部分,在社交网络分析中用来检测社区。
  • 标签传播算法(LPA):检测图中的社区。

pageRank算法:

results = graph.pageRank(resetProbability=0.15, maxIter=10)
results.vertices.sort("pagerank", ascending=False).show()

结果:

+---+-------+---+-------------------+
| id|   name|age|           pagerank|
+---+-------+---+-------------------+
|  b|    Bob| 36| 2.7025217677349773|
|  c|Charlie| 30| 2.6667877057849627|
|  a|  Alice| 34| 0.4485115093698443|
|  e| Esther| 32| 0.3613490987992571|
|  f|  Fanny| 36|0.32504910549694244|
|  d|  David| 29|0.32504910549694244|
|  g|  Gabby| 60|0.17073170731707318|
+---+-------+---+-------------------+

可以设置起始顶点:

graph.pageRank(resetProbability=0.15, maxIter=10, sourceId="a")
graph.parallelPersonalizedPageRank(resetProbability=0.15, sourceIds=["a", "b", "c", "d"], maxIter=10)

广度优先搜索BFS:

搜索从姓名叫Esther到年龄小于32的最小路径:

paths = graph.bfs("name = 'Esther'", "age < 32")
paths.show()
+--------------+--------------+---------------+
|          from|            e0|             to|
+--------------+--------------+---------------+
|{a, Alice, 34}|{a, e, friend}|{e, Esther, 32}|
+--------------+--------------+---------------+

可以指定只能在指定的边搜索:

graph.bfs("name = 'Esther'",
          "age < 32",
          edgeFilter="relationship != 'friend'",
          maxPathLength=4
          ).show()
+---------------+--------------+--------------+--------------+----------------+
|           from|            e0|            v1|            e1|              to|
+---------------+--------------+--------------+--------------+----------------+
|{e, Esther, 32}|{e, f, follow}|{f, Fanny, 36}|{f, c, follow}|{c, Charlie, 30}|
+---------------+--------------+--------------+--------------+----------------+

Connected components 连通组件:

必须先设置检查点:

sc.setCheckpointDir("checkpoint")

graph.connectedComponents().show()

结果:

+---+-------+---+------------+
| id|   name|age|   component|
+---+-------+---+------------+
|  a|  Alice| 34|412316860416|
|  b|    Bob| 36|412316860416|
|  c|Charlie| 30|412316860416|
|  d|  David| 29|412316860416|
|  e| Esther| 32|412316860416|
|  f|  Fanny| 36|412316860416|
|  g|  Gabby| 60|146028888064|
+---+-------+---+------------+

可以看到仅g点在一个连通区域内,可以调用dropIsolatedVertices()方法,删除这种孤立的没有连接的点:

graph.dropIsolatedVertices().connectedComponents().show()

结果:

+---+-------+---+------------+
| id|   name|age|   component|
+---+-------+---+------------+
|  a|  Alice| 34|412316860416|
|  b|    Bob| 36|412316860416|
|  c|Charlie| 30|412316860416|
|  d|  David| 29|412316860416|
|  e| Esther| 32|412316860416|
|  f|  Fanny| 36|412316860416|
+---+-------+---+------------+

Strongly connected components 强连通组件:

graph.stronglyConnectedComponents(maxIter=10).show()

Shortest paths 最短路径:

每个顶点到a或d的最短路径:

graph.shortestPaths(landmarks=["a", "d"]).show()
+---+-------+---+----------------+
| id|   name|age|       distances|
+---+-------+---+----------------+
|  g|  Gabby| 60|              {}|
|  f|  Fanny| 36|              {}|
|  e| Esther| 32|{a -> 2, d -> 1}|
|  d|  David| 29|{a -> 1, d -> 0}|
|  c|Charlie| 30|              {}|
|  b|    Bob| 36|              {}|
|  a|  Alice| 34|{a -> 0, d -> 2}|
+---+-------+---+----------------+

Triangle count 三角形计数:

graph.triangleCount().show()
+-----+---+-------+---+
|count| id|   name|age|
+-----+---+-------+---+
|    1|  a|  Alice| 34|
|    0|  b|    Bob| 36|
|    0|  c|Charlie| 30|
|    1|  d|  David| 29|
|    1|  e| Esther| 32|
|    0|  g|  Gabby| 60|
|    0|  f|  Fanny| 36|
+-----+---+-------+---+

说明顶点a/e/d构成三角形。

标签传播算法(LPA):

graph.labelPropagation(maxIter=5).orderBy("label").show()
+---+-------+---+-------------+
| id|   name|age|        label|
+---+-------+---+-------------+
|  g|  Gabby| 60| 146028888064|
|  f|  Fanny| 36|1047972020224|
|  b|    Bob| 36|1047972020224|
|  a|  Alice| 34|1382979469312|
|  c|Charlie| 30|1382979469312|
|  e| Esther| 32|1460288880640|
|  d|  David| 29|1460288880640|
+---+-------+---+-------------+

PySpark3.X与pandas融合

Pyspark从3.0版本开始出现了pandas_udf装饰器、applyInPandas和mapInPandas,基于这些方法,我们就可以使用熟悉的pandas的语法处理spark对象的数据。

首先创建几条测试数据,并启动 Apache Arrow

spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")

df = spark.createDataFrame(
    [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
    ("id", "v"))
df.show()

自定义UDF和UDAF

pyspark暂不支持自定义UDTF。

使用pandas_udf装饰器我们可以创建出基于pandas的udf自定义函数,在DSL的语法中可以被直接使用:

from pyspark.sql.functions import pandas_udf
import pandas as pd

@pandas_udf("double")
def multiply_func(a: pd.Series, b: pd.Series) -> pd.Series:
    return a * b
df.select(multiply_func("id", "v").alias("product")).show()

注册函数和视图后,可以直接在SQL中使用:

df.createOrReplaceTempView("t")
spark.udf.register("multiply", multiply_func)
spark.sql('select multiply(id, v) product from t').show()

结果均为:

+-------+
|product|
+-------+
|    1.0|
|    2.0|
|    6.0|
|   10.0|
|   20.0|
+-------+

还支持聚合函数和窗口函数:

from pyspark.sql import Window

@pandas_udf("double")
def mean_udf(v: pd.Series) -> float:
    return v.mean()
# 对字段'v'进行求均值
df.select(mean_udf('v').alias("mean_v")).show()
# 按照‘id'分组,求'v'的均值
df.groupby("id").agg(mean_udf('v').alias("mean_v")).show()
# 按照‘id'分组,求'v'的均值,并赋值给新的一列
df.withColumn('mean_v', mean_udf("v").over(Window.partitionBy('id'))).show()

注册到udf之后同样可以直接使用SQL实现:

spark.udf.register("mean2", mean_udf)
spark.sql('select mean2(v) mean_v from t').show()
spark.sql('select id,mean2(v) mean_v from t group by id').show()
spark.sql('select id,v,mean2(v) over(partition by id) mean_v from t').show()

结果均为:

+--------+
| mean_v |
+--------+
|     4.2|
+--------+

+---+--------+
| id| mean_v |
+---+--------+
|  1|     1.5|
|  2|     6.0|
+---+--------+

+---+----+------+
| id|   v|mean_v|
+---+----+------+
|  1| 1.0|   1.5|
|  1| 2.0|   1.5|
|  2| 3.0|   6.0|
|  2| 5.0|   6.0|
|  2|10.0|   6.0|
+---+----+------+

分组聚合与JOIN

applyInPandas需要在datafream调用groupby之后才能使用:

def subtract_mean(pdf):
    v = pdf.v
    pdf['v1'] = v - v.mean()
    pdf['v2'] = v + v.mean()
    return pdf
t = df.groupby("id")
t.applyInPandas(
    subtract_mean, schema="id long, v double, v1 double, v2 double").show()

结果:

+---+----+----+----+
| id|   v|  v1|  v2|
+---+----+----+----+
|  1| 1.0|-0.5| 2.5|
|  1| 2.0| 0.5| 3.5|
|  2| 3.0|-3.0| 9.0|
|  2| 5.0|-1.0|11.0|
|  2|10.0| 4.0|16.0|
+---+----+----+----+

subtract_mean函数接收的是对应id的dataframe数据,schema指定了返回值的名称和类型列表。

通过以下代码我们可以知道,applyInPandas可以借助cogroup进行表连接:

val a = sc.parallelize(List(1, 2, 1, 3))
val b = a.map((_, "b"))
val c = a.map((_, "c"))
val d = a.map((_, "d"))
val e = a.map((_, "e"))
scala> b.cogroup(c).foreach(println)
(3,(CompactBuffer(b),CompactBuffer(c)))
(1,(CompactBuffer(b, b),CompactBuffer(c, c)))
(2,(CompactBuffer(b),CompactBuffer(c)))

示例:

df1 = spark.createDataFrame(
    [(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],
    ("time", "id", "v1"))

df2 = spark.createDataFrame(
    [(20000101, 1, "x"), (20000101, 2, "y")],
    ("time", "id", "v2"))

def asof_join(l, r):
  	# l、r is a pandas.DataFrame
  	# 这里是按照id分组
  	# 那么,l和r分别是对应id的df1和df2数据
    return pd.merge_asof(l, r, on="time", by="id")

df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(
    asof_join, schema="time int, id int, v1 double, v2 string").show()
# +--------+---+---+---+
# |    time| id| v1| v2|
# +--------+---+---+---+
# |20000101|  1|1.0|  x|
# |20000102|  1|3.0|  x|
# |20000101|  2|2.0|  y|
# |20000102|  2|4.0|  y|
# +--------+---+---+---+

map迭代

执行以下代码:

def filter_func(iterator):
    for i, pdf in enumerate(iterator):
        print(i, pdf.values.tolist())
        yield pdf

df.mapInPandas(filter_func, schema=df.schema).show()

后台看到执行结果为:

0 [[2.0, 5.0]]                                                     
0 [[2.0, 3.0]]
0 [[1.0, 1.0]]
0 [[1.0, 2.0]]
0 [[2.0, 10.0]]

前台结果几乎保持原样。可以知道iterator是一个分区迭代器,迭代出当前分区的每一行数据都被封装成一个pandas对象。

Pyspark与Pandas的交互

将spark的Datafream对象转换为原生的pandas对象只需调用toPandas()方法即可:

sdf.toPandas()

将原生的pandas对象转换为spark对象可以使用spark的顶级方法:

spark.createDataFrame(pdf)

习惯使用pandas的童鞋,还可以直接使用pandas-on-Spark,在spark3.2.0版本时已经匹配到pandas 1.3版本的API。通过pandas-on-Spark,我们可以完全用pandas的api操作数据,而底层执行却是spark的并行化。

使用pandas-on-Spark最好设置一下环境变量:

import os
os.environ["PYARROW_IGNORE_TIMEZONE"] = "1"

将spark对象转换为pandas-on-Spark对象:

df = spark.createDataFrame(
    [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
    ("id", "v"))

pdf = df.to_pandas_on_spark()
print(type(pdf))
pdf

pandas-on-Spark对象也可以还原成spark对象:

pdf.to_spark()

另外spark提供直接将文件读取成pandas-on-Spark对象的api,例如:

import pyspark.pandas as ps

pdf = ps.read_csv("example_csv.csv")

ps对象与原生pandas对象的API几乎完全一致。

ps对象相对于原生pandas对象的API几乎一致,同时还支持一些强悍的功能,例如直接以SQL形式访问:

ps.sql("SELECT count(*) as num FROM {pdf}")

{pdf}访问了变量名为pdf的pandas-on-Spark对象。

到此这篇关于PySpark与GraphFrames的安装与使用的文章就介绍到这了,更多相关PySpark与GraphFrames使用内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • MAC下Anaconda+Pyspark安装配置详细步骤

    在MAC的Anaconda上使用pyspark,主要包括以下步骤: 在MAC下安装Spark,并配置环境变量. 在Anaconda中安装引用pyspark. 1. MAC下安装Spark   到Apark Spark官网上下载Spark文件,无论是windows系统,还是MAC系统,亦或者Linux系统,都可以下载这个文件(独立于系统). 将下载的文件进行解压(可以使用命令行进行解压,也可以使用解压软件).解压之后的文件如下: 配置环境变量.打开MAC命令行窗口,输入如下命令: sudo vi

  • 从PySpark中的字符串获取列表方法讲解

    在 PySpark 中是否有类似eval的功能. 我正在尝试将 Python 代码转换为 PySpark 我正在查询一个数据框,并且其中一列具有数据,如下所示,但采用字符串格式. [{u'date': u'2015-02-08', u'by': u'abc@gg.com', u'value': u'NA'}, {u'date': u'2016-02-08', u'by': u'dfg@yaa.com', u'value': u'applicable'}, {u'date': u'2017-02-

  • PySpark与GraphFrames的安装与使用环境搭建过程

    目录 PySpark环境搭建 配置hadoop 安装pyspark与Java graphframes安装 使用方法 启动spark并读取数据 启动hive支持 Spark的DataFrame与RDD DataFrame的基础api RDD的简介 RDD的API概览 各类RDD cache&checkpoint graphframes的用法 Motiffinding(模式发现) Subgraphs子图 GraphFrames支持的GraphX算法 PySpark3.X与pandas融合 自定义UD

  • Android Studio安装配置、环境搭建详细步骤及基本使用的详细教程

    前言 Android Studio的安装配置及使用篇终于来啦~ 废话不多说,以下针对JDK正确安装(及其环境变量配置完毕,即Java开发环境下).Android Studio的安装,配置,以及创建工程.主题字体更换.窗口工具.布局.快捷方式等的基本使用逐一说明. 安装java 下载Java安装包(jdk,网上有很多下载地址,最好去官网下:https://www.java.com/zh_CN/),安装完后记得配置环境变量: 在"系统变量"新建一个变量名为JAVA_HOME的变量,变量值为

  • python安装和pycharm环境搭建设置方法

    一.python 开发工具的选择 python开发工具有很多,这里推荐使用 pycharm:因为jetbrains公司拥有众多的开发工具以及开发拥护者.这些工具有相同的风格和各类使用案例.描述问题的博客,可供初学者参考. 1.pycharm professional edition - 专业版,可以试用30天 https://download.jetbrains.8686c.com/python/pycharm-professional-2020.1.1.exe jetbrains pychar

  • Python环境搭建过程从安装到Hello World

    开发环境 安装最新版Python 下载地址:https://www.python.org/downloads/ 运行Python 1.交互方式运行 用自带的IDLE 打开cmd输入python 2.程序方式运行 新建hello.py print("Hello World") #打印语句 input() #输入(起暂停作用) 直接双击该文件 用cmd运行 UNIX/Linux下,也是python+文件名,记得用chomod命令设置执行权限 %python hello.py %./hell

  • 初学者AngularJS的环境搭建过程

    AngularJS是什么? AngularJS是一个开源Web应用程序框架.它最初是由MISKO Hevery和Adam Abrons于2009年开发.现在是由谷歌维护 AngularJS特性 AngularJS是一个功能强大的基于JavaScript开发框架用于创建富互联网应用(RIA). AngulajJS为开发者提供的选项(使用JavaScript)在一个干净的MVC(模型 - 视图 - 控制器)的方式来编写客户端应用程序. AngularJS写的应用都是跨浏览器兼容.AngularJS使

  • 阿里云go开发环境搭建过程

    开通了一个阿里云来玩,记录一下环境搭建的过程 运行环境 ECS Ubuntu 16.04 64位 过程 #切换到安装文件夹 cd /usr/local #下载go #由于墙的原因,直接下载官方的可能会失败,这里用国内一个论坛的 wget https://dl.gocn.io/golang/1.8.4/go1.8.4.linux-amd64.tar.gz #解压 tar -zxvf go1.8.4.linux-amd64.tar.gz #创建工作目录 mkdir -p GOPATH goProje

  • spring cloud 阿波罗 apollo 本地开发环境搭建过程

    开源配置中心 - Apollo Apollo(阿波罗)是携程框架部门研发的配置管理平台,能够集中化管理应用不同环境.不同集群的配置,配置修改后能够实时推送到应用端,并且具备规范的权限.流程治理等特性.服务端基于Spring Boot和Spring Cloud开发,打包后可以直接运行,不需要额外安装Tomcat等应用容器. 检出代码 apollo github 可以fork下然后本地使用idea打开 数据库脚本 执行以下脚本创建ApolloConifgDB.ApolloPortalDB apoll

  • Hadoop-3.1.2完全分布式环境搭建过程图文详解(Windows 10)

    一.前言 Hadoop原理架构本人就不在此赘述了,可以自行百度,本文仅介绍Hadoop-3.1.2完全分布式环境搭建(本人使用三个虚拟机搭建). 首先,步骤: ① 准备安装包和工具: hadoop-3.1.2.tar.gz ◦ jdk-8u221-linux-x64.tar.gz(Linux环境下的JDK) ◦ CertOS-7-x86_64-DVD-1810.iso(CentOS镜像) ◦工具:WinSCP(用于上传文件到虚拟机),SecureCRTP ortable(用于操作虚拟机,可复制粘

  • Hadoop2.8.1完全分布式环境搭建过程

    前言 本文搭建了一个由三节点(master.slave1.slave2)构成的Hadoop完全分布式集群(区别单节点伪分布式集群),并通过Hadoop分布式计算的一个示例测试集群的正确性. 本文集群三个节点基于三台虚拟机进行搭建,节点安装的操作系统为Centos7(yum源),Hadoop版本选取为2.8.0.作者也是初次搭建Hadoop集群,其间遇到了很多问题,故希望通过该博客让读者避免. 实验过程 1.基础集群的搭建 目的:获得一个可以互相通信的三节点集群 下载并安装VMware WorkS

  • Hadoop环境搭建过程中遇到的问题及解决方法

    1.启动hadoop之前,ssh免密登录slave主机正常,使用命令start-all.sh启动hadoop时,需要输入slave主机的密码,说明ssh文件权限有问题,需要执行以下操作: 1)进入.ssh目录下查看是否有公钥私钥文件authorized_keys.id_rsa.id_rsa.pub 2)如果没有公钥私钥文件,则执行ssh-keygen -t rsa生成秘钥(master主机和slave主机都需要执行) 3)公钥私钥文件生成完成后,执行cat id_rsa.pub >> auth

随机推荐