Hadoop-MapReduce

基础

MapReduce 是一种用于数据处理的编程模型,其本质是并行运行,因此可以将大规模的数据分析任务分发给任何一个拥有足够多机器的数据中心,当然其优势也是处理大规模数据集

MapReduce 任务过程分为两个处理阶段: map 阶段reduce 阶段。每个阶段都是以键值对作为输入和输出,其类型由开发者决定,当然 map 函数和 reduce 函数也是由开发者实现。

MapReduce 原理

MapReduce 作业(job是客户端执行工作的单元,包含:输入数据MapReduce 程序配置信息Hadoop 将作业分成若干个任务(task来执行,其中包含两类任务: map 任务reduce 任务,这些任务运行在集群的节点上,并通过 YARN 进行调度,其中如果有一个任务失败,则将在另一个不同的节点上重新调度运行。

MapReduce 的输入数据会被划分为等长的小数据块,称为输入分片(input split或者简称分片,而每一个分片会构建一个 map 任务,并由该任务来运行用户自定义的 map 函数从而处理分片中的每条记录。
有许多分片就意味着每个分片处理所需时间少于整个输入数据所需时间;另外如果分片切分得太小,那管理分片得事件和构建 map 任务的事件将组成作业的整体运行事件。
如果 map 任务运行在存储有输入数据(HDFS 中的数据)的节点上,无需使用带宽资源即可获得最佳性能,也就是数据本地化优化(data locality optimization

map 任务将其输出写入本地磁盘(此输出为中间结果),该中间结果由 reduce 任务处理后才会产生最终输出结果,随着作业完成,map 的结果也会被随之删除。

reduce 任务的输入通常来自于所有 map 任务的输出,其并不具备数据本地化的优势,且 reduce 任务的数量是独立指定的不由输入数据的大小决定。如果存在多个 reduce 任务,那每个 map 任务就会针对输出进行分区,即为每个 reduce 任务创建一个分区,每个分区都有许多键及其对应的值,但是每个键对应的键值对记录都在同一分区中。
分区可以由用户定义的分区函数控制,但通常采用默认的 partitioner 通过哈希函数来区分。

map 任务和 reduce 任务之间的排序和分组,该部分也被称为 shuffle (混洗) ,每个 reduce 任务的输入都来自于所有 map 任务的输出,另外 shuffle 一般比图中的更加复杂,而且调整混洗参数对作业总执行时间的影响非常大。
当数据完全并行时(即无需混洗)可能会出现无 reduce 任务的情况。

总结 map 阶段的输入是 NCDC 原始数据,键是某一行起始位置相对于文件起始位置的偏移量,而值是文本文件的每一行;reduce 阶段的输入则是将 map 阶段的输出结果经由 shuffle 排序和分组之后的数据。

combiner 函数

集群上可用的带宽限制了 MapReduce 作业的数量,因此尽量避免 map 任务和 reduce 任务之间的数据传输是有利的;为此 Hadoop 允许用户指对 map 任务的输出指定一个 combiner,该 combiner 函数的输出将作为 reduce 任务的输入。
由于 combiner 属于优化方案,因此无法确定要对一个指定的 map 任务输出记录调用多少次 combiner


经典示例

WordCount 单词计数是最简单也是最能体现 MapReduce 思想的示例程序之一,该程序完整的代码可以在 Hadoop 安装包的 src/examples 目录下找到。其主要功能是:统计一系列文本文件中每个单词出现的次数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCount {

/**
* 继承 Mapper 类,实现 map 功能
*
* key 表示每一行的起始位置(偏移量offset)
* value 表示每一行的文本内容
* context.key 表示每一行中的每个单词
* context.value 表示每一行中的每个单词的出现次数,固定值为1
*/
public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>{

private final static IntWritable one = new IntWritable(1);
private Text word = new Text();

// map 功能必须实现的函数
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}

/**
* 继承 Reducer 类,实现 reduce 功能
*
* key 表示每一行中的每个单词
* values 表示每一行中的每个单词的出现次数,固定值为1
* context.key 表示每一行中的每个单词
* context.value 表示每一行中的每个单词的出现次数之和
*/
public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable();

public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}

public static void main(String[] args) throws Exception {
// 初始化 Configuration,读取 mapreduce 系统配置信息
Configuration conf = new Configuration();

// 构建 Job 并且加载计算程序 WordCount.class
Job job = Job.getInstance(conf, "word count");
job.setJarByClass(WordCount.class);

//指定 Mapper、Combiner、Reducer,也就是继承实现的类
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);

// 设置输入输出数据
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.setInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
  1. Job 初始化
    main 函数构建 Configuration 对象设置系统配置信息,接着 Job 自定义实例并设置启动类。

  2. 设置 Jobmap(拆分)、 combiner(中间结果处理)、 reduce(合并)
    设置 Job 的相关 map 类、reduce 类、combiner 类。

    • mapMapper 类共有四个泛型,分别是 KEYINVALUEINKEYOUTVALUEOUT,前面两个 KEYINVALUEIN 指的是 map 函数输入参数的键值对的类型;后面两个KEYOUTVALUEOUT 指的是 map 函数输出的键值对的类型。而这里的 map 函数中通过空格符号来分割文本内容,并对其进行记录。
    • reduceReducer 类也有四个泛型,分别指的是 reduce 函数输入的键值对类型(这里输入的键值对类型通常和 map 的输出键值对类型保持一致)和输出的键值对类型。而这里的 reduce 函数主要是将传入的键值对进行最后的合并统计,形成最后的统计结果。
  3. 设置 Job 的键值对类型
    设置 Job 输出结果 <key,value> 的中键值对数据类型,因为结果是<单词,个数>,所以 key 设置为 Text 类型相当于 String 类型。Value 设置为 IntWritable 相当于 int 类型。

  4. 设置 Job 的输入输出
    通过 setInputPathsetOutputPath 设置 Job 的输入输出路径。


工作机制

作业运行机制

在说明作业运行机制之前,可以通过 Job 对象的 submit() 方法来运行 MapReduce 作业,且其内部封装了大量的处理细节;也可以通过调用 waitForComplete() 方法用来提交之前没有提交过的任务,并等待它完成。

整体机制流程如下:

  • 客户端提交 MapReduce 作业。
  • YARN 资源管理器负责协调集群上计算机资源的分配。
  • YARN 节点管理器负责启动和监视集群中机器上的计算容器(container)。
  • MapReduceapplication master 负责协调运行 MapReduce 作业的任务。它和 MapReduce 任务在容器中运行,这些容器由资源管理器分配并由节点管理器进行管理。
  • 分布式文件系统用来与其他实体间共享作业文件。

hadoop-2-1.jpg

提交作业

Jobsubmit() 方法创建一个内部的 JobSummiter 实例,并且调用其 submitJoobbInternal() 方法。提交作业后,waitForComplete() 每秒轮询作业的进度,如果发现自上次报告后有改变,便把进度报告到控制台。作业完成后,如果成功就显示作业计数器;如果失败则导致作业失败的错误被记录到控制台。
JobSummiter 所实现的作业提交过程如下:

  • 向资源管理器请求一个新应用 ID,用于 MapReduce 作业 ID
  • 检查作业的输出说明。
  • 计算作业的输入分片。
  • 将运行作业所需要的资源(包括作业 JAR 文件、配置文件和计算所得的输入分片)复制到一个以作业 ID 命名的目录下的共享文件系统中。作业 JAR 的副本较多(由 mapreduce.client.submit.file.replication 属性控制,默认值为 10),因此在运行作业的任务时,集群中有很多副本可供节点管理器访问。
  • 通过调用资源管理器的 submitApplication() 方法提交作业。
作业初始化

资源管理器收到调用它的 submitApplication() 消息后,便将请求传递给 YARN 调度器(scheduler 。调度器分配一个容器,然后资源管理器在节点管理器的管理下在容器中启动 application master 的进程。

MapReduce 作业的 application master 是一个 Java 应用程序,它的主类是 MRAppMaster 。由于将接受来自任务的进度和完成报告,因此 application master 对作业的初始化是通过创建多个薄记对象以保持对作业进度的跟踪来完成的。接下来它接受来自共享文件系统、在客户端计算的输入分片,接着对每一个分片创建一个 map 任务对象以及由 mapreduce.job.reduces 属性(通过作业的 setNumReduceTasks() 方法设置)确定的多个 reduce 任务对象。任务 ID 也会在此时分配。

application master 必须决定如何运行构成 MapReduce 作业的各个任务。如果作业很小,并且 application master 判断在新的容器中分配和运行任务的开销大于并行运行的开销时,就选择和自己在同一个 JVM 上运行任务。这样的作业被称为 uberized,或者作为 uber 任务运行

如何判断作何很小呢?默认情况下,小作业就是少于 10mapper 且只有 1reduce 且输入大小小于一个 HDFS 块的作业(设置 mapreduce.job.ubertask.maxmapsmapreduce.job.ubertask.maxreducesmapreduce.job.ubertask.maxbytes 参数)。必须明确启用 Uber 任务(对于单个作业或者整个集群),具体方法是将 mapreduce.job.ubertask.enable 设置为 true

最后在运行任务前,application master 调用 setupJob() 方法设置 OutputCommitter。默认值为 FileOutputCommiter,表示将建立作业的最终输出目录及任务输出的临时工作空间。

分配任务

如果作业不适合作为 uber 任务运行,那么 application master 就会为该作业中的所有 map 任务和 reduce 任务向资源管理器请求容器。首先为 map 任务发出请求,该请求优先级要高于 reduce 任务的请求,这是因为所有的 map 任务必须在 reduce 的排序阶段能够启动前完成,直到有 5%map 任务已经完成时为 reduce 任务的请求才会发出。

reduce 任务能够在集群中任意位置运行,但是 map 任务的请求有着数据本地化局限,在理想的情况下,任务是数据本地化(data local)的,意味着任务在分片驻留的同一节点上运行。可选的情况下,任务可能时机架本地化(rack local)的,即和分片在同一机架而非同一节点上运行。同时也有一些任务既不是数据本地化,也不是机架本地化,它们会从别的机架上获取自己的数据。

请求也为任务指定了内存需求和 CPU 数。在默认情况下,每个 map 任务和 reduce 任务都分配到 1024MB 的内存和一个虚拟内核,这些值可以在每个作业的基础上进行配置,分别通过 4 个属性来设置 mapreduce.map.memory.mbmapreduce.map.cpu.vcoresmapreduce.reduce.memory.mbmapreduce.reduce.cpu.vcoresp.memory.mb

执行任务

一旦资源管理器的调度器为任务分配了一个特定节点上的容器,application master 就通过与节点管理器通信来启动容器。该任务由主类为 YarnChild 的一个 Java 应用程序执行,在它运行任务之前,需要首先将任务需要的资源本地化,包括作业的配置、JAR 文件和所有来自分布式缓存的文件。最后运行 map 任务或 reduce 任务。

YarnChild 在指定的 JVM 中运行,因此用户定义的 mapreduce 函数中的任何缺陷不会影响到节点管理器。

每个任务都能够执行搭建(setup)和提交(commit)动作,它们和任务本身在同一个 JVM 中运行,并由作业的 OutputCommitter 确定。对于基于文件的作业,提交动作将任务输出由临时位置迁移到最终位置。提交协议确保当推测执行(speculative execution)被启用时,只有一个任务副本被提交,其他的都被取消。

更新作业状态和进度

MapReduce 作业是长时间运行的批量作业,运行时间范围从数秒到数小时。一个作业和它的每个任务都有一个状态(status),包括:作业或任务的状态、mapreduce 的进度、作业计数器的值、状态消息或描述(可以由用户来设置)。
上述的状态信息在作业期间不断改变,那又是如何与客户端通信呢?任务在运行时,对其进度(progress,即任务完成百分比)保持跟踪。对 map 任务,任务进度是已处理输入所占比例。对 reduce 任务,情况有点复杂,整个过程分为三个步骤,与 shuffle 的三个阶段相对应,同时也会估计已处理 reduce 输入的比例。

MapReduce 中进度的组成如下:

  • 读入一条输入记录
  • 写入一条输出记录
  • 设置状态描述
  • 增加计数器的值
  • 调用 ReporterTaskAttemptContextprogress() 方法

任务也有一组计数器,负责对任务运行过程中各个事件进行计数,这些计数器要么置于框架中,要么由用户自己定义。

map 任务或 reduce 任务运行时,子进程和自己的父 application master 通过 umbilical 接口通信。每个三秒钟,任务通过这个 umbilical 接口向自己的 application master 报告进度和状态(包括计数器),application master 会形成一个作业的汇聚视图(aggregate view)。

在作业期间,客户端每秒钟轮询一次 application master 以接收最新状态(轮询间隔通过 mapreduce.client.progressmonitor.pollinterval 参数设置)。客户端也可以使用 JobgetStatus() 方法得到一个 JobStatus 对象的实例,后者会包含作业的所有状态信息。

hadoop-2-2.jpg

作业完成

application master 收到作业最后一个任务已完成的通知后,便将作业的状态设置为“成功”。在 Job 轮询状态时,便知道任务已成功完成,于是 Job 打印一条消息告知用户,然后从 waitForComplete() 方法返回;Job 的统计信息和计数值在这个时候也会输出到控制台。

如果 application master 有相应的设置,也会发送一条 HTTP 作业通知,可以通过 mapreduce.job.end-notification.url 属性来设置在收到回调指令后通知客户端。
最后在作业完成时,application master 和任务容器清理其工作状态(中间状态会被删除),接着 OutputCommittercommitJob() 方法会被调用。作业信息由作业历史服务器存档,以便日后用户需要时可以查询。

shuffle

MapReduce 确保每个 reduce 的输入都是按键排序的,系统执行排序将 map 输出作为输入传给 reducer 的过程称为 shuffleshuffle 属于不断被优化和改进的部分,也是 MapReduce 的“心脏”,是奇迹发生的地方。

map

map 函数开始产生输出时,并不是简单地将数据写入磁盘,而是利用缓冲的方式写到内存并处于效率的考虑进行预排序。

hadoop-2-3.jpg

每个 map 任务都有一个环形内存缓冲区用于存储任务输出,一旦缓冲内容达到阈值(默认为 80%),此时一个后台线程便开始把内容溢出到磁盘。在溢出写到磁盘的过程中,map 输出继续写到缓冲区,但如果在此期间缓冲区被填满,map 会被阻塞直到写磁盘过程完成。溢出写过程按轮询方式将缓冲区内容写到 mapreduce.cluster.local.dir 属性在作业特定子目录下指定的目录中。
默认情况下,缓冲区大小为 100MB,此值可以通过 mapreduce.task.io.sort.mb 属性调整

如果 map 输出相当小,则输出会被直接复制到 reduce 任务的 JVM 内存中。指定用于此用途的堆空间的百分比大小可以由 mapreduce.reduce.shuffle.input.buffer.percent 属性控制,。

在写磁盘之前后台线程会根据数据最终要传递给的 reducer 将数据划分为相应的分区。在每个分区中后台线程会根据键进行排序,如果此时有一个 combiner 函数,在排序后运行此函数,此后会减少写到磁盘上的数据和传递给 reducer 的数据。

每次内存缓冲区达到溢出阈值,就会创建一个溢出文件(spill file),因此在 map 任务写完其最后一个输出记录之后就会有几个溢出文件。在任务完成之前,溢出文件被合并成一个已分区且已排序的输出文件。
属性 mapreduce.task.io.sort.factor 可以控制一次最多合并多少溢出文件,默认值为 10。如果最少存在 3 个溢出文件时,则 combiner 函数会在输出文件写到磁盘之前再次运行

在将压缩 map 输出写到磁盘的过程中是否能使用压缩呢?在默认情况下,输出是不压缩的,但可以通过属性 mapreduce.map.output.compress 设置为 true,即可启用此功能。而使用的压缩库则由 mapreduce.map.output.compress.codec 参数指定。

reducer 通过 HTTP 得到输出文件的分区。而用于文件分区的工作线程的数量由任务的 mapreduce.shuffle.max.threads 属性控制,此设置针对的是每一个节点管理器,而不是每个 map 任务。

reduce

map 输出文件位于运行 map 任务的 tasktracker 的本地磁盘,之后 tasktracker 会为分区文件运行 reduce 任务。那 reducer 如何知道从那台机器上获取 map 输出?map 任务成功后,会通过心跳机制通知他们的 application masterreducer 中的一个线程定期询问 master 以便获取 map 输出文件主机的位置,直到获取所有输出位置。

每个 map 任务的完成不尽相同,因此每个任务完成时,reduce 任务就开始复制其输出(此为 reduce 任务的复制阶段)。reduce 任务有少量复制线程,可以并行获取 map 输出,默认为 5 个线程,此默认值可以通过 mapreduce.reduce.shuffle.parallelcopies 属性修改。

复制完成所有的 map 输出之后,reduce 任务进行合并阶段,此阶段会合并 map 输出,维持其顺序排序。
默认值为 10,通过 mapreduce.task.io.sort.factor 属性控制。

在最后阶段,即直接将数据输入 reduce 函数,从而省略一次磁盘的往返行程,并没有将输出文件合并为一个已排序的文件作为最后一次。

reduce 阶段,对已排序输出中的每个键调用 reduce 函数,此阶段的输出直接写到输出文件系统,一般为 HDFS。如果采用 HDFS,由于节点管理器也运行着数据节点,则第一块数据副本将被写到本地磁盘。

配置调优
  1. map 端的调优属性

    属性名称 类型 默认值 说明
    mapreduce.task.io.sort.mb int 100 排序 map 输出时所使用的内存缓冲区的大小,以 MB 为单位
    mapreduce.map.sort.spill.percent float 0.80 map 输出内存缓存和用来开始磁盘溢出写过程的记录边界索引,使用比例的阈值
    mapreduce.task.io.sort.factor int 10 排序文件时,一次最多合并的流数。在 reduce 中使用。
    mapreduce.map.combine.minspills int 3 运行 combiner 所需的最少溢出文件数
    mapreduce.map.output.compress Boolean false 是否压缩 map 输出
    mapreduce.map.output.compress.codec Class name org.apache. hadoop.io.compress. DefaultCodec 用于 map 输出的压缩编解码器
    mapreduce.shuffle.max.threads int 0 每个节点管理器的工作线程数,用于将 map 输出到 reducer0 表示使用 Netty 默认值,两倍于可用的处理器数。

    总的原则是给 shuffle 过程尽可能多提供内存空间。但是 map 函数和 reduce 函数不能无限制使用内存,需要尽可能少用内存。
    运行 map 任务和 reduce 任务的 JVMmapred.child.java.opts 属性设置内存大小。
    map 端通过避免多次溢出写磁盘来获得最佳性能,一次最好;而在 reduce 端,中间数据全部驻留在内存时就能获得最佳性能。

  2. reduce 端的调优属性

    属性名称 类型 默认值 描述
    mapreduce.reduce.shuffle.parallelcopies int 5 用于把 map 输出复制到 reducer 的线程数
    mapreduce.reduce.shuffle.maxfetchfailures int 10 在声明失败之前 reducer 获取一个 map 输出所花的最大时间
    mapreduce.task.io.sort.factor int 10 排序文件时一次最多合并的流的数量,
    mapreduce.reduce.shuffle.input.buffer.percent float 0.70 shuffle 复制阶段分配给 map 输出的缓冲区占堆空间的百分比
    mapreduce.reduce.shuffle.merge.percent float 0.66 map 输出缓冲区的阈值使用比例,用于启动合并输出和磁盘溢出写的过程
    mapreduce.reduce.merge.inmen.threshold int 1000 启动合并输出和磁盘溢出写过程的 map 输出的阈值数,0 或者更小的数意味者没有阈值限制
    mapreduce.reduce.input.buffer.percent float 0.0 reduce 过程中在内存中保存 map 输出的空间占整个堆空间的比例。

特性

计数器

计数器是收集作业统计信息的有效手段之一,用于质量控制和应用级统计,另外计数器还可以辅助诊断系统故障。

内置计数器

Hadoop 为每个作业维护若干个内置计数器,以描述多项指标。

组别 名称/类别
MapReduce 计数器 org.apache.hadoop.mapreduce.TaskCounter
文件系统计数器 org.apache.hadoop.mapreduce.FileSystemCounter
FileInputFormat 计数器 org.apache.hadoop.mapreduce.lib.input.FileInputFormatCounter
FileOutputFormat 计数器 org.apache.hadoop.mapreduce.lib.output.FileOutputFormatCounter
作业计数器 org.apache.hadoop.mapreduce.JobCounter

这些计数器被划分为若干组,各组要么包含任务计数器(在任务处理过程中不断更新),要么包含作业计数器(在作业处理过程中不断更新)。

Java 自定义计数器

MapReduce 允许用户编写程序来定义计数器,计数器的值可以在 mapperreducer 中增加,计数器由一个 Java 枚举类型来定义,以便对有关的计数器分组。
一个作业可以定义的枚举类型数量不限,各个枚举类型所包含的字段数量也不限。枚举类型的名称即为组的名称,枚举类型的字段就是计数器名称,计数器是全局的。换言之,MapReduce 框架将跨所有 mapreduce 聚集这些计数器,并在作业结束时产生一个最终结果。

1
2
3
4
5
6
7
// 定义
enum Tem {
MISSING,
MALFORMED
}
// 使用
context.getCounter(Tem.MISSING).increment(1);

排序

排序是 MapReduce 的核心,MapReduce 使用排序来组织数据,而排序也分为不同的数据集排序方式。

部分排序

默认情况下,MapReduce 会根据输入记录的键对数据集排序。

全排序

如何生成一个全局排序的文件呢?
最简单的方法就是,首先创建一系列排序好的文件;其次串联这些文件;最后生成一个全局排序的文件。其主要思路就是使用一个 patitioner 来描述输出的全局排序。

该方法的关键点在于如何划分各个分区。在理想情况下,各分区所包含的记录数应大致相等。使作业的总体执行时间不会受制于个别 reducer

辅助排序

通过特定的方法对键进行排序和分组以实现对值的排序。
通过设置一个按照键进行分区的 patitioner,这样可以确保相同 patitioner 的记录会被发送到同一个 reducer 中,而在同一个分区中,仍然可以通过键进行分组。

按值排序的方法总结:

  • 定义包括自然键和自然值的组合键。
  • 根据组合键对记录进行排序,即同时用自然键和自然值进行排序。
  • 针对组合键进行分区和分组时均只考虑自然键。

边数据

边数据(side data)是作业所需的额外的只读数据,以辅助处理主数据集 。其所面临的挑战在于如何使所有 mapreduce 任务(散布在集群内不)都能够方便而高效地使用边数据。

Hadoop 的分布式缓存机制能够在任务运行过程中及时地将文件和存档复制到任务节点以供使用,不过为了节约网络带宽,在每一个作业中,各个文件通常只需要复制到一个节点一次。
对于使用 GenericOptionsParser 的工具来说,用户可以使用 -files 选项指定待分发的文件,文件内包含以逗号隔开的 URI 列表,如果没有指定文件系统,则这些文件会被默认为本地文件。-archives 选项可以向自己的任务中复制存档文件(JAR 文件、ZIP 文件、tar 文件和 gzipped tar 文件),这些文件会被解档到任务节点。-libjars 选项会把 JAR 文件添加到 mapperreducer 任务的类路径中。


引用


个人备注

此博客内容均为作者学习所做笔记,侵删!
若转作其他用途,请注明来源!