一. MapReduce模型框架
MapReduce是一种用于大数据处理的分布式计算模型,最初由谷歌工程师设计并实现。 谷歌公开了完整的MapReduce论文。 其中的定义是MapReduce是编程模型,是用于处理和生成大数据集的相关实现。 用户定义用于处理Key-Value对的map函数以生成中间的Key-Value对,然后定义reduce函数以合并这些中间具有相同Key的所有Value。 许多现实世界的任务可以用这个模式来表达。
1、MapReduce模型
源数据中间数据结果数据
MapReduce模型如上图所示,Hadoop MapReduce模型主要有两个抽象类: Mapper和Reducer。 Mapper端主要负责数据的分析处理,最终转换为Key-Value的数据结构Reducer端主要获取Mapper的结果并统计结果。
2、MapReduce框架
整个过程如上图所示,由四个独立的图元组成:
客户端: MapReduce作业,例如编写的MR程序,以及CLI执行的命令等。 作业跟踪者:协调工作的执行是管理者。 tasktracker :作业执行分割任务是执行者。 hdfs用于在群集之间共享存储的抽象文件系统。 说明:实际上,还有namenode是元数据仓库,类似于windows的注册表。 secondarynamenode可以视为namenode的备份。 datanode被认为是用于保存作业被拆分的任务。 在DRCP中,master为namenode、secondarynamenode、jobtracker,其他3台slaver均为tasktracker、datanode,tasktracker为HDFS的数据Mapper和Reducer在Hadoop中运行的MapReduce APP应用程序的最基本组成部分是用于创建Mapper抽象类、Reducer抽象类和作业conf的可执行文件。 JobTracker JobTracker是一种主服务器,在软件启动后,JobTracker会接收Job,并将作业的每个子任务Task调度为在TaskTracker上运行并监视其执行。 如果发现失败的Task,则必须重新运行,通常需要部署job tracker。tasktracker tasktracker是一种在多个节点上运行的slaver服务。 TaskTracker主动与JobTracker进行通信,接收作业(与DataNode和NameNode类似,通过心跳实现)并直接执行各项任务。 JobClient每个Job在客户端通过JobClient类将APP应用程序和配置参数Configuration打包到一个JAR文件中并存储在HDFS中,然后将路径提交给JobTracker的主服务。 当主程序通过每个任务(JobInProgress job client )提交作业时,JobTracker会创建jobinprogress来跟踪和调度此作业,然后将其添加到作业队列中。 JobInProgress基于提交的任务JAR中定义的输入数据集,创建分割为FileSplit的对应的TaskInProgress组来监视和调度地图任务,创建指定的书籍缺省值为一个ReduceTask。当TaskInProgress JobTracker开始任务时,它通过每个TaskInProgress执行Task,并序列化Task对象,即映射任务和ReduceTask TaskTracker到达后,将创建相应的TaskInProgress。 此TaskInProgress实现在JobTracker以外使用的TaskInProgress,并发挥同样的作用。 (此TaskInProgress用于监视和调度此Task。 要启动具体的Task进程,请在TaskInProgress中进行管理,并在TaskRunner对象中执行。 TaskRunner会自动加载任务JAR文件,设置环境变量,然后启动独立的Java Child进程来执行Task (映射任务或读取任务),但不一定在同一个TaskTracker上运行MapTask和ReduceTask完整作业由Mapper、Combiner (在作业conf指定Combiner时执行)、Mapper和Combiner从MapTask调用,Reduce为Reduce
口类的实现。Mapper会根据Job JAR中定义的输入数据集<key1, value1>对读入,处理完成生成临时的<key2, value2>对,如果定义了Combiner,MapTask会在Mapper完成调用该Combiner将相同Key的值做合并处理,以减少输出结果集。MapTask的任务全部完成后,交给ReduceTask进程调用Reducer处理,生成最终结果<Key3, value3>对。二、MapReduce工作原理
MapReduce是Hadoop的一个离线计算框架,运行时间范围从数秒到数小时,因此,对于我们而言直到作业进展是很重要的。一个作业和每个任务都有一个状态信息,包括作业或任务的运行状态(比如,运行状态,成功完成,失败状态)、Map和Reduce的进度、计数器值、状态消息和描述(可以由用户代码来设置)等。这些消息通过一定的时间间隔由Child JVM—>TaskTracker—>JobTracker汇聚。JobTracker将产生一个表明所有运行作业及其任务状态的全局视图。可以通过Web UI查看。同时JobClient通过每秒查询JobTracker来获得最新状态,输出到控制台上。现在可能会有一个疑问,这些状态信息在作业执行期间不断变化,它们是如何与客户端进行通信的呢?详细细节不在讲解,参考资料《Hadoop权威指南》。 6、作业的完成 当jobtracker收到作业最后一个任务已完成的通知后,便把作业的状态设置为”成功”。然后,在JobClient查询状态时,便知道作业已成功完成,于是JobClient打印一条消息告知用户,最后从runJob()方法返回。 说明: MapReduce容错,即作业失败情况不再讲解,参考资料《Hadoop权威指南》。
三、Shuffle阶段和Sort阶段
如果说以上是从物理实体的角度来讲解MapReduce的工作原理,那么以上便是从逻辑实体的角度来讲解MapReduce的工作原理,如下所示: 输入分片: 在进行map计算之前,mapreduce会根据输入文件计算输入分片,每个输入分片针对一个map任务,输入分片存储的并非数据本身,而是一个分片长度和一个记录数据位置的数组,输入分片往往和hdfs的block关系很密切。假如我们设定hdfs块的大小是64MB,如果我们有三个输入文件,大小分别是3MB、65MB和127MB,那么mapreduce会把3MB文件分为一个输入分片,65MB则是两个输入分片,而127MB也是两个输入分片,就会有5个map任务将执行。map阶段: 就是编写好的map函数,而且一般map操作都是本地化操作,也就是在数据存储节点上进行。combiner阶段: combiner阶段是可以选择的,combiner本质也是一种reduce操作。Combiner是一个本地化的reduce操作,它是map运算的后续操作,主要是在map计算出中间文件后做一个简单的合并重复key值的操作,比如,我们对文件里的单词频率做统计,如果map计算时候碰到一个hadoop单词就会记录为1,这篇文章里hadoop可能会出现多次,那么map输出文件冗余就会很多,因此在reduce计算前对相同的key做一个合并操作,文件就会变小,这样就提高了宽带的传输效率。但是combiner操作是有风险的,使用它的原则是combiner的输入不会影响到reduce计算的最终结果,比如:如果计算只是求总数,最大值,最小值可以使用combiner,但是如果做平均值计算使用combiner,那么最终的reduce计算结果就会出错。shuffle阶段: 将map的输出作为reduce输入的过程就是shuffle。一般mapreduce计算的都是海量数据,map输出的时候不可能把所有文件都放到内存中进行操作,因此map写入磁盘的过程十分的复杂,更何况map输出的时候要对结果进行排序,内存开销是很大的。map在做输出的时候会在内存里开启一个环形内存缓冲区,这个缓冲区是专门用来输出的,默认大小是100MB,并且在配置文件里为这个缓冲区设定了一个阀值,默认是0.80(这个大小和阀值都是可以在配置文件里进行配置的),同时map还会为输出操作启动一个守护线程,如果缓冲区的内存达到了阀值的80%时候,这个守护线程就会把内容写到磁盘上,这个过程叫spill。另外的20%内存可以继续写入要写进磁盘的数据,写出磁盘和写入内存操作是互不干扰的,如果缓存区被填满了,那么map就会阻塞写入内存的操作,让写出磁盘操作完成后再继续执行写入内存操作。写出磁盘前会有个排序操作,这个是在写出磁盘操作的时候进行的,不是在写入内存的时候进行的,如果还定义了combiner函数,那么排序后还会执行combiner操作。每次spill操作也就是写出磁盘操作的时候就会写一个溢出文件,即在做map输出的时候有几次spill操作就会产生多少个溢出文件。这个过程里还会有一个partitioner操作,其实partitioner操作和map阶段的输入分片很像,一个partitioner对应一个reduce作业,如果mapreduce操作只有一个reduce操作,那么partitioner就只有一个。如果有多个reduce操作,那么partitioner对应的就会有多个。因此,可以把partitioner看作reduce的输入分片。到了reduce阶段就是合并map输出文件,partitioner会找到对应的map输出文件,然后进行复制操作,复制操作时reduce会开启几个复制线程,这些线程默认个数是5个(也可以在配置文件中更改复制线程的个数),这个复制过程和map写出磁盘的过程类似,也有阀值和内存大小,阀值一样可以在配置文件里配置,而内存大小是直接使用reduce的tasktracker的内存大小,复制的时候reduce还会进行排序操作和合并文件操作,这些操作完毕之后就会进行reduce计算。reduce阶段: 和map函数一样,是编写好的reduce函数,最终结果是存储在hdfs上的。
参考文献:
[1] MapReduce编程模型的要点: http://blog.sina.com.cn/s/blog_4a1f59bf0100tgqj.html
[2] Hadoop权威指南(第三版)
[3] Hadoop应用开发技术详解
[4] mapreduce中reducers个数设置: http://www.2cto.com/os/201312/263998.html
[5] 操作系统典型调度算法: http://see.xidian.edu.cn/cpp/html/2595.html
[6] MapReduce框架结构: http://www.cppblog.com/javenstudio/articles/43073.html
[7] MapReduce框架详解: http://www.cnblogs.com/sharpxiajun/p/3151395.html