定制源
只需传递一个SourceFunction
val stream4=env.addsource (newmysensource ) )
复制代码
例:随机生成传感器数据
只需通过生成随机数据来组装传感器数据
变换
转换运算符
valstreamMap=stream.map{x=x*2}
复制代码
平面图
A、
flat map (list (1,2,3 ) )的结果是list (I ) 1、1、2、2、3、3 ) )
b、
List(a(ab )、cd ).flat map (line-line.split ) ) ) )结果为list (a、b、c、c ) )
复制代码
代码:
valstreamflatmap=stream.flat map {
x=x.split (' ) )
}
复制代码
菲尔斯特
valstreamFilter=stream.filter{
x=x==1
}
复制代码
KeyBy
DataStream、KeyedStream逻辑上将1个流分割为不交叉的分区
每个分区中包含相同key的元素在内部实现为hash
复制代码
代码:
valaggstream=datastream.keyby (' id ) ) ) ) ) ) ) ) ) )。
复制代码
角色聚合器(Rolling Aggregation ) )。
汇总keyedStream的每个支流
sum () )。
最小() )。
max () )
minBy () )
最大by () )。
复制代码
读我
源代码
3359 gitee.com/pingfanrenbiji/flink-userbehavioranalysis/blob/master/flink tutorial/src/main/Scala/com/com
复制代码
KeyedStream、DataStream的一个包数据流的聚合操作
将当前元素与最后聚合的结果合并以生成新值
返回的流包含每个聚合的结果
不仅仅返回最后聚合的最终结果
复制代码
流剥离
流选择
连接流
连接保持类型的两个数据流的两个数据流只是被连接后放在同一个流中,内部仍然独立,不改变各自的数据和格式
复制代码
CoMap,CoFlatMap
作用于连接流的功能与地图和平板地图相同
针对连接流的各流,分别进行映射和平面映射的处理
复制代码
Union
对两个或多个DataStream执行union操作将生成包含所有DataStram元素的新DataStream
复制代码
连接和union的区别
1、union前两个流的类型必须相同。 connect可以不同。 在之后的coMap中调整为相同
2、Connect只能操作两个流Union,可以进行多个操作
复制代码
支持的数据类型
Flink流APP应用程序用一个数据对象表示事件流
数据对象必须序列化和反序列化
它可以通过网络传输,也可以从状态后端、检查点和保存点读取
复制代码
Flink使用类型信息的概念来表示数据类型
为每种数据类型生成特定的串行化器、解串行化器和比较器
复制代码
Flink提供了类型提取系统。 此系统分析函数的输入和返回类型
自动获取类型信息以获取串行化器和解串行化器
复制代码
lamdba 函数或泛型类型 需要显示的提供类型信息 才能使得应用程序正常工作或提高性能复制代码
基础数据类型
Flink支持Java和Scala中所常见的数据类型
复制代码
Long
val numbers: DataStream[Long] = env.fromElements(1L, 2L, 3L, 4L)
numbers.map( n => n + 1 )
复制代码
元组
val persons: DataStream[(String, Integer)] = env.fromElements(
("Adam", 17),
("Sarah", 23) )
persons.filter(p => p._2 > 18)
复制代码
样例类
case class Person(name: String, age: Int)
val persons: DataStream[Person] = env.fromElements(
Person("Adam", 17),
Person("Sarah", 23) )
persons.filter(p => p.age > 18)
复制代码
简单类
public class Person {
public String name;
public int age;
public Person() {}
public Person(String name, int age) {
this.name = name;
this.age = age;
}
}
DataStream persons = env.fromElements(
new Person("wldll", 42),
new Person("娇气的柜子", 23));
复制代码
Flink 对 Java 和 Scala(ArrayList,HashMap,Enum等)也支持
实现UDF函数-更细粒度的控制流
函数类(Function classes)
Flink暴露了所有udf函数的接口(实现方式为接口或抽象类)
例如:
MapFunction、FilterFunction、ProcessFunction
复制代码
实现FilterFunction接口
class FilterFilter extends FilterFunction[String] {
override def filter(value: String): Boolean = {
value.contains("flink")
}
}
val flinkTweets = tweets.filter(new FlinkFilter)
复制代码
将函数实现成匿名类
val flinkTweets = tweets.filter(
new RichFilterFunction[String] {
override def filter(value: String): Boolean = {
value.contains("flink")
}
}
)
复制代码
字符串作为参数传进去
val tweets: DataStream[String] = ...
val flinkTweets = tweets.filter(new KeywordFilter("flink"))
class KeywordFilter(keyWord: String) extends FilterFunction[String] {
override def filter(value: String): Boolean = {
value.contains(keyWord)
}
}
复制代码
匿名函数(Lamdba Functions)
val tweets: DataStream[String] = ...
val flinkTweets = tweets.filter(_.contains("flink"))
复制代码
// _ .id 代表 data => data.id
dataStream.filter( _.id.statWith("sensor_1")).print
复制代码
副函数
函数类接口 所有的Flink函数类都有Rich版本
与常规函数区别:
可以获取运行时上下文
并拥有一些生命周期方法
可以实现更加复杂的功能
a、 RichMapFunction
b、 RichFlatMapFunction
c、 RichFilterFunction
生命周期:
a、open() 是 rich fuction初始化方法
当一个算子例如map或filter被调用之前 open会被调用
b、close方法 是生命周期中的最后一个调用方法 做一些清理工作
c、getRuntimeContext方法 提供了函数的RuntimeContext的一些信息
例如函数的执行并行度、任务的名字以及state状态
复制代码
代码
class MyFlatMap extends RichFlatMapFunction[Int, (Int, Int)] {
var subTaskIndex = 0
override def open(configuration: Configuration): Unit = {
subTaskIndex = getRuntimeContext.getIndexOfThisSubtask
// 以下可以做一些初始化工作 , 例如建立一个和 HDFS 的连接
}
override def flatMap(in: Int, out: Collector[(Int, Int)]): Unit = {
if (in % 2 == subTaskIndex) {
out.collect((subTaskIndex, in))
}
}
override def close(): Unit = {
// 以下做一些清理工作,例如断开和 HDFS 的连接。
}
}
复制代码