hadoop的wordcount实例代码

可以通过一个简单的例子来说明MapReduce到底是什么:

  我们要统计一个大文件中的各个单词出现的次数。由于文件太大。我们把这个文件切分成如果小文件,然后安排多个人去统计。这个过程就是”Map”。然后把每个人统计的数字合并起来,这个就是“Reduce"。

  上面的例子如果在MapReduce去做呢,就需要创建一个任务job,由job把文件切分成若干独立的数据块,并分布在不同的机器节点中。然后通过分散在不同节点中的Map任务以完全并行的方式进行处理。MapReduce会对Map的输出地行收集,再将结果输出送给Reduce进行下一步的处理。

  对于一个任务的具体执行过程,会有一个名为"JobTracker"的进程负责协调MapReduce执行过程中的所有任务。若干条TaskTracker进程用来运行单独的Map任务,并随时将任务的执行情况汇报给JobTracker。如果一个TaskTracker汇报任务失败或者长时间未对本身任务进行汇报,JobTracker会启动另外一个TaskTracker重新执行单独的Map任务。

下面的具体的代码实现:

1. 编写wordcount的相关job

(1)eclipse下创建相关maven项目,依赖jar包如下(也可参照hadoop源码包下的hadoop-mapreduce-examples项目的pom配置)

  注意:要配置一个maven插件maven-jar-plugin,并指定mainClass

<dependencies>
  <dependency>
   <groupId>junit</groupId>
   <artifactId>junit</artifactId>
   <version>4.11</version>
  </dependency>
  <dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-mapreduce-client-core</artifactId>
    <version>2.5.2</version>
  </dependency>
  <dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-common</artifactId>
    <version>2.5.2</version>
  </dependency>
 </dependencies>

 <build>
   <plugins>
     <plugin>
  <groupId>org.apache.maven.plugins</groupId>
   <artifactId>maven-jar-plugin</artifactId>
   <configuration>
    <archive>
     <manifest>
      <mainClass>com.xxx.demo.hadoop.wordcount.WordCount</mainClass>
     </manifest>
    </archive>
   </configuration>
  </plugin>
   </plugins>
 </build>

(2)根据MapReduce的运行机制,一个job至少要编写三个类分别用来完成Map逻辑、Reduce逻辑、作业调度这三件事。

Map的代码可继承org.apache.hadoop.mapreduce.Mapper类

public static class TokenizerMapper
    extends Mapper<Object, Text, Text, IntWritable>{

  private final static IntWritable one = new IntWritable(1);
  private Text word = new Text();
   //由于该例子未用到key的参数,所以该处key的类型就简单指定为Object
  public void map(Object key, Text value, Context context
          ) throws IOException, InterruptedException {
   StringTokenizer itr = new StringTokenizer(value.toString());
   while (itr.hasMoreTokens()) {
    word.set(itr.nextToken());
    context.write(word, one);
   }
  }
 }

Reduce的代码可继承org.apache.hadoop.mapreduce.Reducer类

public class IntSumReducer
    extends Reducer<Text,IntWritable,Text,IntWritable> {
  private IntWritable result = new IntWritable();

  public void reduce(Text key, Iterable<IntWritable> values,
            Context context
            ) throws IOException, InterruptedException {
   int sum = 0;
   for (IntWritable val : values) {
    sum += val.get();
   }
   result.set(sum);
   context.write(key, result);
  }
 }

编写main方法进行作业调度

public static void main(String[] args) throws Exception {
  Configuration conf = new Configuration();
  Job job = Job.getInstance(conf, "word count");
  job.setJarByClass(WordCount.class);
  job.setMapperClass(TokenizerMapper.class);
  job.setCombinerClass(IntSumReducer.class);
  job.setReducerClass(IntSumReducer.class);
  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(IntWritable.class);
  FileInputFormat.addInputPath(job, new Path(args[0]));
  FileOutputFormat.setOutputPath(job, new Path(args[1]));
  job.waitForCompletion(true) ;
  //System.exit(job.waitForCompletion(true) ? 0 : 1);
 }

2. 上传数据文件到hadoop集群环境

执行mvn install把项目打成jar文件然后上传到linux集群环境,使用hdfs dfs -mkdir命令在hdfs文件系统中创建相应的命令,使用hdfs dfs -put 把需要处理的数据文件上传到hdfs系统中,示例:hdfs dfs -put ${linux_path/数据文件} ${hdfs_path}

