首页 > 编程知识 正文

mapreduce处理数据,mapreduce工作原理简述

时间:2023-05-05 10:30:02 阅读:61224 作者:4699

使用MapReduce统计每个单词出现的个数

1.pom文件repositoriesrepositoryidcloudera/id URL https://repository.cloud era.com/arti factory/cloud era-repos///URL/re epos repositoriesdependenciesdependencygroupidorg.Apache.Hadoop/groupidartifactidhadoop-client/artifactidversion2.6.0ependencydependencygroupidorg.Apache.Hadoop/groupidartifactidhadoop-common/artifactidversion2.6.0- CDH 5.14.ependencydependencygroupidorg.Apache.Hadoop/groupidartifactidhadoop-HDFS/artifactidversion2.6.0- CDH5. 14.0 ependencydependencygroupidorg.Apache.Hadoop/groupidartifactidhadoop-mapreducy artifactidversion2. artifact dependencydependencygroupidjunit/groupidartifactidjunit/artifactidversion 4.11/versionsconscopopetion 从属关系组从属关系测试ng/artifactidversionrelease/version /从属关系/dependenciesbuildpluginsplugingroupidorg.Apache groupidartifactidmaven-compiler-plugin/artifactidversion 3.0/versioncon

<groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>2.4.3</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <minimizeJar>true</minimizeJar> </configuration> </execution> </executions> </plugin> </plugins> </build>

2.编写mapper类

import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;/** * Mapper<KEYIN,VALUEIN,KEYOUT,VALUEOUTT> * KEYIN:表示mapper数据输入的时候key的数据类型,在默认得读取数据组建下,叫InputFormat,它的行为是一行一行的读取待处理的数据 * 读取一行,返回一行给我们的mr程序,这种情况下 keyin就表示每一行的其实偏移量 因此数据类型是Long * * VALUEIN:表述mapper数据输入的时候value的数据类型,在默认得读取数据组件下 valuein就表示读取的这一行内容 因此数据类型是String * * KEYOUT:表示mapper数据输出的时候key 的数据类型 在本案例当中 输出大的key是单词 因此数据类型是 String * * VALUEOUT:拜师mapper数据输出的时候key的数据类型 在本案例当中 输出的key是单词的次数 因此数据类型是Integer * * 这里所说的数据类型STring Long都是jdk自带的类型 在序列化的时候 效率低下 因此Hadoop自己封装一台数据类型 * long-->LongWritable * String-->Test * Integer-->Intwritable * null-->NullWritable */public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> { //keyin: LongWritable valuein: Text keyout: Text valueout: IntWritable //map方法的生命周期:框架每传一行数据就被调用一次 //key:这一昂的起始点在文件中的偏移量 @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //拿到一行数据转换为string String line = value.toString(); //将这一行切分出各个单词 String[] words = line.split(" "); //遍历数组,输出<单词,1> for (String word : words) { //使用mr程序的上下文context 把mapper阶段处理的数据发送出去 //作为reduce节点的输入数据 context.write(new Text(word),new IntWritable(1)); } }}

3.编写Reducer类

import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class WordCountReducer extends Reducer<Text,IntWritable,Text,IntWritable> { @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int count = 0; for (IntWritable value:values) { count += value.get(); } context.write(key,new IntWritable(count)); }}

4.编写加载主类

import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;public class WordCountRunner extends Configured{ //把业务逻辑相关的信息(哪个是mapper,哪个是reducer,要处理的数据在哪里,输出的结果放哪里……)描述成一个job对象 //把这个描述好的job提交给集群去运行 public static void main(String[] args) throws Exception { //实例job对象 Job job = Job.getInstance(new Configuration(), "wordcount"); //指定主类 job.setJarByClass(WordCountReducer.class); //指定map类 job.setMapperClass(WordCountMapper.class); //指定reduce类 job.setReducerClass(WordCountReducer.class); //指定map输出key/value类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class);// job.setOutputKeyClass(Text.class);// job.setOutputValueClass(IntWritable.class); //设置读取数据的路径// job.setInputFormatClass(TextInputFormat.class); TextInputFormat.addInputPath(job, new Path("/input/")); //设置数据的输出路径// job.setOutputFormatClass(TextOutputFormat.class); TextOutputFormat.setOutputPath(job, new Path("/output/")); //提交 job.waitForCompletion(true); }}

4.打jar包

在集群上创建指定的读取目录及文件

5.执行hadoop jar task20191111-1.0-SNAPSHOT.jar com.czxy.WordCountRunner  (hadoop jar jar包名  主类报名和主类名)

6.查看结果

版权声明:该文观点仅代表作者本人。处理文章:请发送邮件至 三1五14八八95#扣扣.com 举报,一经查实,本站将立刻删除。