Spark调度架构原理详解

1.启动spark集群,就是执行sbin/start-all.sh,启动master和多个worker节点,master主要作为集群的管理和监控,worker节点主要担任运行各个application的任务。master节点需要让worker节点汇报自身状况,比如CPU,内存多大,这个过程都是通过心跳机制来完成的

2.master收到worker的汇报信息之后,会给予worker信息

3.driver提交任务给spark集群[driver和master之间的通信是通过AKKAactor来做的,也就是说master是akkaactor异步通信模型中的一个actor模型,driver也是一样,driver异步向mater发送注册信息(registerApplication)异步注册信息]

4.master节点对application预估,7个G的内存完成任务,对任务进行分配,每一个worker节点上都分配3.5G的内存去执行任务,在master就对各个worker上的任务进行整体的监控调度

5.worker节点领到任务,开始执行,在worker节点上启动相应的executor进程来执行,每个executor中都有一个线程池的概念,里面存有多个task线程

6.executor会从线程池中取出task去计算rddpatition中的数据,transformation操作,action操作

7.worker节点向driver节点汇报计算状态

通过本地并行化集合创建RDD

public class JavaLocalSumApp{
	public static void main(String[] args){
		SparkConf conf = new SparkConf().setAppName("JavaLocalSumApp");
		JavaSparkContext sc = new JavaSparkContext(conf);
		List<Integer> list = Arrays.asList(1,3,4,5,6,7,8);
		//通过本地并行化集合创建RDD
		JavaRDD <Integer> listRDD = sc.parallelize(list);
		//求和
		Integer sum = listRDD.reduce(new Function2<Integer,Integer,Integer,Integer>(){
			@Override
			    public Integer call(Integer v1,Integer v2) throws Exception{
				return v1+v2;
			}
		}
		);
		System.out.println(sum)
	}
}
//java 中的函数式编程,需要将编译器设置成1.8
listRDD.reduce((v1,v2)=> v1+v2)

Sparktransformation和action操作

RDD:弹性分布式数据集,是一种集合,支持多种来源,有容错机制,可以被缓存,支持并行操作,一个RDD代表一个分区里的数据集

RDD有两种操作算子:

Transformation(转化):Transformation属于延迟计算,当一个RDD转换成另一个RDD时并没有立即进行转换,紧紧是记住了数据集的逻辑操作

Action(执行):触发Spark作业的运行,真正触发转换算子的计算

spark算子的作用

该图描述的是Spark在运行转换中通过算子对RDD进行转换,算子是RDD中定义的函数,可以对RDD中的数据进行转换和操作。

输入:在Spark程序运行中,数据从外部数据空间(如分布式存储:textFile读取HDFS等,parallelize方法输入Scala集合或数据)输入Spark ,数据进入Spark运行时数据空间,转化为Spark中的数据块,通过BlockManager进行管理

运行:在Spark数据输入形成RDD后便可以通过变换算子,如filter等。对数据进行操作并将RDD转换为新的RDD,通过Action算子,触发Spark提交作业,如果数据需要复用,可以通过Cache算子,将数据缓存到内存

输出:程序运行结束数据会输出Spark运行时空间,存储到分布式存储中(如saveAsTextFile输出到HDFS),或Scala数据或集合中(collect输出到Scala集合,count返回Scala int 型数据)

Transformation 和 Actions操作概况

Transformation

map(func):返回一个新的分布式数据集,由每个原元素经过func函数转换后组成
filter(func) :返回一个新的数据集,由经过func函数
flatMap(func):类似于map,但是每一个输入元素,会被映射为0到多个输出元素(因此,func函数的返回值是一个Seq,而不是单一元素)
sample(withReplacement, frac, seed): 根据给定的随机种子seed,随机抽样出数量为frac的数据
union(otherDataset): 返回一个新的数据集,由原数据集和参数联合而成
roupByKey([numTasks]): 在一个由(K,V)对组成的数据集上调用,返回一个(K,Seq[V])对的数据集。注意:默认情况下,使用8个并行任务进行分组,你可以传入numTask可选参数,根据数据量设置不同数目的Task
reduceByKey(func, [numTasks]): 在一个(K,V)对的数据集上使用,返回一个(K,V)对的数据集,key相同的值,都被使用指定的reduce函数聚合到一起。和groupbykey类似,任务的个数是可以通过第二个可选参数来配置的。
join(otherDataset, [numTasks]): 在类型为(K,V)和(K,W)类型的数据集上调用,返回一个(K,(V,W))对,每个key中的所有元素都在一起的数据集
groupWith(otherDataset, [numTasks]): 在类型为(K,V)和(K,W)类型的数据集上调用,返回一个数据集,组成元素为(K, Seq[V], Seq[W]) Tuples。这个操作在其它框架,称为CoGroup
cartesian(otherDataset): 笛卡尔积。但在数据集T和U上调用时,返回一个(T,U)对的数据集,所有元素交互进行笛卡尔积。

