首页 > 编程知识 正文

flink 详解,flink怎么使用

时间:2023-05-03 21:21:23 阅读:152243 作者:631

定制源

只需传递一个SourceFunction

val stream4=env.addsource (newmysensource ) )

复制代码

例:随机生成传感器数据

只需通过生成随机数据来组装传感器数据

变换

转换运算符

valstreamMap=stream.map{x=x*2}

复制代码

平面图

A、

flat map (list (1,2,3 ) )的结果是list (I ) 1、1、2、2、3、3 ) )

b、

List(a(ab )、cd ).flat map (line-line.split ) ) ) )结果为list (a、b、c、c ) )

复制代码

代码:

valstreamflatmap=stream.flat map {

x=x.split (' ) )

}

复制代码

菲尔斯特

valstreamFilter=stream.filter{

x=x==1

}

复制代码

KeyBy

DataStream、KeyedStream逻辑上将1个流分割为不交叉的分区

每个分区中包含相同key的元素在内部实现为hash

复制代码

代码:

valaggstream=datastream.keyby (' id ) ) ) ) ) ) ) ) ) )。

复制代码

角色聚合器(Rolling Aggregation ) )。

汇总keyedStream的每个支流

sum () )。

最小() )。

max () )

minBy () )

最大by () )。

复制代码

读我

源代码

3359 gitee.com/pingfanrenbiji/flink-userbehavioranalysis/blob/master/flink tutorial/src/main/Scala/com/com

复制代码

KeyedStream、DataStream的一个包数据流的聚合操作

将当前元素与最后聚合的结果合并以生成新值

返回的流包含每个聚合的结果

不仅仅返回最后聚合的最终结果

复制代码

流剥离

流选择

连接流

连接保持类型的两个数据流的两个数据流只是被连接后放在同一个流中,内部仍然独立,不改变各自的数据和格式

复制代码

CoMap,CoFlatMap

作用于连接流的功能与地图和平板地图相同

针对连接流的各流,分别进行映射和平面映射的处理

复制代码

Union

对两个或多个DataStream执行union操作将生成包含所有DataStram元素的新DataStream

复制代码

连接和union的区别

1、union前两个流的类型必须相同。 connect可以不同。 在之后的coMap中调整为相同

2、Connect只能操作两个流Union,可以进行多个操作

复制代码

支持的数据类型

Flink流APP应用程序用一个数据对象表示事件流

数据对象必须序列化和反序列化

它可以通过网络传输,也可以从状态后端、检查点和保存点读取

复制代码

Flink使用类型信息的概念来表示数据类型

为每种数据类型生成特定的串行化器、解串行化器和比较器

复制代码

Flink提供了类型提取系统。 此系统分析函数的输入和返回类型

自动获取类型信息以获取串行化器和解串行化器

复制代码

lamdba 函数或泛型类型 需要显示的提供类型信息 才能使得应用程序正常工作或提高性能

复制代码

基础数据类型

Flink支持Java和Scala中所常见的数据类型

复制代码

Long

val numbers: DataStream[Long] = env.fromElements(1L, 2L, 3L, 4L)

numbers.map( n => n + 1 )

复制代码

元组

val persons: DataStream[(String, Integer)] = env.fromElements(

("Adam", 17),

("Sarah", 23) )

persons.filter(p => p._2 > 18)

复制代码

样例类

case class Person(name: String, age: Int)

val persons: DataStream[Person] = env.fromElements(

Person("Adam", 17),

Person("Sarah", 23) )

persons.filter(p => p.age > 18)

复制代码

简单类

public class Person {

public String name;

public int age;

public Person() {}

public Person(String name, int age) {

this.name = name;

this.age = age;

}

}

DataStream persons = env.fromElements(

new Person("wldll", 42),

new Person("娇气的柜子", 23));

复制代码

Flink 对 Java 和 Scala(ArrayList,HashMap,Enum等)也支持

实现UDF函数-更细粒度的控制流

函数类(Function classes)

Flink暴露了所有udf函数的接口(实现方式为接口或抽象类)

例如:

MapFunction、FilterFunction、ProcessFunction

复制代码

实现FilterFunction接口

class FilterFilter extends FilterFunction[String] {

override def filter(value: String): Boolean = {

value.contains("flink")

}

}

val flinkTweets = tweets.filter(new FlinkFilter)

复制代码

将函数实现成匿名类

val flinkTweets = tweets.filter(

new RichFilterFunction[String] {

override def filter(value: String): Boolean = {

value.contains("flink")

}

}

)

复制代码

字符串作为参数传进去

val tweets: DataStream[String] = ...

val flinkTweets = tweets.filter(new KeywordFilter("flink"))

class KeywordFilter(keyWord: String) extends FilterFunction[String] {

override def filter(value: String): Boolean = {

value.contains(keyWord)

}

}

复制代码

匿名函数(Lamdba Functions)

val tweets: DataStream[String] = ...

val flinkTweets = tweets.filter(_.contains("flink"))

复制代码

// _ .id 代表 data => data.id

dataStream.filter( _.id.statWith("sensor_1")).print

复制代码

副函数

函数类接口 所有的Flink函数类都有Rich版本

与常规函数区别:

可以获取运行时上下文

并拥有一些生命周期方法

可以实现更加复杂的功能

a、 RichMapFunction

b、 RichFlatMapFunction

c、 RichFilterFunction

生命周期:

a、open() 是 rich fuction初始化方法

当一个算子例如map或filter被调用之前 open会被调用

b、close方法 是生命周期中的最后一个调用方法 做一些清理工作

c、getRuntimeContext方法 提供了函数的RuntimeContext的一些信息

例如函数的执行并行度、任务的名字以及state状态

复制代码

代码

class MyFlatMap extends RichFlatMapFunction[Int, (Int, Int)] {

var subTaskIndex = 0

override def open(configuration: Configuration): Unit = {

subTaskIndex = getRuntimeContext.getIndexOfThisSubtask

//  以下可以做一些初始化工作 , 例如建立一个和 HDFS 的连接

}

override def flatMap(in: Int, out: Collector[(Int, Int)]): Unit = {

if (in % 2 == subTaskIndex) {

out.collect((subTaskIndex, in))

}

}

override def close(): Unit = {

//  以下做一些清理工作,例如断开和 HDFS 的连接。

}

}

复制代码

版权声明:该文观点仅代表作者本人。处理文章:请发送邮件至 三1五14八八95#扣扣.com 举报,一经查实,本站将立刻删除。