1、mapreduce原理
1.1、序列化和反序列化
(1)序列化:将内存的对象转换成字节序列,便于存储
(2)反序列化:将收到的字节序列或硬盘的持久化数据,转换成内存。
1.2、inputformat中默认的是(textinputformat)
1.3、Inputsplit的含义
–inputsplit只记录了分片的元数据信息,比如起始位置、长度及所在节点列表等。
(1)找到所需数据文件存储目录;
(2)遍历处理目录下的每一个文件
(3)遍历第一个文件ss.txt
①遍历文件大小;
②计算切片大小,默认情况下block大小;
③开始切片,形成第一个切片ss.txt-1:128M…
每次切片都要判断切完剩下的部分是否大于块的1.1倍,不大于1.1倍就划分为一块切片。
④将切片信息写入到一个切片规划文件中。
⑤提交切片规划文件到yarn上,yarn上的MRAppmaster根据切片规划文件计算开启maptask的个数。
block:是HDFS上 物理存储 的数据;
切片:是对 数据逻辑上 的划分。
数据切片只是在逻辑上对输入数据进行切片,并不会在磁盘上将其切分成片进行存储。
2、Mapreduce执行流程(重点)

2.1、Mapper阶段运作流程

1、准备工作
:HDFS待处理的文件(200M),两个block对应两个分片(split),对应两个Maptask
2、客户端submit前
:获取待处理的信息,根据参数配置,形成任务分配规划
:如 - txt1(0-128M),txt2(128-200M)
3、客户端正式提交
:wc.jar,job.xml
4、客户端向RM发出请求
5、Resourcemanager执行的任务过程
:根据分片的个数计算出Maptask数量
:指定nodemanager开启APPmaster
6、使用textinputformat读取数据,并进行逻辑运算默认
:Ⅰ、InputFormat读取txt1文件
:Ⅱ、底层使用记录读取器RecorderReader读取内存
:Ⅲ、调用read读取k1,v1
:Ⅳ、调用mapper方法,生成k2,v2
7、计算完毕后将计算结果k2,v2写入到环形缓冲区
:元数据 - 索引、分区、分区数、key的开始至,value的开始
:记录 - 记录的key,记录的values,未使用区域
8、在内存进行分区排序
:分区规则 - 当前key的hash值对reduce个数进行求余数
:排序规则 - 快排
9、当内存数据超过阈值,0.8后,会溢写数据到磁盘,形成小文件
:磁盘文件分区且区内有序
10、磁盘中的小文件最后会归并排序
:将分区号相同的小文件合并成一个文件
11、Combiner,按照各个分区进行本地聚合
2.2、Reducer阶段运作流程
- 所有的MapTask任务完成之后,启动相对应数量的ReduceTask(即按照分区数启动)
- reduce数量可指定

1、按照分区号通过http协议跨节点拉取到reducetask本地磁盘;
2、将拉取到的多个map端文件进行归并排序,生成一个排序后的文件
3、将文件装载到内存,进行聚合操作
4、输出计算结果到HDFS,输出格式由outputformat.recordwriter控制
3、Shuffle过程(重点)
3.1、环形缓冲区数据结构

Map的输出结果是由collector处理的,每个Map任务不断地将键值对输出到在内存中构造的一个环形数据结构中。使用环形数据结构是为了更有效地使用内存空间,在内存中放置尽可能多的数据。
:这个数据结构其实就是个字节数组byte[],叫Kvbuffer,名如其义,但是这里面不光放置了数据,还放置了一些索引数据,给放置索引数据的区域起了一个Kvmeta的别名,在Kvbuffer的一块区域上穿了一个IntBuffer(字节序采用的是平台自身的字节序)的马甲。
:数据区域和索引数据区域在Kvbuffer中是相邻不重叠的两个区域,用一个分界点来划分两者,分界点不是亘古不变的,而是每次Spill之后都会更新一次。初始的分界点是0,数据的存储方向是向上增长,索引数据的存储方向是向下增长.
注意:上述的分界点就可以理解图中的赤道信息
Kvbuffer的存放指针bufindex(即数据的存储方向)是一直闷着头地向上增长,比如bufindex初始值为0,一个Int型的key写完之后,bufindex增长为4,一个Int型的value写完之后,bufindex增长为8。 (int型的数据占有4个字节)
索引是对在kvbuffer中的键值对的索引,是个四元组,包括:value的起始位置、key的起始位置、partition值、value的长度,占用四个Int长度,Kvmeta的存放指针Kvindex每次都是向下跳四个“格子”,然后再向上一个格子一个格子地填充四元组的数据。比如Kvindex初始位置是-4,当第一个键值对写完之后,(Kvindex+0)的位置存放value的起始位置、(Kvindex+1)的位置存放key的起始位置、(Kvindex+2)的位置存放partition的值、(Kvindex+3)的位置存放value的长度,然后Kvindex跳到
-8位置,等第二个键值对和索引写完之后,Kvindex跳到-12位置。

