Java基础之MapReduce框架总结与扩展知识点

一、MapTask工作机制

MapTask就是Map阶段的job,它的数量由切片决定

二、MapTask工作流程:

1.Read阶段:读取文件,此时进行对文件数据进行切片(InputFormat进行切片),通过切片,从而确定MapTask的数量,切片中包含数据和key(偏移量)

2.Map阶段:这个阶段是针对数据进行map方法的计算操作,通过该方法,可以对切片中的key和value进行处理

3.Collect收集阶段:在用户编写map()函数中,当数据处理完成后,一般会调用OutputCollector.collect()输出结果。在该函数内部,它会将生成的key/value分区(调用Partitioner),并写入一个环形内存缓冲区中。

4.Spill阶段:即“溢写”,当环形缓冲区满后,MapReduce会将数据写到本地磁盘上,生成一个临时文件。需要注意的是,将数据写入本地磁盘之前,先要对数据进行一次本地排序,并在必要时对数据进行合并、压缩等操作。

5.Combine阶段:当所有数据处理完成后,MapTask对所有临时文件进行一次合并,以确保最终只会生成一个数据文件,这个阶段默认是没有的,一般需要我们自定义

6.当所有数据处理完后,MapTask会将所有临时文件合并成一个大文件,并保存到文件output/file.out中,同时生成相应的索引文件output/file.out.index。

7.在进行文件合并过程中,MapTask以分区为单位进行合并。对于某个分区,它将采用多轮递归合并的方式。每轮合并io.sort.factor(默认10)个文件,并将产生的文件重新加入待合并列表中,对文件排序后,重复以上过程,直到最终得到一个大文件。

8.让每个MapTask最终只生成一个数据文件,可避免同时打开大量文件和同时读取大量小文件产生的随机读取带来的开销

第四步溢写阶段详情:

  • 步骤1:利用快速排序算法对缓存区内的数据进行排序,排序方式是,先按照分区编号Partition进行排序,然后按照key进行排序。这样,经过排序后,数据以分区为单位聚集在一起,且同一分区内所有数据按照key有序。
  • 步骤2:按照分区编号由小到大依次将每个分区中的数据写入任务工作目录下的临时文件output/spillN.out(N表示当前溢写次数)中。如果用户设置了Combiner,则写入文件之前,对每个分区中的数据进行一次聚集操作。
  • 步骤3:将分区数据的元信息写到内存索引数据结构SpillRecord中,其中每个分区的元信息包括在临时文件中的偏移量、压缩前数据大小和压缩后数据大小。如果当前内存索引大小超过1MB,则将内存索引写到文件output/spillN.out.index中。

三、ReduceTask工作机制

ReduceTask就是Reduce阶段的job,它的数量由Map阶段的分区进行决定

四、ReduceTask工作流程:

1.Copy阶段:ReduceTask从各个MapTask上远程拷贝一片数据,并针对某一片数据,如果其大小超过一定阈值,则写到磁盘上,否则直接放到内存中。

2.Merge阶段:在远程拷贝数据的同时,ReduceTask启动了两个后台线程对内存和磁盘上的文件进行合并,以防止内存使用过多或磁盘上文件过多。

3.Sort阶段:按照MapReduce语义,用户编写reduce()函数输入数据是按key进行聚集的一组数据。为了将key相同的数据聚在一起,Hadoop采用了基于排序的策略。由于各个MapTask已经实现对自己的处理结果进行了局部排序,因此,ReduceTask只需对所有数据进行一次归并排序即可。

4.Reduce阶段:reduce()函数将计算结果写到HDFS上

五、数据清洗(ETL)

我们在大数据开篇概述中说过,数据是低价值的,所以我们要从海量数据中获取到我们想要的数据,首先就需要对数据进行清洗,这个过程也称之为ETL

还记得上一章中的Join案例么,我们对pname字段的填充,也算数据清洗的一种,下面我通过一个简单的案例来演示一下数据清洗

数据清洗案例

需求:过滤一下log日志中字段个数小于11的日志(随便举个栗子而已)

测试数据:就拿我们这两天学习中HadoopNodeName产生的日志来当测试数据吧,我将log日志信息放到我的windows中,数据位置如下

/opt/module/hadoop-3.1.3/logs/hadoop-xxx-nodemanager-hadoop102.log

编写思路:

直接通过切片,然后判断长度即可,因为是举个栗子,没有那么复杂

真正的数据清洗会使用框架来做,这个我后面会为大家带来相关的知识

  • ETLDriver
