基于Hadoop实现Knn算法

Knn算法的核心思想是如果一个样本在特征空间中的K个最相邻的样本中的大多数属于某一个类别,则该样本也属于这个类别,并具有这个类别上样本的特性。该方法在确定分类决策上只依据最邻近的一个或者几个样本的类别来决定待分样本所属的类别。Knn方法在类别决策时,只与极少量的相邻样本有关。由于Knn方法主要靠周围有限的邻近的样本,而不是靠判别类域的方法来确定所属类别的,因此对于类域的交叉或重叠较多的待分样本集来说,Knn方法较其他方法更为合适。

Knn算法流程如下:

1. 计算当前测试数据与训练数据中的每条数据的距离

2. 圈定距离最近的K个训练对象,作为测试对象的近邻

3. 计算这K个训练对象中出现最多的那个类别,并将这个类别作为当前测试数据的类别

以上流程是Knn的大致流程,按照这个流程实现的MR效率并不高,可以在这之上进行优化。在这里只写,跟着这个流程走的MR实现过程。

Mapper的设计:

由于测试数据相比于训练数据来说,会小很多,因此将测试数据用Java API读取,放到内存中。所以,在setup中需要对测试数据进行初始化。在map中,计算当前测试数据与每条训练数据的距离,Mapper的值类型为:<Object, Text, IntWritable,MyWritable>。map输出键类型为IntWritable,存放当前测试数据的下标,输出值类型为MyWritable,这是自定义值类型,其中存放的是距离以及与测试数据比较的训练数据的类别。

public class KnnMapper extends Mapper<Object, Text, IntWritable,MyWritable> {
 Logger log = LoggerFactory.getLogger(KnnMapper.class);
 private List<float[]> testData;
 @Override
 protected void setup(Context context)
 throws IOException, InterruptedException {
 // TODO Auto-generated method stub
 Configuration conf= context.getConfiguration();
 conf.set("fs.defaultFS", "master:8020");
 String testPath= conf.get("TestFilePath");
 Path testDataPath= new Path(testPath);
 FileSystem fs = FileSystem.get(conf);
 this.testData = readTestData(fs,testDataPath);
 }

 @Override
 protected void map(Object key, Text value, Context context)
 throws IOException, InterruptedException {
 // TODO Auto-generated method stub
 String[] line = value.toString().split(",");
 float[] trainData = new float[line.length-1];
 for(int i=0;i<trainData.length;i++){
 trainData[i] = Float.valueOf(line[i]);
 log.info("训练数据:"+line[i]+"类别:"+line[line.length-1]);
 }
 for(int i=0; i< this.testData.size();i++){
 float[] testI = this.testData.get(i);
 float distance = Outh(testI, trainData);
 log.info("距离:"+distance);
 context.write(new IntWritable(i), new MyWritable(distance, line[line.length-1]));
 }
 }

 private List<float[]> readTestData(FileSystem fs,Path Path) throws IOException {
 //补充代码完整
 FSDataInputStream data = fs.open(Path);
 BufferedReader bf = new BufferedReader(new InputStreamReader(data));
 String line = "";
 List<float[]> list = new ArrayList<>();
 while ((line = bf.readLine()) != null) {
 String[] items = line.split(",");
 float[] item = new float[items.length];
 for(int i=0;i<items.length;i++){
 item[i] = Float.valueOf(items[i]);
 }
 list.add(item);
 }
 return list;
 }
 // 计算欧式距离
 private static float Outh(float[] testData, float[] inData) {
 float distance =0.0f;
 for(int i=0;i<testData.length;i++){
 distance += (testData[i]-inData[i])*(testData[i]-inData[i]);
 }
 distance = (float)Math.sqrt(distance);
 return distance;
 }
}

自定义值类型MyWritable如下:

public class MyWritable implements Writable{
 private float distance;
 private String label;
 public MyWritable() {
 // TODO Auto-generated constructor stub
 }
 public MyWritable(float distance, String label){
 this.distance = distance;
 this.label = label;
 }
 @Override
 public String toString() {
 // TODO Auto-generated method stub
 return this.distance+","+this.label;
 }
 @Override
 public void write(DataOutput out) throws IOException {
 // TODO Auto-generated method stub
 out.writeFloat(distance);
 out.writeUTF(label);
 }
 @Override
 public void readFields(DataInput in) throws IOException {
 // TODO Auto-generated method stub
 this.distance = in.readFloat();
 this.label = in.readUTF();

 }
 public float getDistance() {
 return distance;
 }

