Java/Web调用Hadoop进行MapReduce示例代码

Hadoop环境搭建详见此文章http://www.jb51.net/article/33649.htm。

我们已经知道Hadoop能够通过Hadoop jar ***.jar input output的形式通过命令行来调用,那么如何将其封装成一个服务,让Java/Web来调用它?使得用户可以用方便的方式上传文件到Hadoop并进行处理,获得结果。首先,***.jar是一个Hadoop任务类的封装,我们可以在没有jar的情况下运行该类的main方法,将必要的参数传递给它。input 和output则将用户上传的文件使用Hadoop的JavaAPI put到Hadoop的文件系统中。然后再通过Hadoop的JavaAPI 从文件系统中取得结果文件。

搭建JavaWeb工程。本文使用Spring、SpringMVC、MyBatis框架, 当然,这不是重点,就算没有使用任何框架也能实现。

项目框架如下:

项目中使用到的jar包如下:

在Spring的配置文件中,加入

<bean id="multipartResolver" class="org.springframework.web.multipart.commons.CommonsMultipartResolver">
   <property name="defaultEncoding" value="utf-8" />
   <property name="maxUploadSize" value="10485760000" />
   <property name="maxInMemorySize" value="40960" />
</bean>

使得项目支持文件上传。

新建一个login.jsp 点击登录后进入user/login

user/login中处理登录,登录成功后,【在Hadoop文件系统中创建用户文件夹】,然后跳转到console.jsp

package com.chenjie.controller; 

import java.io.IOException; 

import javax.annotation.Resource; 

import javax.servlet.http.HttpServletRequest; 

import javax.servlet.http.HttpServletResponse; 

import org.apache.hadoop.conf.Configuration; 

import org.apache.hadoop.fs.FileSystem; 

import org.apache.hadoop.fs.Path; 

import org.springframework.stereotype.Controller; 

import org.springframework.web.bind.annotation.RequestMapping; 

import com.chenjie.pojo.JsonResult; 

import com.chenjie.pojo.User; 

import com.chenjie.service.UserService; 

import com.chenjie.util.AppConfig; 

import com.google.gson.Gson;
/** 

 * 用户请求控制器 

 * 

 * @author Chen 

 * 

 */ 

@Controller 

// 声明当前类为控制器 

@RequestMapping("/user") 

// 声明当前类的路径 

public class UserController { 

  @Resource(name = "userService") 

  private UserService userService;// 由Spring容器注入一个UserService实例
  /** 

   * 登录 

   * 

   * @param user 

   *      用户 

   * @param request 

   * @param response 

   * @throws IOException 

   */ 

  @RequestMapping("/login") 

  // 声明当前方法的路径 

  public String login(User user, HttpServletRequest request, 

      HttpServletResponse response) throws IOException { 

    response.setContentType("application/json");// 设置响应内容格式为json 

    User result = userService.login(user);// 调用UserService的登录方法 

    request.getSession().setAttribute("user", result); 

    if (result != null) { 

      createHadoopFSFolder(result); 

      return "console"; 

    } 

    return "login"; 

  } 

  public void createHadoopFSFolder(User user) throws IOException { 

    Configuration conf = new Configuration(); 

    conf.addResource(new Path("/opt/hadoop-1.2.1/conf/core-site.xml")); 

    conf.addResource(new Path("/opt/hadoop-1.2.1/conf/hdfs-site.xml")); 

    FileSystem fileSystem = FileSystem.get(conf); 

    System.out.println(fileSystem.getUri()); 

    Path file = new Path("/user/" + user.getU_username()); 

    if (fileSystem.exists(file)) { 

      System.out.println("haddop hdfs user foler exists."); 

      fileSystem.delete(file, true); 

      System.out.println("haddop hdfs user foler delete success."); 

    } 

    fileSystem.mkdirs(file); 

    System.out.println("haddop hdfs user foler creat success."); 

  }
}

console.jsp中进行文件上传和任务提交、

