深入了解Hadoop如何实现序列化

目录
  • 前言
  • 为什么要序列化
  • 为什么不使用Java序列化
  • Hadoop序列化特点
  • Hadoop序列化业务场景
  • 案例业务描述
  • 编码实现

前言

序列化想必大家都很熟悉了,对象在进行网络传输过程中,需要序列化之后才能传输到客户端,或者客户端的数据序列化之后送达到服务端

序列化的标准解释如下:

序列化就是把内存中的对象,转换成字节序列(或其他数据传输协议)以便于存储到磁盘(持久化)和网络传输

对应的反序列化为序列化的逆向过程

反序列化就是将收到字节序列(或其他数据传输协议)或者是磁盘的持久化数据,转换成内存中的对象

为什么要序列化

一般来说,程序动态创建出来的“活的” 对象只生存在内存里,一旦服务停机或断电就没了。而且“活”对象只能存活于本地进程,不能发送到网络上其他的服务器或者进程中使用。 然而通过序列化之后,则可以存储“活的”对象,从而进行网络传输,提供给其他进程或机器使用。

为什么不使用Java序列化

在Java中,创建一个对象如果希望这个对象是序列化的对象,只需要实现Serializable接口即可,但Java的序列化在Hadoop看来,是一个重量级序列化框架,一个对象被序列化后,会附带很多额外的信息(各种校验信息,Header,继承体系等),从而不便于在网络中高效传输。所以,Hadoop自己开发了一套序列化机制,只需要对象实现Writable接口,重写里面的两个方法。

Hadoop序列化特点

  • 紧凑 :高效使用存储空间
  • 快速:读写数据的额外开销小
  • 互操作:支持多语言的交互

Hadoop序列化业务场景

在真实的业务场景中,类似于wordcount那样的单个字符串的场景很少,而且无法应对各种复杂的大数据场景和海量数据的处理业务,因此在传输过程中,为了更加灵活的进行数据在Map、Reduce中的传输,将解析到的数据以序列化对象的方式传输,是非常便捷的

在Hadoop中,具体实现bean对象序列化步骤如下7步:

  1. 实现Writable接口
  2. 反序列化时,需要反射调用空参构造函数,即类对象中必须有空参构造
  3. 重写序列化write的方法
  4. 重写反序列化的readFields方法
  5. 注意反序列化的顺序和序列化的顺序完全一致
  6. 若想把结果显示在文件中,需重写toString(),可用"\t"分开,方便后续用
  7. 如果需将自定义的bean放在key中传输,还需要实现Comparable接口,因为MapReduce框中Shuffle过程要求对key必须能排序

案例业务描述

业务需求描述,如下数据为从某个地方导出来的一批统计手机号峰值流量和低谷流量的文本文件,现在的业务需求是,通过程序,最终输出各个手机号对应的峰值流量、低谷流量以及总流量的统计分析文件

那么最终的效果可按如下格式输出

了解了上面的业务后,下面开始按照前面描述的几个步骤进行编码实现

编码实现

1、定义一个封装手机流量各个属性的对象

从wordcount的案例中我们了解了使用mapreduce编码的基本编码套路,即map逻辑中读取原始数据文件,然后传递到reduce中

同样,在这里的map逻辑中,需要读取上面的原始的流量文本文件,但是既然在reduce中要能实现最终的统计输出,那么从map中出来的数据格式,必然是已经处理好的bean对象,key为手机号,而value值则为封装了当前手机号对应的峰值流量、低谷流量以及计算的总流量信息

了解了这一点,就大概知道这个bean对象该如何定义了

import org.apache.hadoop.io.Writable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class PhoneBean implements Writable {

    //峰值流量
    private long upFlow;
    //低谷流量
    private long downFlow;
    //总流量
    private long sumFlow;

    //提供无参构造
    public PhoneBean() {
    }

    //提供三个参数的getter和setter方法
    public long getUpFlow() {
        return upFlow;
    }

    public void setUpFlow(long upFlow) {
        this.upFlow = upFlow;
    }

    public long getDownFlow() {
        return downFlow;
    }

    public void setDownFlow(long downFlow) {
        this.downFlow = downFlow;
    }

    public long getSumFlow() {
        return sumFlow;
    }

    public void setSumFlow(long sumFlow) {
        this.sumFlow = sumFlow;
    }

    public void setSumFlow() {
        this.sumFlow = this.upFlow + this.downFlow;
    }

    //实现序列化和反序列化方法,注意顺序一定要保持一致
    @Override
    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeLong(upFlow);
        dataOutput.writeLong(downFlow);
        dataOutput.writeLong(sumFlow);
    }

    @Override
    public void readFields(DataInput dataInput) throws IOException {
        this.upFlow = dataInput.readLong();
        this.downFlow = dataInput.readLong();
        this.sumFlow = dataInput.readLong();
    }

    //重写ToString方法
    @Override
    public String toString() {
        return upFlow + "\t" + downFlow + "\t" + sumFlow;
    }

}

