本文将会从不同角度详细阐述Flink与Spark Streaming的区别,并给出相应代码实例。
一、数据处理模型
Spark Streaming采用微批处理模型,即连续不断地将一定时间内的数据收集起来进行处理,而Flink采用的则是流处理模型,即数据一条一条地流入,立即开始处理。
相应代码如下:
// Spark Streaming数据处理模型
val ssc = new StreamingContext(sparkConf, Seconds(1))
val lines = ssc.socketTextStream("localhost", 9999)
val words = lines.flatMap(_.split(" "))
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
wordCounts.print()
// Flink数据处理模型
val env = StreamExecutionEnvironment.getExecutionEnvironment()
val text = env.socketTextStream("localhost", 9999)
val counts = text.flatMap { _.toLowerCase.split("\W+") filter { _.nonEmpty } }
.map { (_, 1) }
.keyBy(0)
.sum(1)
counts.print()
二、容错机制
Spark Streaming的容错机制是通过在Driver节点上记录已经处理的批次数据的元数据来实现的,一旦出现故障,Spark Streaming可以根据这些元数据来恢复数据处理。
而Flink采用的是分布式快照机制来实现容错,每个算子的状态都会被周期性地快照并保存下来,一旦出现故障,Flink会根据已经保存的状态来恢复数据处理。
相应代码如下:
// Spark Streaming容错
ssc.checkpoint("checkpoint")
...
val ssc = StreamingContext.getOrCreate("checkpoint", creatingFunc)
// Flink容错
env.enableCheckpointing(5000)
env.setStateBackend(new FsStateBackend("hdfs://localhost:9000/flink/checkpoints"))
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)
env.getCheckpointConfig.setCheckpointTimeout(60000)
...
env.execute("Flink Streaming Example")
三、状态管理
Flink提供了丰富的状态管理机制,包括内存状态、RocksDB状态和托管状态等,可以满足不同场景下的需求;而Spark Streaming虽然也支持状态管理,但支持的类型较为有限。
相应代码如下:
// Flink状态管理
val counts = text.flatMap { _.split("\W+") filter { _.nonEmpty } }
.map { (_, 1) }
.keyBy(0)
.flatMap(new CountWindowAverage())
.keyBy(0)
val stateBackend = new RocksDBStateBackend("hdfs://localhost:9000/flink/rocksdb")
env.setStateBackend(stateBackend)
...
class CountWindowAverage extends RichFlatMapFunction[(String, Long), (String, Long)] {
private var sum: ValueState[(Long, Long)] = _
override def open(parameters: Configuration): Unit = {
val stateDescriptor = new ValueStateDescriptor[(Long, Long)]("average", createTypeInformation[(Long, Long)])
sum = getRuntimeContext.getState(stateDescriptor)
}
override def flatMap(input: (String, Long), out: Collector[(String, Long)]): Unit = {
val tmpCurrentSum = sum.value() match {
case null => (0L, 0L)
case (count, sum) => (count, sum)
}
val currentSum = (tmpCurrentSum._1 + 1, tmpCurrentSum._2 + input._2)
sum.update(currentSum)
if (currentSum._1 >= 2) {
out.collect((input._1, currentSum._2 / currentSum._1))
sum.clear()
}
}
}
// Spark Streaming状态管理
val ssc = new StreamingContext(sparkConf, Seconds(1))
ssc.checkpoint("hdfs://localhost:9000/spark/checkpoints")
...
val updateState = (newValues: Seq[Int], currentState: Option[Int]) => {
val currentCount = currentState.getOrElse(0)
val newCount = newValues.sum + currentCount
Some(newCount)
}
....
val stateSpec = StateSpec.function(updateState)
.numPartitions(10)
.timeout(Seconds(30))
val stateDStream: MapWithStateDStream[String, Int, Int, (String, Int)] = words.mapWithState(stateSpec)
四、窗口处理
Spark Streaming提供了丰富的窗口API,包括滑动窗口、计数窗口和时间窗口,并可以进行连续的滑动操作;而Flink也提供了类似的窗口API,并可以支持Lagging和Session窗口。
相应代码如下:
// Spark Streaming窗口处理
val windowedStream = pairs.reduceByKeyAndWindow(_ + _, _ - _, Seconds(30), Seconds(10))
windowedStream.print()
// Flink窗口处理
val windowedStream = text.flatMap { _.split("\W+") filter { _.nonEmpty } }
.keyBy(0)
.timeWindow(Time.seconds(5))
counts.sum(1)
windowedStream.print()
五、附加功能
Flink在计算机视觉和机器学习等领域得到了广泛应用,而Spark Streaming则可以很容易地与大数据生态系统中的其他组件进行集成,例如Kafka、HBase和Cassandra等。
相应代码如下:
// Flink计算机视觉
val inputStream = env.addSource(new VideoStreamSource("rtmp://localhost:1935/live/test"))
val dataStream = inputStream.map(frame => Image(frame))
.flatMap(new ObjectDetection())
dataStream.print()
// Spark Streaming集成
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)
stream.map(record => (record.key, record.value))
.foreachRDD { rdd =>
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
...
}
...
val conf = HBaseConfiguration.create()
conf.set("hbase.zookeeper.quorum", "localhost")
conf.set("hbase.zookeeper.property.clientPort", "2181")
conf.set("zookeeper.znode.parent", "/hbase")
val job = HadoopJob.getInstance(conf)
...
cassandraDF.write.format("org.apache.spark.sql.cassandra").options(Map("table" -> "words", "keyspace" -> "test")).save()