Hadoop之Mapreduce序列化

目录
  • 什么是序列化:
  • 什么是反序列化:
  • 为什么要序列化:
  • Java的序列化:
  • Hadoop序列化:
  • 自定义序列化接口:
  • 实现序列化的步骤:
    • 先看源码进行简单分析:
  • 序列化案例实操:
    • 案例需求:
      • (1)输入数据:
      • (2)输入数据格式:
      • (3)期望输出数据格式
    • 需求分析:
      • 编写MapperReduce程序:

什么是序列化:

序列化就是把内存中的对象,转换成字节序列(或其他数据传输协议)以便于存储到磁盘(持久化)和网络传输。

什么是反序列化:

 反序列化就是将收到字节序列(或其他数据传输协议)或者是磁盘的持久化数据,转换成内存中的对象。

为什么要序列化:

一般来说,“活的”对象只生存在内存里,关机断 电就没有了。而且“活的”对象只能由本地的进程使用,不能被发送到网络上的另外一台计算机。 然而序列化可以存储“活的”对象,可以将“活的”对象发送到远程计算机。

Java的序列化:

在Java中也是有序列化的,我们为什么不通过idea使用Java的序列化那?

因为Java的序列化框架(Serializable)是一个繁重的框架,附带信息比较多(各种校验信息,Header,继承体系等),不便于在网络中高效传输。所以,Hadoop自己开发了一套序列化机制(Writable)。

Hadoop序列化:

Hadoop的序列化比较精简,只有简单的校验,有紧凑(高效使用存储空间),快速(读写数据的额外开销小),互操作(支持多语言的交互)的特点。

自定义序列化接口:

在开发过程中,基本序列化类型不能满足所有需求,比如在Hadoop框架内部传递一个bean对象(不是基本的数据类型(某个类)----没有对应的Hadoop类型),那么该对象就需要实现序列化接口。

实现序列化的步骤:

先看源码进行简单分析:

Writable接口(好像也分析不出什么)

两个方法:

1.write: 进行序列化

2.readFields:进行反序列化

(1)  反序列化时,需要反射调用空参构造函数,所以必须有空参构造

public FlowBean() {
	super();
}

(2)  重写接口中的两个方法***(注意:反序列化的顺序和序列化的顺序完全一致)

如数据结构中的队列一样先进先出,先序列化则先反序列化

(3)需要重写toString()方法,因为需要打印出来,否则打印出来的是地址

(4)  如果需要将自定义的bean放在key中传输,则还需要实现Comparable接口,因为MapReduce框中的Shuffle过程要求对key必须能排序。(比如:上一篇博客中的计算单词出现次数中 最后呈现的单词是按照26个英文字母的顺序进行排序的)

看一个样例源码(字符串Text):

看到上图  实现接口:

WritableComparable<BinaryComparable>

跟进一下:

看到该接口继承自Comparable接口(这是Java中的一个API)

序列化案例实操:

案例需求:

统计每一个手机号耗费的总上行流量、总下行流量、总流量

(1)输入数据:

(2)输入数据格式:

(3)期望输出数据格式

需求分析:

先输入数据,输入数据后需要进行mapper阶段---》reduce阶段---》输出阶段

mapper阶段:

先考虑输入kv   (k---偏移量    v是一行数据)

输出(kv)为reduce的输入(kv) (本样例中使用的k是手机号--统计手机号的流量使用      v为上行流量,下行流量,总流量    则需要封装bean类(自定义对象)  再进行序列化传输(为什么要进行序列化那?----因为再计算的过程中可能由于资源问题mapper和reduce不在同一台服务器上))

输出(kv)同样也是(手机号,bean类)

编写MapperReduce程序:

1.FlowBean代码:

package com.tangxiaocong.mapreduce.writable;

import org.apache.hadoop.io.Writable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

/*
*
* 定义bean类
* 需要实现writable
* 重写序列化和反序列化方法
* 重写空参构造
* 重写tostring方法
*
* */
public class FlowBean  implements Writable {
   private  long upFlow;
    private  long downFlow;
    private  long sumFlow;

    public long getUpFlow() {
        return upFlow;
    }

    public void setUpFlow(long upFlow) {
        this.upFlow = upFlow;
    }

    public long getDownFlow() {
        return downFlow;
    }

    public void setDownFlow(long downFlow) {
        this.downFlow = downFlow;
    }

    public long getSumFlow() {
        return sumFlow;
    }

    public void setSumFlow(long sumFlow) {
        this.sumFlow = sumFlow;
    }
    public void setSumFlow() {
        this.sumFlow = this.downFlow+this.upFlow;
    }
    //生成空参构造函数由于反射  快捷键alt   + insert

    public FlowBean() {
    }

    @Override
    public void write(DataOutput out) throws IOException {
        //序列化方法
        //  向缓冲流中写入Long类型的数据
        out.writeLong(upFlow);
        out.writeLong(downFlow);
        out.writeLong(sumFlow);
    }

    @Override
    public void readFields(DataInput in) throws IOException {
//反序列化方法
        //读取缓冲区中数据
        this.upFlow= in.readLong();
        this.downFlow= in.readLong();
        this.sumFlow= in.readLong();
    }