2、自定义Mapper类

该类读取和解析文本文件,将各个手机号的属性封装到PhoneBean对象中,并输出到Reduce使用

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class PhoneMapper extends Mapper<LongWritable, Text, Text, PhoneBean> {

    private Text outK = new Text();

    private PhoneBean outV = new PhoneBean();

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String line = value.toString();
        //分割数据
        String[] split = line.split("\t");
        //抓取需要的数据:手机号,上行流量,下行流量
        String phone = split[1];
        String max = split[3];
        String mine = split[4];
        //封装outK outV
        outK.set(phone);
        outV.setUpFlow(Long.parseLong(max));
        outV.setDownFlow(Long.parseLong(mine));
        outV.setSumFlow();
        //写出outK outV
        context.write(outK, outV);
    }
}

3.、自定义Reduce类

关于Reduce中的入参类型和出参类型,到这里想必都已经了解,就不再过多解释了

import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;
import java.util.LinkedList;

public class PhoneMapper extends Mapper<LongWritable, Text, Text, PhoneBean> {

    private Text outK = new Text();

    private PhoneBean outV = new PhoneBean();

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String line = value.toString();
        //分割数据
        String[] splits = line.split("\t");
        LinkedList<String> linkedList = new LinkedList<>();
        for(String str:splits){
            if(StringUtils.isNotEmpty(str)){
                linkedList.add(str.trim());
            }
        }
        //抓取需要的数据:手机号,上行流量,下行流量
        String phone = linkedList.get(1);
        String max =  linkedList.get(3);
        String mine = linkedList.get(4);
        //封装outK outV
        outK.set(phone);
        outV.setUpFlow(Long.parseLong(max));
        outV.setDownFlow(Long.parseLong(mine));
        outV.setSumFlow();
        //写出outK outV
        context.write(outK, outV);
    }
}

4、job类

依照wordcount案例中的模板做即可

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class PhoneJob {

    public static void main(String[] args) throws Exception {

        //1 获取job对象
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);

        //2 关联本Driver类
        job.setJarByClass(PhoneJob.class);

        //3 关联Mapper和Reducer
        job.setMapperClass(PhoneMapper.class);
        job.setReducerClass(PhoneReducer.class);

        //4 设置Map端输出KV类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(PhoneBean.class);

        //5 设置程序最终输出的KV类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(PhoneBean.class);

        //6 设置程序的输入输出路径
        String inPath = "F:\\网盘\\csv\\phone_data.txt";
        String outPath = "F:\\网盘\\csv\\out.txt";
        FileInputFormat.setInputPaths(job, new Path(inPath));
        FileOutputFormat.setOutputPath(job, new Path(outPath));

        //7 提交Job
        boolean b = job.waitForCompletion(true);
        System.exit(b ? 0 : 1);

    }

}

}

运行这段程序,观察是否在输出的目标路径下,生成了统计结果

打开最后那个文件,然后对比下原始的文件,正好满足预期的业务需求

以上就是深入了解Hadoop如何实现序列化的详细内容,更多关于Hadoop序列化的资料请关注我们其它相关文章!

(0)

