hadoop 切片机制分析与应用

前言

上面是一张MapReduce读取一个文本数据的逻辑顺序处理图。我们知道,不管是本地运行还是集群模式下,最终以job的任务调度形式运行,主要分为两个阶段

  • Map阶段,开启MapTask处理数据的读取
  • Reduce阶段,开启ReduceTask对数据做聚合

比如在wordcount案例中,一段文本数据,在map阶段首先被解析,拆分成一个个的单词,其实对hadoop来说,这项工作的完成,是由背后开启的一个MapTask进行处理的,等job处理完成,看到在目标文件夹下,生成了对应的单词统计结果

如果有多个单词统计文本文件要处理呢?我们不妨改造下wordcount的job代码,在一个目录下放多个处理文件,看运行完毕的结果如何呢?

public static void main(String[] args) throws Exception {

        //1、获取job
        Configuration configuration = new Configuration();
        Job job = Job.getInstance(configuration);

        //2、设置jar路径
        job.setJarByClass(DemoJobDriver.class);

        //3、关联mapper 和 Reducer
        job.setMapperClass(DemoMapper.class);
        job.setReducerClass(DemoReducer.class);

        //4、设置 map输出的 key/val 的类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        //5、设置最终输出的key / val 类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        //6、设置最终的输出路径
        String inputPath = "F:\\网盘\\csv\\combines\\";
        String outPath = "F:\\网盘\\csv\\result";

        FileInputFormat.setInputPaths(job,new Path(inputPath));
        FileOutputFormat.setOutputPath(job,new Path(outPath));

        // 7 提交job
        boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);

    }

运行完毕后,我们随机截取几张运行中的日志,通过阅读关键信息,相信感兴趣的同学能看出点什么吧,总结下来

  • 如果一个job开启后,检查到一个目录下包含多个待处理文件,将会开启多个MapTask处理
  • 默认情况下,有多少个文件,就开启多少个MapTask任务
  • ReduceTask处理完毕后,对结果做归并

Hadoop任务并行化

使用hadoop或者其他大数据框架的一个很重要的原因在于,它们的底层设计能很好的支撑任务的并行化处理,也就是说,会充分利用服务器的配置将一个复杂的任务,或者单个Task处理起来很耗时的任务根据需要拆分成多个并行的子任务来处理,充分利用服务器性能,达到任务处理的最优,耗时最短,机器性能利用率最佳

hadoop同样如此,提供了很多配置参数提供客户端选择,从而提升任务的处理性能

我们知道,hadoop的任务处理主要分为2个阶段,Map和Reduce阶段,默认情况下,结合上面的案例可以知道,将会根据文件个数默认开启等量的MapTask去处理,但是我们设想这样一个问题,现在的文件比较小,尚未超过默认的一个blocksize ,即128M,如果超出了怎么办?甚至说这个文件达到1个G怎么办?

于是得出如下结论:

MapTask的并行度决定Map阶段的任务处理并发度,进而影响到整个Job的处理速度

这样说来,当要处理的某个文件特别大的时候,通过设置MapTask的并行度是可以提升整个Map阶段的处理速度的

思考:1G的数据,启动8个MapTask,可以提高集群的并发处理能力。那么1K的数据,也启动8个MapTask,会提高集群性能吗?MapTask并行任务是否越多越好呢?哪些因素影响了MapTask并行度?

MapTask并行度决定机制

  • 数据块:Block是HDFS物理上的单位,即把数据分成一块一块的,数据块是HDFS存储数据单位数据切片:数据切片只在逻辑上对输入进行分片,并不是真的会在磁盘上将其切分成多个片进行存储。
  • 数据切片是MapReduce程序计算输入数据的单位,一个切片会对应启动一个MapTask。

设想一段300MB大小的文件,假如按照100MB为一个切片的话,可以分为3个切片,这样Map阶段将会开启3个MapTask来处理这个任务,但是默认情况下,Hadoop处理的一个文件块即block的size大小为128Mb,那么问题来了,假如在生产环境下,hadoop真正在分布式环境下运行,任务往往分布在不同的机器上运行的,如下图所示

  • node1 ~ node3 可认为是集群中3个节点,用于处理MapTask数据
  • 默认情况下,每次处理一个任务默认的文件数据库大小为128Mb
  • 300Mb的待处理文件,按照100Mb为一个切片,将会划分为3个切片,3个切片将会开启3个MapTask进行处理

