hadoop之MapReduce框架原理

目录
  • MapReduce框架的简单运行机制:
  • Mapper阶段:
    • InputFormat数据输入:
      • 切片与MapTask并行度决定机制:
    • job提交过程源码解析:
    • 切片逻辑:
    • 1)FileInputFormat实现类
    • 进行虚拟存储
      • (1)虚拟存储过程:
  • Shuffle阶段:
    • 排序:
    • Combiner合并:
  • ReduceTask阶段:
    • Reduce Join:
    • Map Join:

MapReduce框架的简单运行机制:

MapReduce是分为两个阶段的,MapperTask阶段,和ReduceTask阶段。(中间有一个Shuffle阶段)

Mapper阶段,可以通过选择什么方式(K,V的选择对应不同的方法)来读取数据,读取后把数据交给Mapper来进行后续的业务逻辑(用户写),让后进入Reduce阶段通过Shuffle来拉取Mapper阶段的数据,让后通过OutputFormat(等方法)来写出(可以是ES,mysql,hbase,文件)

Mapper阶段:

InputFormat数据输入:

切片与MapTask并行度决定机制:

MapTask个数,决定了并行度(相当于在生成map集合的过程中有几个人在干活),**(不一定越多越好,当数据量小的时候可能开启的众多MapTask的时间用一个MapTask已经计算完成)

数据块:Block是HDFS物理上把数据分成一块一块。数据块是HDFS存储数据单位。

数据切片:数据切片只是在逻辑上对输入进行分片,并不会在磁盘上将其切分成片进行存储。数据切片是MapReduce程序计算输入数据的单位,一个切片会对应启动一个MapTask。

job提交过程源码解析:

因为我们找的job提交,所以在job提交函数哪里打个断点,

步入函数后

ensureState(JobState.DEFINE);  是确保你的状态是正确的(状态不对或者running 都会抛异常)

setUseNewAPI();       处理Hadoop不同版本之间的API兼容

connect();          连接,(客户端需要与集群或者本机连接)

checkSpecs(job); 校验 校验输出路径是否已经创建,是否有参

return submitter.submitJobInternal(Job.this, cluster);   核心代码    步入的时候需要点两下,

第一个步入是步入的参数Job  第二个才步入此方法

这个方法是提交job(在集群模式下,提交的job包含(通过客户端方式把jar包提交给集群),在本地不需要提交jar包,jar在本地是存在的)

还会进行切片,生成切片信息(几个切片就有几个MapTask)

还会 生成xml文件

综上  job提交会交三样东西(jar,xml文件,切片信息---》集群模式下)

最后会删除所有的信息文件

切片逻辑:

**(切片是每一个文件单独切片)

在本地是32m一块,前边说过,默认一块对应一个切片,但是有前提条件,再你减去32m的时候,余下最后一块如果大于1.1倍就重新分配切片,但如果小于1.1,则不能更新分片

例子1:

已有一个32.1m的数据   物理分块是(32m+0.1m)切片分布是(1个切片,因为32.1/32=1.003125<1.1   所以使用一个切片)

例子2:

已有一个100m的数据

100-32-32=36>32(36/32=1.125>1.1   所以最后36m需要分配两个切片)

**块的大小没办法改变,但是可以调切片大小(maxSize让切片调小)(minSize让切片调大)

切片总结:

(开一个MapTask  默认是占1g内存+1个cpu)

1)FileInputFormat实现类

思考:在运行MapReduce程序时,输入的文件格式包括:基于行的日志文件、二进制格式文件、数据库表等。那么,针对不同的数据类型,MapReduce是如何读取这些数据的呢?

FileInputFormat常见的接口实现类包括:TextInputFormat、KeyValueTextInputFormat、NLineInputFormat、CombineTextInputFormat和自定义InputFormat等。(应用场景的不同选择不同的接口实现类)

TextInputFormat是默认的FileInputFormat实现类。按行读取每条记录。键是存储该行在整个文件中的起始字节偏移量, LongWritable类型。值是这行的内容,不包括任何行终止符(换行符和回车符),Text类型。

CombineTextInputFormat用于小文件过多的场景,它可以将多个小文件从逻辑上规划到一个切片中,这样,多个小文件就可以交给一个MapTask处理。