文件上传和任务提交:

package com.chenjie.controller; 

import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List; 

import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse; 

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobID;
import org.apache.hadoop.mapred.JobStatus;
import org.apache.hadoop.mapred.RunningJob;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.multipart.MultipartFile;
import org.springframework.web.multipart.MultipartHttpServletRequest;
import org.springframework.web.multipart.commons.CommonsMultipartResolver; 

import com.chenjie.pojo.User;
import com.chenjie.util.Utils; 

@Controller
// 声明当前类为控制器
@RequestMapping("/hadoop")
// 声明当前类的路径
public class HadoopController { 

  @RequestMapping("/upload")
  // 声明当前方法的路径
  //文件上传
  public String upload(HttpServletRequest request,
      HttpServletResponse response) throws IOException {
    List<String> fileList = (List<String>) request.getSession()
        .getAttribute("fileList");//得到用户已上传文件列表
    if (fileList == null)
      fileList = new ArrayList<String>();//如果文件列表为空,则新建
    User user = (User) request.getSession().getAttribute("user");
    if (user == null)
      return "login";//如果用户未登录,则跳转登录页面
    CommonsMultipartResolver multipartResolver = new CommonsMultipartResolver(
        request.getSession().getServletContext());//得到在Spring配置文件中注入的文件上传组件
    if (multipartResolver.isMultipart(request)) {//如果请求是文件请求
      MultipartHttpServletRequest multiRequest = (MultipartHttpServletRequest) request; 

      Iterator<String> iter = multiRequest.getFileNames();//得到文件名迭代器
      while (iter.hasNext()) {
        MultipartFile file = multiRequest.getFile((String) iter.next());
        if (file != null) {
          String fileName = file.getOriginalFilename();
          File folder = new File("/home/chenjie/CJHadoopOnline/"
              + user.getU_username());
          if (!folder.exists()) {
            folder.mkdir();//如果文件不目录存在,则在服务器本地创建
          }
          String path = "/home/chenjie/CJHadoopOnline/"
              + user.getU_username() + "/" + fileName; 

          File localFile = new File(path); 

          file.transferTo(localFile);//将上传文件拷贝到服务器本地目录
          // fileList.add(path);
        }
        handleUploadFiles(user, fileList);//处理上传文件
      } 

    }
    request.getSession().setAttribute("fileList", fileList);//将上传文件列表保存在Session中
    return "console";//返回console.jsp继续上传文件
  } 

  @RequestMapping("/wordcount")
  //调用Hadoop进行mapreduce
  public void wordcount(HttpServletRequest request,
      HttpServletResponse response) {
    System.out.println("进入controller wordcount ");
    User user = (User) request.getSession().getAttribute("user");
    System.out.println(user);
    // if(user == null)
    // return "login";
    WordCount c = new WordCount();//新建单词统计任务
    String username = user.getU_username();
    String input = "hdfs://chenjie-virtual-machine:9000/user/" + username
        + "/wordcountinput";//指定Hadoop文件系统的输入文件夹
    String output = "hdfs://chenjie-virtual-machine:9000/user/" + username
        + "/wordcountoutput";//指定Hadoop文件系统的输出文件夹
    String reslt = output + "/part-r-00000";//默认输出文件
    try {
      Thread.sleep(3*1000);
      c.main(new String[] { input, output });//调用单词统计任务
      Configuration conf = new Configuration();//新建Hadoop配置
      conf.addResource(new Path("/opt/hadoop-1.2.1/conf/core-site.xml"));//添加Hadoop配置,找到Hadoop部署信息
      conf.addResource(new Path("/opt/hadoop-1.2.1/conf/hdfs-site.xml"));//Hadoop配置,找到文件系统 

      FileSystem fileSystem = FileSystem.get(conf);//得打文件系统
      Path file = new Path(reslt);//找到输出结果文件
      FSDataInputStream inStream = fileSystem.open(file);//打开
      URI uri = file.toUri();//得到输出文件路径
      System.out.println(uri);
      String data = null;
      while ((data = inStream.readLine()) != null) {
        //System.out.println(data);
        response.getOutputStream().println(data);//讲结果文件写回用户网页
      }
//     InputStream in = fileSystem.open(file);
//     OutputStream out = new FileOutputStream("result.txt");
//     IOUtils.copyBytes(in, out, 4096, true);
      inStream.close();
    } catch (Exception e) {
      System.err.println(e.getMessage());
    }
  } 

