MapReduce-hadoop的计算框架
MapReduce 是一个分布式运算程序的编程框架,是用户开发 “基于hadoop 的数据分析应用” 的核心框架。
1. 概述
MapReduce 核心功能是将 用户编写的业务逻辑代码 和 自带默认组件 整合成一个完整的分布式运算程序,并发运行在一个 Hadoop 集群上。
1.1 优缺点
在项目技术选型阶段,我们需要根据各个技术的优缺点选择最匹配项目需求的技术实现路线。因此,这里总结 MapReduce 的优缺点如下:
- 优点:
- 易于编程,它简单的实现了一些接口,就可以完成一个分布式程序;
- 良好的扩展性;
- 高容错性,可以将运行失败的任务自动转移到另外结点上运行;
- 适合 PB 级以上海量数据的 离线处理。
- 缺点:
- 不擅长实时计算,无法像MySQL一样在毫秒或者秒级内返回结果;
- 不擅长流式计算,流式计算的输入数据是动态的,而 MapReduce 输入数据集必须是静态的,不能动态变化。
- 不擅长 DAG(有向图) 计算,DAG即多个应用程序存在依赖关系,后一个应用程序的输入为前一个的输出。因为 MapReduce 作业的结果都会写入磁盘,因此面对这种操作会造成大量的磁盘 IO,从而导致性能低下。
1.2 构成
MapReduce 可以分为两个阶段:Map 阶段和 reduce 阶段,这两个阶段的程序任务分别称为 MapTask 和 ReduceTask。每一个阶段的task 都以并行方式运行,但两个 task 阶段为串行执行,即 reduce 阶段的task 需要等待 map 阶段的task 执行完毕后才能执行。
一个 MapReduce 程序由下面3个部分组成:
- MrAppMaster: 负责整个任务程序的过程调度即状态协调,相当于是整个任务的老大;
- MapTask: 负责 map 阶段的整个数据处理流程。
- ReduceTask:负责 reduce 阶段整个数据处理流程。
MapReduce编程规范:编写的程序分为 mapper, reducer, driver 三个部分。分别是对 MapTask, ReduceTask 的实现,以及对整个MapReduce程序的配置。
在集群上运行:MapReduce 程序可以在本地运行,也可以打包成 jar 包,上传至集群,然后运行。打包工作可以利用 maven 来实现,具体的,是在 pom.xml
中进行配置,添加 <build>
部分的 相关的 plugin,并写入相关的编译设置(具体参考 github 中的 POM文件)。括号中链接的例子是以 wordCount 程序为例,导出 jar 文件后,将其命名为 wc.jar
并上传至服务器中,最后可以在服务器中使用下面的指令运行程序:hadoop jar wc.jar WordCountDriver /user/lab1/input /user/lab1/output
。
1.3 序列化
序列化是指将内存中的对象,转换成字节序列(或其他数据传输协议)以便于存储到磁盘(持久化)和网络传输。类似的,磁盘的内容读取为内存中对象,称为反序列化。
需要注意的是,Hadoop 单独提供了一套序列化方案,而不是直接使用 Java 的序列化 (Serializable
,所有的包装类都实现了 该接口)。原因是,Java 的序列化框架过重,一个对象被序列化后,会附带很多额外的信息(校验信息,header,继承体系等),不利于网络传输。所以 hadoop提供了一套自己的序列化机制(Writable
)。
类似于 Java 中的 Serializable
接口,Writable
接口定义了 Hadoop 中的序列化方式。对于基本数据类型,Hadoop 提供一套自己的封装类,全部实现 Writable
接口,包括 LongWritable(Long), IntWritable(Integer), Text(String)...
(Note: 除 String
外,各类型对象名就是在原数据类型后添加 Writable
)。
除了使用框架中提供的可序列化对象,通过实现 Writable
接口并重写 write()
和 readFields()
方法,我们可以实现自定义的序列化对象。例子参考。
2. MapReduce 框架原理
整个 MapReduce 程序的流程如下面两张图所示。
下面我们就根据整个 MapReduce 程序执行的流程对各个部分进行更加细节的分析。
2.1 Job 提交与切片计算
在程序代码中,Driver 部分获取 Job 对象,进行一系列的设置之后,会 submit
该 job。提交的信息主要包括三个部分:切片信息、jar包、xml 文件。
xml文件 主要是程序执行的相关设置,jar包 则为map 程序和 reduce 程序的具体计算逻辑以及程序所用到的相关依赖组件。切片信息 相对来说最为关键,在 MapReduce 计算框架中,分片的数量,决定了并行的 MapTask 的数量,即每个 split 切片会分配一个 MapTask 来处理。
在这里,需要区分 切片(split)和数据块(block) 两个概念。Block 是实际的物理存储块,在 hdfs-site.xml
中可以对该值进行设定,通常默认为 128 M。而 split 则是 MapReduce
计算框架中的一个虚拟的切分量,它 与物理的存储无关。
当然,在 Hadoop 的默认设置中,split 的大小等于 block 的大小,这是由于同一个文件的不同 block 可能位于不同的服务器上,如果设置 split 大小与 block 不相等,可能会导致同一个 MapTask 需要从不同的服务器上获取数据,会导致系统效率降低。
具体地,切片过程中,默认切片方法为:
- 每个单独的文件做一个切片;
- 如果文件大于 block size,则 默认以块大小为切片大小进行切片(本地模式默认切片大小为32M);
- 如果一个文件多次切片后剩余大小小于 1.1 倍 (SPLIT_SLOP) 的块大小,则不再切片(减少小文件)。
小文件处理:由于默认每个单独的文件至少形成一个切片,对于有大量小文件的情况,会产生过多的小切片,从而产生大量的 MapTask,使得系统的处理效率降低。一种处理方案是使用 CombineTextInputFormat
作为 InputFormat
(参考)。
2.2 FileInputFormat
MapReduce 框架在提交 job 之后,根据 split 数启动对应数量的 MapTask, 并开始对对应分片的数据进行处理。这一处理阶段主要是进行数据读取,将原始数据读取为特定的 Java 对象,并进行响应的数据处理。
如何从文件数据中读取内容,MapReduce 框架通过实现了 FileInputFormat
接口的类进行规定(在 Driver 中进行设置)。这一接口规定了 数据的读取格式:即将文件中的数据以何种 键值对 的形式读取到程序内存中。例举 Hadoop 中提供的实现类如:
TextInputFormat
:它是默认的FileInputFormat
的实现类。它按行读取文件中的数据。键值 为当前读取的行在文件中的偏移量(字节为单位,LongWritable
类型),值 则是一行数据的内容(不包括任何终止符,即换行符和回车符),为Text
类型;keyValueTextInputFormat
: 与TextInputFormat
相同,将每一行数据作为一条记录。但 键、值 均为这一行的内容,具体的,是通过分隔符进行分割,分隔符前面的为 key,后面的为 value。 默认分隔符为 tab (\t
), 可以通过conf.set(KeyValueLineRecordReader.KEY_VALUE_SEPERATOR, " ");
来改变分隔符。
如果系统提供的 FileInputFormat
实现类不能满足我们的需求,我们还可以通过自己实现该接口,自定义从文件到 MapTask 输入值的映射关系,具体参考 例子代码。
Note:InputFormat 实际上可以规定切片策略 和 读取时映射 key-value 方式。读取时映射 key-value 的方式主要通过 CreateRecordReader()
方法中返回的 RecordReader
对象决定(参考)。
2.3 Shuffle 阶段
MapTask 处理完毕的数据会经过 OutputCollector 进入到 shuffle 阶段。具体地,数据会进入一个环形缓冲区(equator 的一侧放置 meta 信息,另一侧放置 kv 数据),当数据达到一定量后,会溢写到 container 或者磁盘(参考)。
在 Shuffle 阶段,最核心的几项任务包括 分区(partition),排序(Sorting),合并(Combiner)。下面对这几项任务重点介绍。
a. 分区
在 Hadoop 中,我们可以通过自定义 分区方式,将同一类型的数据放置到同一个分区中,再分别交由不同的 ReduceTask 进行并行处理。
系统默认分区在 HashPartitioner
类 (继承了 Partitioner
类) 中通过 getPartition()
函数进行规定,计算方法为 (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks
。默认情况下,numReduceTasks
的值为1,因此,默认所有的 MapTask 输出的结果会放置到同一个分区下,最终交给一个 ReduceTask 来处理。
自定义 partitioner 的方法可以看参考例子。
需要注意的是,在自定义分区方式的同时,一定要在 driver 类中将 job 的 ReduceTask 数量设置为与 分区数量相匹配的值。如果不匹配,可能会发生下列问题:
- 如果 reducetask 数量大于 getPartition 的结果数,则会产生几个空的输出文件;
- 如果 reducetask 数量小于 getPartition 的结果数,则有一部分数据无处放置,报 exception;
- 如果 reducetask 数量为1,所有数据都放置到一个文件中,无论 在 getPartition 中设置了多少个分区。
b. 排序
在 MapReduce 的计算框架中,会根据数据的 key 值进行多次排序。第一次排序是在环形缓冲区中(准备溢写时)使用 快速排序法 进行首次排序,接下来的几次排序则是对初步 有序的数据进行进一步排序,使用 归并排序法。
通常,具体的排序策略并不需要我们处理,我们只需要规定判定数据之间大小关系的规则即可。具体地,我们可以通过自定义 MapTask 的输出 k-v 对的 key 值的 compareTo
方法,来实现这一目标。可查看 参考示例代码:参考1, 参考2。
c. Combiner
Combiner 是 MR 程序中 Mapper 和 Reducer 之外的一种组件,它是 Reducer 的子类,因此,功能与 Reducer 非常类似。不同点是,二者的运行位置不同: combiner 是在每一个 MapTask 所在的节点运行;而 Reducer 是接收全局所有 Mapper 的输出结果。
可以想到,Combiner 的意义就是对每个 MapTask 的输出进行局部汇总,从而可以减小网络传输量。
需要注意的是,Combiner 能够应用的前提是不影响最终的业务逻辑(适合各种求和,汇总场景,不适用于求平均等涉及除法场景)。并且,Combiner 输出的 kv 应该跟 Reducer 输入 kv 类型对应起来。
2.4 Reduce 阶段
数据经过 shuffle 过程之后,会最终进入到 reduce 阶段。系统根据分区数(driver 类中需进行设置)启动 reducetask。系统会将将所有 maptask 中相同分区的数据放到同一个 ReduceTask。数据进来后,再进行 归并排序,合并。reduce 逻辑处理完毕后,通过context.write(k, v)
写出,最终系统会根据指定的 outputFormat
将数据最终写出到文件。
总结来说,reduce 分为下面三个过程:
- copy 阶段,拷贝对应数据到 reduceTask 所在的本地磁盘。
- merge,sort 阶段:数据溢写到硬盘,将数据排序,分组。
- reduce 阶段:相同组的到同一个 reduce 中。
a. ReduceTask 并行度
如前面所述,ReduceTask 并行度需要与分区情况进行匹配,方可达到最佳效果。具体 reduceTask 的数量可以在 driver 类中设定:job.setNumReduceTasks('并行数')
。
关于 reduce Task 数量的相关的注意点总结如下:
- reduceTask = 0, 表示没有 reduce 阶段,输出文件个数和 map 个数一致;
- reduceTask 默认值为 1, 所以输出文件个数为1个。ReduceTask 数量并不是随意设置的,需要考虑业务逻辑需求,有些情况下,需要计算全局汇总结果,就只能有一个 reduceTask。
- 如果数据分布不均匀,就有可能在 reduce 阶段产生数据倾斜,需要避免!
- 具体开多少个 reducetask 最好,还由集群性能决定;
- 如果分区数不是 1,但是 reduceTask 为 1,则实际不执行分区过程。因为在 maptask 源码中,执行分区之前,会先判断reduceNum 的个数是否大于1,不大于 1 则不执行任务。
b. OutputFormat 接口
类似 InputFormat
可以规定文件读取时的切片方式和 输入 Map Task 的 key-value 的模式, OutputFormat
则规定了 MapReduce 程序输出内容的形式。Hadoop 默认的 OutputFormat
是 TextOutputFormat
,当需要使用多级 MR 时, 可以使用 SequenceFileOutputFormat
。
可以通过自定义的输出格式(继承FileOutputFormat
),实现数据直接输入数据库等功能。
具体可以参考 案例。
3. 案例及调试
3.1 案例:Join的多种应用
JOIN 即数据的合并,是数据库中经常需要进行的操作,但对于 MapReduce 程序,如何实现大数据的 JOIN 呢?通常有两种方案。
方案1: reduce join
reduce join 是通过在 map 阶段对数据进行标记,然后在 reduce 阶段统一进行合并。具体过程如下。
Map端:为来自不同表(或文件) 的 key/value 对,打标签以区别不同的来源记录。然后用连接字段作为 key(连接字段是join 使用的字段),其余部分和新加的数据来源标签作为 value,最后进行输出。
Reduce 端:以连接字段作为 key 的分组已经完成,只需要在每个分组中将 来源于不同文件的记录分开,最后进行合并即可。
参考代码:https://github.com/SmallGreens/hadoop_basic/tree/main/src/main/java/tableJoin
Reduce join 的缺点:合并操作在 reduce 阶段进行,reduce 端的处理压力太大,map 结点运算负载低,资源利用率不高,且在 reduce 阶段极易产生数据倾斜。
方案2:map join
为解决在 reduce 端合并的问题,可以考虑在 map 端对数据进行合并。方法是先将数据量较小的表预缓存到内存中,然后再读取另外的表,同时进行合并操作。通常适用于一张大表 + 1张很小的表的情况。
参考代码: https://github.com/SmallGreens/hadoop_basic/tree/main/src/main/java/cache
3.2 调试方法:计数器
单线程的程序可以使用 debug 来处理, 但是多线程的程序,则通常需要通过计数器、log 等工具来实现调试。
hadoop 为每个作业维护若干内置计数器。例如有记录处理的字节数的计数器,可以监控已处理的输入数据量和已产生的输出数据量。
默认的 mapreduce 框架中就有很多的计数器, 在控制台的输出可以看到:
1 | Map input records=6 |
如果希望自己实现一个计数器,hadoop 中主要有两种方案:
method 1: 采用 枚举的方式统计计数:
1 | enum MyCounter{MALFORORMED, NORMAL} |
method 2: 采用计数器组、计数器名称的方式进行统计:
1 | context.getCounter("counterGroup_name", "counter_name").increment(1); |
程序执行后,计数器的结果可以在控制台上进行观测。
参考例子-清理数据:https://github.com/SmallGreens/hadoop_basic/tree/main/src/main/java/log
4. Hadoop 数据压缩
压缩技术 能够减少底层 HDFS 的读写字节数。压缩提高了网络带宽和磁盘空间的效率。在 Hadoop 中, 压缩可以发生在 MapReduce 的任意阶段(map前,map和 reduce之间,reduce 后)。
虽然采用 压缩技术可以减少磁盘 IO 和网络传输,但同时也增加了 CPU 的运算负担。因此:
- 运算密集型的 job,少用压缩;
- IO密集型的job,多用压缩。
4.1 压缩方式
MR 支持的压缩编码包括:DEFLATE, Gzip, bzip2, LZO, Snappy (最后两个不是自带的,需要自行配置安装)。下面对这些压缩方式进行具体的介绍。
Gzip
优点是:压缩率比较高,而且压缩和解压速度也比较快。hadoop 本身支持,在应用中处理 Gzip 格式的文件就和直接处理文本一样。并且大部分 Linux 系统都自带 Gzip 命令,使用方便。缺点:不支持切片。
应用场景:当每个文件压缩之后在 130 M 以内(1个块大小内),都可以考虑使用 Gzip 压缩格式。
bzip2
优点是:支持 切片,具有很高的压缩率,比 gzip 压缩率高。hadoop 本身自带,使用方便。缺点:压缩解压速度慢。
应用场景:适合对速度要求不高,但需要较高的压缩率的情况;或者输出数据比较大,处理之后 的数据需要的压缩存档以减少磁盘存储空间,并且以后用的比较少的情况。
Lzo
优点:压缩、解压速度比较快,具有合理的压缩率。支持 split,是 hadoop 中 最流行的压缩格式之一。可以在 linux 系统下安装 lzop 命令。 缺点:压缩率不高,hadoop 本身不支持,需要进行安装;并且 在应用中对 lzo 格式文件需要做一些特殊处理(为了支持 split 需要建索引,还需要指定 InputFormat 为 Lzo 格式)。
应用场景:一个很大的文本文件,压缩之后还大于 200 M 以上的可以考虑,并且单个文件越大,lzo 优点越明显。
Snappy
优点; 极高速压缩速度,合理的压缩率(即压缩率相对较低)。缺点:不支持 split,压缩率比 gzip 低,hadoop 本身不支持,需要安装。
应用场景:mapreduce 作业的 map 输出的数据比较大的时候,作为 map 到 reduce 的中间数据的压缩格式。
4.2 压缩位置的选择
Map 输入前采用压缩: 有大量数据并计划重复处理的情况下,应该考虑对输入进行压缩。在输入阶段,无需显示的指定使用的编码方式,hadoop 会自动检查文件拓展名,如果拓展名能够匹配支持的压缩方式,就会使用对应的编解码器对文件进行处理。
Map 和 reduce 之间进行压缩: 可以降低网络传输的压力,最常使用。常常使用 LZO 和 Snappy 方法。
Reducer 输出采用压缩: 在多级 mapreduce 任务中,在此阶段的输出可能会进行压缩。
5. Yarn 资源调度器
Yarn 是一个资源调度平台,负责为运算程序提供服务运算资源,相当于一个分布式的操作系统平台,而 MapReduce 等运算程序则相当于运行于 该操作系统之上的应用程序。
5.1 组成与运行机制
Yarn 主要由 ResourceManager(整个集群的老大), NodeManager(管理单个结点的资源), ApplicationMaster(管理每个 job) 和 Container(yarn 上资源的抽象) 等组件构成。如下图所示:
Yarn 通过上述各组件对 MapReduce 程序进行管理,具体的操作流程如下图所示:
上图中,将一个 MapReduce 的任务分割为了 14 小步,这里可以根据他们
- 申请 运行 ApplicationMaster 并提交相关资源(对应上图中的 0-4 步):
- MR 程序提交到客户端所在的结点,指定使用 Yarn 运行,会向 ResourceManager 发送一个任务申请;
- ResourceManager 返回资源提交路径
hdfs://.../staging
以及application_id
; - 客户端提交 job 运行所需的资源(job.split, job.xml, jar 包)到 ResourceManager 指定的提交路径处;
- 资源提交完毕,申请运行 ApplicationMaster;
- 将运行 ApplicationMaster 这一任务放入 ResourceManager 管理的任务调度队列,排队执行(步骤5-6)。
- ApplicationMaster 的初始化(步骤7-9):
- 创建 container;
- 下载 任务所需的资源到 ApplicationMaster 所在的 结点;
- 申请运行 MapTask (申请 MapTask 的容器);
- 将 MapTask 的申请放入 ResourceManager 管理的任务调度队列中,排队执行(步骤9-10)。
- 执行 MapTask(步骤10-11):
- ResourceManager 为 MapTask 分配执行的 container;
- ApplicationMaster 向 MapTask 所在的 container 发送任务启动脚本。
- 将 Reduceask 的申请放入 ResourceManager 管理的任务调度队列中,排队执行(步骤12)。
- 执行 ReduceTask:分配得到的 ReduceTask container 从 map task 处获得分区数据,并执行 Reduce(步骤13)。
- 程序执行完毕,向 ResourceManager 注销自己。
除了上述的任务管理功能外,Yarn 还会监控任务执行的进度情况,并可以定时向 任务所有者(客户端)报告任务的进度。
5.2 资源调度器
Hadoop 作业调度器主要有三种:FIFO,Capacity Scheduler,Fair Scheduler, 其中 Hadoop2 默认的资源调度器为 Capacity scheduler。
FIFO:先进先出。维护一个队列放置 task,当新的服务器结点有资源,则将队列中的第一个 task 交给对应结点(注意这个task 可能是 初始化 ApplicationMaster or MapTask or ReduceTask)。
Capacity Scheduler:支持多个 FIFO 队列,每个队列可配置一定的资源量,每个队列采用 FIFO 调度策略。因为有多个队列,添加任务的时候会 首先添加到最闲的队列中(根据队列中的任务数与其应该分得的计算资源之间的比值进行判定)。
此外,为了防止同一个用户的作业独占队列中的资源,该调度器会对 同一用户提交的作业所占资源量进行限制。
相对于 FIFO 调度策略,Capacity Scheduler 具有更高的并发度。
Fair Scheduler:同样使用多队列。每个队列中的 job 会按照 优先级分配资源,优先级越高资源分配越多,但是每个 job 都会分配到一定资源。
此外,job 按照 缺额进行排序(所谓缺额是指job实际需要的资源与被分配到的资源之间的差距), job 缺额越大,越先获得资源有限执行。
相比上述两种调度器, Fair Scheduler 的并发度最高。
5.3 推测执行
由于实际作业完成时间总是取决于 执行最慢的任务的完成时间。一个 job 通常有多个 task 组成,但由于执行 task 的各个机器性能不同,性能最慢的机器会影响系统整体的速度(例如 map 阶段,一两个机器运行很慢,就会一直卡住无法进入reduce 阶段)。
因此,Yarn 在资源调度时 提供了一种 推测执行机制。即当发现拖后腿的任务时,系统会为其启动一个备份任务,同时运行,最后谁先运行完毕,就采用谁的结果。开启推测执行参数在 mapred-site.xml
中设置,默认设置为开启。
推测执行算法原理:MR 会根据当前任务的执行进度(progress - 百分比)以及到现在为止的任务执行时间推断出 当前任务执行结束的时刻。同时,根据计算任务运行的平均时间,计算如果当前时刻开始新建备份任务的话,备份任务推测完成时刻。比较上述两个时刻求得 差值。最后选择差值最大的任务为其启动备份任务。
推测执行机制 是一种典型的 以空间换时间 的优化模式。但在资源紧缺的情况下,应合理使用该机制。
具体地,为了更好的应用 该机制,需要关注下述几点:
- 每个 task 只能有一个备份任务;
- 为了防止大量任务同时启动备份任务造成资源浪费,MR 为每个job 设置了同时启动备份任务数目上限;
- 只有当前 job 已完成的 task 超过 5% 才允许 启动备份任务;
- 当任务间存在严重的负载倾斜,以及特殊的任务(例如任务向数据库中写数据)不应该使用推测执行机制。
6. Hadoop优化
MapReduce 程序效率的瓶颈主要可以分为两点:
- 计算机性能:CPU,内存,磁盘健康,网络等。这部分可以通过硬件更新来提升。当然,对于编程来讲,可能需要调节相关的参数来适应相应的硬件水平。
- I/O 操作优化,常见的一些问题如下:
- 数据倾斜;
- map 和 reduce 数设置不合理;map运行时间太长,导致 reduce 等待过久;
- 小文件过多 或者有 大量的不可分块的超大文件;
- spill 次数过多,merge 次数过多。
针对上述的问题,我们可以从数据输入的不同阶段出发,在每个阶段提供一些优化的思路,如下:
- 数据输入阶段:
- 大量的小文件可能导致 map 任务过多,增大 map 任务的装载次数,而任务装载比较耗时,从而导致 MR 运行较慢。可使用 CombineTextInputFormat 作为输入,将小文件合并。
- map阶段:
- 减少 spill 次数,可以调整
io.sort.mb
及sort.spill.percent
参数,增大触发 spill 的内存上限,减少 spill 次数,从而减少磁盘 IO; - 减少 merge 次数:通过调整
io.sort.factor
参数,增大 merge 文件数目,减少 merge 次数,从而缩短 MR 处理时间; - map 之后,在不影响业务逻辑的前提下,先进行 combine 处理,减少网络 IO;
- 减少 spill 次数,可以调整
- I/O 传输阶段:
- 采用数据压缩方式,减少网络 IO 时间,安装 snappy 和 LZO 压缩编码器;
- 使用 SequenceFile 二进制格式(一种比较紧凑的数据格式);
- reduce阶段:
- 合理设置 reduce 数:太少导致 task 等待,延长处理时间;太多,导致 map、reduce 任务间竞争资源,造成处理超时等错误;
- 设置 map reduce 共存:调整
slowstart.completedmaps
参数,使 map 运行到一定程度后,reduce 也开始运行,减少 reduce 的等待时间; - 允许的情况下,规避使用 reduce: 因为 reduce 在用于连接数据集的时候会产生大量的网络消耗;
- 合理设置 reduce 端的 buffer:可以设置部分内存读 buffer 中的数据 直接拿给 reduce 使用,可通过设置
mapred.job.reduce.input.buffer.percent
来实现。
除了上述4个阶段的优化策略,针对特别的问题 数据倾斜(现象是某一区域的数据量要远远大于其他区域) 我们可以采用下面的一些思路进行优化处理:
- 抽样和范围分区,通过对原始数据进行抽样得到的结果集来预设分区边界值;
- 自定义分区,依据对数据的了解,对数据进行自定义分区;
- 使用 combiner;
- 采用 map join,尽量避免 reduce join。
HDFS小文件问题 也是一个重要的需要优化的对象。因为每个小文件都需要在 namenode 上建立索引,而一个索引固定占用 150 bytes,当小文件过多,大量占用 namenode 中的索引空间,会降低文件搜索速度。针对这一问题,有下列几种处理方案:
- hadoop archive: 是一个高效地将小文件放入 hdfs 块中的文件存档工具,它能够将多个小文件打包成一个 HAR 文件,这样就减少了 NameNode 的内存使用;
- Sequence File: 由一系列二进制 key/value 组成,如果 key 为文件名,value 为文件内容,则可以将大批小文件合并成一个大文件;
- CombineFileInputFormat: 是一种新的 InputFormat,用于将多个文件合并成一个单独的 split,另外,他会考虑数据的存储位置;
- 开启 JVM 重用:对于大量小文件的job,可以开启 JVM 重用,原理是一个 map 运行在一个 JVM 上进程上,开启重用的话,该 JVM 进程运行完毕当前 map 后可以继续服务其他 map。
最后,总结一些常用的优化参数如下:
mapreduce.map.memory.mb
: 规定一个mapTask 可以使用的内存资源上限;mapreduce.reduce.memory.mb
: 规定一个 reduceTask 可以使用的内存资源上限;- 规定 MapReduce 计算过程中的资源使用情况:例如,可以通过参数指定各阶段使用 cpu 数;reduce 去 map 中取数据的并行数;数据溢写阈值;
- YARN 相关参数:给应用程序 container 分配的最大最小的内存,cpu核数等;
- shuffle 性能优化:环形缓冲区大小,环形缓冲区溢出比例。
- 容错相关参数:maptask 最大尝试次数;task timeout 等。
参考
- 尚硅谷Hadoop 2.x教程(hadoop框架精讲):https://www.bilibili.com/video/BV1cW411r7c5