进行虚拟存储

(1)虚拟存储过程:

将输入目录下所有文件大小,依次和设置的setMaxInputSplitSize(切片大小)值比较,如果不大于设置的最大值,逻辑上划分一个块。如果输入文件大于设置的最大值且大于两倍,那么以最大值切割一块;当剩余数据大小超过设置的最大值且不大于最大值2倍,此时将文件均分成2个虚拟存储块(防止出现太小切片)。

测试:

再不使用CombineTextInputFormat情况下(默认TextInputFormat)

可以看到切片为4

添加代码,设置实现类为CombineTextInputFormat     和   设置虚拟存储切片大小

// 如果不设置InputFormat,它默认用的是TextInputFormat.class
job.setInputFormatClass(CombineTextInputFormat.class);

//虚拟存储切片最大值设置4m
CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);

可以看到,现在是3个切片

我们可以通过改变虚拟切片大小来改变调用的切片的数量

综上:影响切片的数量的因素为:(1)数据量的大小(2)切片的大小(一般会自动调整)(3)文件格式(有些文件是不可切片的)

影响切片大小的因素:   HDFS中块的大小(通过调maxsize,minsize与块的大小进行比较来判断)

Shuffle阶段:

shuffle阶段是一个从mapper阶段出来的后的阶段,会写入(k,v)一个环形缓冲区(缓冲区分为两半,一半存储索引,一半存储数据,默认100m,到达80%后会反向逆写(减少时间消耗,提高效率,逆写是因为不需要等待全部溢写后在进行写入操作)逆写入文件前会进行分区(分区的个数与reduceTask的个数有关)排序(对key进行排序,但是存储位置并不发生改变,只改变索引的位置,改变存储位置消耗资源较大))写入文件后会进行归并排序(在有序的情况下,归并是最高效的))

排序:

排序可以自定义排序,举例全排序:

自定义了一个Bean类,bean对象做为key传输,需要实现WritableComparable接口重写compareTo方法,就可以实现排序。

Combiner合并:

并不满足所有生产环境下,只有在不影响最终业务逻辑下才可以实现(求和就可以,算平均值就不可以)

combiner与reducetask区别如下:

ReduceTask阶段:

(1)Copy阶段:ReduceTask从各个MapTask上远程拷贝一片数据,并针对某一片数据,如果其大小超过一定阈值,则写到磁盘上,否则直接放到内存中。

(2)Sort阶段:在远程拷贝数据的同时,ReduceTask启动了两个后台线程对内存和磁盘上的文件进行合并,以防止内存使用过多或磁盘上文件过多。按照MapReduce语义,用户编写reduce()函数输入数据是按key进行聚集的一组数据。为了将key相同的数据聚在一起,Hadoop采用了基于排序的策略。由于各个MapTask已经实现对自己的处理结果进行了局部排序,因此,ReduceTask只需对所有数据进行一次归并排序即可。

(3)Reduce阶段:reduce()函数将计算结果写到HDFS上。

ReduceTask的个数可以手动进行设置,设置几就会产生几个文件(分区同上)

Reduce Join:

简述流程:

(1)自定义bean对象(序列化反序列化函数---implements Writable)