  @RequestMapping("/MapReduceStates")
  //得到MapReduce的状态
  public void mapreduce(HttpServletRequest request,
      HttpServletResponse response) {
    float[] progress=new float[2];
    try {
      Configuration conf1=new Configuration();
      conf1.set("mapred.job.tracker", Utils.JOBTRACKER); 

      JobStatus jobStatus = Utils.getJobStatus(conf1);
//     while(!jobStatus.isJobComplete()){
//       progress = Utils.getMapReduceProgess(jobStatus);
//       response.getOutputStream().println("map:" + progress[0] + "reduce:" + progress[1]);
//       Thread.sleep(1000);
//     }
      JobConf jc = new JobConf(conf1); 

      JobClient jobClient = new JobClient(jc);
      JobStatus[] jobsStatus = jobClient.getAllJobs();
      //这样就得到了一个JobStatus数组,随便取出一个元素取名叫jobStatus
      jobStatus = jobsStatus[0];
      JobID jobID = jobStatus.getJobID(); //通过JobStatus获取JobID
      RunningJob runningJob = jobClient.getJob(jobID); //通过JobID得到RunningJob对象
      runningJob.getJobState();//可以获取作业状态,状态有五种,为JobStatus.Failed 、JobStatus.KILLED、JobStatus.PREP、JobStatus.RUNNING、JobStatus.SUCCEEDED
      jobStatus.getUsername();//可以获取运行作业的用户名。
      runningJob.getJobName();//可以获取作业名。
      jobStatus.getStartTime();//可以获取作业的开始时间,为UTC毫秒数。
      float map = runningJob.mapProgress();//可以获取Map阶段完成的比例,0~1,
      System.out.println("map=" + map);
      float reduce = runningJob.reduceProgress();//可以获取Reduce阶段完成的比例。
      System.out.println("reduce="+reduce);
      runningJob.getFailureInfo();//可以获取失败信息。
      runningJob.getCounters();//可以获取作业相关的计数器,计数器的内容和作业监控页面上看到的计数器的值一样。  

    } catch (IOException e) {
      progress[0] = 0;
      progress[1] = 0;
    } 

    request.getSession().setAttribute("map", progress[0]);
    request.getSession().setAttribute("reduce", progress[1]);
  } 

  //处理文件上传
  public void handleUploadFiles(User user, List<String> fileList) {
    File folder = new File("/home/chenjie/CJHadoopOnline/"
        + user.getU_username());
    if (!folder.exists())
      return;
    if (folder.isDirectory()) {
      File[] files = folder.listFiles();
      for (File file : files) {
        System.out.println(file.getName());
        try {
          putFileToHadoopFSFolder(user, file, fileList);//将单个文件上传到Hadoop文件系统
        } catch (IOException e) {
          System.err.println(e.getMessage());
        }
      }
    }
  } 

