首页 > 编程知识 正文

大数据技术书籍推荐,大数据入门经典书籍

时间:2023-05-03 09:31:00 阅读:56675 作者:4167

先谈谈需求吧

假设以上是需要处理的数据,则需要计算每月最热的两天。

这个盒子里用了很多东西。 如果能看清传统的黑米,一定会受益匪浅

首先我们向自己提出几个问题

1 .如何划分数据,如何定义组?

考虑reduce计算的复杂性吗?

3 .可以有多个reduce吗?

4 .如何避免数据倾斜?

5 .如何自定义数据类型?

---记录特征

每年

每月

温度最高

两天

一天多项记录怎么处理?

---进一步思考

将年月分组

温度升序

key包括时间和温度!

----MR原语:同一key被分成组

使用GroupCompartor设置分组规则

---自定义数据类型Weather

包括时间

包括温度

自定义排序比较规则

---自定义组比较

年月相同且被视为相同的key

那么在reduce反复的情况下,同一年月的记录有可能是同一天,在reduce中需要判断是否是同一天

注意原始设备

---数据量很大

所有量数据可以分成最少一个月的数据量来进行判断

这样的业务场景可以有多个reduce

通过实现分区

实现一主类

package com.huawei.mr.weather; import java.io.IOException; importorg.Apache.Hadoop.conf.configuration; importorg.Apache.Hadoop.fs.path; importorg.Apache.Hadoop.io.text; importorg.Apache.Hadoop.MapReduce.job; importorg.Apache.Hadoop.MapReduce.lib.input.fileinputformat; importorg.Apache.Hadoop.MapReduce.lib.output.fileoutputformat; /** * @author Lpf. * @version创建日期: 2019年4月13日下午733604:40 */public class main class { publicstaticvoidmain [ ]。 ClassNotFoundException,InterruptedException {//输入错误回复提示if(args==null||args.length!=2) {System.out.println ('输入格式有误); System.out.println (正确格式为yarnjarweather.jar com.Huawei.Mr.weather.main class args [0] args [1] ) ); 初始化hadoop的默认配置文件,并复盖默认配置配置配置(true,如果有)//指定使用系统配置信息Jobjob=job.getinstance(conf )创建job对象的//job门户程序job.setjarbyclass (main class.class ); //job名称设置job.setjobname('Weather ' ); //指定从何处读取文件,并将输入文件从hdfs加载到jobfileinputformat.addinputpath (job,newpath ) args[0]; 将hdfs上不存在的路径指定为作业的输出路径fileoutputformat.setoutputpath (作业,新路径) args[1] ); //自主设置reduce的数量job.setnumreducetasks(2; //map输出中value的类型job.setmapoutputvalueclass (text.cclass ),该类型指定正在输出的key的类型job.setmapoutputkeyclass 设定//map内的比较器。 如果未默认设置key类型附带的比较器/**,则辅助排序*/job.setsortcomparatorclass (wether comparator.class ),因为映射中的排序与此处的排序不同//设置分区类型以防止数据倾斜的job.setpartitionerclass (weather partitioner.class ); job.setmapperclass (weather mapper.class; job.setreducerclass (weather reduce.class; job.waitforcompletion () true; }双Weather自定义密钥的实现

package com.huawei.mr.weather; import java.io.DataInput; import java.io。

DataOutput; import java.io.IOException; import org.apache.hadoop.io.WritableComparable; /** * @author Lpf. * @version 创建时间:2019年4月13日 下午8:15:26 * map中输出key的自定义 */ public class Weather implements WritableComparable<Weather> { private String year; private String month; private String day; private Integer weather; public String getYear() { return year; } public void setYear(String year) { this.year = year; } public String getMonth() { return month; } public void setMonth(String month) { this.month = month; } public String getDay() { return day; } public void setDay(String day) { this.day = day; } public Integer getWeather() { return weather; } public void setWeather(Integer weather) { this.weather = weather; } @Override public void write(DataOutput out) throws IOException { // 把封装的数据序列化之后写出去 out.writeUTF(year); out.writeUTF(month); out.writeUTF(day); out.writeInt(weather); } /* * 读写的顺序要一致 */ @Override public void readFields(DataInput in) throws IOException { // 把封装的数据序列化之后读进来 setYear(in.readUTF()); setMonth(in.readUTF()); setDay(in.readUTF()); setWeather(in.readInt()); } @Override public int compareTo(Weather that) { int result = 0; result = this.getYear().compareTo(that.getYear()); if (result == 0) { result = this.getMonth().compareTo(that.getMonth()); if (result == 0) { result = this.getDay().compareTo(that.getDay()); if (result == 0) { // 如果年月日都相同,把温度按照高到低倒序排列 result = that.getWeather().compareTo(this.getWeather()); } } } return result; } } 三 >>>自定义map中key的比较器用于排序 package com.huawei.mr.weather;import org.apache.hadoop.io.WritableComparable;import org.apache.hadoop.io.WritableComparator;/** * @author Lpf. * @version 创建时间:2019年4月13日 下午8:29:41 * map中的比较器设置 */public class WetherComparator extends WritableComparator {public WetherComparator() {super(Weather.class, true);}@Overridepublic int compare(WritableComparable a, WritableComparable b) {int result = 0;Weather wa = (Weather) a;Weather wb = (Weather) b;// 分组比较器要保证同年同月为一组 和Weather里面的排序规则不一样result = wa.getYear().compareTo(wb.getYear());if (result == 0) {result = wa.getMonth().compareTo(wb.getMonth());if (result == 0) {result = wb.getWeather().compareTo(wa.getWeather());}}return result;}}

四>>>设置分区器避免数据倾斜

package com.huawei.mr.weather;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Partitioner;/** * @author Lpf. * @version 创建时间:2019年4月13日 下午8:47:46 * 分区器,避免数据倾斜 */public class WeatherPartitioner extends Partitioner<Weather, Text> {@Overridepublic int getPartition(Weather key, Text value, int numPartitions) {String month = key.getMonth();int partitionNum = (month.hashCode() & Integer.MAX_VALUE) % numPartitions;return partitionNum;}}

五>>>map里面对每一行的处理

package com.huawei.mr.weather; import java.io.IOException; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.Calendar; import java.util.Date; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; /** * @author Lpf. * @version 创建时间:2019年4月13日 下午8:55:29 map里面的处理 */ public class WeatherMapper extends Mapper<LongWritable, Text, Weather, Text> { private SimpleDateFormat DATE_FORMAT = new SimpleDateFormat("yyyy-mm-dd"); private Weather wea = new Weather(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 每一行的数据格式为 1949-10-01 14:21:02 34c String linStr = value.toString(); // {"1949-10-01 14:21:02","34c"} String[] linStrs = linStr.split("t"); // 得到温度 int weather = Integer.parseInt(linStrs[1].substring(0, linStrs[1].length() - 1)); // 获取时间 try { Date date = DATE_FORMAT.parse(linStrs[0]); Calendar calendar = Calendar.getInstance(); calendar.setTime(date); int year = calendar.get(Calendar.YEAR); int month = calendar.get(Calendar.MONTH); int day = calendar.get(Calendar.DAY_OF_MONTH); wea.setYear(year + ""); wea.setMonth(month + ""); wea.setDay(day + ""); wea.setWeather(weather); // 把map中的值输出 context.write(wea, value); } catch (ParseException e) { e.printStackTrace(); } } }六>>>reduce里面的输出package com.huawei.mr.weather;import java.io.IOException;import java.util.Iterator;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;/** * @author Lpf. * @version 创建时间:2019年4月13日 下午8:55:35 * reduce 里面的处理 */public class WeatherReduce extends Reducer<Weather, Text, Text, NullWritable> {@Overrideprotected void reduce(Weather key, Iterable<Text> values, Context context)throws IOException, InterruptedException {Iterator<Text> iterator = values.iterator();Text text = null;String day = null;while (iterator.hasNext()) {text = iterator.next();if (day != null) {if (!day.equals(key.getDay())) {// 输出本月温度最高的第二天context.write(text, NullWritable.get());break;}} else {// 输出本月温度最高的第一天context.write(text, NullWritable.get());day = key.getDay();}}}}

年纪上来了 坐一下腰就酸的要死注释补充的不是很完整,有不明白的留言,乐意解答

版权声明:该文观点仅代表作者本人。处理文章:请发送邮件至 三1五14八八95#扣扣.com 举报,一经查实,本站将立刻删除。