Actions操作

reduce(func): 通过函数func聚集数据集中的所有元素。Func函数接受2个参数,返回一个值。这个函数必须是关联性的,确保可以被正确的并发执行
collect(): 在Driver的程序中,以数组的形式,返回数据集的所有元素。这通常会在使用filter或者其它操作后,返回一个足够小的数据子集再使用,直接将整个RDD集Collect返回,很可能会让Driver程序OOM
count(): 返回数据集的元素个数
take(n): 返回一个数组,由数据集的前n个元素组成。注意,这个操作目前并非在多个节点上,并行执行,而是Driver程序所在机器,单机计算所有的元素(Gateway的内存压力会增大,需要谨慎使用)
first(): 返回数据集的第一个元素(类似于take(1))
saveAsTextFile(path): 将数据集的元素,以textfile的形式,保存到本地文件系统,hdfs或者任何其它hadoop支持的文件系统。Spark将会调用每个元素的toString方法,并将它转换为文件中的一行文本
saveAsSequenceFile(path): 将数据集的元素,以sequencefile的格式,保存到指定的目录下,本地系统,hdfs或者任何其它hadoop支持的文件系统。RDD的元素必须由key-value对组成,并都实现了Hadoop的Writable接口,或隐式可以转换为Writable(Spark包括了基本类型的转换,例如Int,Double,String等等)
foreach(func): 在数据集的每一个元素上,运行函数func。这通常用于更新一个累加器变量,或者和外部存储系统做交互

WordCount执行过程

总结

以上就是本文关于Spark 调度架构原理详解的全部内容,希望对大家有所帮助。感兴趣的朋友可以继续参阅本站其他相关专题,如有不足之处,欢迎留言指出。感谢朋友们对本站的支持!

您可能感兴趣的文章:

  • 浅谈七种常见的Hadoop和Spark项目案例
  • SparkGraphx计算指定节点的N度关系节点源码
  • spark之Standalone模式部署配置详解
  • Spark实现K-Means算法代码示例
  • Spark三种属性配置方式详解
  • 浅谈Spark RDD API中的Map和Reduce
  • Spark整合Mongodb的方法
  • 如何为Spark Application指定不同的JDK版本详解
(0)

