Stephan Ewen created FLINK-5582: ----------------------------------- Summary: Add a general distributive aggregate function Key: FLINK-5582 URL: https://issues.apache.org/jira/browse/FLINK-5582 Project: Flink Issue Type: New Feature Components: Streaming Reporter: Stephan Ewen Assignee: Stephan Ewen Fix For: 1.3.0
The {{DataStream}} API currently has two aggregation functions that can be used on windows and in state, both of which have limitations: - {{ReduceFunction}} only supports one type as the type that is added and aggregated/returned. - {{FoldFunction}} Supports different types to add and return, but is not distributive, i.e. it cannot be used for hierarchical aggregation, for example to split the aggregation into to pre- and final-aggregation. I suggest to add a generic and powerful aggregation function that supports: - Different types to add, accumulate, and return - The ability to merge partial aggregated by merging the accumulated type. The proposed interface is below. This type of interface is found in many APIs, like that of various databases, and also in Apache Beam: - The accumulator is the state of the running aggregate - Accumulators can be merged - Values are added to the accumulator - Getting the result from the accumulator perform an optional finalizing operation {code} public interface AggregateFunction<IN, ACC, OUT> extends Function { ACC createAccumulator(); void add(IN value, ACC accumulator); OUT getResult(ACC accumulator); ACC merge(ACC a, ACC b); } {code} Example use: {code} public class AverageAccumulator { long count; long sum; } // implementation of a simple average public class Average implements AggregateFunction<Integer, AverageAccumulator, Double> { public AverageAccumulator createAccumulator() { return new AverageAccumulator(); } public AverageAccumulator merge(AverageAccumulator a, AverageAccumulator b) { a.count += b.count; a.sum += b.sum; return a; } public void add(Integer value, AverageAccumulator acc) { acc.sum += value; acc.count++; } public Double getResult(AverageAccumulator acc) { return acc.sum / (double) acc.count; } } // implementation of a weighted average // this reuses the same accumulator type as the aggregate function for 'average' public class WeightedAverage implements AggregateFunction<Datum, AverageAccumulator, Double> { public AverageAccumulator createAccumulator() { return new AverageAccumulator(); } public AverageAccumulator merge(AverageAccumulator a, AverageAccumulator b) { a.count += b.count; a.sum += b.sum; return a; } public void add(Datum value, AverageAccumulator acc) { acc.count += value.getWeight(); acc.sum += value.getValue(); } public Double getResult(AverageAccumulator acc) { return acc.sum / (double) acc.count; } } {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)