Hadoop MultipleOutputs输出到多个文件中的实现方法

Hadoop MultipleOutputs输出到多个文件中的实现方法

1.输出到多个文件或多个文件夹:

驱动中不需要额外改变,只需要在MapClass或Reduce类中加入如下代码

private MultipleOutputs<Text,IntWritable> mos;
public void setup(Context context) throws IOException,InterruptedException {
  mos = new MultipleOutputs(context);
}
public void cleanup(Context context) throws IOException,InterruptedException {
  mos.close();
}

  然后就可以用mos.write(Key key,Value value,String baseOutputPath)代替context.write(key, value);

  在MapClass或Reduce中使用,输出时也会有默认的文件part-m-00*或part-r-00*,不过这些文件是无内容的,大小为0. 而且只有part-m-00*会传给Reduce。

注意:multipleOutputs.write(key, value, baseOutputPath)方法的第三个函数表明了该输出所在的目录(相对于用户指定的输出目录)。

如果baseOutputPath不包含文件分隔符“/”,那么输出的文件格式为baseOutputPath-r-nnnnn(name-r-nnnnn);
如果包含文件分隔符“/”,例如baseOutputPath=“029070-99999/1901/part”,那么输出文件则为029070-99999/1901/part-r-nnnnn

2.案例-需求

需求,下面是有些测试数据,要对这些数据按类目输出到output中:

1512,iphone5s,4英寸,指纹识别,A7处理器,64位,M7协处理器,低功耗

1512,iphone5,4英寸,A6处理器,IOS7

1512,iphone4s,3.5英寸,A5处理器,双核,经典

50019780,ipad,9.7英寸,retina屏幕,丰富的应用

50019780,yoga,联想,待机18小时,外形独特

50019780,nexus 7,华硕&google,7英寸

50019780,ipad mini 2,retina显示屏,苹果,7.9英寸

1101,macbook air,苹果超薄,OS X mavericks

1101,macbook pro,苹果,OS X lion

1101,thinkpad yoga,联想,windows 8,超级本

3.Mapper程序:

package cn.edu.bjut.multioutput;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class MultiOutPutMapper extends Mapper<LongWritable, Text, IntWritable, Text> {

  @Override
  protected void map(LongWritable key, Text value, Context context)
      throws IOException, InterruptedException {
    String line = value.toString().trim();
    if(null != line && 0 != line.length()) {
      String[] arr = line.split(",");
      context.write(new IntWritable(Integer.parseInt(arr[0])), value);
    }
  }

}

4.Reducer程序:

package cn.edu.bjut.multioutput;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;

public class MultiOutPutReducer extends
    Reducer<IntWritable, Text, NullWritable, Text> {

  private MultipleOutputs<NullWritable, Text> multipleOutputs = null;

  @Override
  protected void reduce(IntWritable key, Iterable<Text> values, Context context)
      throws IOException, InterruptedException {
    for(Text text : values) {
      multipleOutputs.write("KeySpilt", NullWritable.get(), text, key.toString()+"/");
      multipleOutputs.write("AllPart", NullWritable.get(), text);
    }
  }

  @Override
  protected void setup(Context context)
      throws IOException, InterruptedException {
    multipleOutputs = new MultipleOutputs<NullWritable, Text>(context);
  }

  @Override
  protected void cleanup(Context context)
      throws IOException, InterruptedException {
    if(null != multipleOutputs) {
      multipleOutputs.close();
      multipleOutputs = null;
    }
  }

}

5.主程序:

package cn.edu.bjut.multioutput;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class MainJob {
  public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    Job job = new Job(conf, "aaa");
    job.setJarByClass(MainJob.class);

    job.setMapperClass(MultiOutPutMapper.class);
    job.setMapOutputKeyClass(IntWritable.class);
    job.setMapOutputValueClass(Text.class);

    job.setReducerClass(MultiOutPutReducer.class);
    job.setOutputKeyClass(NullWritable.class);
    job.setOutputValueClass(Text.class);

    FileInputFormat.addInputPath(job, new Path(args[0]));

    MultipleOutputs.addNamedOutput(job, "KeySpilt", TextOutputFormat.class, NullWritable.class, Text.class);
    MultipleOutputs.addNamedOutput(job, "AllPart", TextOutputFormat.class, NullWritable.class, Text.class);

    Path outPath = new Path(args[1]);
    FileSystem fs = FileSystem.get(conf);
    if(fs.exists(outPath)) {
      fs.delete(outPath, true);
    }
    FileOutputFormat.setOutputPath(job, outPath);

    job.waitForCompletion(true);
  }
}