相关推荐

  • spark之Standalone模式部署配置详解

    spark运行模式 Spark 有很多种模式,最简单就是单机本地模式,还有单机伪分布式模式,复杂的则运行在集群中,目前能很好的运行在 Yarn和 Mesos 中,当然 Spark 还有自带的 Standalone 模式,对于大多数情况 Standalone 模式就足够了,如果企业已经有 Yarn 或者 Mesos 环境,也是很方便部署的. 1.local(本地模式):常用于本地开发测试,本地还分为local单线程和local-cluster多线程; 2.standalone(集群模式):典型的M

  • 如何为Spark Application指定不同的JDK版本详解

    前言 随着企业内部业务系统越来越多,基于JVM的服务,通常情况线上环境可能会有多套JDK跑不同的服务.大家都知道基于高版本的Java规范编写的服务跑在低版本的JVM上会出现:java.lang.UnsupportedClassVersionError的异常. Spark 2.2开始移除了对Java 7的支持,大多数情况下,我们的Spark Application是和Hadoop系统公用的JDK,如果Hadoop依赖的JDK版本是7,那我们基于JDK 8编写的Application跑在上面就会出问

  • Spark三种属性配置方式详解

    随着Spark项目的逐渐成熟, 越来越多的可配置参数被添加到Spark中来.在Spark中提供了三个地方用于配置: 1.Spark properties:这个可以控制应用程序的绝大部分属性.并且可以通过 SparkConf对象或者Java 系统属性进行设置: 2.环境变量(Environment variables):这个可以分别对每台机器进行相应的设置,比如IP.这个可以在每台机器的$SPARK_HOME/ conf/spark-env.sh脚本中进行设置: 3.日志:所有的日志相关的属性可以

  • 浅谈七种常见的Hadoop和Spark项目案例

    有一句古老的格言是这样说的,如果你向某人提供你的全部支持和金融支持去做一些不同的和创新的事情,他们最终却会做别人正在做的事情.如比较火爆的Hadoop.Spark和Storm,每个人都认为他们正在做一些与这些新的大数据技术相关的事情,但它不需要很长的时间遇到相同的模式.具体的实施可能有所不同,但根据我的经验,它们是最常见的七种项目. 项目一:数据整合 称之为"企业级数据中心"或"数据湖",这个想法是你有不同的数据源,你想对它们进行数据分析.这类项目包括从所有来源获得

  • Spark整合Mongodb的方法

    Spark介绍 按照官方的定义,Spark 是一个通用,快速,适用于大规模数据的处理引擎. 通用性:我们可以使用Spark SQL来执行常规分析, Spark Streaming 来流数据处理, 以及用Mlib来执行机器学习等.Java,python,scala及R语言的支持也是其通用性的表现之一. 快速: 这个可能是Spark成功的最初原因之一,主要归功于其基于内存的运算方式.当需要处理的数据需要反复迭代时,Spark可以直接在内存中暂存数据,而无需像Map Reduce一样需要把数据写回磁盘

  • Spark实现K-Means算法代码示例

    K-Means算法是一种基于距离的聚类算法,采用迭代的方法,计算出K个聚类中心,把若干个点聚成K类. MLlib实现K-Means算法的原理是,运行多个K-Means算法,每个称为run,返回最好的那个聚类的类簇中心.初始的类簇中心,可以是随机的,也可以是KMean||得来的,迭代达到一定的次数,或者所有run都收敛时,算法就结束. 用Spark实现K-Means算法,首先修改pom文件,引入机器学习MLlib包: <dependency> <groupId>org.apache.

  • 浅谈Spark RDD API中的Map和Reduce

    RDD是什么? RDD是Spark中的抽象数据结构类型,任何数据在Spark中都被表示为RDD.从编程的角度来看,RDD可以简单看成是一个数组.和普通数组的区别是,RDD中的数据是分区存储的,这样不同分区的数据就可以分布在不同的机器上,同时可以被并行处理.因此,Spark应用程序所做的无非是把需要处理的数据转换为RDD,然后对RDD进行一系列的变换和操作从而得到结果.本文为第一部分,将介绍Spark RDD中与Map和Reduce相关的API中. 如何创建RDD? RDD可以从普通数组创建出来,

  • SparkGraphx计算指定节点的N度关系节点源码

    直接上代码: package horizon.graphx.util import java.security.InvalidParameterException import horizon.graphx.util.CollectionUtil.CollectionHelper import org.apache.spark.graphx._ import org.apache.spark.rdd.RDD import org.apache.spark.storage.StorageLevel

  • Spark调度架构原理详解

    1.启动spark集群,就是执行sbin/start-all.sh,启动master和多个worker节点,master主要作为集群的管理和监控,worker节点主要担任运行各个application的任务.master节点需要让worker节点汇报自身状况,比如CPU,内存多大,这个过程都是通过心跳机制来完成的 2.master收到worker的汇报信息之后,会给予worker信息 3.driver提交任务给spark集群[driver和master之间的通信是通过AKKAactor来做的,也

  • React为什么需要Scheduler调度器原理详解

    目录 正文 我们为什么需要Scheduler(调度器) Scheduler如何进行工作 总结 正文 最近在重学React,由于近两年没使用React突然重学发现一些很有意思的概念,首先便是React的Scheduler(调度器) 由于我对React的概念还停留在React 15之前(就是那个没有hooks的年代),所以接触Scheduler(调度器) 让我感觉很有意思: 在我印象中React的架构分为两层(React 16 之前) Reconciler(协调器)—— 负责找出变化的组件 Rend

  • java开发MVC三层架构上再加一层Manager层原理详解

    目录 MVC三层架构 MVC架构弊端 Manager层的特征 Manager层使用案例 MVC三层架构 我们在刚刚成为程序员的时候,就会被前辈们 "教育" 说系统的设计要遵循 MVC(Model-View-Controller)架构.它将整体的系统分成了 Model(模型),View(视图)和 Controller(控制器)三个层次,也就是将用户视图和业务处理隔离开,并且通过控制器连接起来,很好地实现了表现和逻辑的解耦,是一种标准的软件分层架构. MVC分层架构是架构上最简单的一种分层

  • 微服务架构设计RocketMQ进阶事务消息原理详解

    目录 前言 RocketMQ事务流程概要 RocketMQ事务流程关键 实现 基础配置 引入组件 添加配置 发送半消息 执行本地事务与回查 消费消息 测试 总结 前言 分布式消息选型的时候是否支持事务消息是一个很重要的考量点,而目前只有RocketMQ对事务消息支持的最好.今天我们来唠唠如何实现RocketMQ的事务消息! Apache RocketMQ在4.3.0版中已经支持分布式事务消息,这里RocketMQ采用了2PC的思想来实现了提交事务消息,同时增加一个补偿逻辑来处理二阶段超时或者失败

  • RocketMQ Namesrv架构工作原理详解

    目录 1 概念 2 核心数据结构和API 2.1 Namesrv的核心数据结构 2.2 Namesrv的API 3 Namesrv架构 3.1组件 3.2 Namesrv四个功能模块 1 概念 Namesrv的作用是保存元数据,提高Broker的可用性. Namesrv的主要功能是临时存储,管理Topic路由信息,各个Namesrv节点之间是不通信,无状态的,互相不知道对方的存在. 当Broker,生产者,消费者启动的时候,会轮询全部的Namesrv节点,获取路由信息. 2 核心数据结构和API

  • Spring IOC原理详解

    最近,买了本Spring入门书:springInAction.大致浏览了下感觉还不错.就是入门了点.Manning的书还是不错的,我虽然不像哪些只看Manning书的人那样专注于Manning,但怀着崇敬的心情和激情通览了一遍.又一次接受了IOC.DI.AOP等Spring核心概念.先就IOC和DI谈一点我的看法. IOC(DI):其实这个Spring架构核心的概念没有这么复杂,更不像有些书上描述的那样晦涩.java程序员都知道:java程序中的每个业务逻辑至少需要两个或以上的对象来协作完成,通

  • java进行远程部署与调试及原理详解

    这篇文章主要介绍了java进行远程部署与调试及原理详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 远程调试,特别是当你在本地开发的时候,你需要调试服务器上的程序时,远程调试就显得非常有用. JAVA 支持调试功能,本身提供了一个简单的调试工具JDB,支持设置断点及线程级的调试同时,不同的JVM通过接口的协议联系,本地的Java文件在远程JVM建立联系和通信.此篇是Intellij IDEA远程调试的教程汇总和原理解释,知其然而又知其所以然.

  • Java多线程 线程状态原理详解

    这篇文章主要介绍了Java多线程 线程状态原理详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 java.lang.Thread.State枚举定义了6种线程状态. NEW: 尚未启动(start)的线程的线程状态 RUNNABLE: 运行状态,但线程可能正在JVM中执行,也可能在等待CPU调度 BLOCKED: 线程阻塞,等待监视器锁以进入同步代码块/方法 WAITING: 等待状态.使用以下不带超时的方式时会进入:Object.wait.

  • java synchronized的用法及原理详解

    目录 为什么要用synchronized 使用方式 字节码语义 对象锁(monitor) 锁升级过程 为什么要用synchronized 相信大家对于这个问题一定都有自己的答案,这里我还是要啰嗦一下,我们来看下面这段车站售票的代码: /** * 车站开两个窗口同时售票 */ public class TicketDemo { public static void main(String[] args) { TrainStation station = new TrainStation(); //

  • Java并发编程深入理解之Synchronized的使用及底层原理详解 上

    目录 一.线程安全问题 1.临界资源 2.线程安全问题 3.如何解决线程安全问题 二.synchronized使用介绍 三.synchronized实现原理 1.synchronized底层指令:monitorenter和monitorexit 2.Object Monitor(监视器锁)机制 一.线程安全问题 1.临界资源 多线程编程中,有可能会出现多个线程同时访问同一个共享.可变资源的情况,这个资源我们称之其为临界资源:这种资源可能是:对象.变量.文件等. 共享:资源可以由多个线程同时访问

随机推荐