首页 > 编程知识 正文

mysql面试题sql语句(spark读取word文档)

时间:2023-05-06 09:48:19 阅读:87958 作者:3071

Java 版本WordCount

项目目录结构为以下:

在项目目录data下创建统计词数的文件Words.txt

新的Java版WordCount程序importorg.Apache.spark.spark conf;

importorg.Apache.spark.API.Java.javapairrdd;

importorg.Apache.spark.API .日本航空.日本航空;

importorg.Apache.spark.API.Java.javasparkcontext;

importorg.Apache.spark.API.Java.function .平板电脑;

importorg.Apache.spark.API.Java .功能2;

importorg.Apache.spark.API.Java .功能.伙伴功能;

importorg.Apache.spark.API.Java .功能.语音功能;

导入比例2;

导入Java.util .阵列;

导入Java.util .迭代器;

公共类Java word计数

publicstaticvoidmain (字符串[ ]数组) {

//*

* SparkConf创建spark APP应用程序的配置

*

* setAppName:用于设置APP的名称。 在Spark集群中运行的程序可以在标准、yarn、mesos等资源管理器的UI中查看

*

* setMaster:用于设置APP应用程序的本地执行模式

* 1) local : spark APP应用的本地执行模式。 如果在local之后未指定参数,则缺省情况下将使用一个core执行spark APP

* local[n]:指定使用n个酷睿来运行spark APP应用程序

* local[*]:使用本地计算机上的所有酷睿执行spark APP

* * *这里的核心支持计算机线程。 如果计算机是四核八线程,则计算机总共可以提供八个内核来执行spark APP

如果不安装setMaster,将来程序将被打包并公开到集群上执行。 集群主要有单机、雅安、消息

* 2) standalone: Spark附带的资源管理器、主节点为Master、从节点为Worker

* 3) yarn:基于Hadoop yarn资源管理器运行Spark程序。 主节点为资源管理器,从节点为节点管理器

* 4) mesos:一般多在海外使用,在此不做详细说明

*

*/

sparkconfconf=newsparkconf (.set master (' local ).setappname ) ) Javawordcount );

//*

* JavaSparkContext:是到创建JavaSparkContext的spark APP应用程序群集的唯一通道

* * *在* JavaSparkContext的基本级别,缺省情况下将创建两个对象: Dag调度程序和任务调度程序

* DAGScheduler:根据切割作业将spark APP应用程序中的RDD宽度划分为后台文件,每个后台文件封装到TaskSet中并提交给TaskScheduler

*

* TaskScheduler:负责从TaskSet并行遍历每个Task,发送到工作节点内的执行程序内的读取池执行,监视Task执行,回收结果。

*/

javasparkcontextjsc=newjavasparkcontext (conf;

//设定日志级别

设置日志级别(' warn );

//*

通过调用Hadoop下文件的读取方法来剪切文件,然后创建RDD。

*/

JavaRDD<String> lines = jsc.textFile("./data/words.txt"); /** * flatMap: 功能是将RDD中的partition中的一行行的数据依照空格切分压平 * 它是一个lazy的Transformation算子,懒执行,需要由action算子触发执行 * flatMap算子是1对多,进去一行行的文本,出来一个个的单词 * flatMap内部的逻辑是在Executor中执行 */ JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() { @Override public Iterator<String> call(String line) throws Exception { return Arrays.asList(line.split(" ")).iterator(); } }); /** * mapToPair: 将一个个的单词的JavaRDD<String>转换成<K,V>格式的JavaPairRDD<K,V> * 期中,K为单词, V为1 * * mapToPair与Scala中的map功能类似,是一个懒执行算子,一对一的操作,进去一个单词word, 出来一个键值对(word,1) */ JavaPairRDD<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() { @Override public Tuple2<String, Integer> call(String word) throws Exception { return new Tuple2<>(word, 1); } }); /** * reduceByKey: 对上步RDD的partition中的K,V格式的RDD先分组再聚合, 是懒执行算子 * * reduceByKey是一个shuffle类的算子,如果数据分布在多台节点上,会进行局部分组聚合,再全局聚合, 对于跨节点的数据会落地磁盘小文件 * 基处理也要经过mapper和reducer两个阶段 * * 假如数据有两个分区,分别在node1节点和node2节点上,那么: * * ===================>mapper=========================>shuffle=================>reducer========================= * node1节点: * (知性的冬天,1) * (知性的冬天,1) * (知性的冬天,1) * (知性的冬天,1) 局部分组聚合=> (知性的冬天,4) => 写入磁盘落地 (知性的冬天,6) * (天龙八部,1) (天龙八部,4) * (天龙八部,1) * (天龙八部,1) * (天龙八部,1) => 全局聚合 * node2节点: (天龙八部,6) * (知性的冬天,1) (jdd,2) * (知性的冬天,1) * (天龙八部,1) 局部分组聚合=> (知性的冬天,2) => 写入磁盘落地 * (天龙八部,1) (天龙八部,2) * (jdd,1) (jdd,2) * (jdd,1) * */ JavaPairRDD<String, Integer> reduces = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer v1, Integer v2) throws Exception { return v1 + v2; } }); /** * 由于Spark JAVA的API中没有提供sortBy()方法,只提供了sortByKey(), 所以要进行排序,需要经历以下步骤: * * 1. 调用mapToPair, 把原来的RDD从<K, V>格式转换成<V,K> * 2. 调用sortByKey() * 3. 排好序后,再换回原来K,V */ JavaPairRDD<Integer, String> swaps = reduces.mapToPair(new PairFunction<Tuple2<String, Integer>, Integer, String>() { @Override public Tuple2<Integer, String> call(Tuple2<String, Integer> tuple) throws Exception { return tuple.swap(); } }); JavaPairRDD<Integer, String> sorted = swaps.sortByKey(false); JavaPairRDD<String, Integer> results = sorted.mapToPair(new PairFunction<Tuple2<Integer, String>, String, Integer>() { @Override public Tuple2<String, Integer> call(Tuple2<Integer, String> tuple) throws Exception { return tuple.swap(); } }); /** * foreach: 是一个action算子,触发执行 * * Spark应用程序中有几个action算子就有几个job */ results.foreach(new VoidFunction<Tuple2<String, Integer>>() { @Override public void call(Tuple2<String, Integer> tuple) throws Exception { System.out.println("单词:"+tuple._1+" 个数:"+tuple._2); } }); //释放资源 jsc.stop(); } } 关键代码说明flatMap()

mapToPair()

reduceByKey()

版权声明:该文观点仅代表作者本人。处理文章:请发送邮件至 三1五14八八95#扣扣.com 举报,一经查实,本站将立刻删除。