如有疑问请留言或者到本站社区交流讨论,感谢阅读,希望能帮助到大家,谢谢大家对本站的支持!

(0)

相关推荐

  • Hadoop编程基于MR程序实现倒排索引示例

    相信接触过搜索引擎开发的同学对倒排索引并不陌生,谷歌.百度等搜索引擎都是用的倒排索引,关于倒排索引的有关知识,这里就不再深入讲解,有兴趣的同学到网上了解一下.这篇博文就带着大家一起学习下如何利用Hadoop的MR程序来实现倒排索引的功能. 一.数据准备 1.输入文件数据 这里我们准备三个输入文件,分别如下所示 a.txt hello tom hello jerry hello tom b.txt hello jerry hello jerry tom jerry c.txt hello jerr

  • hadoop动态增加和删除节点方法介绍

    上一篇文章中我们介绍了Hadoop编程基于MR程序实现倒排索引示例的有关内容,这里我们看看如何在Hadoop中动态地增加和删除节点(DataNode). 假设集群操作系统均为:CentOS 6.7 x64 Hadoop版本为:2.6.3 一.动态增加DataNode 1.准备新的DataNode节点机器,配置SSH互信,可以直接复制已有DataNode中.ssh目录中的authorized_keys和id_rsa 2.复制Hadoop运行目录.hdfs目录及tmp目录至新的DataNode 3.

  • hadoop上传文件功能实例代码

    hdfs上的文件是手动执行命令从本地linux上传至hdfs的.在真实的运行环境中,我们不可能每次手动执行命令上传的,这样太过繁琐.那么,我们可以使用hdfs提供的Java api实现文件上传至hdfs,或者直接从ftp上传至hdfs. 然而,需要说明一点,之前笔者是要运行MR,都需要每次手动执行yarn jar,在实际的环境中也不可能每次手动执行.像我们公司是使用了索答的调度平台/任务监控平台,可以定时的以工作流执行我们的程序,包括普通java程序和MR.其实,这个调度平台就是使用了quart

  • 浅谈七种常见的Hadoop和Spark项目案例

    有一句古老的格言是这样说的,如果你向某人提供你的全部支持和金融支持去做一些不同的和创新的事情,他们最终却会做别人正在做的事情.如比较火爆的Hadoop.Spark和Storm,每个人都认为他们正在做一些与这些新的大数据技术相关的事情,但它不需要很长的时间遇到相同的模式.具体的实施可能有所不同,但根据我的经验,它们是最常见的七种项目. 项目一:数据整合 称之为"企业级数据中心"或"数据湖",这个想法是你有不同的数据源,你想对它们进行数据分析.这类项目包括从所有来源获得

  • VMware虚拟机下hadoop1.x的安装方法

    这是Hadoop学习全程记录第1篇,在这篇里我将介绍一下如何在Linux下安装Hadoop1.x. 先说明一下我的开发环境: 虚拟机:VMware8.0: 操作系统:CentOS6.4: 版本:jdk1.8:hadoop1.2.1 ①下载hadoop1.2.1,网盘:链接: https://pan.baidu.com/s/1sl5DMIp 密码: 5p67 下载jdk1.8,网盘:链接: https://pan.baidu.com/s/1boN1gh5 密码: t36h 将 jdk-8u144-

  • Hadoop Combiner使用方法详解

    Hadoop Combiner使用方法详解 Combiner函数是一个可选的中间函数,发生在Map阶段,Mapper执行完成后立即执行.使用Combiner有如下两个优势: Combiner可以用来减少发送到Reducer的数据量,从而提高网络效率. Combiner可以用于减少发送到Reducer的数据量,这将提高Reduce端的效率,因为每个reduce函数将处理相对较少记录,相比于未使用Combiner之前. Combiner与Reducer结构相同,因为Combiner和Reducer都

  • ASP.NET实现Hadoop增删改查的示例代码

    本文介绍了ASP.NET实现Hadoop增删改查的示例代码,分享给大家,具体如下: packages.config <?xml version="1.0" encoding="utf-8"?> <packages> <package id="Microsoft.AspNet.WebApi.Client" version="4.0.20505.0" targetFramework="net

  • hadoop重新格式化HDFS步骤解析

    了解Hadoop的同学都知道,Hadoop有两个核心的组成部分,一个是HDFS,另一个则是MapReduce,HDFS作为Hadoop的数据存储方案,MapReduce则提供计算服务:同时,HDFS作为一种分布式文件系统,它的安装也是需要相应的格式化操作的,如果安装失败或者我们需要重新安装的时候,那我们就需要对HDFS重新进行格式化,这篇文章就和大家一起讨论下如何进行HDFS的重新格式化. 重新格式化hdfs系统的方法: 1.打开hdfs-site.xml 我们打开Hadoop的hdfs-sit

  • Hadoop MultipleOutputs输出到多个文件中的实现方法

    Hadoop MultipleOutputs输出到多个文件中的实现方法 1.输出到多个文件或多个文件夹: 驱动中不需要额外改变,只需要在MapClass或Reduce类中加入如下代码 private MultipleOutputs<Text,IntWritable> mos; public void setup(Context context) throws IOException,InterruptedException { mos = new MultipleOutputs(context

  • Log4j不同模块输出到不同的文件中

    1.实现目标 不同业务的日志信息需要打印到不同的文件中,每天或者每个小时生成一个文件.如,注册的信息打印到register.log,每天凌晨生成一个register-年月日.log文件, 登录信息的日志打印到一个login.log文件中,login-年月日.log. 2.maven配置 <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.or

  • python清除指定目录内所有文件中script的方法

    本文实例讲述了python清除指定目录内所有文件中script的方法.分享给大家供大家参考.具体如下: 将脚本存储为stripscripts.py 调用语法 : python stripscripts.py <directory> 使用范例 : python stripscripts.py d:\myfiles # Hello, this is a script written in Python. See http://www.pyhon.org import os,sys,string,r

  • C#实现通过ffmpeg从flv视频文件中截图的方法

    本文实例讲述了C#实现通过ffmpeg从flv视频文件中截图的方法.分享给大家供大家参考.具体分析如下: 需要先下载ffmpeg,这是开源的,代码如下所示: 复制代码 代码如下: using System; using System.Configuration; public class PublicMethod:System.Web.UI.Page {     public PublicMethod()     {     }     //文件路径     public static stri

  • Go语言清除文件中空行的方法

    本文实例讲述了Go语言清除文件中空行的方法.分享给大家供大家参考.具体实现方法如下: 这里使用Go语言读取源文件,去掉空行,并写到目标文件 复制代码 代码如下: /**  * Created with IntelliJ IDEA.  * User: hyper-carrot  * Date: 12-8-31  * Time: 下午4:04  * To change this template use File | Settings | File Templates.  */ package ma

  • Python3搜索及替换文件中文本的方法

    本文实例讲述了Python3搜索及替换文件中文本的方法.分享给大家供大家参考.具体实现方法如下: # 将文件中的某个字符串改变成另一个 # 下面代码实现从一个特定文件或标准输入读取文件, # 然后替换字符串,然后写入一个指定的文件 import os, sys nargs = len(sys.argv) if not 3 <= nargs <= 5: print('usage: %s search_text repalce_text [infile [outfile]]' % \ os.pat

  • C#编程获取资源文件中图片的方法

    本文实例讲述了C#编程获取资源文件中图片的方法.分享给大家供大家参考.具体实现方法如下: using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Reflection; using System.Drawing; namespace CL { public class RES { /// <summary> /// 定义一个资源文件名 资源文件名 = 工

  • 详解C#编程获取资源文件中图片的方法

    详解C#编程获取资源文件中图片的方法 本文主要介绍C#编程获取资源文件中图片的方法,涉及C#针对项目中资源文件操作的相关技巧,以供借鉴参考.具体内容如下: 例子: using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Reflection; using System.Drawing; namespace CL { public class RES { /

  • 利用numpy和pandas处理csv文件中的时间方法

    环境:numpy,pandas,python3 在机器学习和深度学习的过程中,对于处理预测,回归问题,有时候变量是时间,需要进行合适的转换处理后才能进行学习分析,关于时间的变量如下所示,利用pandas和numpy对csv文件中时间进行处理. date (UTC) Price 01/01/2015 0:00 48.1 01/01/2015 1:00 47.33 01/01/2015 2:00 42.27 #coding:utf-8 import datetime import pandas as

  • Python解析Excle文件中的数据方法

    在公司里面,人力资源部每到发工资的时候就会头疼,如果公司内部有100多号员工,那么发完工资后需要给员工发送工资条的话,那么就需要截图如下图, 但是在公司的薪水保密协议不允许公开所有人的薪水,因此我们需要一个一个的发,现在我们给张三发一下薪资条 如果我们给1000人发的话,我们每个人都截图两次,面上的标题和线面的数据两栏,那么这个工程是比较大的.这个工作是循环的,死板的,那么我们就需要使用程序来解决这个问题. #coding=utf-8 import xlrd data = xlrd.open_w

随机推荐