3.2、Map阶段的Shuffle过程(重点)

map阶段shuffle大致流程
1、每个map有一个环形内存缓冲区,用于存储map的输出。
:默认大小为100MB,一旦达到阈值0.8,一个后台线程就将内容写入到磁盘的指定目录的一个新建文件中
2、写磁盘前,要partition(哈希取模分区,余数就是分区编号),sort。
:如果有combiner,combiner排序后写数据
:commbiner:本地聚合可缓解reducetask计算压力
:内存中排序算法为快排
3、等最后记录写完,合并全部文件为一个分区且排序的文件
:小文件合并排序算法为归并排序
1、在生成map之前,会计算文件分片的大小,然后会根据分片的大小计算map的个数,
对每一个分片都会产生一个map作业,或者是一个文件(小于分片大小*1.1)生成一个map作业,然后通过自定的map方法进行自定义的逻辑计算,计算完毕后会写到本地磁盘;
2、在这里不是直接写入磁盘,为了保证IO效率,采用了先写入内存的环形缓冲区,并做一次预排序(快速排序)。
缓冲区的大小默认为100MB(可通过修改配置项mpareduce.task.io.sort.mb进行修改),当写入内存缓冲区的大小到达一定比例时,默认为80%(可通过mapreduce.map.sort.spill.percent配置项修改),将启动一个溢写线程将内存缓冲区的内容溢写到磁盘(spill to disk),这个溢写线程是独立的,不影响map向缓冲区写结果的线程,在溢写到磁盘的过程中,map继续输入到缓冲中,如果期间缓冲区被填满,则map写会被阻塞到溢写磁盘过程完成。
溢写是通过轮询的方式将缓冲区中的内存写入到本地mapreduce.cluster.local.dir目录下。在溢写到磁盘之前,我们会知道reduce的数量,然后会根据reduce的数量划分分区,默认根据hashpartition对溢写的数据写入到相对应的分区。
在每个分区中,后台线程会根据key进行排序,所以溢写到磁盘的文件是分区且排序的。如果有combiner函数,它在排序后的输出运行,使得map输出更紧凑。减少写到磁盘的数据和传输给reduce的数据。
每次环形换冲区的内存达到阈值时,就会溢写到一个新的文件,因此当一个map溢写完之后,本地会存在多个分区切排序的文件。在map完成之前会把这些文件合并成一个分区且排序(归并排序)的文件,可以通过参数mapreduce.task.io.sort.factor控制每次可以合并多少个文件。
3.3、Reduce阶段的Shuffle过程(重点)

