java 中自定义OutputFormat的实例详解

java 中 自定义OutputFormat的实例详解

实例代码:

package com.ccse.hadoop.outputformat; 

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.StringTokenizer; 

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; 

public class MySelfOutputFormatApp { 

  public final static String INPUT_PATH = "hdfs://chaoren1:9000/mapinput";
  public final static String OUTPUT_PATH = "hdfs://chaoren1:9000/mapoutput";
  public final static String OUTPUT_FILENAME = "/abc"; 

  public static void main(String[] args) throws IOException, URISyntaxException,
    ClassNotFoundException, InterruptedException {
    Configuration conf = new Configuration();
    FileSystem fileSystem = FileSystem.get(new URI(OUTPUT_PATH), conf);
    fileSystem.delete(new Path(OUTPUT_PATH), true); 

    Job job = new Job(conf, MySelfOutputFormatApp.class.getSimpleName());
    job.setJarByClass(MySelfOutputFormatApp.class); 

    FileInputFormat.setInputPaths(job, new Path(INPUT_PATH));
    job.setMapperClass(MyMapper.class);
    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(LongWritable.class); 

    job.setReducerClass(MyReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(LongWritable.class);
    job.setOutputFormatClass(MyselfOutputFormat.class); 

    job.waitForCompletion(true);
  } 

  public static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable> { 

    private Text word = new Text();
    private LongWritable writable = new LongWritable(1); 

    @Override
    protected void map(LongWritable key, Text value,
        Mapper<LongWritable, Text, Text, LongWritable>.Context context)
        throws IOException, InterruptedException {
      if (value != null) {
        String line = value.toString();
        StringTokenizer tokenizer = new StringTokenizer(line);
        while (tokenizer.hasMoreTokens()) {
          word.set(tokenizer.nextToken());
          context.write(word, writable);
        }
      }
    } 

  } 

  public static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable> { 

    @Override
    protected void reduce(Text key, Iterable<LongWritable> values,
        Reducer<Text, LongWritable, Text, LongWritable>.Context context)
        throws IOException, InterruptedException {
      long sum = 0;
      for (LongWritable value : values) {
        sum += value.get();
      }
      context.write(key, new LongWritable(sum));
    }
  } 

  public static class MyselfOutputFormat extends OutputFormat<Text, LongWritable> { 

    private FSDataOutputStream outputStream = null; 

    @Override
    public RecordWriter<Text, LongWritable> getRecordWriter(
        TaskAttemptContext context) throws IOException,
        InterruptedException {
      try {
        FileSystem fileSystem = FileSystem.get(new URI(MySelfOutputFormatApp.OUTPUT_PATH), context.getConfiguration());
        //指定文件的输出路径
        final Path path = new Path(MySelfOutputFormatApp.OUTPUT_PATH
                     + MySelfOutputFormatApp.OUTPUT_FILENAME);
        this.outputStream = fileSystem.create(path, false);
      } catch (URISyntaxException e) {
        e.printStackTrace();
      }
      return new MySelfRecordWriter(outputStream);
    } 

    @Override
    public void checkOutputSpecs(JobContext context) throws IOException,
        InterruptedException {
    } 

    @Override
    public OutputCommitter getOutputCommitter(TaskAttemptContext context)
        throws IOException, InterruptedException {
      return new FileOutputCommitter(new Path(MySelfOutputFormatApp.OUTPUT_PATH), context);
    } 

  } 

  public static class MySelfRecordWriter extends RecordWriter<Text, LongWritable> { 

    private FSDataOutputStream outputStream = null; 

    public MySelfRecordWriter(FSDataOutputStream outputStream) {
      this.outputStream = outputStream;
    } 

    @Override
    public void write(Text key, LongWritable value) throws IOException,
        InterruptedException {
      this.outputStream.writeBytes(key.toString());
      this.outputStream.writeBytes("\t");
      this.outputStream.writeLong(value.get());
    } 

    @Override
    public void close(TaskAttemptContext context) throws IOException,
        InterruptedException {
      this.outputStream.close();
    } 

  } 

}

