让python在hadoop上跑起来

本文实例讲解的是一般的hadoop入门程序“WordCount”,就是首先写一个map程序用来将输入的字符串分割成单个的单词,然后reduce这些单个的单词,相同的单词就对其进行计数,不同的单词分别输出,结果输出每一个单词出现的频数。

  注意:关于数据的输入输出是通过sys.stdin(系统标准输入)和sys.stdout(系统标准输出)来控制数据的读入与输出。所有的脚本执行之前都需要修改权限,否则没有执行权限,例如下面的脚本创建之前使用“chmod +x mapper.py”

1.mapper.py

#!/usr/bin/env python
import sys

for line in sys.stdin: # 遍历读入数据的每一行

  line = line.strip() # 将行尾行首的空格去除
  words = line.split() #按空格将句子分割成单个单词
  for word in words:
    print '%s\t%s' %(word, 1)

2.reducer.py

#!/usr/bin/env python

from operator import itemgetter
import sys

current_word = None # 为当前单词
current_count = 0 # 当前单词频数
word = None

for line in sys.stdin:
  words = line.strip() # 去除字符串首尾的空白字符
  word, count = words.split('\t') # 按照制表符分隔单词和数量

  try:
    count = int(count) # 将字符串类型的‘1'转换为整型1
  except ValueError:
    continue

  if current_word == word: # 如果当前的单词等于读入的单词
    current_count += count # 单词频数加1
  else:
    if current_word: # 如果当前的单词不为空则打印其单词和频数
      print '%s\t%s' %(current_word, current_count)
    current_count = count # 否则将读入的单词赋值给当前单词,且更新频数
    current_word = word

if current_word == word:
  print '%s\t%s' %(current_word, current_count)

在shell中运行以下脚本,查看输出结果:

echo "foo foo quux labs foo bar zoo zoo hying" | /home/wuying/mapper.py | sort -k 1,1 | /home/wuying/reducer.py

# echo是将后面“foo ****”字符串输出,并利用管道符“|”将输出数据作为mapper.py这个脚本的输入数据,并将mapper.py的数据输入到reducer.py中,其中参数sort -k 1,1是将reducer的输出内容按照第一列的第一个字母的ASCII码值进行升序排序

其实,我觉得后面这个reducer.py处理单词频数有点麻烦,将单词存储在字典里面,单词作为‘key',每一个单词出现的频数作为'value',进而进行频数统计感觉会更加高效一点。因此,改进脚本如下:

mapper_1.py

但是,貌似写着写着用了两个循环,反而效率低了。关键是不太明白这里的current_word和current_count的作用,如果从字面上老看是当前存在的单词,那么怎么和遍历读取的word和count相区别?

下面看一些脚本的输出结果:

我们可以看到,上面同样的输入数据,同样的shell换了不同的reducer,结果后者并没有对数据进行排序,实在是费解~

让Python代码在hadoop上跑起来!

一、准备输入数据

接下来,先下载三本书:

$ mkdir -p tmp/gutenberg
$ cd tmp/gutenberg
$ wget http://www.gutenberg.org/ebooks/20417.txt.utf-8
$ wget http://www.gutenberg.org/files/5000/5000-8.txt
$ wget http://www.gutenberg.org/ebooks/4300.txt.utf-8

