用PHP和Shell写Hadoop的MapReduce程序

使得任何支持标准IO (stdin, stdout)的可执行程序都能成为hadoop的mapper或者 reducer。例如:


代码如下:

hadoop jar hadoop-streaming.jar -input SOME_INPUT_DIR_OR_FILE -output SOME_OUTPUT_DIR -mapper /bin/cat -reducer /usr/bin/wc

在这个例子里,就使用了Unix/Linux自带的cat和wc工具来作为mapper / reducer,是不是很神奇?

如果你习惯了使用一些动态语言,用动态语言来写mapreduce吧,跟之前的编程没有任何不同,hadoop只是运行它的一个框架,下面我演示一下用PHP来实现Word Counter的mapreduce。

一、找到Streaming jar

Hadoop根目录下是没有hadoop-streaming.jar的,因为streaming是一个contrib,所以要去contrib下面找,以hadoop-0.20.2为例,它在这里:

代码如下:

$HADOOP_HOME/contrib/streaming/hadoop-0.20.2-streaming.jar

二、写Mapper

新建一个wc_mapper.php,写入如下代码:

代码如下:

#!/usr/bin/php
<?php
$in = fopen(“php://stdin”, “r”);
$results = array();
while ( $line = fgets($in, 4096) )
{
$words = preg_split(‘/\W/', $line, 0, PREG_SPLIT_NO_EMPTY);
foreach ($words as $word)
$results[] = $word;
}
fclose($in);
foreach ($results as $key => $value)
{
print “$value\t1\n”;
}

这段代码的大致意思是:把输入的每行文本中的单词找出来,并以”
hello 1
world 1″
这样的形式输出出来。

和之前写的PHP基本没有什么不同,对吧,可能稍微让你感到陌生有两个地方:

PHP作为可执行程序

第一行的“#!/usr/bin/php”告诉linux,要用/usr/bin/php这个程序作为以下代码的解释器。写过linux shell的人应该很熟悉这种写法了,每个shell脚本的第一行都是这样: #!/bin/bash, #!/usr/bin/python

有了这一行,保存好这个文件以后,就可以像这样直接把wc_mapper.php当作cat, grep一样的命令执行了:./wc_mapper.php

使用stdin接收输入

PHP支持多种参数传入的方法,大家最熟悉的应该是从$_GET, $_POST超全局变量里面取通过Web传递的参数,次之是从$_SERVER['argv']里取通过命令行传入的参数,这里,采用的是标准输入stdin

它的使用效果是:

在linux控制台输入 ./wc_mapper.php

wc_mapper.php运行,控制台进入等候用户键盘输入状态

用户通过键盘输入文本

用户按下Ctrl + D终止输入,wc_mapper.php开始执行真正的业务逻辑,并将执行结果输出

那么stdout在哪呢?print本身已经就是stdout啦,跟我们以前写web程序和CLI脚本没有任何不同。

三、写Reducer

新建一个wc_reducer.php,写入如下代码:


代码如下:

#!/usr/bin/php
<?php
$in = fopen(“php://stdin”, “r”);
$results = array();
while ( $line = fgets($in, 4096) )
{
list($key, $value) = preg_split(“/\t/”, trim($line), 2);
$results[$key] += $value;
}
fclose($in);
ksort($results);
foreach ($results as $key => $value)
{
print “$key\t$value\n”;
}

这段代码的大意是统计每个单词出现了多少次,并以”
hello 2
world 1″
这样的形式输出。

四、用Hadoop来运行

上传要统计的示例文本


代码如下:

hadoop fs -put *.TXT /tmp/input

以Streaming方式执行PHP mapreduce程序

代码如下:

hadoop jar hadoop-0.20.2-streaming.jar -input /tmp/input -output /tmp/output -mapper wc_mapper.php的绝对路径 -reducer wc_reducer.php的绝对路径

注意:

input和output目录是在hdfs上的路径

mapper和reducer是在本地机器的路径,一定要写绝对路径,不要写相对路径,以免到时候hadoop报错说找不到mapreduce程序。

查看结果


代码如下:

hadoop fs -cat /tmp/output/part-00000

五、shell版的Hadoop MapReduce程序

代码如下:

#!/bin/bash -

# 加载配置文件
source './config.sh'

# 处理命令行参数
while getopts "d:" arg
do
 case $arg in
  d)
   date=$OPTARG

?)
            echo "unkonw argument"
   exit 1

esac
done

# 默认处理日期为昨天
default_date=`date -v-1d +%Y-%m-%d`

# 最终处理日期. 如果日期格式不对, 则退出执行
date=${date:-${default_date}}
if ! [[ "$date" =~ [12][0-9]{3}-(0[1-9]|1[12])-(0[1-9]|[12][0-9]|3[01]) ]]
then
 echo "invalid date(yyyy-mm-dd): $date"
 exit 1
fi

# 待处理文件
log_files=$(${hadoop_home}bin/hadoop fs -ls ${log_file_dir_in_hdfs} | awk '{print $8}' | grep $date)

# 如果待处理文件数目为零, 则退出执行
log_files_amount=$(($(echo $log_files | wc -l) + 0))
if [ $log_files_amount -lt 1 ]
then
 echo "no log files found"
 exit 0
fi

# 输入文件列表
for f in $log_files
do
 input_files_list="${input_files_list} $f"
done

function map_reduce () {
 if ${hadoop_home}bin/hadoop jar ${streaming_jar_path} -input${input_files_list} -output ${mapreduce_output_dir}${date}/${1}/ -mapper "${mapper} ${1}" -reducer "${reducer}" -file "${mapper}"
 then
  echo "streaming job done!"
 else
  exit 1
 fi
}

# 循环处理每一个bucket
for bucket in ${bucket_list[@]}
do
 map_reduce $bucket
done

(0)

相关推荐

  • hadoop是什么语言

    Hadoop是什么?Hadoop是一个开发和运行处理大规模数据的软件平台,是Appach的一个用java语言实现开源软件框架,实现在大量计算机组成的集群中对海量数据进行分布式计算. Hadoop框架中最核心设计就是:HDFS和MapReduce.HDFS提供了海量数据的存储,MapReduce提供了对数据的计算. 数据在Hadoop中处理的流程可以简单的按照下图来理解:数据通过Haddop的集群处理后得到结果. HDFS:Hadoop Distributed File System,Hadoop

  • Hadoop2.X/YARN环境搭建--CentOS7.0系统配置

    一.我缘何选择CentOS7.0 14年7月7日17:39:42发布了CentOS 7.0.1406正式版,我曾使用过多款Linux,对于Hadoop2.X/YARN的环境配置缘何选择CentOS7.0,其原因有: 1.界面采用RHEL7.0新的GNOME界面风,这可不是CentOS6.5/RHEL6.5所能比的!(当然,Fedora早就采用这种风格的了,但是现在的Fedora缺包已然不成样子了) 2.曾经,我也用了RHEL7.0,它最大的问题就是YUM没法用,而且总会有Warning提示注册购

  • Hadoop单机版和全分布式(集群)安装

    Hadoop,分布式的大数据存储和计算, 免费开源!有Linux基础的同学安装起来比较顺风顺水,写几个配置文件就可以启动了,本人菜鸟,所以写的比较详细.为了方便,本人使用三台的虚拟机系统是Ubuntu-12.设置虚拟机的网络连接使用桥接方式,这样在一个局域网方便调试.单机和集群安装相差不多,先说单机然后补充集群的几点配置. 第一步,先安装工具软件编辑器:vim 复制代码 代码如下: sudo apt-get install vim ssh服务器: openssh,先安装ssh是为了使用远程终端工

  • 用python + hadoop streaming 分布式编程(一) -- 原理介绍,样例程序与本地调试

    MapReduce与HDFS简介 什么是Hadoop? Google为自己的业务需要提出了编程模型MapReduce和分布式文件系统Google File System,并发布了相关论文(可在Google Research的网站上获得: GFS . MapReduce). Doug Cutting和Mike Cafarella在开发搜索引擎Nutch时对这两篇论文做了自己的实现,即同名的MapReduce和HDFS,合起来就是Hadoop. MapReduce的Data flow如下图,原始数据

  • hadoop实现grep示例分享

    hadoop做的一个简单grep程序,可从文档中提取包含某些字符串的行 复制代码 代码如下: /* * 一个简单grep程序,可从文档中提取包含莫些字符串的行 */ public class grep extends Configured  implements Tool{ public static  class grepMap extends Mapper<LongWritable, Text, Text,NullWritable>{ public void map(LongWritabl

  • hadoop map-reduce中的文件并发操作

    这样的操作在map端或者reduce端均可.下面以一个实际业务场景中的例子来简要说明. 问题简要描述: 假如reduce输入的key是Text(String),value是BytesWritable(byte[]),不同key的种类为100万个,value的大小平均为30k左右,每个key大概对应 100个value,要求对每一个key建立两个文件,一个用来不断添加value中的二进制数据,一个用来记录各个value在文件中的位置索引.(大量的小文件会影响HDFS的性能,所以最好对这些小文件进行

  • Hadoop1.2中配置伪分布式的实例

    1.设置ssh 安装ssh相关软件包: 复制代码 代码如下: sudo apt-get install openssh-client openssh-server 然后使用下面两个命令之一启动/关闭sshd: 复制代码 代码如下: sudo /etc/init.d/ssh start|stopsudo service ssh start|stop 若成功启动sshd,我们能看到如下类似结果: 复制代码 代码如下: $ ps -e | grep ssh 2766 ?        00:00:00

  • hadoop中一些常用的命令介绍

    假设Hadoop的安装目录HADOOP_HOME为/home/admin/hadoop.启动与关闭启动Hadoop1.进入HADOOP_HOME目录.2.执行sh bin/start-all.sh 关闭Hadoop1.进入HADOOP_HOME目录.2.执行sh bin/stop-all.sh文件操作Hadoop使用的是HDFS,能够实现的功能和我们使用的磁盘系统类似.并且支持通配符,如*. 查看文件列表查看hdfs中/user/admin/aaron目录下的文件.1.进入HADOOP_HOME

  • hadoop的hdfs文件操作实现上传文件到hdfs

    hdfs文件操作操作示例,包括上传文件到HDFS上.从HDFS上下载文件和删除HDFS上的文件,大家参考使用吧 复制代码 代码如下: import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.*; import java.io.File;import java.io.IOException;public class HadoopFile {    private Configuration conf =null

  • 用PHP和Shell写Hadoop的MapReduce程序

    使得任何支持标准IO (stdin, stdout)的可执行程序都能成为hadoop的mapper或者 reducer.例如: 复制代码 代码如下: hadoop jar hadoop-streaming.jar -input SOME_INPUT_DIR_OR_FILE -output SOME_OUTPUT_DIR -mapper /bin/cat -reducer /usr/bin/wc 在这个例子里,就使用了Unix/Linux自带的cat和wc工具来作为mapper / reducer

  • hadoop之MapReduce框架原理

    目录 MapReduce框架的简单运行机制: Mapper阶段: InputFormat数据输入: 切片与MapTask并行度决定机制: job提交过程源码解析: 切片逻辑: 1)FileInputFormat实现类 进行虚拟存储 (1)虚拟存储过程: Shuffle阶段: 排序: Combiner合并: ReduceTask阶段: Reduce Join: Map Join: MapReduce框架的简单运行机制: MapReduce是分为两个阶段的,MapperTask阶段,和ReduceT

  • Java/Web调用Hadoop进行MapReduce示例代码

    Hadoop环境搭建详见此文章http://www.jb51.net/article/33649.htm. 我们已经知道Hadoop能够通过Hadoop jar ***.jar input output的形式通过命令行来调用,那么如何将其封装成一个服务,让Java/Web来调用它?使得用户可以用方便的方式上传文件到Hadoop并进行处理,获得结果.首先,***.jar是一个Hadoop任务类的封装,我们可以在没有jar的情况下运行该类的main方法,将必要的参数传递给它.input 和outpu

  • shell linux中如何用shell写一个占用CPU的脚本

    使用场景: 向公司申请的虚机资源自己工作用的比较方便,因占用较小basis要求回收掉,现写一个脚本,让CPU跑满一些. 首先看下共有几颗逻辑CPU cat /proc/cpuinfo |grep "processor"|wc -l 上图可以看到是4颗,我现在跑满2颗 脚本如下 #! /bin/bash # filename killcpu.sh endless_loop() { echo -ne "i=0; while true do i=i+100; i=100 done&

  • Shell执行/调用Java/Jar程序例子的实例详解

    Shell执行/调用Java/Jar程序例子的实例详解 前言: 最近要写一个独立的Java程序去监控Hadoop和Oozie,通过Shell去调用.写代码到现在也4年多了,貌似就从来没在生产环境中写过一个独立的Java程序,不是部署到Tomcat就是直接丢给Hadoop.于是参考Hadoop等开源环境,自己写了一个demo,并且可以通过Ant打包生成可运行的程序.所以这里有三步:Java程序,Shell,Ant      1.首先建立Java程序,由于是例子,所以这里很简单,只是输出传入参数的个

  • 通用MapReduce程序复制HBase表数据

    编写MR程序,让其可以适合大部分的HBase表数据导入到HBase表数据.其中包括可以设置版本数.可以设置输入表的列导入设置(选取其中某几列).可以设置输出表的列导出设置(选取其中某几列). 原始表test1数据如下: 每个row key都有两个版本的数据,这里只显示了row key为1的数据 在hbase shell 中创建数据表: create 'test2',{NAME => 'cf1',VERSIONS => 10} // 保存无版本.无列导入设置.无列导出设置的数据 create '

  • java 矩阵乘法的mapreduce程序实现

    java 矩阵乘法的mapreduce程序实现 map函数:对于矩阵M中的每个元素m(ij),产生一系列的key-value对<(i,k),(M,j,m(ij))> 其中k=1,2.....知道矩阵N的总列数;对于矩阵N中的每个元素n(jk),产生一系列的key-value对<(i , k) , (N , j ,n(jk)>, 其中i=1,2.......直到i=1,2.......直到矩阵M的总列数. map package com.cb.matrix; import stati

  • hadoop运行java程序(jar包)并运行时动态指定参数

    1)首先启动hadoop2个进程,进入hadoop/sbin目录下,依次启动如下命令 [root@node02 sbin]# pwd /usr/server/hadoop/hadoop-2.7.0/sbin sh start-dfs.sh sh start-yarn.sh jps 2)通过jps查看是否正确启动,确保启动如下6个程序 [root@node02 sbin]# jps 10096 DataNode 6952 NodeManager 9962 NameNode 10269 Second

  • 手把手教你写一个微信小程序(推荐)

    需求 小程序语音识别,全景图片观看,登录授权,获取个人基本信息 一:基础框架 官方开发文档:https://developers.weixin.qq.com/miniprogram/dev/ (其实官方文档写的很清楚了) 1.跟着官方文档一步一步来,新建一个小程序项目就好 2.然后呢,毕竟默认的只是基本骨架,肌肉线条还是要自己填的 app.json 是当前小程序的全局配置 小程序的所有页面路径.界面表现.网络超时时间.底部 tab 需求一:底部tab,我们要像原生APP那样要有是三个常驻的按钮,

  • 用python写一个定时提醒程序的实现代码

    身体是革命的本钱,身体健康了我们才有更多精力做自己想做的事情,追求女神,追求梦想.然而程序员是一个苦比的职业,大部分时间都对着电脑,我现在颈椎就不好了,有时候眼睛还疼,我还没20阿,伤心...于是乎写了一个小程序,指定时间会打开浏览器播放一段音乐,提醒我们休息一会儿,防止我们猝死,说多了都是泪. 较基础,适合python新手及对python感兴趣的同学阅读. 我们来理一遍这个程序,大概功能是:我们设置一个时间,时间到了以后会打开浏览器播放一段音频. 1.等待 2.打开浏览器,播放音频. 3.重复

随机推荐