 2.OutputFormat是用于处理各种输出目的地的。

2.1 OutputFormat需要写出去的键值对,是来自于Reducer类,是通过RecordWriter获得的。

2.2 RecordWriter中的write(...)方法只有k和v,写到哪里去哪?这要通过单独传入OutputStream来处理。write就是把k和v写入到OutputStream中的。

2.3 RecordWriter类位于OutputFormat中的。因此,我们自定义的OutputFromat必须继承OutputFormat类型。那么,流对象必须在getRecordWriter(...)方法中获得。

以上就是java 中自定义OutputFormat的实例,如有疑问请留言或者到本站社区交流讨论,感谢阅读,希望能帮助到大家,谢谢大家对本站的支持!

(0)

相关推荐

  • Java中FilterInputStream和FilterOutputStream的用法详解

    FilterInputStream FilterInputStream 的作用是用来"封装其它的输入流,并为它们提供额外的功能".它的常用的子类有BufferedInputStream和DataInputStream. BufferedInputStream的作用就是为"输入流提供缓冲功能,以及mark()和reset()功能". DataInputStream 是用来装饰其它输入流,它"允许应用程序以与机器无关方式从底层输入流中读取基本 Java 数据类

  • Java的DataInputStream和DataOutputStream数据输入输出流

    DataInputStream  DataInputStream 是数据输入流.它继承于FilterInputStream. DataInputStream 是用来装饰其它输入流,它"允许应用程序以与机器无关方式从底层输入流中读取基本 Java 数据类型".应用程序可以使用DataOutputStream(数据输出流)写入由DataInputStream(数据输入流)读取的数据. DataInputStream 函数列表: DataInputStream(InputStream in)

  • java DataInputStream和DataOutputStream详解及实例代码

    java DataInputStream和DataOutputStream详解 操作基本数据类型的流 DataInputStream DataOutputStream import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.IOException; public c

  • java中FileOutputStream中文乱码问题解决办法

    java中FileOutputStream中文乱码问题解决办法 使用FileOutputStream序列化可以直接向文件写入文本内容,代码如下: FileOutputStream outStream = new FileOutputStream(file); outStream.write(str.getBytes()); outStream.close(); 但这里的字符串如果包含中文,就会出现乱码,这是因为FileOutputStream是字节流,将文本按字节写入文件,而一个汉字是两个字节,

  • java IO流 之 输出流 OutputString()的使用

    FileOutPutStream:子类,写出数据的通道 步骤: 1.获取目标文件 2.创建通道(如果原来没有目标文件,则会自动创建一个) 3.写入数据 write() 4.释放资源 注意: (1)如果目标文件不存在,那么会自己创建一个目标文件 (2)如果目标文件存在,先将里面的数据清空,再写入数据 (3)想在原有的数据上写入数据,则在创建通道的时候使用 构造方法: OutPutStream(File file,Boolean append),boolean值为true则可以 (4)用 write

  • Java中的BufferedInputStream与BufferedOutputStream使用示例

    BufferedInputStream  BufferedInputStream 是缓冲输入流.它继承于FilterInputStream. BufferedInputStream 的作用是为另一个输入流添加一些功能,例如,提供"缓冲功能"以及支持"mark()标记"和"reset()重置方法". BufferedInputStream 本质上是通过一个内部缓冲区数组实现的.例如,在新建某输入流对应的BufferedInputStream后,当我

  • java 中自定义OutputFormat的实例详解

    java 中 自定义OutputFormat的实例详解 实例代码: package com.ccse.hadoop.outputformat; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apa

  • java中Spring Security的实例详解

    java中Spring Security的实例详解 spring security是一个多方面的安全认证框架,提供了基于JavaEE规范的完整的安全认证解决方案.并且可以很好与目前主流的认证框架(如CAS,中央授权系统)集成.使用spring security的初衷是解决不同用户登录不同应用程序的权限问题,说到权限包括两部分:认证和授权.认证是告诉系统你是谁,授权是指知道你是谁后是否有权限访问系统(授权后一般会在服务端创建一个token,之后用这个token进行后续行为的交互). spring

  • Java 中This用法的实例详解

