Hadoop MapReduce多输出详细介绍

Hadoop MapReduce多输出

FileOutputFormat及其子类产生的文件放在输出目录下。每个reducer一个文件并且文件由分区号命名:part-r-00000,part-r-00001,等等。有时可能要对输出的文件名进行控制或让每个reducer输出多个文件。MapReduce为此提供了MultipleOutputFormat类。

MultipleOutputFormat类可以将数据写到多个文件,这些文件的名称源于输出的键和值或者任意字符串。这允许每个reducer(或者只有map作业的mapper)创建多个文件。采用name-r-nnnnn形式的文件名用于map输出,name-r-nnnnn形式的文件名用于reduce输出,其中name是由程序设定的任意名字,nnnnn是一个指名块号的整数(从0开始)。块号保证从不同块(mapper或者reducer)写的输出在相同名字情况下不会冲突。

1. 重定义输出文件名

我们可以对输出的文件名进行控制。考虑这样一个需求:按男女性别来区分度假订单数据。这需要运行一个作业,作业的输出是男女各一个文件,此文件包含男女性别的所有数据记录。

这个需求可以使用MultipleOutputs来实现:

package com.sjf.open.test;
import java.io.IOException;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapred.JobPriority;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import com.sjf.open.utils.ConfigUtil;
/**
 * Created by xiaosi on 16-11-7.
 */
public class VacationOrderBySex extends Configured implements Tool {
  public static void main(String[] args) throws Exception {
    int status = ToolRunner.run(new VacationOrderBySex(), args);
    System.exit(status);
  }
  public static class VacationOrderBySexMapper extends Mapper<LongWritable, Text, Text, Text> {
    public String fInputPath = "";
    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
      super.setup(context);
      fInputPath = ((FileSplit) context.getInputSplit()).getPath().toString();
    }
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
      String line = value.toString();
      if(fInputPath.contains("vacation_hot_country_order")){
        String[] params = line.split("\t");
        String sex = params[2];
        if(StringUtils.isBlank(sex)){
          return;
        }
        context.write(new Text(sex.toLowerCase()), value);
      }
    }
  }
  public static class VacationOrderBySexReducer extends Reducer<Text, Text, NullWritable, Text> {
    private MultipleOutputs<NullWritable, Text> multipleOutputs;
    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
      multipleOutputs = new MultipleOutputs<NullWritable, Text>(context);
    }
    @Override
    protected void reduce(Text key, Iterable<Text> values, Context context)
        throws IOException, InterruptedException {
      for (Text value : values) {
        multipleOutputs.write(NullWritable.get(), value, key.toString());
      }
    }
    @Override
    protected void cleanup(Context context) throws IOException, InterruptedException {
      multipleOutputs.close();
    }
  }
  @Override
  public int run(String[] args) throws Exception {
    if (args.length != 2) {
      System.err.println("./run <input> <output>");
      System.exit(1);
    }
    String inputPath = args[0];
    String outputPath = args[1];
    int numReduceTasks = 16;
    Configuration conf = this.getConf();
    conf.setBoolean("mapred.output.compress", true);
    conf.setClass("mapred.output.compression.codec", GzipCodec.class, CompressionCodec.class);
    Job job = Job.getInstance(conf);
    job.setJobName("vacation_order_by_jifeng.si");
    job.setJarByClass(VacationOrderBySex.class);
    job.setMapperClass(VacationOrderBySexMapper.class);
    job.setReducerClass(VacationOrderBySexReducer.class);
    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(Text.class);
    job.setOutputKeyClass(NullWritable.class);
    job.setOutputValueClass(Text.class);
    FileInputFormat.setInputPaths(job, inputPath);
    FileOutputFormat.setOutputPath(job, new Path(outputPath));
    job.setNumReduceTasks(numReduceTasks);
    boolean success = job.waitForCompletion(true);
    return success ? 0 : 1;
  }
}

在生成输出的reduce中,在setup()方法中构造一个MultipleOutputs的实例并将它赋予一个实例变量。在reduce()方法中使用MultipleOutputs实例来写输出,而不是context。write()方法作用于键,值和名字。这里使用的是性别作为名字,因此最后产生的输出名称的形式为sex-r-nnnnn:

-rw-r--r--  3 wirelessdev wirelessdev     0 2016-12-06 10:41 tmp/data_group/order/vacation_hot_country_order_by_sex/_SUCCESS
-rw-r--r--  3 wirelessdev wirelessdev   88574 2016-12-06 10:41 tmp/data_group/order/vacation_hot_country_order_by_sex/f-r-00005.gz
-rw-r--r--  3 wirelessdev wirelessdev   60965 2016-12-06 10:41 tmp/data_group/order/vacation_hot_country_order_by_sex/m-r-00012.gz
-rw-r--r--  3 wirelessdev wirelessdev     20 2016-12-06 10:41 tmp/data_group/order/vacation_hot_country_order_by_sex/part-r-00000.gz
-rw-r--r--  3 wirelessdev wirelessdev     20 2016-12-06 10:41 tmp/data_group/order/vacation_hot_country_order_by_sex/part-r-00001.gz
-rw-r--r--  3 wirelessdev wirelessdev     20 2016-12-06 10:41 tmp/data_group/order/vacation_hot_country_order_by_sex/part-r-00002.gz
-rw-r--r--  3 wirelessdev wirelessdev     20 2016-12-06 10:41 tmp/data_group/order/vacation_hot_country_order_by_sex/part-r-00003.gz
-rw-r--r--  3 wirelessdev wirelessdev     20 2016-12-06 10:41 tmp/data_group/order/vacation_hot_country_order_by_sex/part-r-00004.gz
-rw-r--r--  3 wirelessdev wirelessdev     20 2016-12-06 10:41 tmp/data_group/order/vacation_hot_country_order_by_sex/part-r-00005.gz
-rw-r--r--  3 wirelessdev wirelessdev     20 2016-12-06 10:41 tmp/data_group/order/vacation_hot_country_order_by_sex/part-r-00006.gz
-rw-r--r--  3 wirelessdev wirelessdev     20 2016-12-06 10:41 tmp/data_group/order/vacation_hot_country_order_by_sex/part-r-00007.gz
-rw-r--r--  3 wirelessdev wirelessdev     20 2016-12-06 10:41 tmp/data_group/order/vacation_hot_country_order_by_sex/part-r-00008.gz

我们可以看到在输出文件中不仅有我们想要的输出文件类型,还有part-r-nnnnn形式的文件,但是文件内没有信息,这是程序默认的输出文件。所以我们在指定输出文件名称时(name-r-nnnnn),不要指定name为part,因为它已经被使用为默认值了。

2. 多目录输出

在MultipleOutputs的write()方法中指定的基本路径相对于输出路径进行解释,因为它可以包含文件路径分隔符(/),创建任意深度的子目录。例如,我们改动上面的需求:按男女性别来区分度假订单数据,不同性别数据位于不同子目录(例如:sex=f/part-r-00000)。

 public static class VacationOrderBySexReducer extends Reducer<Text, Text, NullWritable, Text> {
    private MultipleOutputs<NullWritable, Text> multipleOutputs;
    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
      multipleOutputs = new MultipleOutputs<NullWritable, Text>(context);
    }
    @Override
    protected void reduce(Text key, Iterable<Text> values, Context context)
        throws IOException, InterruptedException {
      for (Text value : values) {
        String basePath = String.format("sex=%s/part", key.toString());
        multipleOutputs.write(NullWritable.get(), value, basePath);
      }
    }
    @Override
    protected void cleanup(Context context) throws IOException, InterruptedException {
      multipleOutputs.close();
    }
  }

后产生的输出名称的形式为sex=f/part-r-nnnnn或者sex=m/part-r-nnnnn:

-rw-r--r--  3 wirelessdev wirelessdev     0 2016-12-06 12:26 tmp/data_group/order/vacation_hot_country_order_by_sex/_SUCCESS
-rw-r--r--  3 wirelessdev wirelessdev     20 2016-12-06 12:26 tmp/data_group/order/vacation_hot_country_order_by_sex/part-r-00000.gz
-rw-r--r--  3 wirelessdev wirelessdev     20 2016-12-06 12:26 tmp/data_group/order/vacation_hot_country_order_by_sex/part-r-00001.gz
-rw-r--r--  3 wirelessdev wirelessdev     20 2016-12-06 12:26 tmp/data_group/order/vacation_hot_country_order_by_sex/part-r-00002.gz
-rw-r--r--  3 wirelessdev wirelessdev     20 2016-12-06 12:26 tmp/data_group/order/vacation_hot_country_order_by_sex/part-r-00003.gz
-rw-r--r--  3 wirelessdev wirelessdev     20 2016-12-06 12:26 tmp/data_group/order/vacation_hot_country_order_by_sex/part-r-00004.gz
-rw-r--r--  3 wirelessdev wirelessdev     20 2016-12-06 12:26 tmp/data_group/order/vacation_hot_country_order_by_sex/part-r-00005.gz
-rw-r--r--  3 wirelessdev wirelessdev     20 2016-12-06 12:26 tmp/data_group/order/vacation_hot_country_order_by_sex/part-r-00006.gz
-rw-r--r--  3 wirelessdev wirelessdev     20 2016-12-06 12:26 tmp/data_group/order/vacation_hot_country_order_by_sex/part-r-00007.gz
drwxr-xr-x  - wirelessdev wirelessdev     0 2016-12-06 12:26 tmp/data_group/order/vacation_hot_country_order_by_sex/sex=f
drwxr-xr-x  - wirelessdev wirelessdev     0 2016-12-06 12:26 tmp/data_group/order/vacation_hot_country_order_by_sex/sex=m