 public void setDistance(float distance) {
 this.distance = distance;
 }

 public String getLabel() {
 return label;
 }

 public void setLabel(String label) {
 this.label = label;
 }

}

在Reducer端中,需要初始化参数K,也就是圈定距离最近的K个对象的K值。在reduce中需要对距离按照从小到大的距离排序,然后选取前K条数据,再计算这K条数据中,出现次数最多的那个类别并将这个类别与测试数据的下标相对应并以K,V的形式输出到HDFS上。

public class KnnReducer extends Reducer<IntWritable, MyWritable, IntWritable, Text> {
 private int K;
 @Override
 protected void setup(Context context)
 throws IOException, InterruptedException {
 // TODO Auto-generated method stub
 this.K = context.getConfiguration().getInt("K", 5);
 }
 @Override
 /***
 * key => 0
 * values =>([1,lable1],[2,lable2],[3,label2],[2.5,lable2])
 */
 protected void reduce(IntWritable key, Iterable<MyWritable> values,
 Context context) throws IOException, InterruptedException {
 // TODO Auto-generated method stub
 MyWritable[] mywrit = new MyWritable[K];
 for(int i=0;i<K;i++){
 mywrit[i] = new MyWritable(Float.MAX_VALUE, "-1");
 }
 // 找出距离最小的前k个
 for (MyWritable m : values) {
 float distance = m.getDistance();
 String label = m.getLabel();
 for(MyWritable m1: mywrit){
 if (distance < m1.getDistance()){
  m1.setDistance(distance);
  m1.setLabel(label);
 }
 }
 }
 // 找出前k个中,出现次数最多的类别
 String[] testClass = new String[K];
 for(int i=0;i<K;i++){
 testClass[i] = mywrit[i].getLabel();
 }
 String countMost = mostEle(testClass);
 context.write(key, new Text(countMost));
 }
 public static String mostEle(String[] strArray) {
  HashMap<String, Integer> map = new HashMap<>();
  for (int i = 0; i < strArray.length; i++) {
 String str = strArray[i];
   if (map.containsKey(str)) {
 int tmp = map.get(str);
 map.put(str, tmp+1);
 }else{
 map.put(str, 1);
 }
 }
  // 得到hashmap中值最大的键,也就是出现次数最多的类别
  Collection<Integer> count = map.values();
  int maxCount = Collections.max(count);
  String maxString = "";
  for(Map.Entry<String, Integer> entry: map.entrySet()){
   if (maxCount == entry.getValue()) {
 maxString = entry.getKey();
 }
  }
  return maxString;
 }
}

最后输出结果如下:

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持我们。

(0)

