MapTask阶段shuffle源码分析

1. 收集阶段

Mapper中,调用context.write(key,value)实际是调用代理NewOutPutCollectorwirte方法

public void write(KEYOUT key, VALUEOUT value
          ) throws IOException, InterruptedException {
  output.write(key, value);
 }

实际调用的是MapOutPutBuffercollect(),在进行收集前,调用partitioner来计算每个key-value的分区号

@Override
  public void write(K key, V value) throws IOException, InterruptedException {
   collector.collect(key, value,
            partitioner.getPartition(key, value, partitions));
  }

2. NewOutPutCollector对象的创建

@SuppressWarnings("unchecked")
  NewOutputCollector(org.apache.hadoop.mapreduce.JobContext jobContext,
            JobConf job,
            TaskUmbilicalProtocol umbilical,
            TaskReporter reporter
            ) throws IOException, ClassNotFoundException {
  // 创建实际用来收集key-value的缓存区对象
   collector = createSortingCollector(job, reporter);
  // 获取总的分区个数
   partitions = jobContext.getNumReduceTasks();
   if (partitions > 1) {
    partitioner = (org.apache.hadoop.mapreduce.Partitioner<K,V>)
     ReflectionUtils.newInstance(jobContext.getPartitionerClass(), job);
   } else {
    // 默认情况,直接创建一个匿名内部类,所有的key-value都分配到0号分区
    partitioner = new org.apache.hadoop.mapreduce.Partitioner<K,V>() {
     @Override
     public int getPartition(K key, V value, int numPartitions) {
      return partitions - 1;
     }
    };
   }
  }

3. 创建环形缓冲区对象

@SuppressWarnings("unchecked")
 private <KEY, VALUE> MapOutputCollector<KEY, VALUE>
     createSortingCollector(JobConf job, TaskReporter reporter)
  throws IOException, ClassNotFoundException {
  MapOutputCollector.Context context =
   new MapOutputCollector.Context(this, job, reporter);
  // 从当前Job的配置中,获取mapreduce.job.map.output.collector.class,如果没有设置,使用MapOutputBuffer.class
  Class<?>[] collectorClasses = job.getClasses(
   JobContext.MAP_OUTPUT_COLLECTOR_CLASS_ATTR, MapOutputBuffer.class);
  int remainingCollectors = collectorClasses.length;
  Exception lastException = null;
  for (Class clazz : collectorClasses) {
   try {
    if (!MapOutputCollector.class.isAssignableFrom(clazz)) {
     throw new IOException("Invalid output collector class: " + clazz.getName() +
      " (does not implement MapOutputCollector)");
    }
    Class<? extends MapOutputCollector> subclazz =
     clazz.asSubclass(MapOutputCollector.class);
    LOG.debug("Trying map output collector class: " + subclazz.getName());
   // 创建缓冲区对象
    MapOutputCollector<KEY, VALUE> collector =
     ReflectionUtils.newInstance(subclazz, job);
   // 创建完缓冲区对象后,执行初始化
    collector.init(context);
    LOG.info("Map output collector class = " + collector.getClass().getName());
    return collector;
   } catch (Exception e) {
    String msg = "Unable to initialize MapOutputCollector " + clazz.getName();
    if (--remainingCollectors > 0) {
     msg += " (" + remainingCollectors + " more collector(s) to try)";
    }
    lastException = e;
    LOG.warn(msg, e);
   }
  }
  throw new IOException("Initialization of all the collectors failed. " +
   "Error in last collector was :" + lastException.getMessage(), lastException);
 }

3. MapOutPutBuffer的初始化   环形缓冲区对象

