Hadoop对文本文件的快速全局排序实现方法及分析

一、背景

Hadoop中实现了用于全局排序的InputSampler类和TotalOrderPartitioner类,调用示例是org.apache.hadoop.examples.Sort。

但是当我们以Text文件作为输入时,结果并非按Text中的string列排序,而且输出结果是SequenceFile。

原因:

1) hadoop在处理Text文件时,key是行号LongWritable类型,InputSampler抽样的是key,TotalOrderPartitioner也是用key去查找分区。这样,抽样得到的partition文件是对行号的抽样,结果自然是根据行号来排序。

2)大数据量时,InputSampler抽样速度会非常慢。比如,RandomSampler需要遍历所有数据,IntervalSampler需要遍历文件数与splits数一样。SplitSampler效率比较高,但它只抽取每个文件前面的记录,不适合应用于文件内有序的情况。

二、功能

1. 实现了一种局部抽样方法PartialSampler,适用于输入数据各文件是独立同分布的情况

2. 使RandomSampler、IntervalSampler、SplitSampler支持对文本的抽样

3. 实现了针对Text文件string列的TotalOrderPartitioner

三、实现

1. PartialSampler

PartialSampler从第一份输入数据中随机抽取第一列文本数据。PartialSampler有两个属性:freq(采样频率),numSamples(采样总数)。

public K[] getSample(InputFormat<K,V> inf, JobConf job) throws IOException {
   InputSplit[] splits = inf.getSplits(job, job.getNumMapTasks());
   ArrayList<K> samples = new ArrayList<K>(numSamples);
   Random r = new Random();
   long seed = r.nextLong();
   r.setSeed(seed);
   LOG.debug("seed: " + seed);
   // 对splits【0】抽样
   for (int i = 0; i < 1; i++) {
    System.out.println("PartialSampler will getSample splits["+i+"]");
    RecordReader<K,V> reader = inf.getRecordReader(splits[i], job,
      Reporter.NULL);
    K key = reader.createKey();
    V value = reader.createValue();
    while (reader.next(key, value)) {
     if (r.nextDouble() <= freq) {
      if (samples.size() < numSamples) {
        // 选择value中的第一列抽样
        Text value0 = new Text(value.toString().split("\t")[0]);
        samples.add((K) value0);
      } else {
       // When exceeding the maximum number of samples, replace a
       // random element with this one, then adjust the frequency
       // to reflect the possibility of existing elements being
       // pushed out
       int ind = r.nextInt(numSamples);
       if (ind != numSamples) {
        Text value0 = new Text(value.toString().split("\t")[0]);
        samples.set(ind, (K) value0);
       }
       freq *= (numSamples - 1) / (double) numSamples;
      }
      key = reader.createKey();
     }
    }
    reader.close();
   }
   return (K[])samples.toArray();
  }

首先通过InputFormat的getSplits方法得到所有的输入分区;

然后扫描第一个分区中的记录进行采样。

记录采样的具体过程如下:

从指定分区中取出一条记录,判断得到的随机浮点数是否小于等于采样频率freq

  如果大于则放弃这条记录;

  如果小于,则判断当前的采样数是否小于最大采样数,

    如果小于则这条记录被选中,被放进采样集合中;

    否则从【0,numSamples】中选择一个随机数,如果这个随机数不等于最大采样数numSamples,则用这条记录替换掉采样集合随机数对应位置的记录,同时采样频率freq减小变为freq*(numSamples-1)/numSamples。

然后依次遍历分区中的其它记录。

note:

1)PartialSampler只适用于输入数据各文件是独立同分布的情况。

2)自带的三种Sampler通过修改samples.add(key)为samples.add((K) value0); 也可以实现对第一列的抽样。

2. TotalOrderPartitioner

TotalOrderPartitioner主要改进了两点:

1)读partition时指定keyClass为Text.class

因为partition文件中的key类型为Text

在configure函数中,修改:

//Class<K> keyClass = (Class<K>)job.getMapOutputKeyClass();
Class<K> keyClass = (Class<K>)Text.class;

2)查找分区时,改用value查

public int getPartition(K key, V value, int numPartitions) {
  Text value0 = new Text(value.toString().split("\t")[0]);
  return partitions.findPartition((K) value0);
 }

3. Sort

1)设置InputFormat、OutputFormat、OutputKeyClass、OutputValueClass、MapOutputKeyClass

2)初始化InputSampler对象,抽样