3. 执行job

在集群环境中执行命令: hadoop jar ${linux_path}/wordcount.jar ${hdfs_input_path} ${hdfs_output_path}

4. 查看统计结果

hdfs dfs -cat ${hdfs_output_path}/输出文件名

以上的方式在未启动hadoop集群环境时,是以Local模式运行,此时HDFS和YARN都不起作用。下面是在伪分布式模式下执行mapreduce job时需要做的工作,先把官网上列的步骤摘录出来:

配置主机名

# vi /etc/sysconfig/network

例如:

NETWORKING=yes
HOSTNAME=master

vi /etc/hosts

填入以下内容

127.0.0.1 localhost

配置ssh免密码互通

ssh-keygen -t rsa
# cat?~/.ssh/id_rsa.pub?>>?~/.ssh/authorized_keys

配置core-site.xml文件(位于${HADOOP_HOME}/etc/hadoop/

<configuration>
  <property>
    <name>fs.defaultFS</name>
    <value>hdfs://localhost:9000</value>
  </property>
</configuration>

配置hdfs-site.xml文件

<configuration>
  <property>
    <name>dfs.replication</name>
    <value>1</value>
  </property>
</configuration>

下面的命令可以在单机伪分布模式下运行mapreduce的job

1.Format the filesystem:
$ bin/hdfs namenode -format
2.Start NameNode daemon and DataNode daemon:
$ sbin/start-dfs.sh
3.The hadoop daemon log output is written to the $HADOOP_LOG_DIR directory (defaults to $HADOOP_HOME/logs).

4.Browse the web interface for the NameNode; by default it is available at:
NameNode - http://localhost:50070/
Make the HDFS directories required to execute MapReduce jobs:
$ bin/hdfs dfs -mkdir /user
$ bin/hdfs dfs -mkdir /user/<username>
5.Copy the input files into the distributed filesystem:
$ bin/hdfs dfs -put etc/hadoop input
6.Run some of the examples provided:
$ bin/hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-2.5.2.jar grep input output 'dfs[a-z.]+'
7.Examine the output files:
Copy the output files from the distributed filesystem to the local filesystem and examine them:

$ bin/hdfs dfs -get output output
$ cat output/*
or

View the output files on the distributed filesystem:

$ bin/hdfs dfs -cat output/*
8.When you're done, stop the daemons with:
$ sbin/stop-dfs.sh

总结

以上就是本文关于hadoop的wordcount实例代码的全部内容,希望对大家有所帮助。感兴趣的朋友可以继续参阅本站其他相关专题,如有不足之处,欢迎留言指出。感谢朋友们对本站的支持!

(0)

相关推荐

  • Hadoop 2.x与3.x 22点比较,Hadoop 3.x比2.x的改进

    问题导读 1.Hadoop3.x通过什么方式来容错? 2.Hadoop3.x存储开销减少了多少? 3.Hadoop3.x MR API是否兼容hadoop1.x? 一.目的 在这篇文章中,我们将讨论Hadoop 2.x与Hadoop 3.x之间的比较. Hadoop3版本中添加了哪些新功能,Hadoop3中兼容的Hadoop 2程序,Hadoop 2和Hadoop 3有什么区别? 二.Hadoop 2.x与Hadoop 3.x比较 本节将讲述Hadoop 2.x与Hadoop 3.x之间的22个

  • Hadoop之NameNode Federation图文详解

    一. 前言 1.NameNode架构的局限性 (1)Namespace(命名空间)的限制 由于NameNode在内存中存储所有的元数据(metadata),因此单个NameNode所能存储的对象(文件+块)数目受到NameNode所在JVM的heap size的限制.50G的heap能够存储20亿(200million)个对象,这20亿个对象支持4000个DataNode,12PB的存储(假设文件平均大小为40MB).随着数据的飞速增长,存储的需求也随之增长.单个DataNode从4T增长到36

  • Hadoop 2.X新特性回收站功能的讲解

    开启回收站功能,可以将删除的文件在不超时的情况下,恢复原数据,起到防止误删除.备份等作用. 1.回收站参数设置及工作机制 2.启用回收站 修改core-site.xml,配置垃圾回收时间为1分钟. <property> <name>fs.trash.interval</name> <value>1</value> </property> 3.查看回收站 回收站在集群中的路径:/user/用户名/.Trash/-. 4.修改访问垃圾回收

  • ubuntu docker搭建Hadoop集群环境的方法

    spark要配合Hadoop的hdfs使用,然而Hadoop的特点就是分布式,在一台主机上搭建集群有点困难,百度后发现可以使用docker构建搭建,于是开搞: github项目:https://github.com/kiwenlau/hadoop-cluster-docker 参考文章://www.jb51.net/article/109698.htm docker安装 文章中安装的是docker.io 但是我推荐安装docker-ce,docker.io版本太老了,步骤如下: 1.国际惯例更新

  • Hadoop计数器的应用以及数据清洗

    数据清洗(ETL) 在运行核心业务MapReduce程序之前,往往要先对数据进行清洗,清理掉不符合用户要求的数据.清理的过程往往只需要运行Mapper程序,不需要运行Reduce程序. 1.需求 去除日志中字段长度小于等于11的日志. (1)输入数据 web.log (2)期望输出数据 每行字段长度都大于11 2.需求分析 需要在Map阶段对输入的数据根据规则进行过滤清洗. 3.实现代码 (1)编写LogMapper类 package com.atguigu.mapreduce.weblog;

  • 在CentOS中搭建Hadoop的详细步骤

    搭建说明:第一次搭建 Hadoop 的小伙伴,请严格按照文章中的软件环境和步骤搭建,不一样的版本都可能会导致问题. 软件环境: 虚拟机:VMware Pro14 Linux:CentOS-6.4(下载地址,下载DVD版本即可) JDK:OpenJDK1.8.0 (强力建议不要使用 Oracle 公司的 Linux 版本的 JDK) Hadoop:2.6.5(下载地址) 虚拟机的安装和Linux系统的安装这里就省略了,可以参照网上的教程安装,一般没什么大问题,需要注意的是记住这里你输入的用户密码,

  • Hadoop中namenode和secondarynamenode工作机制讲解

    1)流程 2)FSImage和Edits nodenode是HDFS的大脑,它维护着整个文件系统的目录树,以及目录树里所有的文件和目录,这些信息以俩种文件存储在文件系统:一种是命名空间镜像(也称为文件系统镜像,File System Image,FSImage),即HDFS元数据的完整快照,每次NameNode启动的时候,默认会加载最新的命名空间镜像,另一种是命令空间镜像的编辑日志(Edit log). FSImage文件其实是文件系统元数据的一个永久性检查点,但并非每一个写操作都会更新这个文件

  • Java/Web调用Hadoop进行MapReduce示例代码

    Hadoop环境搭建详见此文章http://www.jb51.net/article/33649.htm. 我们已经知道Hadoop能够通过Hadoop jar ***.jar input output的形式通过命令行来调用,那么如何将其封装成一个服务,让Java/Web来调用它?使得用户可以用方便的方式上传文件到Hadoop并进行处理,获得结果.首先,***.jar是一个Hadoop任务类的封装,我们可以在没有jar的情况下运行该类的main方法,将必要的参数传递给它.input 和outpu

  • ubantu 16.4下Hadoop完全分布式搭建实战教程

    前言 本文主要介绍了关于ubantu 16.4 Hadoop完全分布式搭建的相关内容,分享出来供大家参考学习,下面话不多说了,来一起看看详细的介绍吧 一个虚拟机 1.以  NAT网卡模式   装载虚拟机 2.最好将几个用到的虚拟机修改主机名,静态IP     /etc/network/interface,这里 是 s101 s102  s103 三台主机 ubantu,改/etc/hostname文件 3.安装ssh 在第一台主机那里s101 创建公私密匙   ssh-keygen -t rsa

  • hadoop的wordcount实例代码

    可以通过一个简单的例子来说明MapReduce到底是什么: 我们要统计一个大文件中的各个单词出现的次数.由于文件太大.我们把这个文件切分成如果小文件,然后安排多个人去统计.这个过程就是"Map".然后把每个人统计的数字合并起来,这个就是"Reduce". 上面的例子如果在MapReduce去做呢,就需要创建一个任务job,由job把文件切分成若干独立的数据块,并分布在不同的机器节点中.然后通过分散在不同节点中的Map任务以完全并行的方式进行处理.MapReduce会

  • Java执行hadoop的基本操作实例代码

    Java执行hadoop的基本操作实例代码 向HDFS上传本地文件 public static void uploadInputFile(String localFile) throws IOException{ Configuration conf = new Configuration(); String hdfsPath = "hdfs://localhost:9000/"; String hdfsInput = "hdfs://localhost:9000/user/

  • hadoop上传文件功能实例代码

    hdfs上的文件是手动执行命令从本地linux上传至hdfs的.在真实的运行环境中,我们不可能每次手动执行命令上传的,这样太过繁琐.那么,我们可以使用hdfs提供的Java api实现文件上传至hdfs,或者直接从ftp上传至hdfs. 然而,需要说明一点,之前笔者是要运行MR,都需要每次手动执行yarn jar,在实际的环境中也不可能每次手动执行.像我们公司是使用了索答的调度平台/任务监控平台,可以定时的以工作流执行我们的程序,包括普通java程序和MR.其实,这个调度平台就是使用了quart

  • python导出hive数据表的schema实例代码

    本文研究的主要问题是python语言导出hive数据表的schema,分享了实现代码,具体如下. 为了避免运营提出无穷无尽的查询需求,我们决定将有查询价值的数据从mysql导入hive中,让他们使用HUE这个开源工具进行查询.想必他们对表结构不甚了解,还需要为之提供一个表结构说明,于是编写了一个脚本,从hive数据库中将每张表的字段即类型查询出来,代码如下: #coding=utf-8 import pyhs2 from xlwt import * hiveconn = pyhs2.connec

  • HDFS的Java API的访问方式实例代码

    本文研究的主要是HDFS的Java API的访问方式,具体代码如下所示,有详细注释. 最近的节奏有点儿快,等有空的时候把这个封装一下 实现代码 要导入的包: import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.BlockLocation

  • java网络爬虫连接超时解决实例代码

    本文研究的主要是java网络爬虫连接超时的问题,具体如下. 在网络爬虫中,经常会遇到如下报错.即连接超时.针对此问题,一般解决思路为:将连接时间.请求时间设置长一下.如果出现连接超时的情况,则在重新请求[设置重新请求次数]. Exception in thread "main" java.net.ConnectException: Connection timed out: connect 下面的代码便是使用httpclient解决连接超时的样例程序.直接上程序. package da

  • Python统计文本词汇出现次数的实例代码

    问题描述 有时在遇到一个文本需要统计文本内词汇的次数 的时候 ,可以用一个简单的python程序来实现. 解决方案 首先需要的是一个文本文件(.txt)格式(文本内词汇以空格分隔),因为需要的是一个程序,所以要考虑如何将文件打开而不是采用复制粘贴的方式.这时就要用到open()的方式来打开文档,然后通过read()读取其中内容,再将词汇作为key,出现次数作为values存入字典. 图 1 txt文件内容 再通过open和read函数来读取文件: open_file=open("text.txt

  • Angularjs2不同组件间的通信实例代码

    AngualrJs2官方方法是以@Input,@Output来实现组件间的相互传值,而且组件之间必须父子关系,下面给大家提供一个简单的方法,实现组件间的传值,不仅仅是父子组件,跨模块的组件也可以实现传值 /** *1.定义一个服务,作为传递参数的媒介 */ @Injectable() export class PrepService{ //定义一个属性,作为组件之间的传递参数,也可以是一个对象或方法 profileInfo: any; } /** *2.传递参数的组件,我这边简单演示,直接就在构

  • Angular动态添加、删除输入框并计算值实例代码

    Angular动态添加.删除输入框并计算值实例代码 摘要: 在学习群中交流时,有人分享了一个动态添加输入框的方法,我在其基础上进行了一些改进 这个功能本身并不复杂,但还是要注意,每个ng-model的对象必须是不同的,这样才能把它们分隔开. 下面是完整代码: JS: angular.module("myApp",[]) .controller("inputController",function($scope){ $scope.items=[]; //初始化数组,以

  • 正则表达式匹配(URL、电话、手机、邮箱)的实例代码

    正则表达式,又称规则表达式.(英语:Regular Expression,在代码中常简写为regex.regexp或RE),计算机科学的一个概念.正则表通常被用来检索.替换那些符合某个模式(规则)的文本.下面通过实例代码给大家介绍正则表达式匹配(URL.电话.手机.邮箱)的实例代码,一起看看吧! 废话不多说了,直接给大家贴代码了,具体代码如下所示: <!DOCTYPE html> <html lang="en"> <head> <meta ch

随机推荐