     Java 中This用法的实例详解 用类名定义一个变量的时候,定义的只是一个引用,外面可以通过这个引用来访问这个类里面的属性和方法. 那们类里面是够也应该有一个引用来访问自己的属性和方法纳? 呵呵,Java提供了一个很好的东西,就是 this 对象,它可以在类里面来引用这个类的属性和方法.先来个简单的例子: public class ThisDemo { String name="Mick"; public void print(String name){ System.out.pr

  • java中的interface接口实例详解

     java中的interface接口实例详解 接口:Java接口是一些方法表征的集合,但是却不会在接口里实现具体的方法. java接口的特点如下: 1.java接口不能被实例化 2.java接口中声明的成员自动被设置为public,所以不存在private成员 3.java接口中不能出现方法的具体实现. 4.实现某个接口就必须要实现里面定义的所有方法. 接下来看一个实现接口的案例: package hello;   interface competer{ //定义接口 void set_comp

  • Java中的动态和静态编译实例详解

    Java中的动态和静态编译实例详解 首先,我们来说说动态和静态编译的问题. Q: java和javascript有什么区别?    总结了一下:有以下几点吧: 1.首先从运行环境来说java代码是在JVM上编译成class文件,而javascript则直接在浏览器上加载运行. 2.由第一点可看出,java代码需要编译,而javascript不需要编译. 3.从语言性质来说,java是一种高级编程语言,对变量检查要求严格,javascript只是一个简单的解释性的脚本语言,对变量检查及要求很弱.

  • java 中createStatement()方法的实例详解

    java 中createStatement()方法的实例详解 用缺省设置创建时,ResultSet 是一种只能访问一次(one-time-through).只能向前访问(forward-only)和只读的对象.您只能访问数据一次,如果再次需要该 数据,必须重新查询数据库. 然而,并不只有这一种方式.通过设置 Statement 对象上的参数,您可以控制它产生的 ResultSet.例如: ... Class.forName(driverName); db = DriverManager.getC

  • Java中IO流 字节流实例详解

    Java中IO流 字节流实例详解 IO流(输入流.输出流),又分为字节流.字符流. 流是磁盘或其它外围设备中存储的数据的源点或终点. 输入流:程序从输入流读取数据源.数据源包括外界(键盘.文件.网络-),即是将数据源读入到程序的通信通道. 输出流:程序向输出流写入数据.将程序中的数据输出到外界(显示器.打印机.文件.网络-)的通信通道. 字节流 1.InputStream.OutputStream InputStream抽象了应用程序读取数据的方式 OutputStream抽象了应用程序写出数据

  • java 中死锁问题的实例详解

    java 中死锁问题的实例详解 先看代码在做解释 public class DeadLock implements Runnable{ String a; String b; boolean flag; public DeadLock(String a,String b,boolean flag){ this.a=a; this.b=b; this.flag=flag; } public void run(){ if(flag){ // while(true){ synchronized(a){

  • Java 中synchronize函数的实例详解

    Java 中synchronize函数的实例详解 java中的一个类的成员函数若用synchronized来修饰,则对应同一个对象,多个线程像调用这个对象的这个同步函数时必须等到上一个线程调用完才能由下一个线程调用. 那么如果一个类同时有两个成员函数是由synchronized修饰如代码所示,对与同一个对象,是否可以在两个线程运行时,一个调用funcA,同时另一个调用funcB? Mysyn是这样一个类,如果我有两个线程,一个在run方法中先运行funcA再运行funcB,另一个线程在run方法

  • java 中@Deprecated 注解的实例详解

    java 中@Deprecated 注解的实例详解 1 简介 Deprecated 同 SuppressWarnings 一样,都是 J2SE 5.0 中定义在Java.lang包中的标准 Annotation 之一,其可以标注在类.字段和方法上,其作用为:不鼓励程序员使用被 @Deprecated 注释的程序元素,因为被 @Deprecated 注释的元素很危险(例如,现阶段 JDK 提供的带有 @Deprecated 注释的元素在以后的 JDK 版本中可能被删除)或存在更好的选择.在使用不被

随机推荐