package com.company.etl;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class ETLDriver {
    public static void main(String[] args) throws Exception {
        Job job = Job.getInstance(new Configuration());

        job.setJarByClass(ETLDriver.class);

        job.setMapperClass(ETLMapper.class);

        job.setNumReduceTasks(0);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(NullWritable.class);

        FileInputFormat.setInputPaths(job,new Path("D:\\io\\input8"));
        FileOutputFormat.setOutputPath(job,new Path("D:\\io\\output88"));

        job.waitForCompletion(true);
    }
}
  • ETLMapper
package com.company.etl;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class ETLMapper extends Mapper<LongWritable, Text,Text, NullWritable> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

        //清洗(过滤)
        String line = value.toString();
        String[] info = line.split(" ");
        //判断
        if (info.length > 11){
            context.write(value,NullWritable.get());
        }
    }
}

六、计数器应用

  • 顾名思义,计数器的作用就是用于计数的,在Hadoop中,它内部也有一个计数器,用于监控统计我们处理数据的数量
  • 我们通常在MapReduce中通过上下文 context进行应用,例如在Mapper中,我通过step方法进行初始化计数器,然后在我们map方法中进行计数

七、计数器案例

在上面数据清洗的基础上进行计数器的使用,Driver没什么变化,只有Mapper

我们在Mapper的setup方法中,创建计数器的对象,然后在map方法中调用它即可

ETLMapper

package com.company.etl;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class ETLMapper extends Mapper<LongWritable, Text,Text, NullWritable> {

    private Counter sucess;
    private Counter fail;
    /*
        创建计数器对象
     */
    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        /*
             getCounter(String groupName, String counterName);
             第一个参数 :组名 随便写
             第二个参数 :计数器名 随便写
         */
        sucess = context.getCounter("ETL", "success");
        fail = context.getCounter("ETL", "fail");

    }

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

        //清洗(过滤)
        String line = value.toString();
        String[] info = line.split(" ");
        //判断
        if (info.length > 11){
            context.write(value,NullWritable.get());
            //统计
            sucess.increment(1);
        }else{
            fail.increment(1);
        }

    }
}

八、MapReduce总结

好了,到这里,我们MapReduce就全部学习完毕了,接下来,我再把整个内容串一下,还是MapReduce的那个图

MapReduce的主要工作就是对数据进行运算、分析,它的工作流程如下:

1.我们会将HDFS中的数据通过InputFormat进行进行读取、切片,从而计算出MapTask的数量

2.每一个MapTask中都会有Mapper类,里面的map方法就是任务的具体实现,我们通过它,可以完成数据的key,value封装,然后通过分区进入shuffle中来完成每个MapTask中的数据分区排序

3.通过分区来决定ReduceTask的数量,每一个ReduceTask都有一个Reducer类,里面的reduce方法是ReduceTask的具体实现,它主要是完成最后的数据合并工作

4.当Reduce任务过重,我们可以通过Combiner合并,在Mapper阶段来进行局部的数据合并,减轻Reduce的任务量,当然,前提是Combiner所做的局部合并工作不会影响最终的结果

5.当Reducer的任务完成,会将最终的key,value写出,交给OutputFormat,用于数据的写出,通过OutputFormat来完成HDFS的写入操作

每一个MapTask和ReduceTask内部都是循环进行读取,并且它有三个方法:setup() map()/reduce() cleanup()
setup()方法是在MapTask/ReduceTask刚刚启动时进行调用,cleanup()是在任务完成后调用