@SuppressWarnings("unchecked")
  public void init(MapOutputCollector.Context context
          ) throws IOException, ClassNotFoundException {
   job = context.getJobConf();
   reporter = context.getReporter();
   mapTask = context.getMapTask();
   mapOutputFile = mapTask.getMapOutputFile();
   sortPhase = mapTask.getSortPhase();
   spilledRecordsCounter = reporter.getCounter(TaskCounter.SPILLED_RECORDS);
   // 获取分区总个数,取决于ReduceTask的数量
   partitions = job.getNumReduceTasks();
   rfs = ((LocalFileSystem)FileSystem.getLocal(job)).getRaw();
   //sanity checks
   // 从当前配置中,获取mapreduce.map.sort.spill.percent,如果没有设置,就是0.8
   final float spillper =
    job.getFloat(JobContext.MAP_SORT_SPILL_PERCENT, (float)0.8);
   // 获取mapreduce.task.io.sort.mb,如果没设置,就是100MB
   final int sortmb = job.getInt(JobContext.IO_SORT_MB, 100);
   indexCacheMemoryLimit = job.getInt(JobContext.INDEX_CACHE_MEMORY_LIMIT,
                     INDEX_CACHE_MEMORY_LIMIT_DEFAULT);
   if (spillper > (float)1.0 || spillper <= (float)0.0) {
    throw new IOException("Invalid \"" + JobContext.MAP_SORT_SPILL_PERCENT +
      "\": " + spillper);
   }
   if ((sortmb & 0x7FF) != sortmb) {
    throw new IOException(
      "Invalid \"" + JobContext.IO_SORT_MB + "\": " + sortmb);
   }
// 在溢写前,对key-value排序,采用的排序器,使用快速排序,只排索引
   sorter = ReflectionUtils.newInstance(job.getClass("map.sort.class",
      QuickSort.class, IndexedSorter.class), job);
   // buffers and accounting
   int maxMemUsage = sortmb << 20;
   maxMemUsage -= maxMemUsage % METASIZE;
   // 存放key-value
   kvbuffer = new byte[maxMemUsage];
   bufvoid = kvbuffer.length;
  // 存储key-value的属性信息,分区号,索引等
   kvmeta = ByteBuffer.wrap(kvbuffer)
     .order(ByteOrder.nativeOrder())
     .asIntBuffer();
   setEquator(0);
   bufstart = bufend = bufindex = equator;
   kvstart = kvend = kvindex;
   maxRec = kvmeta.capacity() / NMETA;
   softLimit = (int)(kvbuffer.length * spillper);
   bufferRemaining = softLimit;
   LOG.info(JobContext.IO_SORT_MB + ": " + sortmb);
   LOG.info("soft limit at " + softLimit);
   LOG.info("bufstart = " + bufstart + "; bufvoid = " + bufvoid);
   LOG.info("kvstart = " + kvstart + "; length = " + maxRec);
   // k/v serialization
    // 获取快速排序的Key的比较器,排序只按照key进行排序!
   comparator = job.getOutputKeyComparator();
  // 获取key-value的序列化器
   keyClass = (Class<K>)job.getMapOutputKeyClass();
   valClass = (Class<V>)job.getMapOutputValueClass();
   serializationFactory = new SerializationFactory(job);
   keySerializer = serializationFactory.getSerializer(keyClass);
   keySerializer.open(bb);
   valSerializer = serializationFactory.getSerializer(valClass);
   valSerializer.open(bb);
   // output counters
   mapOutputByteCounter = reporter.getCounter(TaskCounter.MAP_OUTPUT_BYTES);
   mapOutputRecordCounter =
    reporter.getCounter(TaskCounter.MAP_OUTPUT_RECORDS);
   fileOutputByteCounter = reporter
     .getCounter(TaskCounter.MAP_OUTPUT_MATERIALIZED_BYTES);
   // 溢写到磁盘,可以使用一个压缩格式! 获取指定的压缩编解码器
   // compression
   if (job.getCompressMapOutput()) {
    Class<? extends CompressionCodec> codecClass =
     job.getMapOutputCompressorClass(DefaultCodec.class);
    codec = ReflectionUtils.newInstance(codecClass, job);
   } else {
    codec = null;
   }
   // 获取Combiner组件
   // combiner
   final Counters.Counter combineInputCounter =
    reporter.getCounter(TaskCounter.COMBINE_INPUT_RECORDS);
   combinerRunner = CombinerRunner.create(job, getTaskID(),
                       combineInputCounter,
                       reporter, null);
   if (combinerRunner != null) {
    final Counters.Counter combineOutputCounter =
     reporter.getCounter(TaskCounter.COMBINE_OUTPUT_RECORDS);
    combineCollector= new CombineOutputCollector<K,V>(combineOutputCounter, reporter, job);
   } else {
    combineCollector = null;
   }
   spillInProgress = false;
   minSpillsForCombine = job.getInt(JobContext.MAP_COMBINE_MIN_SPILLS, 3);
   // 设置溢写线程在后台运行,溢写是在后台运行另外一个溢写线程!和收集是两个线程!
   spillThread.setDaemon(true);
   spillThread.setName("SpillThread");
   spillLock.lock();
   try {
   // 启动线程
    spillThread.start();
    while (!spillThreadRunning) {
     spillDone.await();
    }
   } catch (InterruptedException e) {
    throw new IOException("Spill thread failed to initialize", e);
   } finally {
    spillLock.unlock();
   }
   if (sortSpillException != null) {
    throw new IOException("Spill thread failed to initialize",
      sortSpillException);
   }
  }

