首页 > 编程知识 正文

聚合状态:aggregatingstate

时间:2023-11-19 16:08:38 阅读:289788 作者:LJHF

本文将详细介绍aggregatingstate的概念、实现和使用,在需要时对数据进行聚合是现代计算机系统中的一项主要任务,这可以减少计算或存储的需求,并简化和提高系统的性能。

一、什么是aggregatingstate

aggregatingstate是Apache Flink中的一个状态类型,可以将输入的多个数据聚合为一组数据,同时也可以根据所选择的函数,更新当前的聚合结果。相比于其他状态类型,aggregatingstate提供了更为灵活的聚合方式,可以使用Flink提供的内置函数或者自定义函数去实现多种聚合方式,如平均数、最大值、最小值、加和等等。

二、aggregatingstate的实现方式

在Flink中,使用aggregatingstate需要以下步骤:

1.定义状态

定义一个Tuple类型的AggregatableTuple,包含聚合的不同类型值和一个聚合函数,示例代码如下:

public static class AggregatableTuple {
    int sum1;
    int sum2;
    public int getSum1() {
        return sum1;
    }
    public void setSum1(int sum1) {
        this.sum1 = sum1;
    }
    public int getSum2() {
        return sum2;
    }
    public void setSum2(int sum2) {
        this.sum2 = sum2;
    }
    public static final AggregatingStateDescriptor AGGREGATING_STATE_DESCRIPTOR = new AggregatingStateDescriptor<>("aggState", new AggregatorFunction(), TypeInformation.of(AggregatableTuple.class));

    public static final class AggregatorFunction implements AggregateFunction {

        private static final long serialVersionUID = 1L;

        @Override
        public AggregatableTuple createAccumulator() {
            return new AggregatableTuple();
        }
        @Override
        public AggregatableTuple add(AggregatableTuple t, AggregatableTuple acc) {
            acc.setSum1(t.getSum1() + acc.getSum1());
            acc.setSum2(t.getSum2() + acc.getSum2());
            return acc;
        }
        @Override
        public AggregatableTuple getResult(AggregatableTuple acc) {
            return acc;
        }
        @Override
        public AggregatableTuple merge(AggregatableTuple a, AggregatableTuple b) {
            a.setSum1(a.getSum1() + b.getSum1());
            a.setSum2(a.getSum2() + b.getSum2());
            return a;
        }
    }
}

2.创建状态

使用AggregatableTuple.AGGREGATING_STATE_DESCRIPTOR创建aggregatingstate类型状态,示例代码如下:

    private static final AggregatingStateDescriptor AGGREGATING_STATE_DESCRIPTOR = AggregatableTuple.AGGREGATING_STATE_DESCRIPTOR;
    private AggregatingState aggregatingState;

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        aggregatingState = getRuntimeContext().getAggregatingState(AGGREGATING_STATE_DESCRIPTOR);
    }

3.使用状态

使用aggregatingstate的add方法来将输入数据合并到当前的聚合结果中,最后使用get方法获取聚合结果,示例代码如下:

    @Override
    public void processElement(Integer value, Context context, Collector collector) throws Exception {
        aggregatingState.add(value);
        AggregatableTuple result = aggregatingState.get();
        collector.collect(result.getSum1());
    }

三、如何使用aggregatingstate

下面将说明使用aggregatingstate实现平均值的方式:

1.定义状态

定义AggregatableTuple类型的变量作为状态,包含两个int型变量(sum和count)和一个聚合函数,示例代码如下:

public static class AggregatableTuple {
    int sum;
    int count;
    public int getSum() {
        return sum;
    }
    public void setSum(int sum) {
        this.sum = sum;
    }
    public int getCount() {
        return count;
    }
    public void setCount(int count) {
        this.count = count;
    }

    public static final AggregatingStateDescriptor AGGREGATING_STATE_DESCRIPTOR = new AggregatingStateDescriptor<>("aggState", new AggregatorFunction(), TypeInformation.of(AggregatableTuple.class));

    public static final class AggregatorFunction implements AggregateFunction {

        private static final long serialVersionUID = 1L;
        @Override
        public AggregatableTuple createAccumulator() {
            return new AggregatableTuple();
        }
        @Override
        public AggregatableTuple add(Integer value, AggregatableTuple accumulator) {
            accumulator.setSum(accumulator.getSum() + value);
            accumulator.setCount(accumulator.getCount() + 1);
            return accumulator;
        }
        @Override
        public AggregatableTuple getResult(AggregatableTuple accumulator) {
            return accumulator;
        }
        @Override
        public AggregatableTuple merge(AggregatableTuple a, AggregatableTuple b) {
            a.setSum(a.getSum() + b.getSum());
            a.setCount(a.getCount() + b.getCount());
            return a;
        }
    }
}

2.创建状态

在Task中使用getAggregatingState方法创建aggregatingstate类型状态,示例代码如下:

    private AggregatingState sumState;

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        sumState = getRuntimeContext().getAggregatingState(AggregatableTuple.AGGREGATING_STATE_DESCRIPTOR);
    }

3.实现平均值

在processElement方法中添加数据并更新聚合结果,最后使用get方法获取聚合结果,然后计算平均值并返回,示例代码如下:

    @Override
    public void processElement(Integer value, Context ctx, Collector> out) throws Exception {
        sumState.add(value);
        AggregatableTuple res = sumState.get();
        if (res.getCount() != 0) {
            double avg = (double) res.getSum() / res.getCount();
            out.collect(new Tuple2<>(ctx.timerService().currentProcessingTime(), avg));
        }
    }

四、总结

aggregatingstate可以将输入数据聚合成一组数据,同时可以使用Flink提供的内置函数或者自定义函数去实现多种聚合方式。通过本文的介绍,你已经掌握了aggregatingstate的概念、实现和使用方法,可以更加灵活地使用Flink实现各种不同的聚合操作。

版权声明:该文观点仅代表作者本人。处理文章:请发送邮件至 三1五14八八95#扣扣.com 举报,一经查实,本站将立刻删除。