    @Override
    public String toString() {
        return upFlow + "\t"+downFlow +"\t"+ sumFlow ;
    }
}

2.Mapper代码:

package com.tangxiaocong.mapreduce.writable;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class FlowMapper extends Mapper<LongWritable, Text,Text,FlowBean> {

    private Text outK=new Text();
    private  FlowBean outV=new FlowBean();  //调用的无参构造函数

    @Override
    protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, FlowBean>.Context context) throws IOException, InterruptedException {
        //1 获取一行
        //1	13736230513	192.196.100.1	www.atguigu.com	2481	24681	200

        String s = value.toString();// 将数据转换成string
        //2 进行切割

        String[] split = s.split("\t"); //将数据按写入形式进行切割
        //3 抓取想要的数据
        //根据角标获取  手机号  上行流量  下行流量

        String phone = split[1];
        String up = split[split.length - 3];//  不能正序 因为有的属性是没有字段的
        String down = split[split.length - 2];
//     封装输出的kv

        outK.set(phone);
        outV.setUpFlow(Long.parseLong(up));//  up为string类型
        outV.setDownFlow(Long.parseLong(down));
        outV.setSumFlow();          //

        //写出
        context.write(outK,outV);
    }
}

3. reduce代码:

package com.tangxiaocong.mapreduce.writable;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class FlowReducer extends Reducer  <Text,FlowBean,Text,FlowBean>{
   private FlowBean outv=new FlowBean();

    @Override
    protected void reduce(Text key, Iterable<FlowBean> values, Reducer<Text, FlowBean, Text, FlowBean>.Context context) throws IOException, InterruptedException {

         long totalUp=0;
         long totaldown=0;

        //分析   传入TEXT  为手机号  后边为集合(Bean类的对象的集合)输出还是一个一个bean类  (每个手机号的总和)
        for (FlowBean value : values) {  //传入的参数是同一个key的
            totalUp+=value.getUpFlow();
            totaldown+=value.getDownFlow();
        }
        //  现在求出的是每个手机号的总的上行流量  下行流量
            //封装  key不需要
        //outv
    outv.setUpFlow(totalUp);
    outv.setDownFlow(totaldown);
    outv.setSumFlow();
    //写出
        context.write(key,outv);
    }
}

4.driver代码:

package com.tangxiaocong.mapreduce.writable;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class FlowDriver {
    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
        //获取JOB
        Configuration entries = new Configuration();
        Job job = Job.getInstance(entries);

        job.setJarByClass(FlowDriver.class);
        //关联mapper  和reduce
        job.setMapperClass(FlowMapper.class);
        job.setReducerClass(FlowReducer.class);

        //设置mapper  输出的key 和value
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(FlowBean.class);

        // 设置最终的数据输出的key和value 类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);

        //设置数据的输入路径和输出路径
        FileInputFormat.setInputPaths(job, new Path("D:\\hadoop\\phone_data.txt"));
        FileOutputFormat.setOutputPath(job, new Path("D:\\hadoop\\output3"));
        //提交job
        boolean b = job.waitForCompletion(true);
        System.exit(b ? 0 : 1);

    }

}

最后运行

出现了bug  经过两个小时的调试  找出答案   是在driver类中设置mapper类输出kv类型出现差错没有配型成功

现在是运作正确的