相关推荐

  • Spark简介以及与Hadoop对比分析

    目录 1. Spark 与 Hadoop 比较 1.1 Haoop 的缺点 1.2 相较于Hadoop MR的优点 2. Spark 生态系统 2.1 大数据处理的三种类型 1. 复杂的批量数据处理 2. 基于历史数据的交互式查询 3. 基于实时数据流的数据处理 2.2 BDAS架构 2.3 Spark 生态系统 3. 基本概念与架构设计 3.1 基本概念 3.2 运行架构 3.3 各种概念之间的相互关系 4. Spark运行基本流程 4.1 运行流程 4.2 运行架构特点 5. Spark的部

  • Hadoop中的压缩与解压缩案例详解

    目录 一:压缩的作用 1.1:压缩的原则: 1.2:MR支持的压缩编码 1.3:压缩性能的比较 1.4:压缩方式的选择 压缩可以在MapReduce作用的任意阶段启用.  二:MapReduce数据压缩 三:压缩的参数配置 3.1:设置reduce输出端的压缩格式 3.2:设置map输入的压缩方式 四:文件的压缩与解压缩案例 压缩主要关注点:压缩率,压缩速度,解压速度,是否可切片 一:压缩的作用 压缩技术能够减少底层HDFS读写字节数,减少磁盘IO,提升网络传输效率,因为磁盘IO和网络带宽是Ha

  • java实现对Hadoop的操作

    基本操作 import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.*; import org.junit.Test; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.DisplayName; import org.junit.runner.RunWith; import org.junit.runners.JUnit

  • 深入浅析Java Object Serialization与 Hadoop 序列化

    一,Java Object Serialization 1,什么是序列化(Serialization) 序列化是指将结构化对象转化为字节流以便在网络上传输或者写到磁盘永久存储的过程.反序列化指将字节流转回结构化对象的逆过程.简单的理解就是对象转换为字节流用来传输和保存,字节流转换为对象将对象恢复成原来的状态. 2,序列化(Serialization)的作用 (1)一种持久化机制,把的内存中的对象状态保存到一个文件中或者数据库. (2)一种通信机制,用套接字在网络上传送对象. (3)Java远程方

  • 深入了解Hadoop如何实现序列化

    目录 前言 为什么要序列化 为什么不使用Java序列化 Hadoop序列化特点 Hadoop序列化业务场景 案例业务描述 编码实现 前言 序列化想必大家都很熟悉了,对象在进行网络传输过程中,需要序列化之后才能传输到客户端,或者客户端的数据序列化之后送达到服务端 序列化的标准解释如下: 序列化就是把内存中的对象,转换成字节序列(或其他数据传输协议)以便于存储到磁盘(持久化)和网络传输 对应的反序列化为序列化的逆向过程 反序列化就是将收到字节序列(或其他数据传输协议)或者是磁盘的持久化数据,转换成内

  • java 中Spark中将对象序列化存储到hdfs

    java 中Spark中将对象序列化存储到hdfs 摘要: Spark应用中经常会遇到这样一个需求: 需要将JAVA对象序列化并存储到HDFS, 尤其是利用MLlib计算出来的一些模型, 存储到hdfs以便模型可以反复利用. 下面的例子演示了Spark环境下从Hbase读取数据, 生成一个word2vec模型, 存储到hdfs. 废话不多说, 直接贴代码了. spark1.4 + hbase0.98 import org.apache.spark.storage.StorageLevel imp

  • 浅谈序列化之protobuf与avro对比(Java)

    最近在做socket通信中用到了关于序列化工具选型的问题,在调研过程中开始趋向于用protobuf,可以省去了编解码的过程.能够实现快速开发,且只需要维护一份协议文件即可. 但是调研过程中发现了protobuf的一些弊端,比如需要生成相应的文件类,和业务绑定太紧密,所以在看了AVRO之后发现它完美解决了这个问题. 下面记录下对这两种序列化工具的入门与测评. 一.protobuf基本操作 protobuf简介: Protocol Buffers (a.k.a., protobuf) are Goo

  • Hadoop中的Python框架的使用指南

    最近,我加入了Cloudera,在这之前,我在计算生物学/基因组学上已经工作了差不多10年.我的分析工作主要是利用Python语言和它很棒的科学计算栈来进行的.但Apache Hadoop的生态系统大部分都是用Java来实现的,也是为Java准备的,这让我很恼火.所以,我的头等大事变成了寻找一些Python可以用的Hadoop框架. 在这篇文章里,我会把我个人对这些框架的一些无关科学的看法写下来,这些框架包括: Hadoop流 mrjob dumbo hadoopy pydoop 其它 最终,在

  • hadoop client与datanode的通信协议分析

    本文主要分析了hadoop客户端read和write block的流程. 以及client和datanode通信的协议, 数据流格式等. hadoop客户端与namenode通信通过RPC协议, 但是client 与datanode通信并没有使用RPC, 而是直接使用socket, 其中读写时的协议也不同, 本文分析了hadoop 0.20.2版本的(0.19版本也是一样的)client与datanode通信的原理与通信协议.  另外需要强调的是0.23及以后的版本中client与datanod

  • Hadoop streaming详细介绍

    Hadoop streaming Hadoop为MapReduce提供了不同的API,可以方便我们使用不同的编程语言来使用MapReduce框架,而不是只局限于Java.这里要介绍的就是Hadoop streaming API.Hadoop streaming 使用Unix的standard streams作为我们mapreduce程序和MapReduce框架之间的接口.所以你可以用任何语言来编写MapReduce程序,只要该语言可以往standard input/output上进行读写. st

  • hadoop二次排序的原理和实现方法

    默认情况下,Map输出的结果会对Key进行默认的排序,但是有时候需要对Key排序的同时还需要对Value进行排序,这时候就要用到二次排序了.下面我们来说说二次排序 1.二次排序原理 我们把二次排序分为以下几个阶段 Map起始阶段 在Map阶段,使用job.setInputFormatClass()定义的InputFormat,将输入的数据集分割成小数据块split,同时InputFormat提供一个RecordReader的实现.在这里我们使用的是TextInputFormat,它提供的Reco

  • Hadoop源码分析五hdfs架构原理剖析

    目录 1. hdfs架构 如果在hadoop配置时写的配置文件不同,启动的服务也有所区别 namenode的下方是三台datanode. namenode左右两边的是两个zkfc. namenode的上方是三台journalnode集群. 2. namenode介绍 namenode作为hdfs的核心,它主要的作用是管理文件的元数据 文件与块的对应关系中的块 namenode负责管理hdfs的元数据 namenode的数据持久化,采用了一种日志加快照的方式 最后还会有一个程序读取这个快照文件和日

随机推荐