用python + hadoop streaming 分布式编程(一) -- 原理介绍,样例程序与本地调试

MapReduce与HDFS简介
什么是Hadoop?

Google为自己的业务需要提出了编程模型MapReduce和分布式文件系统Google File System,并发布了相关论文(可在Google Research的网站上获得: GFS 、 MapReduce)。 Doug Cutting和Mike Cafarella在开发搜索引擎Nutch时对这两篇论文做了自己的实现,即同名的MapReduce和HDFS,合起来就是Hadoop。

MapReduce的Data flow如下图,原始数据经过mapper处理,再进行partition和sort,到达reducer,输出最后结果。

图片来自Hadoop: The Definitive Guide

Hadoop Streaming原理
Hadoop本身是用Java开发的,程序也需要用Java编写,但是通过Hadoop Streaming,我们可以使用任意语言来编写程序,让Hadoop运行。

Hadoop Streaming的相关源代码可以在Hadoop的Github repo 查看。简单来说,就是通过将用其他语言编写的mapper和reducer通过参数传给一个事先写好的Java程序(Hadoop自带的*-streaming.jar),这个Java程序会负责创建MR作业,另开一个进程来运行mapper,将得到的输入通过stdin传给它,再将mapper处理后输出到stdout的数据交给Hadoop,partition和sort之后,再另开进程运行reducer,同样地通过stdin/stdout得到最终结果。因此,我们只需要在其他语言编写的程序里,通过stdin接收数据,再将处理过的数据输出到stdout,Hadoop streaming就能通过这个Java的wrapper帮我们解决中间繁琐的步骤,运行分布式程序。

图片来自Hadoop: The Definitive Guide

原理上只要是能够处理stdio的语言都能用来写mapper和reducer,也可以指定mapper或reducer为Linux下的程序(如awk、grep、cat)或者按照一定格式写好的java class。因此,mapper和reducer也不必是同一类的程序。

Hadoop Streaming的优缺点

优点

可以使用自己喜欢的语言来编写MapReduce程序(换句话说,不必写Java XD)
不需要像写Java的MR程序那样import一大堆库,在代码里做一大堆配置,很多东西都抽象到了stdio上,代码量显著减少
因为没有库的依赖,调试方便,并且可以脱离Hadoop先在本地用管道模拟调试

缺点

只能通过命令行参数来控制MapReduce框架,不像Java的程序那样可以在代码里使用API,控制力比较弱,有些东西鞭长莫及
因为中间隔着一层处理,效率会比较慢
所以Hadoop Streaming比较适合做一些简单的任务,比如用python写只有一两百行的脚本。如果项目比较复杂,或者需要进行比较细致的优化,使用Streaming就容易出现一些束手束脚的地方。

用python编写简单的Hadoop Streaming程序

这里提供两个例子:

Michael Noll的word count程序
Hadoop: The Definitive Guide里的例程
使用python编写Hadoop Streaming程序有几点需要注意:

在能使用iterator的情况下,尽量使用iterator,避免将stdin的输入大量储存在内存里,否则会严重降低性能

streaming不会帮你分割key和value传进来,传进来的只是一个个字符串而已,需要你自己在代码里手动调用split()

从stdin得到的每一行数据末尾似乎会有\n,保险起见一般都需要使用rstrip()来去掉

在想获得K-V list而不是一个个处理key-value pair时,可以使用groupby配合itemgetter将key相同的k-v pair组成一个个group,得到类似Java编写的reduce可以直接获取一个Text类型的key和一个iterable作为value的效果。注意itemgetter的效率比lambda表达式要高,所以如果需求不是很复杂的话,尽量用itemgetter比较好。