4. Paritionner的获取

从配置中读取mapreduce.job.partitioner.class,如果没有指定,采用HashPartitioner.class

如果reduceTask > 1, 还没有设置分区组件,使用HashPartitioner

@SuppressWarnings("unchecked")
 public Class<? extends Partitioner<?,?>> getPartitionerClass()
   throws ClassNotFoundException {
  return (Class<? extends Partitioner<?,?>>)
   conf.getClass(PARTITIONER_CLASS_ATTR, HashPartitioner.class);
 }
public class HashPartitioner<K, V> extends Partitioner<K, V> {
 /** Use {@link Object#hashCode()} to partition. **/
 public int getPartition(K key, V value,
             int numReduceTasks) {
  return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
 }
}

分区号的限制:0 <= 分区号 < 总的分区数(reduceTask的个数)

if (partition < 0 || partition >= partitions) {
    throw new IOException("Illegal partition for " + key + " (" +
      partition + ")");
   }

5.MapTask shuffle的流程

①在map()调用context.write()

②调用MapoutPutBuffer的collect()

  • 调用分区组件Partitionner计算当前这组key-value的分区号

③将当前key-value收集到MapOutPutBuffer中

  • 如果超过溢写的阀值,在后台启动溢写线程,来进行溢写!

④溢写前,先根据分区号,将相同分区号的key-value,采用快速排序算法,进行排序!

  • 排序并不在内存中移动key-value,而是记录排序后key-value的有序索引!

⑤ 开始溢写,按照排序后有序的索引,将文件写入到一个临时的溢写文件中

  • 如果没有定义Combiner,直接溢写!
  • 如果定义了Combiner,使用CombinerRunner.conbine()对key-value处理后再次溢写!

⑥多次溢写后,每次溢写都会产生一个临时文件

⑦最后,执行一次flush(),将剩余的key-value进行溢写

⑧MergeParts: 将多次溢写的结果,保存为一个总的文件!

  • 在合并为一个总的文件前,会执行归并排序,保证合并后的文件,各个分区也是有序的!
  • 如果定义了Conbiner,Conbiner会再次运行(前提是溢写的文件个数大于3)!
  • 否则,就直接溢写!

⑨最终保证生成一个最终的文件,这个文件根据总区号,分为若干部分,每个部分的key-value都已经排好序,等待ReduceTask来拷贝相应分区的数据

6. Combiner

combiner其实就是Reducer类型:

Class<? extends Reducer<K,V,K,V>> cls =
    (Class<? extends Reducer<K,V,K,V>>) job.getCombinerClass();

Combiner的运行时机:

MapTask:

  • ①每次溢写前,如果指定了Combiner,会运行
  • ②将多个溢写片段,进行合并为一个最终的文件时,也会运行Combiner,前提是片段数>=3

ReduceTask:

③reduceTask在运行时,需要启动shuffle进程拷贝MapTask产生的数据!

  • 数据在copy后,进入shuffle工作的内存,在内存中进行merge和sort!
  • 数据过多,内部不够,将部分数据溢写在磁盘!
  • 如果有溢写的过程,那么combiner会再次运行!

①一定会运行,②,③需要条件!

总结

以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作具有一定的参考学习价值,谢谢大家对我们的支持。如果你想了解更多相关内容请查看下面相关链接

(0)