到此这篇关于Java基础之MapReduce框架总结与扩展知识点的文章就介绍到这了,更多相关Java MapReduce框架内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • java连接hdfs ha和调用mapreduce jar示例

    Java API 连接 HDFS HA 复制代码 代码如下: public static void main(String[] args) {  Configuration conf = new Configuration();  conf.set("fs.defaultFS", "hdfs://hadoop2cluster");  conf.set("dfs.nameservices", "hadoop2cluster");

  • 基于Java SSM框架开发图书借阅系统源代码

    一.技术框架与开发环境 开发环境: IDE:IDEA 2020 数据库:MySQL 8.0 JDK 1.8 Maven 3.6.1 Tomcat 9 lombok 1.18.1 技术框架: 核心框架:Spring 5.1.9 持久层框架:Mybatis 3.5.2 视图层框架:SpringMVC 5.1.9 前端框架:Bootstrap 4 jquery-3.6.0 二.项目源码 有用就点赞博客 Github 国内:Giee 项目结构 三.功能介绍 1.登录 2.注册 AJAX异步刷新显示判断账

  • Java利用反射实现框架类的方法实例

    框架类的简单实现 实现步骤: 1. 加载配置文件 2. 获取配置文件中定义的数据 3. 加载该类进内存 主要讲解第一步:加载配置文件 的相关知识. //1.加载配置文件 //1.1创建Properties对象 Properties pro = new Properties(); //1.2加载配置文件,转换为一个集合 //1.2.1获取class目录下的配置文件 ClassLoader classLoader = ReflectTest.class.getClassLoader(); Input

  • Java基础详解之集合框架工具Collections

    一.Collections 说明:Collcetions是集合框架中的工具,特点是方法都是静态的. 二.Collections中的常见方法 1,对list进行二分查找:前提该集合一定要有序. int binarySearch(list,key);//要求list集合中的元素都是Comparable的子类. int binarySearch(list,key,Comparator); 2,对list集合进行排序. sort(list); sort(list,comaprator); 3,对集合取最

  • Java基础之集合框架详解

    一.前言 本节学习到的内容有以下5类,不分先后顺序: 集合Collection体系结构 List子类 与集合结合使用的迭代器对象 集合与数组的区别? 常见的一般数据结构整理 二.集合的由来? Collection List ArrayList Vector LinkedList Set hashSet treeSet 在集合没有出现之前,使用对象数组来存储对象,但是,对象数组的长度一旦确定,则不可以发生变化,所以我们希望存在一个容器就像StringBuffer一样存储字符串,同时依据传入的值的个

  • 使用JAVA+Maven+TestNG框架实现超详细Appium测试安卓真机教程

    前言:前段时间做了selenium的学习和实践,有点意犹未尽,所以自己就又学了下Appium的使用,因为这一套东西在16年已经停止维护了,不管实现还是设计上都不是很容易,也踩了很多坑,现在在此记录下大概过程.后续有时间再完善手册. 一.准备 安装SDK,配置环境变量 链接: https://pan.baidu.com/s/1g2QaWjdfg6Txa0gZf9kk3A 提取码: 8aaz windows配置环境SDK变量 我的电脑右键->属性 点击高级系统设置 点击环境变量 点击新建按钮,变量名

  • Java函数式编程(七):MapReduce

    译注:map(映射)和reduce(归约,化简)是数学上两个很基础的概念,它们很早就出现在各类的函数编程语言里了,直到2003年Google将其发扬光大,运用到分布式系统中进行并行计算后,这个组合的名字才开始在计算机界大放异彩(那些函数式粉可能并不这么认为).本文我们会看到Java 8在摇身一变支持函数式编程后,map和reduce组合的首次亮相(这里只是初步介绍,后续还会有针对它们的专题). 对集合进行归约 现在为止我们已经介绍了几个操作集合的新技巧了:查找匹配元素,查找单个元素,集合转化.这

  • Java日志框架用法及常见问题解决方案

    日志定义: 在计算机领域,日志文件(logfile)是一个记录了发生在运行中的操作系统或其他软件中的事件的文件,或者记录了在网络聊天软件的用户之间发送的消息. 日志记录(Logging):是指保存日志的行为.最简单的做法是将日志写入单个存放日志的文件. 日志级别优先级: ALL < TRACE < DEBUG < INFO < WARN < ERROR < FATAL < OFF 日志框架的作用: ①:跟踪用户对系统访问,记录了系统行为的时间.地点.状态等相关信息

  • Java安全框架——Shiro的使用详解(附springboot整合Shiro的demo)

    Shiro简介 Apache Shiro是一个强大且易用的Java安全框架,执行身份验证.授权.密码和会话管理 三个核心组件:Subject, SecurityManager 和 Realms Subject代表了当前用户的安全操作 SecurityManager管理所有用户的安全操作,是Shiro框架的核心,Shiro通过SecurityManager来管理内部组件实例,并通过它来提供安全管理的各种服务. Realm充当了Shiro与应用安全数据间的"桥梁"或者"连接器&q

  • Java/Web调用Hadoop进行MapReduce示例代码

    Hadoop环境搭建详见此文章http://www.jb51.net/article/33649.htm. 我们已经知道Hadoop能够通过Hadoop jar ***.jar input output的形式通过命令行来调用,那么如何将其封装成一个服务,让Java/Web来调用它?使得用户可以用方便的方式上传文件到Hadoop并进行处理,获得结果.首先,***.jar是一个Hadoop任务类的封装,我们可以在没有jar的情况下运行该类的main方法,将必要的参数传递给它.input 和outpu

  • java 矩阵乘法的mapreduce程序实现

    java 矩阵乘法的mapreduce程序实现 map函数:对于矩阵M中的每个元素m(ij),产生一系列的key-value对<(i,k),(M,j,m(ij))> 其中k=1,2.....知道矩阵N的总列数;对于矩阵N中的每个元素n(jk),产生一系列的key-value对<(i , k) , (N , j ,n(jk)>, 其中i=1,2.......直到i=1,2.......直到矩阵M的总列数. map package com.cb.matrix; import stati

随机推荐