Reduce shuffle阶段大致流程:
1、Reducer通过http协议得到输出文件的特定分区的数据
reducetask1:负责分区1和分区3的数据聚合,通过http协议将对应分区数据拉取过来,拉去内存中
:内存阈值是0.8,多余的落磁盘
2、全部拉取过来后,进行排序合并map输出,然后走reduce阶段
3、reduce执行完毕之后,写入到HDFS中
map任务完成后,监控作业状态的application master便知道map的执行情况,并启动reduce任务,application master并且知道map输出和主机之间的对应映射关系,reduce轮询application master便知道主机所要复制的数据。
一个Map任务的输出,可能被多个Reduce任务抓取。每个Reduce任务可能需要多个Map任务的输出作为其特殊的输入文件,而每个Map任务的完成时间可能不同,当有一个Map任务完成时,Reduce任务就开始运行。Reduce任务根据分区号在多个Map输出中抓取(fetch)对应分区的数据,这个过程也就是Shuffle的copy过程。
reduce有少量的复制线程,因此能够并行的复制map的输出,默认为5个线程。可以通过参数mapreduce.reduce.shuffle.parallelcopies控制。
这个复制过程和map写入磁盘过程类似,也有阀值和内存大小,阀值一样可以在配置文件里配置,而内存大小是直接使用reduce的tasktracker的内存大小,复制时候reduce还会进行排序操作和合并文件操作。
如果map输出很小,则会被复制到Reducer所在节点的内存缓冲区,缓冲区的大小可以通过mapred-site.xml文件中的mapreduce.reduce.shuffle.input.buffer.percent指定。一旦Reducer所在节点的内存缓冲区达到阀值,或者缓冲区中的文件数达到阀值,则合并溢写到磁盘。
如果map输出较大,则直接被复制到Reducer所在节点的磁盘中。随着Reducer所在节点的磁盘中溢写文件增多,后台线程会将它们合并为更大且有序的文件。当完成复制map输出,进入sort阶段。这个阶段通过归并排序逐步将多个map输出小文件合并成大文件。最后几个通过归并合并成的大文件作为reduce的输出。
4、MapReduce相关面试题汇总
4.1、一个job的map数量和reduce数量
4.1.1、map数量
? MapReduce框架会根据输入文件计算输入数据分片(input split),每个数据分片针对一个map任务,数据分片存储的并非数据本身,而是一个分片长度和一个记录数据的位置的数组。
defalt = total_size/split_size
1、具体的数据分片是这样的,InputFormat在默认情况下会根据hadoop集群的DFS块大小进行分片,每一个分片会由一个map任务来进行处理。
2、block块(数据块,物理划分)
block:HDFS中的基本存储单位,hadoop1.x默认大小为64M而hadoop2.x默认块大小为128M。文件上传到HDFS,就要划分数据成块,这里的划分属于物理的划分(实现机制也就是设置一个read方法,每次限制最多读128M的数据后调用write进行写入到hdfs),块的大小可通过 dfs.block.size配置。block采用冗余机制保证数据的安全:默认为3份,可通过dfs.replication配置。
注意:当更改块大小的配置后,新上传的文件的块大小为新配置的值,以前上传的文件的块大小为以前的配置值。
3、split分片(数据分片,逻辑划分)
split分片:属于逻辑上的划分,目的只是为了让map task更好地获取数据。split是通过hadoop中的InputFormat接口中的getSplits()方法得到的。
4.1.2、reduce数量
4.2、maptask和reducetask的工作原理
4.2.1、maptask的工作原理
![[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-v1IZL3cG-1627810650498)(C:/Users/%E6%9D%8E%E6%B5%B7%E4%BC%9F/AppData/Local/YNote/data/lhw18434365386@163.com/c3baff788a0a4ccf8d6a16764277dbbf/clipboard.png)]](https://img-blog.csdnimg.cn/13b116fe445e4eaf876a1d66fabedf32.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3FxXzI3OTI0NTUz,size_16,color_FFFFFF,t_70) (1) Read阶段:Map Task通过用户编写的RecordReader,从输入InputSplit中解析出一个个key/value。
(2) Map阶段:该节点主要是将解析出的key/value交给用户编写map()函数处理,并产生一系列新的key/value。
(3) Collect收集阶段:在用户编写map()函数中,当数据处理完成后,一般会调用OutputCollector.collect()输出结果。在该函数内部,它会将生成的key/value分区(调用Partitioner),并写入一个环形内存缓冲区中。
(4) Spill阶段:即“溢写”,当环形缓冲区满后,MapReduce会将数据写到本地磁盘上,生成一个临时文件。需要注意的是,将数据写入本地磁盘之前,先要对数据进行一次本地排序,并在必要时对数据进行合并、压缩等操作。
步骤1:利用快速排序算法对缓存区内的数据进行排序,排序方式是,先按照分区编号partition进行排序,然后按照key进行排序。这样,经过排序后,数据以分区为单位聚集在一起,且同一分区内所有数据按照key有序。
步骤2:按照分区编号由小到大依次将每个分区中的数据写入任务工作目录下的临时文件output/spillN.out(N表示当前溢写次数)中。如果用户设置了Combiner,则写入文件之前,对每个分区中的数据进行一次聚集操作。
步骤3:将分区数据的元信息写到内存索引数据结构SpillRecord中,其中每个分区的元信息包括在临时文件中的偏移量、压缩前数据大小和压缩后数据大小。如果当前内存索引大小超过1MB,则将内存索引写到文件output/spillN.out.index中。
(5) Combine阶段:当所有数据处理完成后,MapTask对所有临时文件进行一次合并,以确保最终只会生成一个数据文件。
当所有数据处理完后,MapTask会将所有临时文件合并成一个大文件,并保存到文件output/file.out中,同时生成相应的索引文件output/file.out.index。
在进行文件合并过程中,MapTask以分区为单位进行合并。对于某个分区,它将采用多轮递归合并的方式。每轮合并io.sort.factor(默认100)个文件,并将产生的文件重新加入待合并列表中,对文件排序后,重复以上过程,直到最终得到一个大文件。
让每个MapTask最终只生成一个数据文件,可避免同时打开大量文件和同时读取大量小文件产生的随机读取带来的开销。
4.2.2、reducetask的工作原理
(1)copy阶段 - reducetask获取map阶段的输出
①reduce需要通过内部通信将map任务的输出拷贝到reduce。
②需要将所有的map任务的输出拷贝到不同的reduce。一个reduce处理一个分区的所有map任务输出。 ![[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-VN692lGj-1627810650500)(C:/Users/%E6%9D%8E%E6%B5%B7%E4%BC%9F/AppData/Local/YNote/data/lhw18434365386@163.com/174b1beb74e84bf795b380f4b1b99df8/clipboard.png)]](https://img-blog.csdnimg.cn/cf75b4f5fab142ba83c806e7707f5f0f.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3FxXzI3OTI0NTUz,size_16,color_FFFFFF,t_70)
Ⅰ、map任务和reduce任务不一定在同一台机器上。
Ⅱ、map任务和reduce任务不是只有一个。可能有多个map任务和多个reduce任务同时执行。
(2)合并和排序阶段
①将map输出拷贝到reduce后,需要将文件进行合并为一个文件。
②之后对合并以后的文件内容进行排序
完成排序以后的合并文件就是reduce任务的输入文件。
(3)reduce阶段任务处理【开发】
①输入文件是已经完成排序的。
②文件的内容,都是k.v的形式。相同的key,可能会有很多的不同的value。
所以在reduce任务中,输入k,v的关系实际上是一个key->list关系。
③将key对应的集合数据进行遍历,求和即合并。
④合并以后的结果,变成了k.v的一对一关系。实际就是k和求和后的数据汇总的一对一关系。
⑤将数据输出。
(4)reduce输出文件处理
①文件的输入/输出格式进行处理,不同的文件使用不同文件输入/输出格式。
默认TextInputformat/TextOutputformat.
②读数据的读入/写出使用RecorderReader/RecoderWriter
4.3、Hive的分区表其中一个分区文件被替换为不可读文件,使用这些表的时候会用影响吗?
不会受影响,除非三个备份文件全部都被替换
如果备份文件都被替换,只会影响当前分区
5、MapReduce代码实战
5.0、特别数据类型介绍
- 使用这些类型定义的数据可以被序列化进行网络传输和文件存储,以及进行大小比较。
BooleanWritable:标准布尔型数值
ByteWritable:单字节数值
DoubleWritable:双字节数
FloatWritable:浮点数
IntWritable:整型数
LongWritable:长整型数
Text:使用UTF8格式存储的文本
NullWritable:当<key, value>中的key或value为空时使用
5.1、实施步骤
第一步:正式实施
第二步:导入依赖
第三步:选用相应的编程方式实现
5.1.1、创建maven工程

5.1.2、导入依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>mapreduce-study</groupId>
<artifactId>mapreduce-study</artifactId>
<version>1.0-SNAPSHOT</version>
<name>mapreduce-study</name>
<url>http://www.example.com</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.7</maven.compiler.source>
<maven.compiler.target>1.7</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.9.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-jobclient</artifactId>
<version>2.9.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.9.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.9.2</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<configuration>
<archive>
<manifest>
<mainClass>org.apache.hadoop.examples.ExampleDriver</mainClass>
</manifest>
</archive>
</configuration>
</plugin>
</plugins>
</build>
<build>
<pluginManagement>
<plugins>
<plugin>
<artifactId>maven-clean-plugin</artifactId>
<version>3.1.0</version>
</plugin>
<plugin>
<artifactId>maven-resources-plugin</artifactId>
<version>3.0.2</version>
</plugin>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.0</version>
</plugin>
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.22.1</version>
</plugin>
<plugin>
<artifactId>maven-jar-plugin</artifactId>
<version>3.0.2</version>
</plugin>
<plugin>
<artifactId>maven-install-plugin</artifactId>
<version>2.5.2</version>
</plugin>
<plugin>
<artifactId>maven-deploy-plugin</artifactId>
<version>2.8.2</version>
</plugin>
<plugin>
<artifactId>maven-site-plugin</artifactId>
<version>3.7.1</version>
</plugin>
<plugin>
<artifactId>maven-project-info-reports-plugin</artifactId>
<version>3.0.0</version>
</plugin>
</plugins>
</pluginManagement>
</build>
</project>
5.3、代码编写
5.3.0、需求清单
统计文件中的单词个数:文件大小200M
:将文件拆分成splits,由于测试用的文件较小,所以每个文件为一个split,并将文件按行分割形成<key, value>对,如下图所示,这一步由MapReduce框架自动完成,其中偏移量(即key值)包括了回车所占的字符数

1.将文本信息转换成字符串类型
2.用split方法以空格将字符串分割存到数组里
3.以每一个单词作为map的key,单词出现的次数作为map的value
4.Combine是优化的步骤,在mapper阶段就对value的值进行计算,每一个分片只返回单个大的value,而不再是都是1的value
5.Suffle/sort阶段是对单词进行排序并且对所有分片的value值进行再次统计
6.reduce阶段对上一阶段的value值做最后的计算,输出单词对应出现的次数。
5.3.1、自定义Mapper
1、切割字符串

2、将分割好的<key, value>对交给用户定义的map方法进行处理,生成新的<key, value>对

3、得到map方法输出的<key, value>之后,Mapper会将它们按照key值进行排序,并执行Combine过程,将key值相同的value值累加,得到Mapper的最终输出结果

private final static class MyMapper extends<Longwrite,Text,Text,IntWrite>{
@Override
protected void map(LongWrite key,Text value,Context context)throws IOException,InterruptedException{
String[]arr=value.toString().split("\\s+");
for(String word:arr){
Text key2=new Text(word);
IntWritable value2=new IntWritable(1);
context.write(key,value2);
}
}
}
5.3.2、shuffle阶段
得到map方法输出的<key, value>之后,Mapper会将它们按照key值进行排序,并执行Combine过程,将key值相同的value值累加,得到Mapper的最终输出结果。
5.3.3、自定义reducer
Reducer先对Mapper接收的数据进行排序,再交由用户自定义的reduce方法进行处理,得到新的<key, value>对,并作为wordcount的输出结果

private final static class MyReducer extends Reducer<>{
@Override
protected void reduce(Text key,Iterable<IntWritable>,Context context) throws IOException,InterruptedException{
int totalCnt = 0;
for (IntWritable cnt:values){
int currentCnt = cnt.get();
totalCnt += currentCnt;
}
Text key3 = key;
InterWritable value3 = new IntWritable(totalCnt);
context.write(key3,value3)
}
5.3.4、拦截非法操作
if(args == null || args.length != 2){
System.out.println("请传入参数!使用:yarn jar xx.jar Sample01WordCountMapReduce<input><output>");
System.exit(-1);
5.3.5、装配job并提交
- 装配job,并将Job提交到Hadoop集群中去运行
- ①构建Job实例
- ②将自定义的Mapper装配到job中
- ③将自定义的Reducer装配到job中
- ④将Job提交到Hadoop集群中去运行
Path inputPath = new Path(args[0].trim());
Path ouputPath = new Path(args[1].trim());
try {
Class cls = Sample01WordCountMapReduce.class;
Configuration configuration = new Configuration();
String jobName = cls.getSimpleName();
Job job = Job.getInstance();
job.setJarByClass(Sample01WordCountMapReduce.class);
job.setMapperClass(MyMapper.class);
job.setMapOutputKeyClass(Text.class)
job.setMapOutputValueClass(Text.class)
FileInputFormat.setInputPaths(job,inputPath);
job.setInputFormatClass(TextInputFormat.class);
job.setReducerClass(MyReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileOutputFormat.setOutputPaths(job,outputPath);
job.setOutputFormatClass(TextOutputFormat.class);
job.waitForCompletion(True);
}catch(IOException | InterruptedException | ClassCastException e){
e.printStackTrace();
}
5.4、运行代码
源文件目录:必须存在
目标目录:会自动创建目录,必须不存在,否则会报错,目录已存在
![[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-PU9AzTqi-1627810650502)(C:/Users/%E6%9D%8E%E6%B5%B7%E4%BC%9F/AppData/Roaming/Typora/typora-user-images/image-20210625120542375.png)]](https://img-blog.csdnimg.cn/90a481c1fba2475ca54e78747a0cf073.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3FxXzI3OTI0NTUz,size_16,color_FFFFFF,t_70)
- 生成文件
 - 计算结果
Ant_Man 1
Avengers 2
Black_Panther 1
Black_Widow 1
Captain_America 1
DC 2
Doctor_Strange 1
Gamora 1
Groot 1
Hawkeye 1
Hulk 1
Iron_Man 1
Loki 1
Mavel 2
Rocket_Raccoon 1
Scarlet_Witch 1
Spider_Man 1
Star_Lord 1
Thor 1
Vision 1
Winter_Soldier 1
|