1.map和reduce 1.1 mapReduce处理逻辑在这一系列文章的第一章中简要介绍了MapReduce的原理,现在我再重复一遍。
首先,有两个文件: word1.txt和word2.txt
其中,word1.txt的内容如下:
aaaabbbbccccddddaaaa word2.txt的内容如下:
aaaaccccddddeeeeaaaa这里的两个文件很小。 首先,假设这两个文件分别大小为64M和96M,然后需要统计文件中每个字符串的数量。 这样,MapReduce的处理流程如下。Input:最左边是输入过程,输入的3358 www.Sina.com/: MapReduce根据输入的文件计算拼贴。 每个分片对应一个映射任务。 另一方面,切片的过程与HDFS密切相关。 例如,HDFS的一个块大小为64M,我们输入的两个文件为64M,96M。 这样,第一个文件将生成64M片,第二个文件将生成64M片和32M片(那么,该文件将生成单独的10M分片)。 (Split分片:map阶段由程序员通过代码控制。 图中所示的是将字符串分割后作为键存储在map中,值的位置表示为1,数量。Map:在洗牌阶段,由于在先前生成的map中存在多个具有相同密钥的map,因此在洗牌阶段合并相同密钥。 http://www.Sina.com/:即使在reduce阶段,也由开发人员通过代码控制。 在此示例中,密钥将相同map的值相加以获得最终map
这样最后输出的数据是每个字符串出现的次数。
1.2 Hadoop数据类型Hadoop本身提供了优化网络串行化传输的基本类型
类型语义BooleanWritable标准布尔型数字ByteWritable字节数字DoubleWritable字节数字FloatWritable浮点数IntWritable整数LongWritable长整数textUTF8格式存储的文本null 使用1.3映射器映射器类是通用类,四个参数分别指定映射函数的输入键、输入值、输出键和输出值
Mapper类有四种方法:
shuffle洗牌方法在任务开始时调用一次,通常用于map前的准备工作。Reduce承担主要处理,将输入数据分成键-值对。setup方法在任务结束时被调用一次,主要负责结束工作。map方法确定了setup-map-cleanup的运行模板。
map ) )方法的输入是键和值,输出是Context实例。
了解到这里之后,结合代码详细了解一下Mapper吧。
1.4 Reducer Reducer类也是与Mapper类似的通用类,四个参数分别指定map函数的输入键、输入值、输出键和输出值
Reducer类也有四个方法。
cleanup方法在任务开始时被调用一次,用于reduce前的准备工作。run承担主要处理,将输入数据分成键-值对。setup方法在任务结束时被调用一次,主要负责结束工作。reduce方法确定了setup-reduce-cleanup的运行模板。
cleanup
2 .代码分析接下来看看上一篇文章中使用的测试代码:
import java.io.IOException; import Java.util.string tokenizer; importorg.Apache.Hadoop.conf.configuration; importorg.Apache.Hadoop.fs.path; importorg.Apache.Hadoop.io.int writable; importorg.Apache.Hadoop.io.text; importorg.Apache.Hadoop.MapReduce.job; importorg.Apache.Hadoop.MapReduce.mapper; importorg.Apache.Hadoop.MapReduce.reducer; importorg.Apache.Hadoop.MapReduce.lib.input.fileinputformat; importorg.Apache.Hadoop.MapReduce.lib.output.fileoutputformat; import org.a
pache.hadoop.util.GenericOptionsParser;public class WordCount { //继承mapper接口,设置map的输入类型为<Object,Text> //输出类型为<Text,IntWritable> public static class Map extends Mapper<Object,Text,Text,IntWritable>{ //one表示单词出现一次 private static IntWritable one = new IntWritable(1); //word存储切下的单词 private Text word = new Text(); public void map(Object key,Text value,Context context) throws IOException,InterruptedException{ //对输入的行切词 StringTokenizer st = new StringTokenizer(value.toString()); while(st.hasMoreTokens()){ word.set(st.nextToken());//切下的单词存入word context.write(word, one); } } } //继承reducer接口,设置reduce的输入类型<Text,IntWritable> //输出类型为<Text,IntWritable> public static class Reduce extends Reducer<Text,IntWritable,Text,IntWritable>{ //result记录单词的频数 private static IntWritable result = new IntWritable(); public void reduce(Text key,Iterable<IntWritable> values,Context context) throws IOException,InterruptedException{ int sum = 0; //对获取的<key,value-list>计算value的和 for(IntWritable val:values){ sum += val.get(); } //将频数设置到result result.set(sum); //收集结果 context.write(key, result); } } /** * @param args */ public static void main(String[] args) throws Exception{ // TODO Auto-generated method stub Configuration conf = new Configuration(); conf.set("mapred.job.tracker", "localhost:9001"); args = new String[]{"hdfs://localhost:9000/user/hadoop/input/count_in","hdfs://localhost:9000/user/hadoop/output/count_out"}; //检查运行命令 String[] otherArgs = new GenericOptionsParser(conf,args).getRemainingArgs(); if(otherArgs.length != 2){ System.err.println("Usage WordCount <int> <out>"); System.exit(2); } //配置作业名 Job job = new Job(conf,"word count"); //配置作业各个类 job.setJarByClass(WordCount.class); job.setMapperClass(Map.class); job.setCombinerClass(Reduce.class); job.setReducerClass(Reduce.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); }}WordCount类可以分为三部分,Map,Reduce和main三部分,Map和Reduce都是静态内部类。
Map类继承与Mapper类,四个参数表示其输入键类型为Object,输入值为文本,输出键为文本,输出值为整型数。
通过执行Map操作后,我们希望得到的结果是图1中第三列mapping列的值,即将数据拆分后存储到map中,每个字符串的数量均存储为1.
在代码中定义了一个整型类型的变量one,值为1,用来作为map的值。
map方法的前两个参数分别为输入的键和值,通过下面的代码先将text格式的字段转为java的String类型。
StringTokenizer 根据自定义字符为分界符对字符串进行拆分并将结果集封装提供对应的遍历方法,有如下构造方法:
str为要拆分的字符串,delim为界定符,当不指定delim时,将默认以空格进行拆分。
有如下方法:
其中hasMoreTokens方法用来判断是否还有分隔符。
使用context的write方法将数据进行记录。
Reduce类继承于Reducer类,Reducer类是一个泛型类,四个参数分别表示输入键,输入值,输出键,输出值。其中输入键和输入值与Map类的输出键,输出值保持一致。
当数据到达reduce时,数据已经经过了洗牌,即键相同的数据进行了合并,所以reduce方法的key为键,values是一个迭代器,存储着该键对应的所有值,然后在方法体中对该键对应的值得数量进行了统计。
如果我们在map方法中分别写一句System.out.println(“map”)和System.out.println(“reduce”),就会发现map方法和reduce方法都不止被执行了一次。
main方法来控制任务的执行。
要知道,使用MapReduce框架时,我们仅仅只是填写map和reduce部分的代码,其他的都交给mapreduce框架来处理,所以我们至少需要告诉mapreduce框架应该怎么执行,main方法中的代码做的就是这个操作。
首先我们需要初始化Configuration类,使用MapReduce之前一定要初始化Configuration,该类主要用来读取hdfs和Mapreduce的配置信息。
args设置输入文件和输出文件的位置,这里指向hdfs,输出文件的文件夹可以不存在,运行后会在指定目录下自动生成,输出文件一定不能存在,在运行前要将上一次运行生成的输出文件删除掉。
在上面的代码中我们是通过下面的代码来配置的:
我们也可以将该信息添加到xml文件中来配置,如下图:
代码修改为:
接下来的if部分用来判断是否有两个参数都指定了。
再往下就是配置作业。首先创建一个Job类,然后装载需要的各个类,从上到下分别为:程序类(我们编写的java文件的类名,这里是WordCount),Mapper类(继承了Mapper类的内部类,这里是Map),
Combiner和Reducer类都指向继承于Reducer的内部类Reduce.
(需要特别注意的是,Combiner并非一定要指向Reducer类,有时候也可以不指定,有时候不能指向Reducer而是需要单独写Combiner,只是这里指向Reducer而已)
再往下两行:
指定了输出数据的键和值的类型,也是数据存储到hdfs结果文件中的类型。
下面的代码用来创建输入文件和输出文件:
最后一行代码表示执行成功后退出程序。