到此这篇关于Hadoop之Mapreduce序列化的文章就介绍到这了,更多相关Mapreduce序列化内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • 用PHP和Shell写Hadoop的MapReduce程序

    使得任何支持标准IO (stdin, stdout)的可执行程序都能成为hadoop的mapper或者 reducer.例如: 复制代码 代码如下: hadoop jar hadoop-streaming.jar -input SOME_INPUT_DIR_OR_FILE -output SOME_OUTPUT_DIR -mapper /bin/cat -reducer /usr/bin/wc 在这个例子里,就使用了Unix/Linux自带的cat和wc工具来作为mapper / reducer

  • hadoop之MapReduce框架原理

    目录 MapReduce框架的简单运行机制: Mapper阶段: InputFormat数据输入: 切片与MapTask并行度决定机制: job提交过程源码解析: 切片逻辑: 1)FileInputFormat实现类 进行虚拟存储 (1)虚拟存储过程: Shuffle阶段: 排序: Combiner合并: ReduceTask阶段: Reduce Join: Map Join: MapReduce框架的简单运行机制: MapReduce是分为两个阶段的,MapperTask阶段,和ReduceT

  • Java/Web调用Hadoop进行MapReduce示例代码

    Hadoop环境搭建详见此文章http://www.jb51.net/article/33649.htm. 我们已经知道Hadoop能够通过Hadoop jar ***.jar input output的形式通过命令行来调用,那么如何将其封装成一个服务,让Java/Web来调用它?使得用户可以用方便的方式上传文件到Hadoop并进行处理,获得结果.首先,***.jar是一个Hadoop任务类的封装,我们可以在没有jar的情况下运行该类的main方法,将必要的参数传递给它.input 和outpu

  • Java大数据开发Hadoop MapReduce

    目录 1 MapRedcue的介绍 1.1 MapReduce定义 1.2 MapReduce的思想 1.3 MapReduce优点 1.4 MapReduce的缺点 1.5 MapReduce进程 1.6 MapReduce-WordCount 2 Hadoop序列化 2.1 序列化的定义 2.2 hadoop序列化和java序列化的区别 3 MapReduce 的原理 3.1 MapReduce 工作的过程 3.2 InputFormat 数据输入 3.2.1 切片 3.2.2 FileIn

  • Hadoop MapReduce多输出详细介绍

    Hadoop MapReduce多输出 FileOutputFormat及其子类产生的文件放在输出目录下.每个reducer一个文件并且文件由分区号命名:part-r-00000,part-r-00001,等等.有时可能要对输出的文件名进行控制或让每个reducer输出多个文件.MapReduce为此提供了MultipleOutputFormat类. MultipleOutputFormat类可以将数据写到多个文件,这些文件的名称源于输出的键和值或者任意字符串.这允许每个reducer(或者只有

  • 深入了解Hadoop如何实现序列化

    目录 前言 为什么要序列化 为什么不使用Java序列化 Hadoop序列化特点 Hadoop序列化业务场景 案例业务描述 编码实现 前言 序列化想必大家都很熟悉了,对象在进行网络传输过程中,需要序列化之后才能传输到客户端,或者客户端的数据序列化之后送达到服务端 序列化的标准解释如下: 序列化就是把内存中的对象,转换成字节序列(或其他数据传输协议)以便于存储到磁盘(持久化)和网络传输 对应的反序列化为序列化的逆向过程 反序列化就是将收到字节序列(或其他数据传输协议)或者是磁盘的持久化数据,转换成内

  • java 中Spark中将对象序列化存储到hdfs

    java 中Spark中将对象序列化存储到hdfs 摘要: Spark应用中经常会遇到这样一个需求: 需要将JAVA对象序列化并存储到HDFS, 尤其是利用MLlib计算出来的一些模型, 存储到hdfs以便模型可以反复利用. 下面的例子演示了Spark环境下从Hbase读取数据, 生成一个word2vec模型, 存储到hdfs. 废话不多说, 直接贴代码了. spark1.4 + hbase0.98 import org.apache.spark.storage.StorageLevel imp

  • Hadoop中的Python框架的使用指南

    最近,我加入了Cloudera,在这之前,我在计算生物学/基因组学上已经工作了差不多10年.我的分析工作主要是利用Python语言和它很棒的科学计算栈来进行的.但Apache Hadoop的生态系统大部分都是用Java来实现的,也是为Java准备的,这让我很恼火.所以,我的头等大事变成了寻找一些Python可以用的Hadoop框架. 在这篇文章里,我会把我个人对这些框架的一些无关科学的看法写下来,这些框架包括: Hadoop流 mrjob dumbo hadoopy pydoop 其它 最终,在

  • Hadoop streaming详细介绍

    Hadoop streaming Hadoop为MapReduce提供了不同的API,可以方便我们使用不同的编程语言来使用MapReduce框架,而不是只局限于Java.这里要介绍的就是Hadoop streaming API.Hadoop streaming 使用Unix的standard streams作为我们mapreduce程序和MapReduce框架之间的接口.所以你可以用任何语言来编写MapReduce程序,只要该语言可以往standard input/output上进行读写. st

  • hadoop 详解如何实现数据排序

    目录 前言 MapReduce排序 MapReduce排序分类 1.部分排序 2.全排序 3.辅助排序 4.二次排序 自定义排序案例 1.自定义一个Bean对象,实现WritableComparable接口 2.自定义Mapper 3.自定义Reducer 4.自定义Driver类 分区内排序案例 1.添加自定义分区 2.改造Driver类 前言 在hadoop的MapReduce中,提供了对于客户端的自定义排序的功能相关API MapReduce排序 默认情况下,MapTask 和Reduce

  • 在Hadoop集群环境中为MySQL安装配置Sqoop的教程

    Sqoop是一个用来将Hadoop和关系型数据库中的数据相互转移的工具,可以将一个关系型数据库(例如 : MySQL ,Oracle ,Postgres等)中的数据导进到Hadoop的HDFS中,也可以将HDFS的数据导进到关系型数据库中. Sqoop中一大亮点就是可以通过hadoop的mapreduce把数据从关系型数据库中导入数据到HDFS. 一.安装sqoop 1.下载sqoop压缩包,并解压 压缩包分别是:sqoop-1.2.0-CDH3B4.tar.gz,hadoop-0.20.2-C

  • Hadoop组件简介

    安装hbase 首先下载hbase的最新稳定版本 http://www.apache.org/dyn/closer.cgi/hbase/ 安装到本地目录中,我安装的是当前用户的hadoop/hbase中 tar -zxvf hbase-0.90.4.tar.gz 单机模式 修改配置文件 conf/hbase_env.sh 配置JDK的路径 修改conf/hbase-site.xml hbase.rootdir file:///home/${user.name}/hbase-tmp 完成后启动 b

随机推荐