基于MapReduce实现决策树算法

本文实例为大家分享了MapReduce实现决策树算法的具体代码,供大家参考,具体内容如下

首先,基于C45决策树算法实现对应的Mapper算子,相关的代码如下:

public class MapClass extends MapReduceBase implements Mapper {

  private final static IntWritable one = new IntWritable(1);
  private Text attValue = new Text();
  private int i;
  private String token;
  public static int no_Attr;
  public Split split = null;

  public int size_split_1 = 0;

  public void configure(JobConf conf){
   try {
  split = (Split) ObjectSerializable.unSerialize(conf.get("currentsplit"));
 } catch (ClassNotFoundException e) {
  // TODO Auto-generated catch block
  e.printStackTrace();
 } catch (IOException e) {
  // TODO Auto-generated catch block
  e.printStackTrace();
 }
   size_split_1 = Integer.parseInt(conf.get("current_index"));
  }

  public void map(LongWritable key, Text value, OutputCollector output, Reporter reporter)
      throws IOException {
    String line = value.toString(); // changing input instance value to
                    // string
    StringTokenizer itr = new StringTokenizer(line);
    int index = 0;
    String attr_value = null;
    no_Attr = itr.countTokens() - 1;
    String attr[] = new String[no_Attr];
    boolean match = true;
    for (i = 0; i < no_Attr; i++) {
      attr[i] = itr.nextToken(); // Finding the values of different
                    // attributes
    }

    String classLabel = itr.nextToken();
    int size_split = split.attr_index.size();
    Counter counter = reporter.getCounter("reporter-"+Main.current_index, size_split+" "+size_split_1);
    counter.increment(1l);
    for (int count = 0; count < size_split; count++) {
      index = (Integer) split.attr_index.get(count);
      attr_value = (String) split.attr_value.get(count);
      if (!attr[index].equals(attr_value)) {
        match = false;
        break;
      }
    }

    if (match) {
      for (int l = 0; l < no_Attr; l++) {
        if (!split.attr_index.contains(l)) {
         //表示出某个属性在某个类标签上出现了一次
          token = l + " " + attr[l] + " " + classLabel;
          attValue.set(token);
          output.collect(attValue, one);
        }
        else{

        }
      }
      if (size_split == no_Attr) {
        token = no_Attr + " " + "null" + " " + classLabel;
        attValue.set(token);
        output.collect(attValue, one);
      }
    }
  }

}

然后,基于C45决策树算法实现对应的Reducer算子,相关的代码如下:

public class Reduce extends MapReduceBase implements Reducer {

  static int cnt = 0;
  ArrayList ar = new ArrayList();
  String data = null;
  private static int currentIndex;

  public void configure(JobConf conf) {
    currentIndex = Integer.valueOf(conf.get("currentIndex"));
  }

  public void reduce(Text key, Iterator values, OutputCollector output,
      Reporter reporter) throws IOException {
    int sum = 0;
    //sum表示按照某个属性进行划分的子数据集上的某个类出现的个数
    while (values.hasNext()) {
      sum += values.next().get();
    }
    //最后将这个属性上的取值写入output中;
    output.collect(key, new IntWritable(sum));

    String data = key + " " + sum;
    ar.add(data);
    //将最终结果写入到文件中;
    writeToFile(ar);
    ar.add("\n");
  }

  public static void writeToFile(ArrayList text) {
    try {
      cnt++;
      Path input = new Path("C45/intermediate" + currentIndex + ".txt");
      Configuration conf = new Configuration();
      FileSystem fs = FileSystem.get(conf);
      BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(fs.create(input, true)));

      for (String str : text) {
        bw.write(str);
      }
      bw.newLine();
      bw.close();
    } catch (Exception e) {
      System.out.println("File is not creating in reduce");
    }
  }
}

最后,编写Main函数,启动MapReduce作业,需要启动多趟,代码如下:

package com.hackecho.hadoop;

import java.io.BufferedWriter;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.util.ArrayList;
import java.util.List;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.log4j.PropertyConfigurator;
import org.dmg.pmml.MiningFunctionType;
import org.dmg.pmml.Node;
import org.dmg.pmml.PMML;
import org.dmg.pmml.TreeModel;