相关推荐

  • Hadoop计数器的应用以及数据清洗

    数据清洗(ETL) 在运行核心业务MapReduce程序之前,往往要先对数据进行清洗,清理掉不符合用户要求的数据.清理的过程往往只需要运行Mapper程序,不需要运行Reduce程序. 1.需求 去除日志中字段长度小于等于11的日志. (1)输入数据 web.log (2)期望输出数据 每行字段长度都大于11 2.需求分析 需要在Map阶段对输入的数据根据规则进行过滤清洗. 3.实现代码 (1)编写LogMapper类 package com.atguigu.mapreduce.weblog;

  • Hadoop 2.x与3.x 22点比较,Hadoop 3.x比2.x的改进

    问题导读 1.Hadoop3.x通过什么方式来容错? 2.Hadoop3.x存储开销减少了多少? 3.Hadoop3.x MR API是否兼容hadoop1.x? 一.目的 在这篇文章中,我们将讨论Hadoop 2.x与Hadoop 3.x之间的比较. Hadoop3版本中添加了哪些新功能,Hadoop3中兼容的Hadoop 2程序,Hadoop 2和Hadoop 3有什么区别? 二.Hadoop 2.x与Hadoop 3.x比较 本节将讲述Hadoop 2.x与Hadoop 3.x之间的22个

  • 在CentOS中搭建Hadoop的详细步骤

    搭建说明:第一次搭建 Hadoop 的小伙伴,请严格按照文章中的软件环境和步骤搭建,不一样的版本都可能会导致问题. 软件环境: 虚拟机:VMware Pro14 Linux:CentOS-6.4(下载地址,下载DVD版本即可) JDK:OpenJDK1.8.0 (强力建议不要使用 Oracle 公司的 Linux 版本的 JDK) Hadoop:2.6.5(下载地址) 虚拟机的安装和Linux系统的安装这里就省略了,可以参照网上的教程安装,一般没什么大问题,需要注意的是记住这里你输入的用户密码,

  • Hadoop之NameNode Federation图文详解

    一. 前言 1.NameNode架构的局限性 (1)Namespace(命名空间)的限制 由于NameNode在内存中存储所有的元数据(metadata),因此单个NameNode所能存储的对象(文件+块)数目受到NameNode所在JVM的heap size的限制.50G的heap能够存储20亿(200million)个对象,这20亿个对象支持4000个DataNode,12PB的存储(假设文件平均大小为40MB).随着数据的飞速增长,存储的需求也随之增长.单个DataNode从4T增长到36

  • Java源码解析HashMap的keySet()方法

    HashMap的keySet()方法比较简单,作用是获取HashMap中的key的集合.虽然这个方法十分简单,似乎没有什么可供分析的,但真正看了源码,发现自己还是有很多不懂的地方.下面是keySet的代码. public Set<K> keySet() { Set<K> ks = keySet; if (ks == null) { ks = new KeySet(); keySet = ks; } return ks; } 从代码中了解到,第一次调用keySet方法时,keySet

  • Java线程公平锁和非公平锁的差异讲解

    公平锁,顾名思义,它是公平的,可以保证获取锁的线程按照先来后到的顺序,获取到锁. 非公平锁,顾名思义,各个线程获取到锁的顺序,不一定和它们申请的先后顺序一致,有可能后来的线程,反而先获取到了锁. 在实现上,公平锁在进行lock时,首先会进行tryAcquire()操作.在tryAcquire中,会判断等待队列中是否已经有别的线程在等待了.如果队列中已经有别的线程了,则tryAcquire失败,则将自己加入队列.如果队列中没有别的线程,则进行获取锁的操作. /** * Fair version o

  • Java反射机制的精髓讲解

    1,什么是反射? java的反射,允许程序在运行时,创建一个对象,获取一个类的所有相关信息等. 2,Class类 要了解反射,就绕不开Class类. 我们平时开发的类,例如ClassA,一般会有一些属性,会有几个构造方法,也会有一些普通方法,我们还可以使用ClassA来创建对象,例如ClassA classA = new ClassA(). java程序在运行时,其实是很多类的很多个对象之间的协作.jvm如何管理这些类呢?它如何知道各个类的名称,每个类都有哪些属性和哪些方法呢? jvm会给每个类

  • Java可重入锁的实现原理与应用场景

    可重入锁,从字面来理解,就是可以重复进入的锁. 可重入锁,也叫做递归锁,指的是同一线程外层函数获得锁之后,内层递归函数仍然有获取该锁的代码,但不受影响. 在JAVA环境下ReentrantLock和synchronized都是可重入锁. synchronized是一个可重入锁.在一个类中,如果synchronized方法1调用了synchronized方法2,方法2是可以正常执行的,这说明synchronized是可重入锁.否则,在执行方法2想获取锁的时候,该锁已经在执行方法1时获取了,那么方法

  • ubantu 16.4下Hadoop完全分布式搭建实战教程

    前言 本文主要介绍了关于ubantu 16.4 Hadoop完全分布式搭建的相关内容,分享出来供大家参考学习,下面话不多说了,来一起看看详细的介绍吧 一个虚拟机 1.以  NAT网卡模式   装载虚拟机 2.最好将几个用到的虚拟机修改主机名,静态IP     /etc/network/interface,这里 是 s101 s102  s103 三台主机 ubantu,改/etc/hostname文件 3.安装ssh 在第一台主机那里s101 创建公私密匙   ssh-keygen -t rsa

  • Java分布式锁的概念与实现方式详解

    什么是分布式锁?在回答这个问题之前,我们先回答一下什么是锁. 普通的锁,即在单机多线程环境下,当多个线程需要访问同一个变量或代码片段时,被访问的变量或代码片段叫做临界区域,我们需要控制线程一个一个的顺序执行,否则会出现并发问题. 如何控制呢?就是设置一个各个线程都能看的见的标志.然后,每个线程想访问临界区域时,都要先查看标志,如果标志没有被占用,则说明目前没有线程在访问临界区域.如果标志被占用了,则说明目前有线程正在访问临界区域,则当前线程需要等待. 这个标志,就是锁. 在单机多线程的java程

随机推荐