3)partitionFile通过CacheFile传给TotalOrderPartitioner,执行MapReduce任务

 Class<? extends InputFormat> inputFormatClass = TextInputFormat.class;
  Class<? extends OutputFormat> outputFormatClass = TextOutputFormat.class;
  Class<? extends WritableComparable> outputKeyClass = Text.class;
  Class<? extends Writable> outputValueClass = Text.class;
  jobConf.setMapOutputKeyClass(LongWritable.class);
  // Set user-supplied (possibly default) job configs
  jobConf.setNumReduceTasks(num_reduces);
  jobConf.setInputFormat(inputFormatClass);
  jobConf.setOutputFormat(outputFormatClass);
  jobConf.setOutputKeyClass(outputKeyClass);
  jobConf.setOutputValueClass(outputValueClass);
  if (sampler != null) {
   System.out.println("Sampling input to effect total-order sort...");
   jobConf.setPartitionerClass(TotalOrderPartitioner.class);
   Path inputDir = FileInputFormat.getInputPaths(jobConf)[0];
   inputDir = inputDir.makeQualified(inputDir.getFileSystem(jobConf));
   //Path partitionFile = new Path(inputDir, "_sortPartitioning");
   TotalOrderPartitioner.setPartitionFile(jobConf, partitionFile);
   InputSampler.<K,V>writePartitionFile(jobConf, sampler);
   URI partitionUri = new URI(partitionFile.toString() + "#" + "_sortPartitioning");
   DistributedCache.addCacheFile(partitionUri, jobConf);
   DistributedCache.createSymlink(jobConf);
  }
  FileSystem hdfs = FileSystem.get(jobConf);
  hdfs.delete(outputpath);
  hdfs.close();
  System.out.println("Running on " +
    cluster.getTaskTrackers() +
    " nodes to sort from " +
    FileInputFormat.getInputPaths(jobConf)[0] + " into " +
    FileOutputFormat.getOutputPath(jobConf) +
    " with " + num_reduces + " reduces.");
  Date startTime = new Date();
  System.out.println("Job started: " + startTime);
  jobResult = JobClient.runJob(jobConf);

四、执行

usage:

hadoop jar yitengfei.jar com.yitengfei.Sort [-m <maps>] [-r <reduces>]
[-splitRandom <double pcnt> <numSamples> <maxsplits> | // Sample from random splits at random (general)
-splitSample <numSamples> <maxsplits> | // Sample from first records in splits (random data)
-splitInterval <double pcnt> <maxsplits>] // Sample from splits at intervals (sorted data)
-splitPartial <double pcnt> <numSamples> <maxsplits> | // Sample from partial splits at random (general) ]
<input> <output> <partitionfile>

Example:

hadoop jar yitengfei.jar com.yitengfei.Sort -r 10 -splitPartial 0.1 10000 10 /user/rp-rd/yitengfei/sample/input /user/rp-rd/yitengfei/sample/output /user/rp-rd/yitengfei/sample/partition

五、性能

200G输入数据,15亿条url,1000个分区,排序时间只用了6分钟

总结

以上就是本文关于Hadoop对文本文件的快速全局排序实现方法及分析的全部内容,希望对大家有所帮助 ,感兴趣的朋友可以继续参阅本站:hadoop重新格式化HDFS步骤解析、浅谈七种常见的Hadoop和Spark项目案例。如有不足之处,欢迎留言指出,感谢朋友们对本站的支持!

(0)