//在这里MapReduce的作用就是根据各个属性的特征来划分子数据集
public class Main extends Configured implements Tool {

 //当前分裂
  public static Split currentsplit = new Split();
  //已经分裂完成的集合
  public static List splitted = new ArrayList();
  //current_index 表示目前进行分裂的位置
  public static int current_index = 0;

  public static ArrayList ar = new ArrayList();

  public static List leafSplits = new ArrayList();

  public static final String PROJECT_HOME = System.getProperty("user.dir");

  public static void main(String[] args) throws Exception {
   //在splitted中已经放入了一个currentsplit了,所以此时的splitted的size大小为1
   PropertyConfigurator.configure(PROJECT_HOME + "/conf/log/log4j.properties");
    splitted.add(currentsplit);

    Path c45 = new Path("C45");
    Configuration conf = new Configuration();
    FileSystem fs = FileSystem.get(conf);
    if (fs.exists(c45)) {
      fs.delete(c45, true);
    }
    fs.mkdirs(c45);
    int res = 0;
    int split_index = 0;
    //增益率
    double gainratio = 0;
    //最佳增益
    double best_gainratio = 0;
    //熵值
    double entropy = 0;
    //分类标签
    String classLabel = null;
    //属性个数
    int total_attributes = MapClass.no_Attr;
    total_attributes = 4;
    //分裂的个数
    int split_size = splitted.size();
    //增益率
    GainRatio gainObj;
    //产生分裂的新节点
    Split newnode;

    while (split_size > current_index) {
     currentsplit = splitted.get(current_index);
      gainObj = new GainRatio();
      res = ToolRunner.run(new Configuration(), new Main(), args);
      System.out.println("Current NODE INDEX . ::" + current_index);
      int j = 0;
      int temp_size;
      gainObj.getcount();
      //计算当前节点的信息熵
      entropy = gainObj.currNodeEntophy();
      //获取在当前节点的分类
      classLabel = gainObj.majorityLabel();
      currentsplit.classLabel = classLabel;

      if (entropy != 0.0 && currentsplit.attr_index.size() != total_attributes) {
        System.out.println("");
        System.out.println("Entropy NOTT zero  SPLIT INDEX::  " + entropy);
        best_gainratio = 0;
        //计算各个属性的信息增益值
        for (j = 0; j < total_attributes; j++) // Finding the gain of
                            // each attribute
        {
          if (!currentsplit.attr_index.contains(j)) {
           //按照每一个属性的序号,也就是索引j来计算各个属性的信息增益
            gainratio = gainObj.gainratio(j, entropy);
            //找出最佳的信息增益
            if (gainratio >= best_gainratio) {
              split_index = j;
              best_gainratio = gainratio;
            }
          }
        }

        //split_index表示在第几个属性上完成了分裂,也就是分裂的索引值;
        //attr_values_split表示分裂的属性所取的值的拼接成的字符串;
        String attr_values_split = gainObj.getvalues(split_index);
        StringTokenizer attrs = new StringTokenizer(attr_values_split);
        int number_splits = attrs.countTokens(); // number of splits
                             // possible with
                             // attribute selected
        String red = "";
        System.out.println(" INDEX :: " + split_index);
        System.out.println(" SPLITTING VALUES " + attr_values_split);

        //根据分裂形成的属性值的集合将在某个节点上按照属性值将数据集分成若干类
        for (int splitnumber = 1; splitnumber <= number_splits; splitnumber++) {
          temp_size = currentsplit.attr_index.size();
          newnode = new Split();
          for (int y = 0; y < temp_size; y++) {
            newnode.attr_index.add(currentsplit.attr_index.get(y));
            newnode.attr_value.add(currentsplit.attr_value.get(y));
          }
          red = attrs.nextToken();

          newnode.attr_index.add(split_index);
          newnode.attr_value.add(red);
          //按照当前的属性值将数据集将若干分类,同时将数据集按照这个属性划分位若干个新的分裂;
          splitted.add(newnode);
        }
      } else if(entropy==0.0 && currentsplit.attr_index.size()!=total_attributes){
       //每次计算到叶子节点的时候,就将其持久化到模型文件中
       /**
        String rule = "";
        temp_size = currentsplit.attr_index.size();
        for (int val = 0; val < temp_size; val++) {
          rule = rule + " " + currentsplit.attr_index.get(val) + " " + currentsplit.attr_value.get(val);
        }
        rule = rule + " " + currentsplit.classLabel;
        ar.add(rule);
        writeRuleToFile(ar);
        ar.add("\n");
        if (entropy != 0.0) {
          System.out.println("Enter rule in file:: " + rule);
        } else {
          System.out.println("Enter rule in file Entropy zero ::  " + rule);
        }
        System.out.println("persistence model@!!!!");
        */
       leafSplits.add(currentsplit);
      }
      else{
       TreeModel tree = PmmlDecisionTree.buildTreeModel(leafSplits);
       PMML pmml = new PMML();
       pmml.addModels(tree);
       PmmlModelFactory.pmmlPersistence("C45/DecisionTree.pmml", pmml);
      }
      split_size = splitted.size();
      System.out.println("TOTAL NODES::  " + split_size);
      current_index++;
    }
    System.out.println("Done!");
    System.exit(res);
  }