(2)写mapper类     先重写setup方法(因为本案例需要两个文件,初始化(读多个文 希望先获取到文件名称(多文件) 一个文件一个切片   setup方法是一个优化手段 获取文件名称)

(3)写reduce类(业务逻辑)   先创建一个集合(类型为bean类型)和bean对象用于存储

用for循环遍历value(key是一样的  一样的key才会进入同一个reduce方法)

获取文件名判断写出不同的业务逻辑

"order"表:

先创建一个bean对象,用于存储数据,用于后续写入集合

用到方法   BeanUtils.copyProperties(tmpOrderBean,value);  获取原数据

让后加入上述创建的集合 orderBeans.add(tmpOrderBean);

“pd”表:

BeanUtils.copyProperties(pdBean,value);直接获取原数据

存储结束,结合阶段:

使用增强for

orderbean.setPname(pdBean.getPname());

使用set函数直接设置集合中的pname

让后写入

context.write(orderbean,NullWritable.get());
业务结束

Reduce Join的缺点:这种方式中,合并的操作是在Reduce阶段完成,Reduce端的处理压力太大,Map节点的运算负载则很低,资源利用率不高,且在Reduce阶段极易产生数据倾斜。

Map Join:

使用场景

Map Join适用于一张表十分小、一张表很大的场景。

Map端实现数据合并就解决了Reduce Join的缺点(数据倾斜)

简述流程:

在map类中

setup方法:将较小文件读入缓存,将数据存储到全局的map集合中,将缓存中的数据全部写入

重写的map方法中:

转换成字符串在切割,通过切割后的数组获取map集合中的pname

让后重新设置输出文件的格式进行写出

到此这篇关于hadoop之MapReduce框架原理的文章就介绍到这了,更多相关MapReduce框架原理内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • Java执行hadoop的基本操作实例代码

    Java执行hadoop的基本操作实例代码 向HDFS上传本地文件 public static void uploadInputFile(String localFile) throws IOException{ Configuration conf = new Configuration(); String hdfsPath = "hdfs://localhost:9000/"; String hdfsInput = "hdfs://localhost:9000/user/

  • 利用Java连接Hadoop进行编程

    目录 实验环境 实验内容 测试Java远程连接hadoop 实验环境 hadoop版本:3.3.2 jdk版本:1.8 hadoop安装系统:ubuntu18.04 编程环境:IDEA 编程主机:windows 实验内容 测试Java远程连接hadoop 创建maven工程,引入以下依赖: <dependency> <groupId>org.testng</groupId> <artifactId>testng</artifactId> <

  • java结合HADOOP集群文件上传下载

    对HDFS上的文件进行上传和下载是对集群的基本操作,在<HADOOP权威指南>一书中,对文件的上传和下载都有代码的实例,但是对如何配置HADOOP客户端却是没有讲得很清楚,经过长时间的搜索和调试,总结了一下,如何配置使用集群的方法,以及自己测试可用的对集群上的文件进行操作的程序.首先,需要配置对应的环境变量: 复制代码 代码如下: hadoop_HOME="/home/work/tools/java/hadoop-client/hadoop" for f in $hadoo

  • Java大数据开发Hadoop MapReduce

    目录 1 MapRedcue的介绍 1.1 MapReduce定义 1.2 MapReduce的思想 1.3 MapReduce优点 1.4 MapReduce的缺点 1.5 MapReduce进程 1.6 MapReduce-WordCount 2 Hadoop序列化 2.1 序列化的定义 2.2 hadoop序列化和java序列化的区别 3 MapReduce 的原理 3.1 MapReduce 工作的过程 3.2 InputFormat 数据输入 3.2.1 切片 3.2.2 FileIn

  • Java/Web调用Hadoop进行MapReduce示例代码

    Hadoop环境搭建详见此文章http://www.jb51.net/article/33649.htm. 我们已经知道Hadoop能够通过Hadoop jar ***.jar input output的形式通过命令行来调用,那么如何将其封装成一个服务,让Java/Web来调用它?使得用户可以用方便的方式上传文件到Hadoop并进行处理,获得结果.首先,***.jar是一个Hadoop任务类的封装,我们可以在没有jar的情况下运行该类的main方法,将必要的参数传递给它.input 和outpu

  • hadoop之MapReduce框架原理

    目录 MapReduce框架的简单运行机制: Mapper阶段: InputFormat数据输入: 切片与MapTask并行度决定机制: job提交过程源码解析: 切片逻辑: 1)FileInputFormat实现类 进行虚拟存储 (1)虚拟存储过程: Shuffle阶段: 排序: Combiner合并: ReduceTask阶段: Reduce Join: Map Join: MapReduce框架的简单运行机制: MapReduce是分为两个阶段的,MapperTask阶段,和ReduceT

  • 用PHP和Shell写Hadoop的MapReduce程序

    使得任何支持标准IO (stdin, stdout)的可执行程序都能成为hadoop的mapper或者 reducer.例如: 复制代码 代码如下: hadoop jar hadoop-streaming.jar -input SOME_INPUT_DIR_OR_FILE -output SOME_OUTPUT_DIR -mapper /bin/cat -reducer /usr/bin/wc 在这个例子里,就使用了Unix/Linux自带的cat和wc工具来作为mapper / reducer

  • Hadoop中的Python框架的使用指南

    最近,我加入了Cloudera,在这之前,我在计算生物学/基因组学上已经工作了差不多10年.我的分析工作主要是利用Python语言和它很棒的科学计算栈来进行的.但Apache Hadoop的生态系统大部分都是用Java来实现的,也是为Java准备的,这让我很恼火.所以,我的头等大事变成了寻找一些Python可以用的Hadoop框架. 在这篇文章里,我会把我个人对这些框架的一些无关科学的看法写下来,这些框架包括: Hadoop流 mrjob dumbo hadoopy pydoop 其它 最终,在

  • 在Hadoop集群环境中为MySQL安装配置Sqoop的教程

    Sqoop是一个用来将Hadoop和关系型数据库中的数据相互转移的工具,可以将一个关系型数据库(例如 : MySQL ,Oracle ,Postgres等)中的数据导进到Hadoop的HDFS中,也可以将HDFS的数据导进到关系型数据库中. Sqoop中一大亮点就是可以通过hadoop的mapreduce把数据从关系型数据库中导入数据到HDFS. 一.安装sqoop 1.下载sqoop压缩包,并解压 压缩包分别是:sqoop-1.2.0-CDH3B4.tar.gz,hadoop-0.20.2-C

  • Hadoop组件简介

    安装hbase 首先下载hbase的最新稳定版本 http://www.apache.org/dyn/closer.cgi/hbase/ 安装到本地目录中,我安装的是当前用户的hadoop/hbase中 tar -zxvf hbase-0.90.4.tar.gz 单机模式 修改配置文件 conf/hbase_env.sh 配置JDK的路径 修改conf/hbase-site.xml hbase.rootdir file:///home/${user.name}/hbase-tmp 完成后启动 b

  • 用python + hadoop streaming 分布式编程(一) -- 原理介绍,样例程序与本地调试

    MapReduce与HDFS简介 什么是Hadoop? Google为自己的业务需要提出了编程模型MapReduce和分布式文件系统Google File System,并发布了相关论文(可在Google Research的网站上获得: GFS . MapReduce). Doug Cutting和Mike Cafarella在开发搜索引擎Nutch时对这两篇论文做了自己的实现,即同名的MapReduce和HDFS,合起来就是Hadoop. MapReduce的Data flow如下图,原始数据

  • Hadoop streaming详细介绍

    Hadoop streaming Hadoop为MapReduce提供了不同的API,可以方便我们使用不同的编程语言来使用MapReduce框架,而不是只局限于Java.这里要介绍的就是Hadoop streaming API.Hadoop streaming 使用Unix的standard streams作为我们mapreduce程序和MapReduce框架之间的接口.所以你可以用任何语言来编写MapReduce程序,只要该语言可以往standard input/output上进行读写. st

  • linux环境不使用hadoop安装单机版spark的方法

    大数据持续升温, 不熟悉几个大数据组件, 连装逼的口头禅都没有. 最起码, 你要会说个hadoop, hdfs, mapreduce, yarn, kafka, spark, zookeeper, neo4j吧, 这些都是装逼的必备技能. 关于spark的详细介绍, 网上一大堆, 搜搜便是, 下面, 我们来说单机版的spark的安装和简要使用. 0.  安装jdk,  由于我的机器上之前已经有了jdk, 所以这一步我可以省掉. jdk已经是很俗气的老生常谈了, 不多说, 用java/scala的

  • 通用MapReduce程序复制HBase表数据

    编写MR程序,让其可以适合大部分的HBase表数据导入到HBase表数据.其中包括可以设置版本数.可以设置输入表的列导入设置(选取其中某几列).可以设置输出表的列导出设置(选取其中某几列). 原始表test1数据如下: 每个row key都有两个版本的数据,这里只显示了row key为1的数据 在hbase shell 中创建数据表: create 'test2',{NAME => 'cf1',VERSIONS => 10} // 保存无版本.无列导入设置.无列导出设置的数据 create '

随机推荐