相关推荐

  • Hadoop 中 HBase Shell命令的详解

    Hadoop 中 HBase Shell命令的详解 HBase包含可以与HBase进行通信的Shell. HBase使用Hadoop文件系统来存储数据.所有这些任务发生在HDFS.下面给出的是一些由 常用的HBase Shell命令. 数据操纵语言 命令 说明 命令表达式 create 创建一个表 create '表名称', '列名称1','列名称2','列名称N' put  添加记录 put '表名称', '行名称', '列名称:', '值' get  查看记录 get '表名称', '行名称

  • 浅谈七种常见的Hadoop和Spark项目案例

    有一句古老的格言是这样说的,如果你向某人提供你的全部支持和金融支持去做一些不同的和创新的事情,他们最终却会做别人正在做的事情.如比较火爆的Hadoop.Spark和Storm,每个人都认为他们正在做一些与这些新的大数据技术相关的事情,但它不需要很长的时间遇到相同的模式.具体的实施可能有所不同,但根据我的经验,它们是最常见的七种项目. 项目一:数据整合 称之为"企业级数据中心"或"数据湖",这个想法是你有不同的数据源,你想对它们进行数据分析.这类项目包括从所有来源获得

  • Hadoop上Data Locality的详解

    Hadoop上Data Locality的详解 Hadoop上的Data Locality是指数据与Mapper任务运行时数据的距离接近程度(Data Locality in Hadoop refers to the"proximity" of the data with respect to the Mapper tasks working on the data.) 1. why data locality is imporant? 当数据集存储在HDFS中时,它被划分为块并存储在

  • ASP.NET实现Hadoop增删改查的示例代码

    本文介绍了ASP.NET实现Hadoop增删改查的示例代码,分享给大家,具体如下: packages.config <?xml version="1.0" encoding="utf-8"?> <packages> <package id="Microsoft.AspNet.WebApi.Client" version="4.0.20505.0" targetFramework="net

  • Hadoop编程基于MR程序实现倒排索引示例

    相信接触过搜索引擎开发的同学对倒排索引并不陌生,谷歌.百度等搜索引擎都是用的倒排索引,关于倒排索引的有关知识,这里就不再深入讲解,有兴趣的同学到网上了解一下.这篇博文就带着大家一起学习下如何利用Hadoop的MR程序来实现倒排索引的功能. 一.数据准备 1.输入文件数据 这里我们准备三个输入文件,分别如下所示 a.txt hello tom hello jerry hello tom b.txt hello jerry hello jerry tom jerry c.txt hello jerr

  • docker 搭建hadoop以及hbase集群详解

    要用docker搭建集群,首先需要构造集群所需的docker镜像.构建镜像的一种方式是,利用一个已有的镜像比如简单的linux系统,运行一个容器,在容器中手动的安装集群所需要的软件并进行配置,然后commit容器到新的镜像.另一种方式是,使用Dockerfile来自动化的构造镜像. 下面采用第二种. 1. 创建带ssh服务的ubuntu14.04系统镜像 使用ubuntu14系统来安装hadoop和hbase,由于hadoop集群机器之间通过ssh通信,所以需要在ubuntu14系统中安装ssh

  • hadoop动态增加和删除节点方法介绍

    上一篇文章中我们介绍了Hadoop编程基于MR程序实现倒排索引示例的有关内容,这里我们看看如何在Hadoop中动态地增加和删除节点(DataNode). 假设集群操作系统均为:CentOS 6.7 x64 Hadoop版本为:2.6.3 一.动态增加DataNode 1.准备新的DataNode节点机器,配置SSH互信,可以直接复制已有DataNode中.ssh目录中的authorized_keys和id_rsa 2.复制Hadoop运行目录.hdfs目录及tmp目录至新的DataNode 3.

  • hadoop格式化HDFS出现错误解决办法

    hadoop格式化HDFS出现错误解决办法 报错信息: host:java.net.UnknownHostException: centos-wang: centos-wang: unknown error 在执行hadoop namenode -format命令时,出现未知的主机名. 问题原因: 出现这种问题的原因是Hadoop在格式化HDFS的时候,通过hostname命令获取到的主机名与/etc/hosts文件中进行映射的时候,没有找到. 解决方案: 1.修改/etc/hosts内容 2.

  • VMware虚拟机下hadoop1.x的安装方法

    这是Hadoop学习全程记录第1篇,在这篇里我将介绍一下如何在Linux下安装Hadoop1.x. 先说明一下我的开发环境: 虚拟机:VMware8.0: 操作系统:CentOS6.4: 版本:jdk1.8:hadoop1.2.1 ①下载hadoop1.2.1,网盘:链接: https://pan.baidu.com/s/1sl5DMIp 密码: 5p67 下载jdk1.8,网盘:链接: https://pan.baidu.com/s/1boN1gh5 密码: t36h 将 jdk-8u144-

  • Hadoop对文本文件的快速全局排序实现方法及分析

    一.背景 Hadoop中实现了用于全局排序的InputSampler类和TotalOrderPartitioner类,调用示例是org.apache.hadoop.examples.Sort. 但是当我们以Text文件作为输入时,结果并非按Text中的string列排序,而且输出结果是SequenceFile. 原因: 1) hadoop在处理Text文件时,key是行号LongWritable类型,InputSampler抽样的是key,TotalOrderPartitioner也是用key去

  • Python对列表排序的方法实例分析

    本文实例讲述了Python对列表排序的方法.分享给大家供大家参考.具体分析如下: 1.sort()函数 sort()函数使用固定的排序算法对列表排序.sort()函数对列表排序时改变了原来的列表,从而让其中的元素能按一定的顺序排列,而不是简单的返回一个已排序的列表副本. 注意sort()函数改变原来的列表,函数返回值是空值即None.因此,如果需要一个已排好序的列表副本,同时又要保留原有列表不变的时候,就不能直接简单的使用sort()函数.为了实现上述功能使用sort()的方法是:先获取列表X的

  • 用vbs对文本文件的内容进行排序

    问: 您好,脚本专家!我有一个包含计算机名称列表的文本文件.如何按照字母顺序对该文件进行排序? -- LR 答: 您好,LR.如果想偷懒的话,我们就会告诉您:"对不起,您不能这样做."我们也可以就此脱身,因为 Microsoft 的脚本编写技术中没有一个方法能在打开文本文件后对文件进行排序.不过,嗨,"脚本专家"什么时候偷过懒呢? 噢,对了,我们是偷过几次懒,不过希望所有的人都已经忘了.不过,这次我们将向您提供一个解决办法. 虽然没有能直接对文本文件进行排序的方法,

  • PHP基于ICU扩展intl快速实现汉字转拼音及按拼音首字母分组排序的方法

    本文实例讲述了PHP基于ICU扩展intl快速实现汉字转拼音及按拼音首字母分组排序的方法.分享给大家供大家参考,具体如下: ICU(International Components for Unicode)里提供了transliterator(直译器), 可以很方便把其他语言(比如简体中文)转为拉丁文表示: http://cn2.php.net/manual/zh/transliterator.transliterate.php Transliterator: allows getting la

  • hadoop 详解如何实现数据排序

    目录 前言 MapReduce排序 MapReduce排序分类 1.部分排序 2.全排序 3.辅助排序 4.二次排序 自定义排序案例 1.自定义一个Bean对象,实现WritableComparable接口 2.自定义Mapper 3.自定义Reducer 4.自定义Driver类 分区内排序案例 1.添加自定义分区 2.改造Driver类 前言 在hadoop的MapReduce中,提供了对于客户端的自定义排序的功能相关API MapReduce排序 默认情况下,MapTask 和Reduce

  • MongoDB快速翻页的方法

    翻阅数据是MongoDB最常见的操作之一.一个典型的场景是需要在你的用户界面中显示你的结果.如果你是批量处理的数据,同样重要的是要让你的分页策略正确,以便你的数据处理可以规模化. 接下来,让我们通过一个例子来看在MongoDB中翻阅数据的不同方式.在这个例子中,我们有一个CRM数据库的用户数据,我们需要通过翻阅浏览和在同一时间显示10个用户.所以实际上,我们的页面大小是10.下方是我们的用户文档的结构: { _id, name, company, state } 方法一:Using skip()

  • JS实现中文汉字按拼音排序的方法

    本文实例讲述了JS实现中文汉字按拼音排序的方法.分享给大家供大家参考,具体如下: 代码1,拼音排序: var array = ['武汉', '北京', '上海', '天津']; var resultArray = array.sort( function compareFunction(param1, param2) { return param1.localeCompare(param2,"zh"); } ); console.log(resultArray); 火狐浏览器 resu

  • Redis高级玩法之利用SortedSet实现多维度排序的方法

    说明:本次实践基于Redis版本3.2.11. 关于SortedSet 首先,我们都知道Redis的SortedSet是可以根据score进行排序的,以手机应用商店的热门榜单排序为例,根据下载量倒序排列,其简单用法如下: 127.0.0.1:6379> zadd TopApp 12000000 wechat (integer) 1 127.0.0.1:6379> zadd TopApp 8000000 taobao 10000000 alipay (integer) 2 127.0.0.1:6

  • Python3实现对列表按元组指定列进行排序的方法分析

    本文实例讲述了Python3实现对列表按元组指定列进行排序的方法.分享给大家供大家参考,具体如下: Python版本: python3.+ 运行环境: Mac OS IDE: pycharm Python内建的排序方法 1 排序方法介绍 Python中有2个排序函数,一个是list内置的sort()方法,另一个是全局的sorted()方法 sorted(iterable,key=None,reverse=False) #返回排好序的新列表,不改变对象本身,默认升序;reverse:-True降序

  • 在命令行用 sort 进行排序的方法

    Linux sort命令用于将文本文件内容加以排序. sort可针对文本文件的内容,以行为单位来排序. 在 Linux.BSD 或 Mac 的终端中使用 sort 命令,按自己的需求重新整理数据. 如果你曾经用过数据表应用程序,你就会知道可以按列的内容对行进行排序.例如,如果你有一个费用列表,你可能希望对它们进行按日期或价格升序抑或按类别进行排序.如果你熟悉终端的使用,你不会仅为了排序文本数据就去使用庞大的办公软件.这正是 sort 命令的用处. 安装 你不必安装 sort ,因为它向来都包含在

随机推荐