hadoop map-reduce中的文件并发操作

这样的操作在map端或者reduce端均可。下面以一个实际业务场景中的例子来简要说明。

问题简要描述:

假如reduce输入的key是Text(String),value是BytesWritable(byte[]),不同key的种类为100万个,value的大小平均为30k左右,每个key大概对应 100个value,要求对每一个key建立两个文件,一个用来不断添加value中的二进制数据,一个用来记录各个value在文件中的位置索引。(大量的小文件会影响HDFS的性能,所以最好对这些小文件进行拼接)

当文件数量较小时,可以考虑使用MultipleOutput来进行key-value的分流,可以按照key的不同,将其输出到不同的文件或者目录中。但是reduce的数量只能为1,不然每个reduce都会生成相同的目录或者文件,不能达到最终的目的。此外最重要的是,操作系统对每个进程打开的文件数量的限制,默认为1024,集群的各个datanode可能会配置更高的值,但最多在几万左右,仍然是一个限制因素。不能满足百万文件的需求。

reduce的主要目的是用来归并key-value并输出到HDFS上,我们当然也可以在reduce中进行其他的操作,比如文件读写。因为默认的partitioner保证同一个key的数据肯定会在同一个reduce中,所以在每个reduce中只用打开两个文件进行读写即可(一个索引文件,一个数据文件)。并发度由reduce数量决定,将reduce数量设为256,那我们就可以同时处理256个key的数据(partioner保证了不同reduce处理的key不同,不会引起文件读写冲突)。这样的并发度的效率是很客观的,可以在较短的时间内完成需求。

思路是这样,但同时由于hdfs的特性以及hadoop的任务调度,在文件读写过程中,仍有可能会出现很多问题,下面简要说些一些常见的会碰到的问题。

1.org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException异常

这可能是最经常碰到的一个问题。可能的原因如下:

(1)文件流冲突。

一般创建文件时都会打开一个供写入的文件流。而我们希望是追加,所以如果使用了错误的API ,就有可能引起上述问题。以FileSystem类为例,如果使用create()方法之后再调用append()方法,就会抛出上述异常。所以最好使用createNewFile方法,只创建文件,不打开流。

(2)mapreduce推测执行机制

mapreduce 为了提高效率,会在一个任务启动之后,同时启动一些相同的任务(attempt),其中有一个attempt成功完成之后,视为整个task完成,其结果 作为最终结果,并且杀掉那些较慢的attempt。集群一般会开启此选项以优化性能(以空间换时间)。但在本问题环境下推测执行却不太合适。因为我们一般希望一个task 用来处理一个文件,但如果启动推测执行,会有几个attempt同时试图操作同一个文件,就会引发异常。所以最好关掉此选项,将 mapred.reduce.max.attempts 设为1,或者将mapred.reduce.tasks.speculative.execution设为false.

但此时仍有可能会出现问题。因为如果一个task的唯一attempt出现问题,在被kill掉之后,task仍会另起一个attempt,此时因为前一个attempt异常终止,仍有可能会影响到新起的attempt的文件操作,引发异常。所以最安全的方法是,借鉴推测执行的机制(每个attempt各自生成自己的结果,最终选择一个作为最终结果),以每个attempt的id号为后缀附加到所操作的文件上,同时捕获所有文件操作的异常并处理,这样可避免文件的读写冲突。Context可以用来获取运行时的一些上下文信息,可以很容易得到attempt的id号。注意,此时如果开启推测执行也可以,但是会生成很多相同的文件(每个attempt一份),仍然不是最好的解决方法。

同时,我们可以利用reduce的输出来记录运行“不正常的” key.这些task大多数是attempt_0被杀掉而重启了一个attempt_1,所以下面的文件一般为两份。可以对这些情况的key输出(文件异常或者attemptID > 0),并进行一些后续处理,比如文件重命名,或者紧对这些key重新写入。因为此种情况的key一般只占极少数,所以并不影响总体的效率。

2.文件异常处理

最好能将mapreduce中的所有文件操作都设置好异常处理。不然一个文件异常就有可能会使整个job失败。所以从效率来讲,最好是在文件发生异常时将其key作为reduce的输出以进行记录。因为同时mapreduce会重启一个task attempts重新进行文件读写,可保证我们得到最终的数据,最后所需的只是对那些异常的key进行一些简单的文件重命名操作即可。

3.多目录以及文件拼接

如果我们将key的种类设为1000万,上述方法会生成太多的小文件从而影响hdfs的性能,另外,因为所有文件都在同一个目录下,会导致同一个目录下文件数目过多而影响访问效率。

在创建文件的同时建立多个子目录,一个有用的方法是以reduce的taskid来建立子目录。这样有多少个reduce就可以建立多少个子目录,不会有文件冲突。同一个reduce处理的key都会在同一个目录下。

文件拼接要考虑的一个索引的问题。为了将文件索引建立的尽量简单,应该尽量保证同一个key的所有数据都在同一个大文件中。这可以利用key的hashCode来实现。如果我们想在每个目录下建立1000个文件,只需将hashCode对1000取余即可。

(0)