  //将单个文件上传到Hadoop文件系统
  private void putFileToHadoopFSFolder(User user, File file,
      List<String> fileList) throws IOException {
    Configuration conf = new Configuration();
    conf.addResource(new Path("/opt/hadoop-1.2.1/conf/core-site.xml"));
    conf.addResource(new Path("/opt/hadoop-1.2.1/conf/hdfs-site.xml")); 

    FileSystem fileSystem = FileSystem.get(conf);
    System.out.println(fileSystem.getUri()); 

    Path localFile = new Path(file.getAbsolutePath());
    Path foler = new Path("/user/" + user.getU_username()
        + "/wordcountinput");
    if (!fileSystem.exists(foler)) {
      fileSystem.mkdirs(foler);
    } 

    Path hadoopFile = new Path("/user/" + user.getU_username()
        + "/wordcountinput/" + file.getName());
//   if (fileSystem.exists(hadoopFile)) {
//     System.out.println("File exists.");
//   } else {
//     fileSystem.mkdirs(hadoopFile);
//   }
    fileSystem.copyFromLocalFile(true, true, localFile, hadoopFile);
    fileList.add(hadoopFile.toUri().toString()); 

  } 

}

启动Hadoop:

运行结果:

可以在任意平台下,登录该项目地址,上传文件,得到结果。

运行成功。

源代码:https://github.com/tudoupaisimalingshu/CJHadoopOnline

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持我们。

(0)