以上按照直观的理解,可以归纳出上面几点,但仔细分析下,会发现另一个问题就是,切片规则是客户端人为指定的规则,可以理解为一个账本,上面记录了工人干活时的工时,从而为结工资的时候做考量

但是对于3个节点来说,它们可不这么想了,因为它们是真正干活即执行任务的,人家管你是什么切片规则呢?总不能按照你的100MB大小的切片规则将自己的默认的128MB的数据库大小也改为100MB吧?这显然是不可能的,那该怎么办呢?

既然你切片上的规则是100MB嘛,于是node1节点就按照你的规则来,我这个节点上就处理100MB大小的数据就完事了,还剩下28MB大小的数据怎么办?既然分成了3个切片,肯定要开启3个MapTask了,node2节点也要处理一个任务了,但是不能随意就处理数据啊,得先把node1节点上面那个28MB的未处理完毕的文件拷贝过来,再拼接出72MB大小的数据块,凑够100MB了再搞事

于是,如果在真正的分布式环境下,这样就存在一个数据文件的跨节点拷贝问题,这很显然会带来一部分的网络开销,如果数据文件较大话,这个性能损耗就很值得考虑了

按照以上理解,我们可总结出如下经验:

  • 一个Job的Map阶段的并行度由客户端在提交Job时候的切片数量决定
  • 每一个切片将会被分配一个MapTask进行处理
  • 默认情况下,如果不指定,切片大小 = BlockSize的块大小,这也是最优的处理
  • 切片时不考虑数据整体,而是针对每一个文件单独切片

Hadoop默认切片机制

默认情况下,不做任何设置的话,hadoop将采用FileInputFormat切片机制,简单来说,原理如下:

  • 简单的按照文件内容长度进行切片
  • 切片大小,默认等于128MB,即blocksize的大小
  • 切片时不考虑数据整体,而是针对每一个文件单独切片

这个相对来说,比较简单,就不再过多赘述了,可以通过源码调试,找到下面的writeNewSplits 方法,进去看看源码的做法

Hadoop TextInputFormat 的优化切片机制

FileInputFormat实现类

在编写job的main程序中,还记得最后设置读取文件和输出文件的两行代码

在运行MapReduce程序时,输入的文件格式有很多种,比如:基于行的日志文件、二进制格式文件、数据库表等。那么,针对不同的数据类型,MapReduce是如何读取这些数据的呢?

FileInputFormat常见的接口实现类包括:TextInputFormat、KeyValueTextInputFormat、NLineInputFormat、CombineTextInputFormat和自定义InputFormat等。

最常见的就是TextInputFormat

  • TextInputFormat是默认的FileInputFormat实现类
  • 按行读取每条记录
  • 键是存储该行在整个文件中的起始字节偏移量, LongWritable类型
  • 值是这行的内容,不包括任何行终止符(换行符和回车符),Text类型。

以下是一个示例,比如,一个分片包含了如下4条文本记录

Rich learning form
Intelligent learning engine
Learning more convenient
From the real demand for more close to the enterprise

每条记录可以表示为以下键/值对:

(0,Rich learning form)
(20,Intelligent learning engine)
(49,Learning more convenient)
(74,From the real demand for more close to the enterprise)

CombineTextInputFormat切片机制

从上面的分析来看,框架默认的TextInputFormat切片机制,是对任务按文件规划切片,不管文件多小,都会作为一个单独的切片,交给一个MapTask,假如有大量小文件,就会产生大量的MapTask,从而处理效率上并不高

于是就可以考虑另一种切片机制,即CombineTextInputFormat

CombineTextInputFormat应用场景

CombineTextInputFormat 用于小文件过多的场景,它可将多个小文件从逻辑上规划到一个切片中,这样,多个小文件就可以交给一个MapTask处理

比如上面几个文件,最大的只有不到7MB,最下的只有不到2MB,那么基于CombineTextInputFormat的切片机制,可以考虑使用这种切片来做,具体的设置在job任务的代码中按照下面这样做设置

CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);// 4m 或者其他数据

注意:虚拟存储切片最大值设置最好根据实际的小文件大小情况来设置具体的值,这个可以按照总体文件的大小,获取一个中位数比较好

CombineTextInputFormat案例代码演示

使用上面的4个文件作为输入数据源,期望只需要使用一个切片处理4个文件(默认情况下,4个文件将会启动4个切片和4个MapTask,从控制台日志中观察)