相关推荐

  • hadoop中一些常用的命令介绍

    假设Hadoop的安装目录HADOOP_HOME为/home/admin/hadoop.启动与关闭启动Hadoop1.进入HADOOP_HOME目录.2.执行sh bin/start-all.sh 关闭Hadoop1.进入HADOOP_HOME目录.2.执行sh bin/stop-all.sh文件操作Hadoop使用的是HDFS,能够实现的功能和我们使用的磁盘系统类似.并且支持通配符,如*. 查看文件列表查看hdfs中/user/admin/aaron目录下的文件.1.进入HADOOP_HOME

  • 用PHP和Shell写Hadoop的MapReduce程序

    使得任何支持标准IO (stdin, stdout)的可执行程序都能成为hadoop的mapper或者 reducer.例如: 复制代码 代码如下: hadoop jar hadoop-streaming.jar -input SOME_INPUT_DIR_OR_FILE -output SOME_OUTPUT_DIR -mapper /bin/cat -reducer /usr/bin/wc 在这个例子里,就使用了Unix/Linux自带的cat和wc工具来作为mapper / reducer

  • hadoop是什么语言

    Hadoop是什么?Hadoop是一个开发和运行处理大规模数据的软件平台,是Appach的一个用java语言实现开源软件框架,实现在大量计算机组成的集群中对海量数据进行分布式计算. Hadoop框架中最核心设计就是:HDFS和MapReduce.HDFS提供了海量数据的存储,MapReduce提供了对数据的计算. 数据在Hadoop中处理的流程可以简单的按照下图来理解:数据通过Haddop的集群处理后得到结果. HDFS:Hadoop Distributed File System,Hadoop

  • 用python + hadoop streaming 分布式编程(一) -- 原理介绍,样例程序与本地调试

    MapReduce与HDFS简介 什么是Hadoop? Google为自己的业务需要提出了编程模型MapReduce和分布式文件系统Google File System,并发布了相关论文(可在Google Research的网站上获得: GFS . MapReduce). Doug Cutting和Mike Cafarella在开发搜索引擎Nutch时对这两篇论文做了自己的实现,即同名的MapReduce和HDFS,合起来就是Hadoop. MapReduce的Data flow如下图,原始数据

  • hadoop的hdfs文件操作实现上传文件到hdfs

    hdfs文件操作操作示例,包括上传文件到HDFS上.从HDFS上下载文件和删除HDFS上的文件,大家参考使用吧 复制代码 代码如下: import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.*; import java.io.File;import java.io.IOException;public class HadoopFile {    private Configuration conf =null

  • hadoop实现grep示例分享

    hadoop做的一个简单grep程序,可从文档中提取包含某些字符串的行 复制代码 代码如下: /* * 一个简单grep程序,可从文档中提取包含莫些字符串的行 */ public class grep extends Configured  implements Tool{ public static  class grepMap extends Mapper<LongWritable, Text, Text,NullWritable>{ public void map(LongWritabl

  • Hadoop2.X/YARN环境搭建--CentOS7.0系统配置

    一.我缘何选择CentOS7.0 14年7月7日17:39:42发布了CentOS 7.0.1406正式版,我曾使用过多款Linux,对于Hadoop2.X/YARN的环境配置缘何选择CentOS7.0,其原因有: 1.界面采用RHEL7.0新的GNOME界面风,这可不是CentOS6.5/RHEL6.5所能比的!(当然,Fedora早就采用这种风格的了,但是现在的Fedora缺包已然不成样子了) 2.曾经,我也用了RHEL7.0,它最大的问题就是YUM没法用,而且总会有Warning提示注册购

  • Hadoop单机版和全分布式(集群)安装

    Hadoop,分布式的大数据存储和计算, 免费开源!有Linux基础的同学安装起来比较顺风顺水,写几个配置文件就可以启动了,本人菜鸟,所以写的比较详细.为了方便,本人使用三台的虚拟机系统是Ubuntu-12.设置虚拟机的网络连接使用桥接方式,这样在一个局域网方便调试.单机和集群安装相差不多,先说单机然后补充集群的几点配置. 第一步,先安装工具软件编辑器:vim 复制代码 代码如下: sudo apt-get install vim ssh服务器: openssh,先安装ssh是为了使用远程终端工

  • Hadoop1.2中配置伪分布式的实例

    1.设置ssh 安装ssh相关软件包: 复制代码 代码如下: sudo apt-get install openssh-client openssh-server 然后使用下面两个命令之一启动/关闭sshd: 复制代码 代码如下: sudo /etc/init.d/ssh start|stopsudo service ssh start|stop 若成功启动sshd,我们能看到如下类似结果: 复制代码 代码如下: $ ps -e | grep ssh 2766 ?        00:00:00

  • hadoop map-reduce中的文件并发操作

    这样的操作在map端或者reduce端均可.下面以一个实际业务场景中的例子来简要说明. 问题简要描述: 假如reduce输入的key是Text(String),value是BytesWritable(byte[]),不同key的种类为100万个,value的大小平均为30k左右,每个key大概对应 100个value,要求对每一个key建立两个文件,一个用来不断添加value中的二进制数据,一个用来记录各个value在文件中的位置索引.(大量的小文件会影响HDFS的性能,所以最好对这些小文件进行

  • Python自动化测试中yaml文件读取操作

    什么是yaml 一种标记语言.yaml 是专门用来写配置文件的语言,非常简洁和强大 更直观,更方便,有点类似于json格式 yaml文件格式:test.yaml 安装yaml pip install pyyaml yaml基本语法规则 大小写敏感 使用缩进表示层级关系 缩进时不允许使用Tab键,只允许使用空格. 缩进的空格数目不重要,只要相同层级的元素左侧对齐即可 #表示注释,从这个字符一直到行尾,都会被解析器忽略,这个和python的注释一样 键值对(dict) yaml文件 user: ad

  • 如何在Python中对文件进行操作

    目录 前言 1.open()函数 2.读文件 3.写文件 4.通过 with 来读写文件 前言 在Python中,我们可以通过一些内置函数来操作电脑上的文件,并对文件进行读写,这种读写操作是很常见的 I/O 操作,我们今天就来简单学习下. 1.open()函数 我们可以使用Python中内置的 open() 函数来打开文件,返回文件对象,并对文件进行处理. open() 函数的常见格式如下: open(file, mode='r') 第一个参数表示要打开文件的路径,第二个参数表示文件打开的模式,

  • Python中xlsx文件转置操作详解(行转列和列转行)

    目录 1.原始数据是这样的 2.脚本如下: 3.运行脚本后生成的xlsx文件,如下: 附:pivot方法即可完成行转列哦 总结 1.原始数据是这样的 2.脚本如下: import pandas as pd df = pd.read_excel(r'E:\untitled1\带宽测试\temp.xlsx') # 读取需要转置的文件 df = df.T # 转置 df.to_excel(r'E:\untitled1\带宽测试\TestResult.xlsx') # 另存为xlsx文件 3.运行脚本后

  • 一文带你掌握Go语言中的文件读取操作

    目录 os 包 和 bufio 包 os.Open 与 os.OpenFile 以及 File.Read 读取文件操作 bufio.NewReader 和 Reader.ReadString 读取文件操作 小结 os 包 和 bufio 包 Go 标准库的 os 包,为我们提供很多操作文件的函数,如 Open(name) 打开文件.Create(name) 创建文件等函数,与之对应的是 bufio 包,os 包是直接对磁盘进行操作的,而 bufio 包则是带有缓冲的操作,不用每次都去操作磁盘.

  • node.js中fs文件系统目录操作与文件信息操作

    目录操作 如果存在该目录,就创建失败 同步创建目录fs.mkdirSync(path, [mode]) const fs = require('fs'); let mkdir = './mkdir'; fs.mkdir(mkdir, (err) => { if (err) { console.log(`mkdir ${mkdir} file failed~`); } else { console.log(`mkdir ${mkdir} file success~`); } }); 读取目录 如果

  • 一文带你搞懂Python中的文件操作

    目录 一.文件的编码 二.文件的读取 2.1 open()打开函数 2.2 mode常用的三种基础访问模式 2.3 读操作相关方法 三.文件的写入 写操作快速入门 四.文件的追加 追加写入操作快速入门 五.文件操作综合案例 一.文件的编码 计算机中有许多可用编码: UTF-8 GBK Big5 等 UTF-8是目前全球通用的编码格式 除非有特殊需求,否则,一律以UTF-8格式进行文件编码即可. 二.文件的读取 2.1 open()打开函数 注意:此时的f是open函数的文件对象,对象是Pytho

  • Ruby中的类Google Map/Reduce框架Skynet介绍

    Skynet是一个很响亮的名字,因为它是阿诺施瓦辛格主演的经典系列电影<终结者>里面的统治人类的超级计算机网络.不过本文的Skynet没这么恐怖,它是一个ruby版本的Google Map/Reduce框架的名字而已. Google的Map/Reduce框架实在太有名气了,他可以把一个任务切分为很多份,交给n台计算机并行执行,返回的结果再并行的归并,最后得到运算的结果.据说Google一个搜索结果会Map到7000台服务器并行执行,这么多么可怕的分布式运算能力阿!有了Map/Reduce,程序

  • CI框架中redis缓存相关操作文件示例代码

    本文实例讲述了CI框架中redis缓存相关操作文件.分享给大家供大家参考,具体如下: redis缓存类文件位置: 'ci\system\libraries\Cache\drivers\Cache_redis.php' <?php /** * CodeIgniter * * An open source application development framework for PHP 5.2.4 or newer * * NOTICE OF LICENSE * * Licensed under

  • Go使用sync.Map来解决map的并发操作问题

    目录 前言 map 并发操作出现问题 sync.Map 解决并发操作问题 计算 map 长度 计算 sync.Map 长度 前言 在 Golang 中 map 不是并发安全的,自 1.9 才引入了 sync.Map ,sync.Map 的引入确实解决了 map 的并发安全问题,不过 sync.Map 却没有实现 len() 函数,如果想要计算 sync.Map 的长度,稍微有点麻烦,需要使用 Range 函数. map 并发操作出现问题 func main() { demo := make(ma

随机推荐