首页 > 编程知识 正文

rebalancing什么意思,rebalance机制

时间:2023-05-05 03:53:17 阅读:240237 作者:1775

上篇:project的使用

rebalance

the output elements are distributed evenly to instances of the next operation in a round-robin fashion

按照round-robin的方式,决定上游算子的某个并发的数据发往下游的哪个并发。该方法可以保证从上游算子到下游算子的数据是绝对均匀发送的。但是不同并发之间的数据交互存在网络传输开销。

适用于

上下游算子并发度不一致存在数据倾斜

来一个实例,直接上代码:

package cn._51doit.flink.day03;import org.apache.flink.api.common.functions.RichMapFunction;import org.apache.flink.configuration.Configuration;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.datastream.DataStreamSink;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;/** * rebalance的使用:无界流 */public class RebalancecingPartitioning { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration()); DataStreamSource<String> words = env.socketTextStream("Slave01", 8888); SingleOutputStreamOperator<String> mapDataStream = words.map(new RichMapFunction<String, String>() { @Override public String map(String value) throws Exception { int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask(); return value + ":" + indexOfThisSubtask; } }).setParallelism(1); //设置并行度为1时 //使用轮询方式将数据发送小下游 DataStream<String> rebalanced = mapDataStream.rebalance(); DataStreamSink<String> stringDataStreamSink = rebalanced.addSink(new RichSinkFunction<String>() { @Override public void invoke(String value, Context context) throws Exception { int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask(); System.out.println(value + " ->" + indexOfThisSubtask); } }); env.execute(); }}

运行程序后,再去访问flink web ui页面发现:http://localhost:8081/#/job/bb53e5860410f20d8f6b5d55a86e5acb/overview

若设置并行度为1时,发现出现轮询操作,比如:在nc -lk的8888端口的窗口,发送了8次输入内容为:“abc”【设置并行度的情况下】

以上的结果发现:分区的结果只有一个分区,而发送给小下游的数据的轮询操作

若不设置并行度的情况下,我们再访问flink web ui页面发现:

http://localhost:8081/#/job/d8b79937c9d58aa6f2c022d7bc49b193/overview

发现有3个take,并行度度分别是Source【1】、Map【4】、Sink【4】

若在不设置并行度的情况下,将会在nc -lk的8888端口的窗口,发送了12次输入内容为:“abc”, 控制台打印输出的结果如下:

打印输出信息:

abc:0 ->2abc:1 ->3abc:2 ->2abc:3 ->3abc:0 ->3abc:1 ->0abc:2 ->3abc:3 ->0abc:0 ->0abc:1 ->1abc:2 ->0abc:3 ->1

分享完毕,谢谢! 

版权声明:该文观点仅代表作者本人。处理文章:请发送邮件至 三1五14八八95#扣扣.com 举报,一经查实,本站将立刻删除。