首页 > 编程知识 正文

mapreduce计算框架(mapreduce程序实例)

时间:2023-05-03 05:19:01 阅读:74603 作者:822

MapReduce编程框架大数据学习笔记03

文章目录MapReduce编程框架1、MapReduce思想2、MapReduce编程规范和样例编写3、WordCount代码实现总结

一、MapReduce思想MapReduce思想在生活中随处可见。 我们或多或少接触过这种思想。 MapReduce思想的核心是分派,充分利用了并行处理的优点。

即使是发表论文实现分布式计算的谷歌,也只是实现了这一思想,而不是自己的原创。 MapReduce的任务流程分为两个处理阶段:

映射阶段:映射阶段的主要作用是“分钟”,即把复杂任务分解成几个“简单任务”并并行处理。

Map阶段的这些任务可以并行计算,彼此没有依赖关系。

Reduce阶段: Reduce阶段的主要作用是“匹配”,即对映射阶段的结果进行全局总结。

再次理解MapReduce的思想

二、MapReduce编程规范和样例编写2.1. Mapper类

用户自定义Mapper类以继承Hadoop的Mapper类

映射器的输入数据是KV对的格式。类型可以自定义

Map阶段的业务逻辑由Map (方法定义)

映射器的输出数据是KV对的格式。类型可以自定义

注意: map ) )方法对输入的KV对调用一次!2.2 Reducer类

继承用户定义的Reducer类Hadoop的Reducer类

Reducer的输入数据类型对应于映射器的输出数据类型(KV对)

Reducer的业务逻辑写在reduce ()方法中

Reduce ) )方法对同一k的KV配对调用执行2.3 Driver阶段

创建一个Job对象来提交YARN群集的执行,该群集将运行MapReduce程序所需的相关参数封装在输入数据路径、输出数据路径等中,也相当于YARN群集的客户端,主要作用是

三. WordCount代码实现需求

计数并输出每个单词在指定文档中出现的总次数

输入数据: wc.txt;

输出:具体步骤

按照MapReduce编程规范分别创建Mapper、Reducer、Driver。

maven项目新部署hadoop是dependenciesdependencygroupidorg.Apache.logging.log4j/groupidartifactidlog 4j-core/依赖artifactidversion的ependencydependencygroupidorg.Apache.Hadoop/groupidartifactidhadoop-common/artifactidversion 2 ependencydependencygroupidorg.Apache.Hadoop/groupidartifactidhadoop-client/artifactidversion2.9.2/version/ependencydependencygroupidorg.Apache.Hadoop/groupidartifactidhadoop-HDFS/artifactidversion2.9.2/版本/depersion --maven软件包插件----buildpluginspluginartifactidmaven-compiler-plugin/artifactidversion2.3.2/verrer 插入式插件配置描述符

escriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins></build>

注意:以上依赖第一次需要联网下载!!

添加log4j.properties log4j.rootLogger=INFO, stdoutlog4j.appender.stdout=org.apache.log4j.ConsoleAppenderlog4j.appender.stdout.layout=org.apache.log4j.PatternLayoutlog4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%nlog4j.appender.logfile=org.apache.log4j.FileAppenderlog4j.appender.logfile.File=target/spring.loglog4j.appender.logfile.layout=org.apache.log4j.PatternLayoutlog4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n

整体思路梳理(仿照源码)
Map阶段:

map()方法中把传入的数据转为String类型根据空格切分出单词输出<单词,1>

Reduce阶段:

汇总各个key(单词)的个数,遍历value数据进行累加输出key的总数

Driver:

获取配置文件对象,获取job对象实例指定程序jar的本地路径指定Mapper/Reducer类指定Mapper输出的kv数据类型指定最终输出的kv数据类型指定job处理的原始数据路径指定job输出结果路径提交作业

编写Mapper类

import java.io.IOException;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;public class WordcountMapper extends Mapper<LongWritable, Text, Text,IntWritable>{Text k = new Text();IntWritable v = new IntWritable(1);@Overrideprotected void map(LongWritable key, Text value, Context context) throwsIOException, InterruptedException {// 1 获取一行String line = value.toString();// 2 切割String[] words = line.split(" ");// 3 输出for (String word : words) {k.set(word);context.write(k, v);}}}

编写Reducer类

import java.io.IOException;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;public class WordcountReducer extends Reducer<Text, IntWritable, Text,IntWritable>{int sum;IntWritable v = new IntWritable();@Overrideprotected void reduce(Text key, Iterable<IntWritable> values,Contextcontext) throws IOException, InterruptedException {// 1 累加求和sum = 0;for (IntWritable count : values) {sum += count.get();}// 2 输出 v.set(sum);context.write(key,v);}}

编写Driver驱动类

import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class WordcountDriver {public static void main(String[] args) throws IOException,ClassNotFoundException, InterruptedException {// 1 获取配置信息以及封装任务Configuration configuration = new Configuration();Job job = Job.getInstance(configuration);// 2 设置jar加载路径job.setJarByClass(WordcountDriver.class);// 3 设置map和reduce类job.setMapperClass(WordcountMapper.class);job.setReducerClass(WordcountReducer.class);// 4 设置map输出job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(IntWritable.class);// 5 设置最终输出kv类型job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);// 6 设置输入和输出路径FileInputFormat.setInputPaths(job, new Path(args[0]));FileOutputFormat.setOutputPath(job, new Path(args[1]));// 7 提交boolean result = job.waitForCompletion(true);System.exit(result ? 0 : 1);}}

运行任务

本地模式
直接Idea中运行驱动类即可
idea运行需要传入参数:
选择editconfiguration



注意本地idea运行mr任务与集群没有任何关系,没有提交任务到yarn集群,是在本地使用多线程
方式模拟的mr的运行。

Yarn集群模式
把程序打成jar包,改名为wc.jar;上传到Hadoop集群
选择合适的Jar包

准备原始数据文件,上传到HDFS的路径,不能是本地路径,因为跨节点运行无法获取数
据!!
启动Hadoop集群(Hdfs,Yarn)
使用Hadoop 命令提交任务运行

hadoop jar wc.jar com.lagou.wordcount.WordcountDriver /user/lagou/input /user/lagou/output 总结

实现了mapreduce阶段的wordcount程序,主要了解mr的运行机制。

版权声明:该文观点仅代表作者本人。处理文章:请发送邮件至 三1五14八八95#扣扣.com 举报,一经查实,本站将立刻删除。