lowka commented on code in PR #3413:
URL: https://github.com/apache/ignite-3/pull/3413#discussion_r1533835397
##########
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/agg/MapReduceAggregates.java:
##########
@@ -158,26 +183,84 @@ public static IgniteRel buildAggregates(LogicalAggregate
agg, AggregateRelBuilde
// It consists of columns from agg.groupSet and aggregate expressions.
for (int i = 0; i < groupByColumns; i++) {
+ List<RelDataTypeField> outputRowFields =
agg.getRowType().getFieldList();
RelDataType type = outputRowFields.get(i).getType();
reduceType.add("f" + reduceType.getFieldCount(), type);
}
+ RexBuilder rexBuilder = agg.getCluster().getRexBuilder();
+ IgniteTypeFactory typeFactory = (IgniteTypeFactory)
agg.getCluster().getTypeFactory();
+
+ List<RexNode> reduceInputExprs = new ArrayList<>();
+
+ for (int i = 0; i < map.getRowType().getFieldList().size(); i++) {
+ RelDataType type =
map.getRowType().getFieldList().get(i).getType();
+ RexInputRef ref = new RexInputRef(i, type);
+ reduceInputExprs.add(ref);
+ }
+
+ // Build a list of projections for reduce operator,
+ // if all projections are identity, it is not necessary
+ // to create a projection between MAP and REDUCE operators.
+
+ boolean additionalProjectionsForReduce = false;
+
+ for (int i = 0, argOffset = 0; i < mapReduceAggs.size(); i++) {
+ MapReduceAgg mapReduceAgg = mapReduceAggs.get(i);
+ int argIdx = groupByColumns + argOffset;
+
+ for (int j = 0; j < mapReduceAgg.reduceCalls.size(); j++) {
+ RexNode projExpr =
mapReduceAgg.makeReduceInputExpr.makeExpr(rexBuilder, map, List.of(argIdx),
typeFactory);
+ reduceInputExprs.set(argIdx, projExpr);
+
+ if (mapReduceAgg.makeReduceInputExpr != USE_INPUT_FIELD) {
+ additionalProjectionsForReduce = true;
+ }
+
+ argIdx += 1;
+ }
+
+ argOffset += mapReduceAgg.reduceCalls.size();
+ }
+
+ RelNode reduceInputNode;
+ if (additionalProjectionsForReduce) {
+ RelDataTypeFactory.Builder projectRow = new
Builder(agg.getCluster().getTypeFactory());
+
+ for (int i = 0; i < reduceInputExprs.size(); i++) {
+ RexNode rexNode = reduceInputExprs.get(i);
+ projectRow.add(String.valueOf(i), rexNode.getType());
+ }
+
+ RelDataType projectRowType = projectRow.build();
+
+ reduceInputNode = builder.makeProject(agg.getCluster(), map,
reduceInputExprs, projectRowType);
Review Comment:
We need an additional projection because COUNT returns a BIGINT, but SUM0
requires a DECIMAL time. Otherwise we get an error:
> java.lang.ClassCastException: class java.lang.Long cannot be cast to class
java.math.BigDecimal (java.lang.Long and java.math.BigDecimal are in module
java.base of loader 'bootstrap')
at
org.apache.ignite.internal.sql.engine.exec.exp.agg.Accumulators$DecimalSumEmptyIsZero.add(Accumulators.java:538)
~[main/:?]
Plan w/o projection
> Query: SELECT AVG(dec4_2_col) FROM numbers
> Plan :
Project (EXPR$0=[CASE(=($0, 0:DECIMAL(32767, 0)), null:DECIMAL(4, 2),
DECIMAL_DIVIDE($0, $1, 4, 2))]) row=(DECIMAL(4, 2) EXPR$0)
ReduceHashAggregate (group=[{}], AVG_SUM0=[SUM($0)],
AVG_SUM00=[$SUM0($1)]) row=(DECIMAL(32767, 2) f0_0, DECIMAL(32767, 0) f1_1)
Exchange row=(DECIMAL(32767, 2) _ACC0, BIGINT _ACC1, TINYINT _GROUP_ID)]
MapHashAggregate (group=[{}], AVG_SUM0=[SUM($0)], AVG_COUNT0=[COUNT($0)])
row=(DECIMAL(32767, 2) _ACC0, BIGINT _ACC1, TINYINT _GROUP_ID)
TableScan traits=IGNITE.random.[], row=(DECIMAL(4, 2) DEC4_2_COL)
Alternatively, we can introduce an accumulator operator that that accepts
BIGINT and returns DECIMAL.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]