3. 延迟输出

FileOutputFormat的子类会产生输出文件(part-r-nnnnn),即使文件是空的,也会产生。我们有时候不想要这些空的文件,我们可以使用LazyOutputFormat进行处理。它是一个封装输出格式,可以指定分区第一条记录输出时才真正创建文件。要使用它,用JobConf和相关输出格式作为参数来调用setOutputFormatClass()方法即可:

Configuration conf = this.getConf();
Job job = Job.getInstance(conf);
LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);

再次检查一下我们的输出文件(第一个例子):

sudo -uwirelessdev hadoop fs -ls tmp/data_group/order/vacation_hot_country_order_by_sex/
Found 3 items
-rw-r--r--  3 wirelessdev wirelessdev     0 2016-12-06 13:36 tmp/data_group/order/vacation_hot_country_order_by_sex/_SUCCESS
-rw-r--r--  3 wirelessdev wirelessdev   88574 2016-12-06 13:36 tmp/data_group/order/vacation_hot_country_order_by_sex/f-r-00005.gz
-rw-r--r--  3 wirelessdev wirelessdev   60965 2016-12-06 13:36 tmp/data_group/order/vacation_hot_country_order_by_sex/m-r-00012.gz

感谢阅读,希望能帮助到大家,谢谢大家对本站的支持!

(0)

