首页 > 编程知识 正文

Flink与Spark Streaming区别

时间:2023-11-19 20:20:38 阅读:289799 作者:XKKB

本文将会从不同角度详细阐述Flink与Spark Streaming的区别,并给出相应代码实例。

一、数据处理模型

Spark Streaming采用微批处理模型,即连续不断地将一定时间内的数据收集起来进行处理,而Flink采用的则是流处理模型,即数据一条一条地流入,立即开始处理。

相应代码如下:


// Spark Streaming数据处理模型
val ssc = new StreamingContext(sparkConf, Seconds(1))
val lines = ssc.socketTextStream("localhost", 9999)
val words = lines.flatMap(_.split(" "))
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
wordCounts.print()

// Flink数据处理模型
val env = StreamExecutionEnvironment.getExecutionEnvironment()
val text = env.socketTextStream("localhost", 9999)
val counts = text.flatMap { _.toLowerCase.split("\W+") filter { _.nonEmpty } }
    .map { (_, 1) }
    .keyBy(0)
    .sum(1)
counts.print()

二、容错机制

Spark Streaming的容错机制是通过在Driver节点上记录已经处理的批次数据的元数据来实现的,一旦出现故障,Spark Streaming可以根据这些元数据来恢复数据处理。

而Flink采用的是分布式快照机制来实现容错,每个算子的状态都会被周期性地快照并保存下来,一旦出现故障,Flink会根据已经保存的状态来恢复数据处理。

相应代码如下:


// Spark Streaming容错
ssc.checkpoint("checkpoint")
...
val ssc = StreamingContext.getOrCreate("checkpoint", creatingFunc)

// Flink容错
env.enableCheckpointing(5000)
env.setStateBackend(new FsStateBackend("hdfs://localhost:9000/flink/checkpoints"))
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)
env.getCheckpointConfig.setCheckpointTimeout(60000)
...
env.execute("Flink Streaming Example")

三、状态管理

Flink提供了丰富的状态管理机制,包括内存状态、RocksDB状态和托管状态等,可以满足不同场景下的需求;而Spark Streaming虽然也支持状态管理,但支持的类型较为有限。

相应代码如下:


// Flink状态管理
val counts = text.flatMap { _.split("\W+") filter { _.nonEmpty } }
    .map { (_, 1) }
    .keyBy(0)
    .flatMap(new CountWindowAverage())
    .keyBy(0)
val stateBackend = new RocksDBStateBackend("hdfs://localhost:9000/flink/rocksdb")
env.setStateBackend(stateBackend)
...
class CountWindowAverage extends RichFlatMapFunction[(String, Long), (String, Long)] {
    private var sum: ValueState[(Long, Long)] = _

    override def open(parameters: Configuration): Unit = {
        val stateDescriptor = new ValueStateDescriptor[(Long, Long)]("average", createTypeInformation[(Long, Long)])
        sum = getRuntimeContext.getState(stateDescriptor)
    }

    override def flatMap(input: (String, Long), out: Collector[(String, Long)]): Unit = {
        val tmpCurrentSum = sum.value() match {
            case null => (0L, 0L)
            case (count, sum) => (count, sum)
        }
        val currentSum = (tmpCurrentSum._1 + 1, tmpCurrentSum._2 + input._2)
        sum.update(currentSum)
        if (currentSum._1 >= 2) {
            out.collect((input._1, currentSum._2 / currentSum._1))
            sum.clear()
        }
    }
}

// Spark Streaming状态管理
val ssc = new StreamingContext(sparkConf, Seconds(1))
ssc.checkpoint("hdfs://localhost:9000/spark/checkpoints")
...
val updateState = (newValues: Seq[Int], currentState: Option[Int]) => {
    val currentCount = currentState.getOrElse(0)
    val newCount = newValues.sum + currentCount
    Some(newCount)
}
....
val stateSpec = StateSpec.function(updateState)
    .numPartitions(10)
    .timeout(Seconds(30))
val stateDStream: MapWithStateDStream[String, Int, Int, (String, Int)] = words.mapWithState(stateSpec)

四、窗口处理

Spark Streaming提供了丰富的窗口API,包括滑动窗口、计数窗口和时间窗口,并可以进行连续的滑动操作;而Flink也提供了类似的窗口API,并可以支持Lagging和Session窗口。

相应代码如下:


// Spark Streaming窗口处理
val windowedStream = pairs.reduceByKeyAndWindow(_ + _, _ - _, Seconds(30), Seconds(10))
windowedStream.print()

// Flink窗口处理
val windowedStream = text.flatMap { _.split("\W+") filter { _.nonEmpty } }
    .keyBy(0)
    .timeWindow(Time.seconds(5))
 counts.sum(1)
windowedStream.print()

五、附加功能

Flink在计算机视觉和机器学习等领域得到了广泛应用,而Spark Streaming则可以很容易地与大数据生态系统中的其他组件进行集成,例如Kafka、HBase和Cassandra等。

相应代码如下:


// Flink计算机视觉
val inputStream = env.addSource(new VideoStreamSource("rtmp://localhost:1935/live/test"))
val dataStream = inputStream.map(frame => Image(frame))
    .flatMap(new ObjectDetection())
dataStream.print()

// Spark Streaming集成
val stream = KafkaUtils.createDirectStream[String, String](
    ssc,
    PreferConsistent,
    Subscribe[String, String](topics, kafkaParams)
)
stream.map(record => (record.key, record.value))
    .foreachRDD { rdd =>
        val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
        ...
    }
...
val conf = HBaseConfiguration.create()
conf.set("hbase.zookeeper.quorum", "localhost")
conf.set("hbase.zookeeper.property.clientPort", "2181")
conf.set("zookeeper.znode.parent", "/hbase")
val job = HadoopJob.getInstance(conf)
...
cassandraDF.write.format("org.apache.spark.sql.cassandra").options(Map("table" -> "words", "keyspace" -> "test")).save()

版权声明:该文观点仅代表作者本人。处理文章:请发送邮件至 三1五14八八95#扣扣.com 举报,一经查实,本站将立刻删除。