然后把这三本书上传到hdfs文件系统上:

 $ hdfs dfs -mkdir /user/${whoami}/input # 在hdfs上的该用户目录下创建一个输入文件的文件夹
 $ hdfs dfs -put /home/wuying/tmp/gutenberg/*.txt /user/${whoami}/input # 上传文档到hdfs上的输入文件夹中

寻找你的streaming的jar文件存放地址,注意2.6的版本放到share目录下了,可以进入hadoop安装目录寻找该文件:

$ cd $HADOOP_HOME
$ find ./ -name "*streaming*"

然后就会找到我们的share文件夹中的hadoop-straming*.jar文件:

寻找速度可能有点慢,因此你最好是根据自己的版本号到对应的目录下去寻找这个streaming文件,由于这个文件的路径比较长,因此我们可以将它写入到环境变量:

$ vi ~/.bashrc # 打开环境变量配置文件
# 在里面写入streaming路径
export STREAM=$HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-*.jar

由于通过streaming接口运行的脚本太长了,因此直接建立一个shell名称为run.sh来运行:

hadoop jar $STREAM \
-files ./mapper.py,./reducer.py \
-mapper ./mapper.py \
-reducer ./reducer.py \
-input /user/$(whoami)/input/*.txt \
-output /user/$(whoami)/output

然后"source run.sh"来执行mapreduce。结果就响当当的出来啦。这里特别要提醒一下:

1、一定要把本地的输入文件转移到hdfs系统上面,否则无法识别你的input内容;

2、一定要有权限,一定要在你的hdfs系统下面建立你的个人文件夹否则就会被denied,是的,就是这两个错误搞得我在服务器上面痛不欲生,四处问人的感觉真心不如自己清醒对待来的好;

3、如果你是第一次在服务器上面玩hadoop,建议在这之前请在自己的虚拟机或者linux系统上面配置好伪分布式然后入门hadoop来的比较不那么头疼,之前我并不知道我在服务器上面运维没有给我运行的权限,后来在自己的虚拟机里面运行一下example实例以及wordcount才找到自己的错误。

好啦,然后不出意外,就会complete啦,你就可以通过如下方式查看计数结果:

以上就是本文的全部内容,希望对大家学习python软件编程有所帮助。

(0)

相关推荐

  • 用python + hadoop streaming 分布式编程(一) -- 原理介绍,样例程序与本地调试

    MapReduce与HDFS简介 什么是Hadoop? Google为自己的业务需要提出了编程模型MapReduce和分布式文件系统Google File System,并发布了相关论文(可在Google Research的网站上获得: GFS . MapReduce). Doug Cutting和Mike Cafarella在开发搜索引擎Nutch时对这两篇论文做了自己的实现,即同名的MapReduce和HDFS,合起来就是Hadoop. MapReduce的Data flow如下图,原始数据

  • Hadoop中的Python框架的使用指南

    最近,我加入了Cloudera,在这之前,我在计算生物学/基因组学上已经工作了差不多10年.我的分析工作主要是利用Python语言和它很棒的科学计算栈来进行的.但Apache Hadoop的生态系统大部分都是用Java来实现的,也是为Java准备的,这让我很恼火.所以,我的头等大事变成了寻找一些Python可以用的Hadoop框架. 在这篇文章里,我会把我个人对这些框架的一些无关科学的看法写下来,这些框架包括: Hadoop流 mrjob dumbo hadoopy pydoop 其它 最终,在

  • 让python在hadoop上跑起来

    本文实例讲解的是一般的hadoop入门程序"WordCount",就是首先写一个map程序用来将输入的字符串分割成单个的单词,然后reduce这些单个的单词,相同的单词就对其进行计数,不同的单词分别输出,结果输出每一个单词出现的频数. 注意:关于数据的输入输出是通过sys.stdin(系统标准输入)和sys.stdout(系统标准输出)来控制数据的读入与输出.所有的脚本执行之前都需要修改权限,否则没有执行权限,例如下面的脚本创建之前使用"chmod +x mapper.py&

  • Python连接Hadoop数据中遇到的各种坑(汇总)

    最近准备使用Python+Hadoop+Pandas进行一些深度的分析与机器学习相关工作.(当然随着学习过程的进展,现在准备使用Python+Spark+Hadoop这样一套体系来搭建后续的工作环境),当然这是后话. 但是这项工作首要条件就是将Python与Hadoop进行打通,本来认为很容易的一项工作,没有想到竟然遇到各种坑,花费了整整半天时间.后来也在网上看到大家在咨询相同的问题,但是真正解决这个问题的帖子又几乎没有,所以现在将Python连接Hadoop数据库过程中遇到的各种坑进行一个汇总

  • 使用tensorflow框架在Colab上跑通猫狗识别代码

    一. 前提: 有Google账号(具体怎么注册账号这里不详述,大家都懂的,自行百度)在你的Google邮箱中关联好colab(怎样在Google邮箱中使用colab在此不详述,自行百度) 二. 现在开始: 因为我们使用的是colab,所以就不必为安装版本对应的anaconda.python以及tensorflow尔苦恼了,经过以下配置就可以直接开始使用了. 在colab中新建代码块,运行以下代码来下载需要的数据集 # In this exercise you will train a CNN o

  • 完美解决python针对hdfs上传和下载的问题

    当我们使用python的hdfs包进行上传和下载文件的时候,总会出现如下问题 requests.packages.urllib3.exceptions.NewConnectionError:<requests.packages.urllib3.connection.HTTPConnection object at 0x7fe87cc37c50>: Failed to establish a new connection: [Errno -2] Name or service not known

  • python读取hdfs上的parquet文件方式

    在使用python做大数据和机器学习处理过程中,首先需要读取hdfs数据,对于常用格式数据一般比较容易读取,parquet略微特殊.从hdfs上使用python获取parquet格式数据的方法(当然也可以先把文件拉到本地再读取也可以): 1.安装anaconda环境. 2.安装hdfs3. conda install hdfs3 3.安装fastparquet. conda install fastparquet 4.安装python-snappy. conda install python-s

  • Python服务器创建虚拟环境跑代码

    目录 一.前言 二.创建虚拟环境用来跑代码 一.前言 最近忙着在服务器上跑代码 学习积累了一些经验技巧 这里用来记录分享给大家 二.创建虚拟环境用来跑代码 下面我会以一个实例为模板,学习完之后,再删掉 不会占用大家的服务器 1.连接上服务器 比如我的连接方式为:ssh -p 你的编号 username-host 2.创建虚拟环境 这里我以一篇代码为例 创建虚拟环境 conda create -n KDDocRE python==3.7.4 3.进入虚拟环境 conda activate KDDo

  • Hadoop上Data Locality的详解

    Hadoop上Data Locality的详解 Hadoop上的Data Locality是指数据与Mapper任务运行时数据的距离接近程度(Data Locality in Hadoop refers to the"proximity" of the data with respect to the Mapper tasks working on the data.) 1. why data locality is imporant? 当数据集存储在HDFS中时,它被划分为块并存储在

  • Python保存MongoDB上的文件到本地的方法

    本文实例讲述了Python保存MongoDB上的文件到本地的方法.分享给大家供大家参考,具体如下: MongoDB上的文档通过GridFS来操作,Python也可以通过pymongo连接MongoDB数据库,使用pymongo模块的gridfs方法操作文档.以下示例是把MongoDB上GridFS存的excel文档保存到本地. from pymongo import MongoClient import gridfs client = MongoClient('mongodb://usernam

  • hadoop上传文件功能实例代码

    hdfs上的文件是手动执行命令从本地linux上传至hdfs的.在真实的运行环境中,我们不可能每次手动执行命令上传的,这样太过繁琐.那么,我们可以使用hdfs提供的Java api实现文件上传至hdfs,或者直接从ftp上传至hdfs. 然而,需要说明一点,之前笔者是要运行MR,都需要每次手动执行yarn jar,在实际的环境中也不可能每次手动执行.像我们公司是使用了索答的调度平台/任务监控平台,可以定时的以工作流执行我们的程序,包括普通java程序和MR.其实,这个调度平台就是使用了quart

  • Python实现Windows上气泡提醒效果的方法

    本文实例讲述了Python实现Windows上气泡提醒效果的方法.分享给大家供大家参考.具体实现方法如下: # -*- encoding: gbk -*- import sys import os import struct import time import win32con from win32api import * # Try and use XP features, so we get alpha-blending etc. try: from winxpgui import * e

随机推荐