相关推荐

  • JavaScript mapreduce工作原理简析

    谷歌在2003到2006年间连续发表了三篇非常有影响力的文章,分别是2003年在SOSP上发布的GFS,2004年在OSDI上发布的MapReduce,以及2006年在OSDI上发布的BigTable.GFS是文件系统相关的,其对后来的分布式文件系统设计具有指导意义:MapReduce是一种并行计算的编程模型,用于作业调度:BigTable是一个用于管理结构化数据的分布式存储系统,构建在GFS.Chubby.SSTable等Google技术之上.相当多的Google应用使用了这三种技术,比如Go

  • 无循环 JavaScript(map、reduce、filter和find)

    之前有讨论过,缩进(非常粗鲁地)增加了代码复杂性.我们的目标是写出复杂度低的 JavaScript 代码.通过选择一种合适的抽象来解决这个问题,可是你怎么能知道选择哪一种抽象呢?很遗憾的是到目前为止,没有找到一个具体的例子能回答这个问题.这篇文章中我们讨论不用任何循环如何处理 JavaScript 数组,最终得出的效果是可以降低代码复杂性. 循环是一种很重要的控制结构,它很难被重用,也很难插入到其他操作之中.另外,它意味着随着每次迭代,代码也在不断的变化之中.--Luis Atencio 我们先

  • 5个数组Array方法: indexOf、filter、forEach、map、reduce使用实例

    ECMAScript5标准发布于2009年12月3日,它带来了一些新的,改善现有的Array数组操作的方法.然而,这些新奇的数组方法并没有真正流行起来的,因为当时市场上缺乏支持ES5的浏览器. Array "Extras" 没有人怀疑这些方法的实用性,但写polyfill(PS:兼容旧版浏览器的插件)对他们来说是不值得的.它把"必须实现"变成了"最好实现".有人居然将这些数组方法称之为Array "Extras".哎! 但是,

  • JavaScript之map reduce_动力节点Java学院整理

    如果你读过Google的那篇大名鼎鼎的论文"MapReduce: Simplified Data Processing on Large Clusters",你就能大概明白map/reduce的概念. map 举例说明,比如我们有一个函数f(x)=x2,要把这个函数作用在一个数组[1, 2, 3, 4, 5, 6, 7, 8, 9]上,就可以用map实现如下: 由于map()方法定义在JavaScript的Array中,我们调用Array的map()方法,传入我们自己的函数,就得到了一

  • Hadoop MapReduce多输出详细介绍

    Hadoop MapReduce多输出 FileOutputFormat及其子类产生的文件放在输出目录下.每个reducer一个文件并且文件由分区号命名:part-r-00000,part-r-00001,等等.有时可能要对输出的文件名进行控制或让每个reducer输出多个文件.MapReduce为此提供了MultipleOutputFormat类. MultipleOutputFormat类可以将数据写到多个文件,这些文件的名称源于输出的键和值或者任意字符串.这允许每个reducer(或者只有

  • 使用docker部署hadoop集群的详细教程

    最近要在公司里搭建一个hadoop测试集群,于是采用docker来快速部署hadoop集群. 0. 写在前面 网上也已经有很多教程了,但是其中都有不少坑,在此记录一下自己安装的过程. 目标:使用docker搭建一个一主两从三台机器的hadoop2.7.7版本的集群 准备: 首先要有一台内存8G以上的centos7机器,我用的是阿里云主机. 其次将jdk和hadoop包上传到服务器中. 我安装的是hadoop2.7.7.包给大家准备好了,链接:https://pan.baidu.com/s/15n

  • Windows下使用IDEA搭建Hadoop开发环境的详细方法

    笔者鼓弄了两个星期,终于把所有有关hadoop的环境配置好了,一是虚拟机上的 完全分布式集群 ,但是为了平时写代码的方便,则在windows上也配置了hadoop的 伪分布式集群 ,同时在IDEA上就可以编写代码,同时在windows环境下进行运行.(如果不配置windows下的伪分布式集群,则在IDEA上编写的代码无法在windows平台下运行).笔者在网络上找了很多有关windows下使用idea搭建hadoop开发环境的中文教程都不太全,最后使用国外的英文教程配置成功,因此这里整理一下,方

  • Python 通过pip安装Django详细介绍

    Python 通过pip安装Django详细介绍 经过前面的 Python 包管理工具的学习,接下来我们就要基于前面的知识,来配置 Django 的开发与运行环境. 首先是安装 Django(通过pip安装): pip install Django 输出的结果在我这里是这样的: Downloading/unpacking Django Downloading Django-1.5.2.tar.gz (8.0MB): 8.0MB downloaded Running setup.py egg_in

  • maven中pom.xml详细介绍

    POM 代表工程对象模型.它是使用 Maven 工作时的基本组建,是一个 xml 文件.它被放在工程根目录下,文件命名为 pom.xml. POM 包含了关于工程和各种配置细节的信息,Maven 使用这些信息构建工程. POM 也包含了目标和插件.当执行一个任务或者目标时,Maven 会查找当前目录下的 POM,从其中读取所需要的配置信息,然后执行目标.能够在 POM 中设置的一些配置如下: project dependencies plugins goals build profiles pr

  • UTF-8编码问题BOM详细介绍

    今天在写php代码的时候,出现一个特郁闷的问题那就是两个一模一样的文件,在IE下显示有一个文件却出显了一个空白行,如地址所示http://www.kuomart.com/blog/my_ex/bom_utf8.htm以上出现空白行的页面是用php的require('t.htm')导入模板输出的,而我的php文件和htm文件都是用的记事本写的,然后保存为utf-8编码的,这样之后就出现了用nodepad保存utf8文件自动添加bom到文件的开始,起先自己测试用nodepad,dw,edplus打开

  • java数据类型与二进制详细介绍

    java数据类型与二进制详细介绍 在java中 Int 类型的变量占 4个字节 Long 类型的变量占8个字节 一个程序就是一个世界,变量是这个程序的基本单位. Java基本数据类型 1.        整数类型 2.        小数(浮点数)类型 3.        布尔类型 4.        字符类型 整数类型 整数类型可以表示一个整数,常用的整数类型有:byte,short,int,long Byte  一个字节  -128到127 注:0有两个表示0000 0000正零  1000

  • IOS 打包静态库详细介绍

    IOS 打包静态库详细介绍 一.前言 前段时间看的一本书上说:"隔着一段距离看,很多有趣的知识看起来都很唬人."比如说这篇我要总结的"静态库知识",在我初出茅庐的时候着实觉得那些后缀名为".frameworke".".a".".dylib"的文件很神秘,很高冷.那时我虽然知道只要导入一个库就能引用库里面很多封装好的东西,但对这个"库"究竟是什么"鬼",一直都是云里雾里

  • Maven构建生命周期详细介绍

    什么是构建生命周期 构建生命周期是一组阶段的序列(sequence of phases),这些构建生命周期中的每一个由构建阶段的不同列表定义,其中构建阶段表示生命周期中的阶段. 例如,默认(default)的生命周期包括以下阶段(注意:这里是简化的阶段,用于生命周期阶段的完整列表): 验证(validate) - 验证项目是否正确,所有必要的信息可用 编译(compile) - 编译项目的源代码 测试(test) - 使用合适的单元测试框架测试编译的源代码.这些测试不应该要求代码被打包或部署 打

  • C语言中左移和右移运算符详细介绍

    C语言中左移和右移运算符详细介绍 左移运算符(<<) 左移运算符是用来将一个数的各二进制位左移若干位,移动的位数由右操作数指定(右操作数必须是非负值),其右边空出的位用0填补,高位左移溢出则舍弃该高位. 右移运算符(>>) 右移运算符是用来将一个数的各二进制位右移若干位,移动的位数由右操作数指定(右操作数必须是非负值),移到右端的低位被舍弃,对于无符号数,高位补0. 对于有符号数,某些机器将对左边空出的部分用符号位填补(即"算术移位"),而另一些机器则对左边空出

随机推荐