相关推荐

  • MapReduce核心思想图文详解

    MapReduce核心编程思想,如图1-1所示. 图1-1 MapReduce核心编程思想 1)分布式的运算程序往往需要分成至少2个阶段. 2)第一个阶段的MapTask并发实例,完全并行运行,互不相干. 3)第二个阶段的ReduceTask并发实例互不相干,但是他们的数据依赖于上一个阶段的所有MapTask并发实例的输出. 4)MapReduce编程模型只能包含一个Map阶段和一个Reduce阶段,如果用户的业务逻辑非常复杂,那就只能多个MapReduce程序,串行运行. 小结:分析WordC

  • Java源码解析HashMap简介

    本文基于jdk1.8进行分析 HashMap是java开发中可以说必然会用到的一个集合.本文就HashMap的源码实现进行分析. 首先看一下源码中类的javadoc注释对HashMap的解释.如下图.HashMap是对Map接口的基于hash表的实现.这个实现提供了map的所有可选操作,并且允许null值(可以多个)和一个null的key(仅限一个).HashMap和HashTable十分相似,除了HashMap是非同步的且允许null元素.这个类不保证map里的顺序,更进一步,随着时间的推移,

  • Java源码解析HashMap的resize函数

    HashMap的resize函数,用于对HashMap初始化或者扩容. 首先看一下该函数的注释,如下图.从注释中可以看到,该函数的作用是初始化或者使table的size翻倍.如果table是null,那么就申请空间进行初始化.否则,因为我们在使用2的指数的扩张,在原来table的每个位置的元素,在新的table中,他们要么待在原来的位置,要么移动2的指数的偏移.从这里可以看出,扩容前table每个位置上如果有多个元素,元素之间组成链表时,在扩容后,该链表中的元素,有一部分会待在原地,剩下的元素会

  • MapTask工作机制图文详解

    MapTask工作机制如图所示.  (1)Read阶段:MapTask通过用户编写的RecordReader,从输入InputSplit中解析出一个个key/value. (2)Map阶段:该节点主要是将解析出的key/value交给用户编写map()函数处理,并产生一系列新的key/value. (3)Collect收集阶段:在用户编写map()函数中,当数据处理完成后,一般会调用OutputCollector.collect()输出结果.在该函数内部,它会将生成的key/value分区(调用

  • shuffle的关键阶段sort(Map端和Reduce端)源码分析

    源码中有这样一段代码 1. Map端排序获取的比较器 public RawComparator getOutputKeyComparator() { // 获取mapreduce.job.output.key.comparator.class,必须是RawComparator类型,如果没设置,是null Class<? extends RawComparator> theClass = getClass( JobContext.KEY_COMPARATOR, null, RawComparat

  • Java源码解析ConcurrentHashMap的初始化

    首先看一下代码 private final Node<K,V>[] initTable() { Node<K,V>[] tab; int sc; while ((tab = table) == null || tab.length == 0) { // 第一次检查 if ((sc = sizeCtl) < 0) Thread.yield(); // lost initialization race; just spin else if (U.compareAndSwapInt

  • Java源码解析HashMap成员变量

    本文基于jdk1.8进行分析 关于HashMap的简介,可以参考这篇文章https://www.jb51.net/article/154177.htm. 首先看一下HashMap的一些静态常量.第一个是DEFAULT_INITIAL_CAPACITY,默认初始大小,16.从注释中可以了解到,大小必须为2的指数.这里的16,采用的1左移4位实现.而"aka",是as known as的缩写. /** * The default initial capacity - MUST be a p

  • Java源码解析TreeMap简介

    TreeMap是常用的排序树,本文主要介绍TreeMap中,类的注释中对TreeMap的介绍.代码如下. /** * A Red-Black tree based {@link NavigableMap} implementation. * The map is sorted according to the {@linkplain Comparable natural * ordering} of its keys, or by a {@link Comparator} provided at

  • Java源码解析HashMap的keySet()方法

    HashMap的keySet()方法比较简单,作用是获取HashMap中的key的集合.虽然这个方法十分简单,似乎没有什么可供分析的,但真正看了源码,发现自己还是有很多不懂的地方.下面是keySet的代码. public Set<K> keySet() { Set<K> ks = keySet; if (ks == null) { ks = new KeySet(); keySet = ks; } return ks; } 从代码中了解到,第一次调用keySet方法时,keySet

  • Java源码解析HashMap的tableSizeFor函数

    aka,HashMap的容量大小必须为2的指数,即16,32,64,128这样的值.那么,在构造函数中,如果调用者指定了HashMap的初始大小不是2的指数,那么,HashMap的tableSizeFor函数,会计算一个大于或等于给定参数的2的指数的值.先来看一下tableSizeFor函数的源码,如下 /** * Returns a power of two size for the given target capacity. **/ static final int tableSizeFo

随机推荐