上篇:project的使用
rebalancethe output elements are distributed evenly to instances of the next operation in a round-robin fashion
按照round-robin的方式,决定上游算子的某个并发的数据发往下游的哪个并发。该方法可以保证从上游算子到下游算子的数据是绝对均匀发送的。但是不同并发之间的数据交互存在网络传输开销。
适用于
上下游算子并发度不一致存在数据倾斜来一个实例,直接上代码:
package cn._51doit.flink.day03;import org.apache.flink.api.common.functions.RichMapFunction;import org.apache.flink.configuration.Configuration;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.datastream.DataStreamSink;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;/** * rebalance的使用:无界流 */public class RebalancecingPartitioning { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration()); DataStreamSource<String> words = env.socketTextStream("Slave01", 8888); SingleOutputStreamOperator<String> mapDataStream = words.map(new RichMapFunction<String, String>() { @Override public String map(String value) throws Exception { int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask(); return value + ":" + indexOfThisSubtask; } }).setParallelism(1); //设置并行度为1时 //使用轮询方式将数据发送小下游 DataStream<String> rebalanced = mapDataStream.rebalance(); DataStreamSink<String> stringDataStreamSink = rebalanced.addSink(new RichSinkFunction<String>() { @Override public void invoke(String value, Context context) throws Exception { int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask(); System.out.println(value + " ->" + indexOfThisSubtask); } }); env.execute(); }}运行程序后,再去访问flink web ui页面发现:http://localhost:8081/#/job/bb53e5860410f20d8f6b5d55a86e5acb/overview
若设置并行度为1时,发现出现轮询操作,比如:在nc -lk的8888端口的窗口,发送了8次输入内容为:“abc”【设置并行度的情况下】
以上的结果发现:分区的结果只有一个分区,而发送给小下游的数据的轮询操作
若不设置并行度的情况下,我们再访问flink web ui页面发现:
http://localhost:8081/#/job/d8b79937c9d58aa6f2c022d7bc49b193/overview
发现有3个take,并行度度分别是Source【1】、Map【4】、Sink【4】
若在不设置并行度的情况下,将会在nc -lk的8888端口的窗口,发送了12次输入内容为:“abc”, 控制台打印输出的结果如下:
打印输出信息:
abc:0 ->2abc:1 ->3abc:2 ->2abc:3 ->3abc:0 ->3abc:1 ->0abc:2 ->3abc:3 ->0abc:0 ->0abc:1 ->1abc:2 ->0abc:3 ->1
分享完毕,谢谢!