Gabor Gevay created FLINK-4578:
----------------------------------

             Summary: AggregateOperator incorrectly sets ForwardedField with 
nested composite types
                 Key: FLINK-4578
                 URL: https://issues.apache.org/jira/browse/FLINK-4578
             Project: Flink
          Issue Type: Bug
          Components: DataSet API
            Reporter: Gabor Gevay


When an aggregation is called on a grouped DataSet, 
{{AggregateOperator.translateToDataFlow}} tries to determine whether the field 
that is being aggregated is the same field that the grouping is based on. If 
this is not the case, then it adds the ForwardedField property for the key 
field.

However, the mechanism that makes this decision breaks when there are nested 
composite types involved, because it gets the key positions with 
{{getKeys().computeLogicalKeyPositions()}}, which returns the _flat_ positions, 
whereas the position of the field to aggregate is counted only on the outer 
type.

Example code: https://github.com/ggevay/flink/tree/agg-bad-forwarded-fields
Here, I have changed the WordCount example to have the type 
{{Tuple3<Tuple2<Byte,Byte>, String, Integer>}}, and do {{.groupBy(1).sum(2)}} 
(which groups by the String field and sums the Integer field). If you set a 
breakpoint into {{AggregateOperator.translateToDataFlow}}, you can see that 
{{logicalKeyPositions}} contains 2, and {{fields}} also contains 2, which 
causes {{keyFieldUsedInAgg}} to be erroneously set to true. The problem is 
caused by the Tuple2 being counted as 2 fields in {{logicalKeyPositions}}, but 
only 1 field in {{fields}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to