使用CombineTextInputFormat的切片,大概如下面的实现过程

  • 不做任何处理,运行上面的的WordCount案例程序,观察切片个数为4(控制台日志)
  • 在Job的代码中增加如下代码,运行程序,并观察运行的切片个数为1
// 如果不设置InputFormat,它默认用的是TextInputFormat.class
job.setInputFormatClass(CombineTextInputFormat.class);

//虚拟存储切片最大值设置20m
CombineTextInputFormat.setMaxInputSplitSize(job, 20971520);
  • 先设置为4MB,然后尝试设置为20MB,观察运行结果是否为1个切片,number of splits:1

通过控制台的输出结果,验证了上面的目标猜想

到此这篇关于hadoop 切片机制分析与应用的文章就介绍到这了,更多相关hadoop 切片机制内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • 深入了解Hadoop如何实现序列化

    目录 前言 为什么要序列化 为什么不使用Java序列化 Hadoop序列化特点 Hadoop序列化业务场景 案例业务描述 编码实现 前言 序列化想必大家都很熟悉了,对象在进行网络传输过程中,需要序列化之后才能传输到客户端,或者客户端的数据序列化之后送达到服务端 序列化的标准解释如下: 序列化就是把内存中的对象,转换成字节序列(或其他数据传输协议)以便于存储到磁盘(持久化)和网络传输 对应的反序列化为序列化的逆向过程 反序列化就是将收到字节序列(或其他数据传输协议)或者是磁盘的持久化数据,转换成内

  • hadoop 全面解读自定义分区

    分区概念 分区这个词对很多同学来说并不陌生,比如Java很多中间件中,像kafka的分区,mysql的分区表等,分区存在的意义在于将数据按照业务规则进行合理的划分,方便后续对各个分区数据高效处理 Hadoop分区 hadoop中的分区,是把不同数据输出到不同reduceTask ,最终到输出不同文件中 hadoop 默认分区规则 hash分区 按照key的hashCode % reduceTask 数量 = 分区号 默认reduceTask 数量为1,当然也可以在driver 端设置 以下是Pa

  • Hadoop环境配置之hive环境配置详解

    1.将下载的hive压缩包拉到/opt/software/文件夹下 安装包版本:apache-hive-3.1.2-bin.tar.gz 2.将安装包解压到/opt/module/文件夹中,命令: cd /opt/software/ tar -zxvf 压缩包名 -C /opt/module/ 3.修改系统环境变量,命令: vi /etc/profile  在编辑面板中添加如下代码: export HIVE_HOME=/opt/module/apache-hive-3.1.2-bin expor

  • hadoop 详解如何实现数据排序

    目录 前言 MapReduce排序 MapReduce排序分类 1.部分排序 2.全排序 3.辅助排序 4.二次排序 自定义排序案例 1.自定义一个Bean对象,实现WritableComparable接口 2.自定义Mapper 3.自定义Reducer 4.自定义Driver类 分区内排序案例 1.添加自定义分区 2.改造Driver类 前言 在hadoop的MapReduce中,提供了对于客户端的自定义排序的功能相关API MapReduce排序 默认情况下,MapTask 和Reduce

  • hadoop 切片机制分析与应用

    前言 上面是一张MapReduce读取一个文本数据的逻辑顺序处理图.我们知道,不管是本地运行还是集群模式下,最终以job的任务调度形式运行,主要分为两个阶段 Map阶段,开启MapTask处理数据的读取 Reduce阶段,开启ReduceTask对数据做聚合 比如在wordcount案例中,一段文本数据,在map阶段首先被解析,拆分成一个个的单词,其实对hadoop来说,这项工作的完成,是由背后开启的一个MapTask进行处理的,等job处理完成,看到在目标文件夹下,生成了对应的单词统计结果 如

  • Android Handler消息机制分析

    目录 Handler是什么? Handler 的基本使用 用法一:通过 send 方法 用法二:通过 post 方法 Handler 类 MessageQueue 类 Looper 类 Handler 的消息接收过程 Handler是什么? Handler 是一个可以实现多线程间切换的类,通过 Handler 可以轻松地将一个任务切换到 Handler 所在的线程中去执行.我们最常用的使用的场景就是更新 UI 了,比如我们在子线程中访问网络,拿到数据后我们 UI 要做一些改变,如果此时我们直接访

  • Hadoop源码分析六启动文件namenode原理详解

    1. namenode启动 在本系列文章三中分析了hadoop的启动文件,其中提到了namenode启动的时候调用的类为 org.apache.hadoop.hdfs.server.namenode.NameNode 其main方法的内容如下: public static void main(String argv[]) throws Exception { if (DFSUtil.parseHelpArgument(argv, NameNode.USAGE, System.out, true)

  • Hadoop源码分析五hdfs架构原理剖析

    目录 1. hdfs架构 如果在hadoop配置时写的配置文件不同,启动的服务也有所区别 namenode的下方是三台datanode. namenode左右两边的是两个zkfc. namenode的上方是三台journalnode集群. 2. namenode介绍 namenode作为hdfs的核心,它主要的作用是管理文件的元数据 文件与块的对应关系中的块 namenode负责管理hdfs的元数据 namenode的数据持久化,采用了一种日志加快照的方式 最后还会有一个程序读取这个快照文件和日

  • Hadoop源码分析四远程debug调试

    1. hadoop远程debug 从文档(3)中可以知道hadoop启动服务的时候最终都是通过java命令来启动的,其本质是一个java程序.在研究源码的时候debug是一种很重要的工具,但是hadoop是编译好了的代码,直接在liunx中运行的,无法象普通的程序一样可以直接在eclipse之类的工具中直接debug运行. 对于上述情况java提供了一种远程debug的方式. 这种方式需要在java程序启动的时候添加以下参数: -agentlib:jdwp=transport=dt_socket

  • Hadoop源码分析三启动及脚本剖析

    1. 启动 hadoop的启动是通过其sbin目录下的脚本来启动的.与启动相关的叫脚本有以下几个: start-all.sh.start-dfs.sh.start-yarn.sh.hadoop-daemon.sh.yarn-daemon.sh. hadoop-daemon.sh是用来启动与hdfs相关的服务的 yarn-daemon.sh是用来启动和yarn相关的服务 start-dfs.sh是用来启动hdfs集群的 start-yarn.sh是用来启动yarn集群 start-all.sh是用

  • Hadoop源码分析二安装配置过程详解

    目录 1. 创建用户 2. 安装jdk 3. 修改hosts 4. 配置ssh免密登录 5. 安装zookeeper 解压: 修改配置文件 修改内容如下: 配置环境变量 启动 6. 安装hadoop 对于三台节点的配置安排如下: 解压: 修改配置文件: 修改core-site.xml 配置hdfs-site.xml 配置mapred-site.xml 配置yarn-site.xml 配置slaves 7. 初始化 在初始化前需要将所有机器都配置好hadoop (1) 启动zookeeper (2

  • Hadoop源码分析一架构关系简介

    1. 简介 Hadoop是一个由Apache基金会所开发的分布式系统基础架构 Hadoop起源于谷歌发布的三篇论文:GFS.MapReduce.BigTable.其中GFS是谷歌的分布式文件存储系统,MapReduce是基于这个分布式文件存储系统的一个计算框架,BigTable是一个分布式的数据库.hadoop实现了论文GFS和MapReduce中的内容,Hbase的实现了参考了论文BigTable. 2. hadoop架构 hadoop主要有三个组件 HDFS.YARN和MapReduce.其

  • C#字符串内存驻留机制分析

    在这之前我写过一些文章来介绍关于字符串内存分配和驻留的文章,涉及到的观点主要有:字符串的驻留机制避免了对具有相同字符序列的字符串对象的重复创建:被驻留的字符串是不受GC管辖的,即被驻留的字符串对象不能被GC回收:被驻留的字符串是被同一进程中所有应用程序域共享的.至于具体的原因,相信在<关于CLR内存管理一些深层次的讨论>中,你可以找到答案.由于这些天来在做一些关于内存泄露审查的工作,所以想通过具体的Memory Profiling工具来为你证实上面的结论.我采用的Memory Profilin

  • elasticsearch集群发现zendiscovery的Ping机制分析

    目录 zenDiscovery实现机制 广播的过程 nodeping处理代码 ping请求的发送策略 总结 zenDiscovery实现机制 ping是集群发现的基本手段,通过在网络上广播或者指定ping某些节点获取集群信息,从而可以找到集群的master加入集群.zenDiscovery实现了两种ping机制:广播与单播.本篇将详细分析一些这MulticastZenPing机制的实现为后面的集群发现和master选举做好铺垫. 广播的过程 首先看一下广播(MulticastZenPing),广

随机推荐