首页 > 编程知识 正文

尚硅谷大数据之flink教程.doc,尚硅谷flink文档

时间:2023-05-06 19:41:59 阅读:196032 作者:845

B站视频wordcount
入门踩个坑,先把idea和scala的源码关联好才能通过ctrl+鼠标点击进入对应源码,否则只会有简单的函数显示
首先创建好自己的maven项目,然后在src文件下新建一个文件夹叫scala并且标记为源文件目录

然后是pom.xml文件的配置,加入下列依赖和plugins之后让maven自己导入就行了

<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>org.example</groupId> <artifactId>First_JAVA</artifactId> <version>1.0-SNAPSHOT</version> <build> <plugins> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>3.4.6</version> <executions> <execution> <!--声明绑定到maven的compile阶段--> <goals> <goal>compile</goal> </goals> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-assembly-plugin</artifactId> <version>3.0.0</version> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> <executions> <execution> <!--声明绑定到maven的compile阶段--> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build> <dependencies> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-scala --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-scala_2.11</artifactId> <version>1.10.0</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_2.11</artifactId> <version>1.10.0</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka-0.11_2.11</artifactId> <version>1.10.0</version> </dependency> <dependency> <groupId>org.apache.bahir</groupId> <artifactId>flink-connector-redis_2.11</artifactId> <version>1.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-elasticsearch6_2.11</artifactId> <version>1.10.0</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.44</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-statebackend-rocksdb_2.11</artifactId> <version>1.10.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner_2.11</artifactId> <version>1.10.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner-blink_2.11</artifactId> <version>1.10.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-csv</artifactId> <version>1.10.0</version> </dependency> </dependencies></project>

然后是wordcount.scala的代码,放在新建的scala源文件目录下

package com.atguigu.wcimport org.apache.flink.api.scala._// 批处理 word countobject WordCount_1 { def main(args: Array[String]): Unit = { // 创建一个批处理的执行环境 val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment // 从文件中读取数据 val inputDataSet: DataSet[String] = env.readTextFile("D:\MyCodeforIDEA\JAVA\First_JAVA\src\main\resources\word.txt") // 基于 DataSet做转换,首先按空格分词打散,然后按照word作为key做group by val resultDataSet: AggregateDataSet[(String, Int)] = inputDataSet .flatMap(_.split(" ")) // 分词得到所有word构成的数据集 .map( (_, 1) ) // 转换成一个二元组 (word, count),来一个单词就给他附加一个1 .groupBy(0) // 以二元组中第一个元素作为 key 分组 .sum(1) // 聚合二元组中第二个元素的值 // 打印输出 resultDataSet.print() }}

存储在idea的resources下的word.txt文件,里面存放的是以空格符分割的单词字符串

hello worldhello flinkhello scalahow are youfine thank youand you

最后代码的打印输出是:

(fine,1)(flink,1)(world,1)(thank,1)(are,1)(scala,1)(you,3)(and,1)(hello,3)(how,1)

视频里提到的一个问题

import org.apache.flink.api.scala.{AggregateDataSet, DataSet, ExecutionEnvironment}和import org.apache.flink.api.scala._用上面的import会报错,而用下面这个不会报错,错误如下:Error:(11, 15) could not find implicit value for evidence parameter of type org.apache.flink.api.common.typeinfo.TypeInformation[String] .flatMap(_.split(" "))大概意思是找不到隐藏值,在flink处理底层转换的时候,所有的数据类型都要包装成TypeInfomation,那么就涉及到一个隐式转换,那么如果要引入这个隐式转换,就要使用:import org.apache.flink.api.scala._因为所有的隐式转换就在这里面


我遇到的一个问题就是老师在视频讲解的时候点进AggregateDataSet的源代码的时候,和我的好像不太一样?而且我的org.apache.flink.api.scala里好像没有scala这个Object类,就很奇怪,大家有遇到过吗?是什么问题呢?
我的图:

老师的图:

版权声明:该文观点仅代表作者本人。处理文章:请发送邮件至 三1五14八八95#扣扣.com 举报,一经查实,本站将立刻删除。