相关推荐

  • java连接hdfs ha和调用mapreduce jar示例

    Java API 连接 HDFS HA 复制代码 代码如下: public static void main(String[] args) {  Configuration conf = new Configuration();  conf.set("fs.defaultFS", "hdfs://hadoop2cluster");  conf.set("dfs.nameservices", "hadoop2cluster");

  • Java函数式编程(七):MapReduce

    译注:map(映射)和reduce(归约,化简)是数学上两个很基础的概念,它们很早就出现在各类的函数编程语言里了,直到2003年Google将其发扬光大,运用到分布式系统中进行并行计算后,这个组合的名字才开始在计算机界大放异彩(那些函数式粉可能并不这么认为).本文我们会看到Java 8在摇身一变支持函数式编程后,map和reduce组合的首次亮相(这里只是初步介绍,后续还会有针对它们的专题). 对集合进行归约 现在为止我们已经介绍了几个操作集合的新技巧了:查找匹配元素,查找单个元素,集合转化.这

  • java 矩阵乘法的mapreduce程序实现

    java 矩阵乘法的mapreduce程序实现 map函数:对于矩阵M中的每个元素m(ij),产生一系列的key-value对<(i,k),(M,j,m(ij))> 其中k=1,2.....知道矩阵N的总列数;对于矩阵N中的每个元素n(jk),产生一系列的key-value对<(i , k) , (N , j ,n(jk)>, 其中i=1,2.......直到i=1,2.......直到矩阵M的总列数. map package com.cb.matrix; import stati

  • Java/Web调用Hadoop进行MapReduce示例代码

    Hadoop环境搭建详见此文章http://www.jb51.net/article/33649.htm. 我们已经知道Hadoop能够通过Hadoop jar ***.jar input output的形式通过命令行来调用,那么如何将其封装成一个服务,让Java/Web来调用它?使得用户可以用方便的方式上传文件到Hadoop并进行处理,获得结果.首先,***.jar是一个Hadoop任务类的封装,我们可以在没有jar的情况下运行该类的main方法,将必要的参数传递给它.input 和outpu

  • Java定时调用.ktr文件的示例代码(解决方案)

    1.Maven依赖 <!-- Kettle --> <dependency> <groupId>pentaho-kettle</groupId> <artifactId>kettle-core</artifactId> <version>7.1.0.0-12</version> </dependency> <dependency> <groupId>pentaho-kettl

  • 3分钟纯 Java 注解搭个管理系统的示例代码

    最近接触到个新项目,发现它用了一个比较有意思的框架,可以说实现了我刚入行时候的梦想,所以这里马不停蹄的和大家分享下. 在我刚开始工作接触的项目都还没做前后端分离,经常需要后端来维护页面,有时候觉得自己好像天生不适合干前端,你要是让我研究研究后端的技术,看个中间件源码啊,分析分析什么框架底层原理啊,这都问题不大,偶尔搞一下JS也可以.你要是让我写个css样式,那简直要命了,一点也提不起兴趣,不知道有没有跟我一样的. 今天要介绍的框架直接不用写页面了,话不多说,下边咱们直奔主题 Erupt一个通用后

  • Java Web 简单的分页显示实例代码

    本文通过两个方法:(1)计算总的页数. (2)查询指定页数据,实现简单的分页效果. 思路:首先得在 DAO 对象中提供分页查询的方法,在控制层调用该方法查到指定页的数据,在表示层通过 EL 表达式和 JSTL 将该页数据显示出来. 先给大家展示下效果图: 题外话:该分页显示是用 "表示层-控制层-DAO层-数据库"的设计思想实现的,有什么需要改进的地方大家提出来,共同学习进步.废话不多说了,开始进入主题,详细步骤如下所示: 1.DAO层-数据库 JDBCUtils 类用于打开和关闭数据

  • Java多线程编程实现socket通信示例代码

    流传于网络上有关Java多线程通信的编程实例有很多,这一篇还算比较不错,代码可用.下面看看具体内容. TCP是Tranfer Control Protocol的 简称,是一种面向连接的保证可靠传输的协议.通过TCP协议传输,得到的是一个顺序的无差错的数据流.发送方和接收方的成对的两个socket之间必须建 立连接,以便在TCP协议的基础上进行通信,当一个socket(通常都是server socket)等待建立连接时,另一个socket可以要求进行连接,一旦这两个socket连接起来,它们就可以

  • Java异常退出条件的判断示例代码

    无论是功能性代码还是算法性代码,程序都是一系列流程的合集 既然是流程就分为:一般流程和异常流程: 一般流程保证了基本功能: 异常流程则是对程序稳定性的保证,不能因为一些非法输入,项目就挂了: 注意,布尔表达式的先后顺序,有时不可以交换 if (null == instance || instance.isEmpty()) 0. 常见异常退出条件 参数为空: 表示长度,表示索引的整型为负数,或者超出待索引数组或容器的范围: 1. String 的 startsWith 函数 首先来看 String

  • Java 使用 FFmpeg 处理视频文件示例代码详解

    目前在公司做一个小东西,里面用到了 FFmpeg 简单处理音视频,感觉功能特别强大,在做之前我写了一个小例子,现在记录一下分享给大家,希望大家遇到这个问题知道解决方案. FFmpeg是一套可以用来记录.转换数字音频.视频,并能将其转化为流的开源计算机程序.采用LGPL或GPL许可证.它提供了录制.转换以及流化音视频的完整解决方案.它包含了非常先进的音频/视频编解码库libavcodec,为了保证高可移植性和编解码质量,libavcodec里很多code都是从头开发的. FFmpeg在Linux平

  • 使用Java对Hbase操作总结及示例代码

    前面已经给大家讲解过如何使用Hbase建表,以及基本的操作和一些常用shell命令,今天就给大家介绍下如何使用java对Hbase进行各种操作. 没印象的话可以再去浏览下: Hbase入门教程,shell命令大全讲解 Java操作Hbase主要方法: 1.Configuration 在使用Java API时,Client端需要知道HBase的配置环境,如存储地址,zookeeper等信息. 这些信息通过Configuration对象来封装,可通过如下代码构建该对象: Configuration

  • Java实现NIO聊天室的示例代码(群聊+私聊)

    功能介绍 功能:群聊+私发+上线提醒+下线提醒+查询在线用户 文件 Utils 需要用maven导入下面两个包 <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.16.18</version> </dependency> <dependency> <group

  • java异常级别与捕获的示例代码

    这个是jdk 1.8文档中异常的种族描述 可以看出Java异常体系的根类是 Throwable, 当在java代码中写throw抛出异常时,后面跟的对象必然是Throwable或其子类的对象. 然后往下又分为 Exception(异常) 和 Error 其中Exception异常是指一些可以恢复的异常, 例如常见的NullPointerException空指针异常. Error指的是一些致命的错误,无法通过程序代码手段恢复的异常,例如OutOfMemoryError内存溢出错误. 如果能善用异常

随机推荐