我在编写Hadoop Streaming程序时的基本模版是

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Some description here...
"""

import sys
from operator import itemgetter
from itertools import groupby

def read_input(file):
 """Read input and split."""
 for line in file:
  yield line.rstrip().split('\t')

def main():
 data = read_input(sys.stdin)
 for key, kviter in groupby(data, itemgetter(0)):
  # some code here..

if __name__ == "__main__":
 main()

如果对输入输出格式有不同于默认的控制,主要会在read_input()里调整。

本地调试

本地调试用于Hadoop Streaming的python程序的基本模式是:

$ cat <input path> | python <path to mapper script> | sort -t $'\t' -k1,1 | python <path to reducer script> > <output path>

或者如果不想用多余的cat,也可以用<定向

$ python <path to mapper script> < <input path> | sort -t $'\t' -k1,1 | python <path to reducer script> > <output path>

这里有几点需要注意:

Hadoop默认按照tab来分割key和value,以第一个分割出的部分为key,按key进行排序,因此这里使用

sort -t $'\t' -k1,1
来模拟。如果你有其他需求,在交给Hadoop Streaming执行时可以通过命令行参数调,本地调试也可以进行相应的调整,主要是调整sort的参数。因此为了能够熟练进行本地调试,建议先掌握sort命令的用法。

如果你在python脚本里加上了shebang,并且为它们添加了执行权限,也可以用类似于

./mapper.py

来代替

python mapper.py
(0)

相关推荐

  • Hadoop1.2中配置伪分布式的实例

    1.设置ssh 安装ssh相关软件包: 复制代码 代码如下: sudo apt-get install openssh-client openssh-server 然后使用下面两个命令之一启动/关闭sshd: 复制代码 代码如下: sudo /etc/init.d/ssh start|stopsudo service ssh start|stop 若成功启动sshd,我们能看到如下类似结果: 复制代码 代码如下: $ ps -e | grep ssh 2766 ?        00:00:00

  • hadoop中一些常用的命令介绍

    假设Hadoop的安装目录HADOOP_HOME为/home/admin/hadoop.启动与关闭启动Hadoop1.进入HADOOP_HOME目录.2.执行sh bin/start-all.sh 关闭Hadoop1.进入HADOOP_HOME目录.2.执行sh bin/stop-all.sh文件操作Hadoop使用的是HDFS,能够实现的功能和我们使用的磁盘系统类似.并且支持通配符,如*. 查看文件列表查看hdfs中/user/admin/aaron目录下的文件.1.进入HADOOP_HOME

  • Hadoop2.X/YARN环境搭建--CentOS7.0系统配置

    一.我缘何选择CentOS7.0 14年7月7日17:39:42发布了CentOS 7.0.1406正式版,我曾使用过多款Linux,对于Hadoop2.X/YARN的环境配置缘何选择CentOS7.0,其原因有: 1.界面采用RHEL7.0新的GNOME界面风,这可不是CentOS6.5/RHEL6.5所能比的!(当然,Fedora早就采用这种风格的了,但是现在的Fedora缺包已然不成样子了) 2.曾经,我也用了RHEL7.0,它最大的问题就是YUM没法用,而且总会有Warning提示注册购

  • hadoop map-reduce中的文件并发操作

    这样的操作在map端或者reduce端均可.下面以一个实际业务场景中的例子来简要说明. 问题简要描述: 假如reduce输入的key是Text(String),value是BytesWritable(byte[]),不同key的种类为100万个,value的大小平均为30k左右,每个key大概对应 100个value,要求对每一个key建立两个文件,一个用来不断添加value中的二进制数据,一个用来记录各个value在文件中的位置索引.(大量的小文件会影响HDFS的性能,所以最好对这些小文件进行

  • hadoop是什么语言

    Hadoop是什么?Hadoop是一个开发和运行处理大规模数据的软件平台,是Appach的一个用java语言实现开源软件框架,实现在大量计算机组成的集群中对海量数据进行分布式计算. Hadoop框架中最核心设计就是:HDFS和MapReduce.HDFS提供了海量数据的存储,MapReduce提供了对数据的计算. 数据在Hadoop中处理的流程可以简单的按照下图来理解:数据通过Haddop的集群处理后得到结果. HDFS:Hadoop Distributed File System,Hadoop

  • hadoop实现grep示例分享

    hadoop做的一个简单grep程序,可从文档中提取包含某些字符串的行 复制代码 代码如下: /* * 一个简单grep程序,可从文档中提取包含莫些字符串的行 */ public class grep extends Configured  implements Tool{ public static  class grepMap extends Mapper<LongWritable, Text, Text,NullWritable>{ public void map(LongWritabl

  • hadoop的hdfs文件操作实现上传文件到hdfs

    hdfs文件操作操作示例,包括上传文件到HDFS上.从HDFS上下载文件和删除HDFS上的文件,大家参考使用吧 复制代码 代码如下: import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.*; import java.io.File;import java.io.IOException;public class HadoopFile {    private Configuration conf =null

  • Hadoop单机版和全分布式(集群)安装

    Hadoop,分布式的大数据存储和计算, 免费开源!有Linux基础的同学安装起来比较顺风顺水,写几个配置文件就可以启动了,本人菜鸟,所以写的比较详细.为了方便,本人使用三台的虚拟机系统是Ubuntu-12.设置虚拟机的网络连接使用桥接方式,这样在一个局域网方便调试.单机和集群安装相差不多,先说单机然后补充集群的几点配置. 第一步,先安装工具软件编辑器:vim 复制代码 代码如下: sudo apt-get install vim ssh服务器: openssh,先安装ssh是为了使用远程终端工

  • 用PHP和Shell写Hadoop的MapReduce程序

    使得任何支持标准IO (stdin, stdout)的可执行程序都能成为hadoop的mapper或者 reducer.例如: 复制代码 代码如下: hadoop jar hadoop-streaming.jar -input SOME_INPUT_DIR_OR_FILE -output SOME_OUTPUT_DIR -mapper /bin/cat -reducer /usr/bin/wc 在这个例子里,就使用了Unix/Linux自带的cat和wc工具来作为mapper / reducer

  • 用python + hadoop streaming 分布式编程(一) -- 原理介绍,样例程序与本地调试

    MapReduce与HDFS简介 什么是Hadoop? Google为自己的业务需要提出了编程模型MapReduce和分布式文件系统Google File System,并发布了相关论文(可在Google Research的网站上获得: GFS . MapReduce). Doug Cutting和Mike Cafarella在开发搜索引擎Nutch时对这两篇论文做了自己的实现,即同名的MapReduce和HDFS,合起来就是Hadoop. MapReduce的Data flow如下图,原始数据

  • python pow函数的底层实现原理介绍

    一.最朴素的方法和pow比较 python中求两个a的b次方,常见的方法有:pow(a,b),a**b.那么这两个是否有区别,而且他们底层是怎么实现的呢? 最容易想到的方法就是:循环b次,每次都乘以a.但是究竟底层是不是这样实现的呢? 下面先从时间上来判断他们之间的关系. 首先来看看,pow和**有没有区别: import time start = time.time() print(2 ** 1000000) end0 = time.time() print('**:', end0 - sta

  • Python实现DBSCAN聚类算法并样例测试

    什么是聚类算法?聚类是一种机器学习技术,它涉及到数据点的分组.给定一组数据点,我们可以使用聚类算法将每个数据点划分为一个特定的组.理论上,同一组中的数据点应该具有相似的属性和/或特征,而不同组中的数据点应该具有高度不同的属性和/或特征.聚类是一种无监督学习的方法,是许多领域中常用的统计数据分析技术. 常用的算法包括K-MEANS.高斯混合模型(Gaussian Mixed Model,GMM).自组织映射神经网络(Self-Organizing Map,SOM) 重点给大家介绍Python实现D

  • python分布式编程实现过程解析

    分布式编程的难点在于: 1.服务器之间的通信,主节点如何了解从节点的执行进度,并在从节点之间进行负载均衡和任务调度: 2.如何让多个服务器上的进程访问同一资源的不同部分进行执行 第一部分涉及到网络编程的底层细节 第二个问题让我联想到hdfs的一些功能. 首先分布式进程还是解决的是单机单进程无法处理的大数据量大计算量的问题,希望能加通过一份代码(最多主+从两份)来并行执行一个大任务. 这就面临两个问题,首先将程序分布到多台服务器,其次将输入数据分配给多台服务器. 第一个问题相对比较简单,毕竟程序一

  • Hadoop streaming详细介绍

    Hadoop streaming Hadoop为MapReduce提供了不同的API,可以方便我们使用不同的编程语言来使用MapReduce框架,而不是只局限于Java.这里要介绍的就是Hadoop streaming API.Hadoop streaming 使用Unix的standard streams作为我们mapreduce程序和MapReduce框架之间的接口.所以你可以用任何语言来编写MapReduce程序,只要该语言可以往standard input/output上进行读写. st

  • Python自定义主从分布式架构实例分析

    本文实例讲述了Python自定义主从分布式架构.分享给大家供大家参考,具体如下: 环境:Win7 x64,Python 2.7,APScheduler 2.1.2. 原理图如下: 代码部分: (1).中心节点: #encoding=utf-8 #author: walker #date: 2014-12-03 #function: 中心节点(主要功能是分配任务) import SocketServer, socket, Queue CenterIP = '127.0.0.1' #中心节点IP C

  • Python设计模式之职责链模式原理与用法实例分析

    本文实例讲述了Python设计模式之职责链模式原理与用法.分享给大家供大家参考,具体如下: 职责链模式(Chain Of Responsibility):使多个对象都有机会处理请求,从而避免发送者和接收者的耦合关系.将对象连成链并沿着这条链传递请求直到被处理 下面是一个设计模式的demo: #!/usr/bin/env python # -*- coding:utf-8 -*- __author__ = 'Andy' """ 大话设计模式 设计模式--职责链模式 职责链模式(

  • Python设计模式之享元模式原理与用法实例分析

    本文实例讲述了Python设计模式之享元模式原理与用法.分享给大家供大家参考,具体如下: 享元模式(Flyweight Pattern):运用共享技术有效地支持大量细粒度的对象. 下面是一个享元模式的demo: #!/usr/bin/env python # -*- coding:utf-8 -*- __author__ = 'Andy' """ 大话设计模式 设计模式--享元模式 享元模式(Flyweight Pattern):运用共享技术有效地支持大量细粒度的对象 对一个

  • Python+OpenCV实现图像融合的原理及代码

    根据导师作业安排,在学习数字图像处理(刚萨雷斯版)第六章 彩色图像处理 中的彩色模型后,导师安排了一个比较有趣的作业: 融合原理为: 1 注意:遥感原RGB图image和灰度图Grayimage为测试用的输入图像: 2 步骤:(1)将RGB转换为HSV空间(H:色调,S:饱和度,V:明度): (2)用Gray图像诶换掉HSV中的V: (3)替换后的HSV转换回RGB空间即可得到结果. 书上只介绍了HSI彩色模型,并没有说到HSV,所以需要网上查找资料. Python代码如下: import cv

  • Python设计模式之抽象工厂模式原理与用法详解

    本文实例讲述了Python设计模式之抽象工厂模式原理与用法.分享给大家供大家参考,具体如下: 抽象工厂模式(Abstract Factory Pattern):提供一个创建一系列相关或相互依赖对象的接口,而无需指定它们的类 下面是一个抽象工厂的demo: #!/usr/bin/env python # -*- coding:utf-8 -*- __author__ = 'Andy' """ 大话设计模式 设计模式--抽象工厂模式 抽象工厂模式(Abstract Factory

随机推荐