  public static void writeRuleToFile(ArrayList text) throws IOException {
   Path rule = new Path("C45/rule.txt");
    Configuration conf = new Configuration();
    FileSystem fs = FileSystem.get(conf);
    try {
      BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(fs.create(rule, true)));
      for (String str : text) {
        bw.write(str);
      }
      bw.newLine();
      bw.close();
    } catch (Exception e) {
      e.printStackTrace();
    }
  }

  public int run(String[] args) throws Exception {
    System.out.println("In main ---- run");
    JobConf conf = new JobConf(getConf(), Main.class);
    conf.setJobName("C45");
    conf.set("currentsplit",ObjectSerializable.serialize(currentsplit));
    conf.set("current_index",String.valueOf(currentsplit.attr_index.size()));
    conf.set("currentIndex", String.valueOf(current_index));

    // the keys are words (strings)
    conf.setOutputKeyClass(Text.class);
    // the values are counts (ints)
    conf.setOutputValueClass(IntWritable.class);

    conf.setMapperClass(MapClass.class);
    conf.setReducerClass(Reduce.class);
    System.out.println("back to run");

    FileSystem fs = FileSystem.get(conf);

    Path out = new Path(args[1] + current_index);
    if (fs.exists(out)) {
      fs.delete(out, true);
    }
    FileInputFormat.setInputPaths(conf, args[0]);
    FileOutputFormat.setOutputPath(conf, out);

    JobClient.runJob(conf);
    return 0;
  }
}

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持我们。

(0)

相关推荐

  • Java实现的决策树算法完整实例

    本文实例讲述了Java实现的决策树算法.分享给大家供大家参考,具体如下: 决策树算法是一种逼近离散函数值的方法.它是一种典型的分类方法,首先对数据进行处理,利用归纳算法生成可读的规则和决策树,然后使用决策对新数据进行分析.本质上决策树是通过一系列规则对数据进行分类的过程. 决策树构造可以分两步进行.第一步,决策树的生成:由训练样本集生成决策树的过程.一般情况下,训练样本数据集是根据实际需要有历史的.有一定综合程度的,用于数据分析处理的数据集.第二步,决策树的剪枝:决策树的剪枝是对上一阶段生成的决

  • 基于MapReduce实现决策树算法

    本文实例为大家分享了MapReduce实现决策树算法的具体代码,供大家参考,具体内容如下 首先,基于C45决策树算法实现对应的Mapper算子,相关的代码如下: public class MapClass extends MapReduceBase implements Mapper { private final static IntWritable one = new IntWritable(1); private Text attValue = new Text(); private in

  • Python机器学习应用之基于决策树算法的分类预测篇

    目录 一.决策树的特点 1.优点 2.缺点 二.决策树的适用场景 三.demo 一.决策树的特点 1.优点 具有很好的解释性,模型可以生成可以理解的规则. 可以发现特征的重要程度. 模型的计算复杂度较低. 2.缺点 模型容易过拟合,需要采用减枝技术处理. 不能很好利用连续型特征. 预测能力有限,无法达到其他强监督模型效果. 方差较高,数据分布的轻微改变很容易造成树结构完全不同. 二.决策树的适用场景 决策树模型多用于处理自变量与因变量是非线性的关系. 梯度提升树(GBDT),XGBoost以及L

  • Python机器学习之决策树算法实例详解

    本文实例讲述了Python机器学习之决策树算法.分享给大家供大家参考,具体如下: 决策树学习是应用最广泛的归纳推理算法之一,是一种逼近离散值目标函数的方法,在这种方法中学习到的函数被表示为一棵决策树.决策树可以使用不熟悉的数据集合,并从中提取出一系列规则,机器学习算法最终将使用这些从数据集中创造的规则.决策树的优点为:计算复杂度不高,输出结果易于理解,对中间值的缺失不敏感,可以处理不相关特征数据.缺点为:可能产生过度匹配的问题.决策树适于处理离散型和连续型的数据. 在决策树中最重要的就是如何选取

  • python代码实现ID3决策树算法

    本文实例为大家分享了python实现ID3决策树算法的具体代码,供大家参考,具体内容如下 ''''' Created on Jan 30, 2015 @author: 史帅 ''' from math import log import operator import re def fileToDataSet(fileName): ''''' 此方法功能是:从文件中读取样本集数据,样本数据的格式为:数据以空白字符分割,最后一列为类标签 参数: fileName:存放样本集数据的文件路径 返回值:

  • Python机器学习之决策树算法

    一.决策树原理 决策树是用样本的属性作为结点,用属性的取值作为分支的树结构. 决策树的根结点是所有样本中信息量最大的属性.树的中间结点是该结点为根的子树所包含的样本子集中信息量最大的属性.决策树的叶结点是样本的类别值.决策树是一种知识表示形式,它是对所有样本数据的高度概括决策树能准确地识别所有样本的类别,也能有效地识别新样本的类别. 决策树算法ID3的基本思想: 首先找出最有判别力的属性,把样例分成多个子集,每个子集又选择最有判别力的属性进行划分,一直进行到所有子集仅包含同一类型的数据为止.最后

  • python实现ID3决策树算法

    ID3决策树是以信息增益作为决策标准的一种贪心决策树算法 # -*- coding: utf-8 -*- from numpy import * import math import copy import cPickle as pickle class ID3DTree(object): def __init__(self): # 构造方法 self.tree = {} # 生成树 self.dataSet = [] # 数据集 self.labels = [] # 标签集 # 数据导入函数

  • Python3.0 实现决策树算法的流程

    决策树的一般流程 检测数据集中的每个子项是否属于同一个分类 if so return 类标签 Else 寻找划分数据集的最好特征 划分数据集 创建分支 节点 from math import log import operator #生成样本数据集 def createDataSet(): dataSet = [[1,1,'yes'], [1,1,'yes'], [1,0,'no'], [0,1,'no'], [0,1,'no']] labels = ['no surfacing','flipp

  • Python实现CART决策树算法及详细注释

    目录 一.CART决策树算法简介 二.基尼系数 三.CART决策树生成算法 四.CART算法的Python实现 五.运行结果 一.CART决策树算法简介 CART(Classification And Regression Trees 分类回归树)算法是一种树构建算法,既可以用于分类任务,又可以用于回归.相比于 ID3 和 C4.5 只能用于离散型数据且只能用于分类任务,CART 算法的适用面要广得多,既可用于离散型数据,又可以处理连续型数据,并且分类和回归任务都能处理. 本文仅讨论基本的CAR

  • python实现C4.5决策树算法

    C4.5算法使用信息增益率来代替ID3的信息增益进行特征的选择,克服了信息增益选择特征时偏向于特征值个数较多的不足.信息增益率的定义如下: # -*- coding: utf-8 -*- from numpy import * import math import copy import cPickle as pickle class C45DTree(object): def __init__(self): # 构造方法 self.tree = {} # 生成树 self.dataSet =

随机推荐