korlov42 commented on code in PR #3413:
URL: https://github.com/apache/ignite-3/pull/3413#discussion_r1533745061


##########
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/agg/MapReduceAggregates.java:
##########
@@ -158,26 +183,84 @@ public static IgniteRel buildAggregates(LogicalAggregate 
agg, AggregateRelBuilde
         // It consists of columns from agg.groupSet and aggregate expressions.
 
         for (int i = 0; i < groupByColumns; i++) {
+            List<RelDataTypeField> outputRowFields = 
agg.getRowType().getFieldList();
             RelDataType type = outputRowFields.get(i).getType();
             reduceType.add("f" + reduceType.getFieldCount(), type);
         }
 
+        RexBuilder rexBuilder = agg.getCluster().getRexBuilder();
+        IgniteTypeFactory typeFactory = (IgniteTypeFactory) 
agg.getCluster().getTypeFactory();
+
+        List<RexNode> reduceInputExprs = new ArrayList<>();
+
+        for (int i = 0; i < map.getRowType().getFieldList().size(); i++) {
+            RelDataType type = 
map.getRowType().getFieldList().get(i).getType();
+            RexInputRef ref = new RexInputRef(i, type);
+            reduceInputExprs.add(ref);
+        }
+
+        // Build a list of projections for reduce operator,
+        // if all projections are identity, it is not necessary
+        // to create a projection between MAP and REDUCE operators.
+
+        boolean additionalProjectionsForReduce = false;
+
+        for (int i = 0, argOffset = 0; i < mapReduceAggs.size(); i++) {
+            MapReduceAgg mapReduceAgg = mapReduceAggs.get(i);
+            int argIdx = groupByColumns + argOffset;
+
+            for (int j = 0; j < mapReduceAgg.reduceCalls.size(); j++) {
+                RexNode projExpr = 
mapReduceAgg.makeReduceInputExpr.makeExpr(rexBuilder, map, List.of(argIdx), 
typeFactory);
+                reduceInputExprs.set(argIdx, projExpr);
+
+                if (mapReduceAgg.makeReduceInputExpr != USE_INPUT_FIELD) {
+                    additionalProjectionsForReduce = true;
+                }
+
+                argIdx += 1;
+            }
+
+            argOffset += mapReduceAgg.reduceCalls.size();
+        }
+
+        RelNode reduceInputNode;
+        if (additionalProjectionsForReduce) {
+            RelDataTypeFactory.Builder projectRow = new 
Builder(agg.getCluster().getTypeFactory());
+
+            for (int i = 0; i < reduceInputExprs.size(); i++) {
+                RexNode rexNode = reduceInputExprs.get(i);
+                projectRow.add(String.valueOf(i), rexNode.getType());
+            }
+
+            RelDataType projectRowType = projectRow.build();
+
+            reduceInputNode = builder.makeProject(agg.getCluster(), map, 
reduceInputExprs, projectRowType);

Review Comment:
   why do we need an additional projection in between map and reduce nodes?



##########
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/PlanUtils.java:
##########
@@ -127,9 +127,17 @@ private static void addAccumulatorFields(IgniteTypeFactory 
typeFactory, List<Agg
 
         for (int i = 0; i < aggregateCalls.size(); i++) {
             AggregateCall call = aggregateCalls.get(i);
-
             Accumulator acc = accumulators.accumulatorFactory(call).get();
-            RelDataType fieldType = acc.returnType(typeFactory);
+            RelDataType fieldType;
+            // For a decimal type Accumulator::returnType returns a type with 
default precision and scale,
+            // that can cause precision loss when a tuple is sent over the 
wire by an exchanger/outbox.
+            // Outbox uses its input type as wire format, so if a scale is 0, 
then the scale is lost
+            // (see Outbox::sendBatch -> RowHandler::toBinaryTuple -> 
BinaryTupleBuilder::appendDecimalNotNull).

Review Comment:
   not sure I fully understand the reasoning... If scale is 0, why there is 
something after period then? 



##########
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/agg/MapReduceAggregates.java:
##########
@@ -356,10 +486,190 @@ private static MapReduceAgg 
createSimpleAgg(AggregateCall call, int reduceArgume
         return new MapReduceAgg(argList, call, reduceCall, USE_INPUT_FIELD);
     }
 
+    /**
+     * TODO: Rename to MakeExpr.
+     * Produces intermediate expressions that modify results of MAP/REDUCE 
aggregate.
+     * For example: after splitting a function into a MAP aggregate and REDUCE 
aggregate it is necessary to add casts to
+     * output of a REDUCE phase aggregate.
+     *
+     * <p>In order to avoid creating unnecessary projections, use {@link 
MapReduceAggregates#USE_INPUT_FIELD}.
+     */
     @FunctionalInterface
     private interface MakeReduceExpr {
 
-        /** Creates an expression that produces result of REDUCE phase of an 
aggregate. */
+        /**
+         * Creates an expression that applies a performs computation (e.g. 
applies some function, adds a cast)
+         * on {@code args} fields of input relation.
+         *
+         * @param rexBuilder Expression builder.
+         * @param input Input relation.
+         * @param args Arguments.
+         * @param typeFactory Type factory.
+         *
+         * @return Expression.
+         */
         RexNode makeExpr(RexBuilder rexBuilder, RelNode input, List<Integer> 
args, IgniteTypeFactory typeFactory);
     }
+
+    private static MapReduceAgg createAvgAgg(
+            RelOptCluster cluster,
+            AggregateCall call,
+            int reduceArgumentOffset,
+            RelDataType inputType,
+            boolean canBeNull
+    ) {
+        RelDataTypeFactory tf = cluster.getTypeFactory();
+        RelDataTypeSystem typeSystem = tf.getTypeSystem();
+
+        RelDataType fieldType = 
inputType.getFieldList().get(call.getArgList().get(0)).getType();
+
+        // In case of AVG(NULL) return a simple version of an aggregate, 
because result is always NULL.
+        if (fieldType.getSqlTypeName() == SqlTypeName.NULL) {
+            return createSimpleAgg(call, reduceArgumentOffset);
+        }
+
+        // AVG(x) : SUM(x)/COUNT0(x)
+        // MAP    : SUM(x) / COUNT(x)
+
+        // SUM(x) as s
+        RelDataType mapSumType = typeSystem.deriveSumType(tf, fieldType);
+        if (canBeNull) {
+            mapSumType = tf.createTypeWithNullability(mapSumType, true);
+        }
+
+        AggregateCall mapSum0 = AggregateCall.create(
+                SqlStdOperatorTable.SUM,
+                call.isDistinct(),
+                call.isApproximate(),
+                call.ignoreNulls(),
+                ImmutableList.of(),
+                call.getArgList(),
+                call.filterArg,
+                null,
+                call.collation,
+                mapSumType,
+                "AVG" + reduceArgumentOffset + "_MAP_SUM");
+
+        // COUNT(x) as c
+        RelDataType mapCountType = tf.createSqlType(SqlTypeName.BIGINT);
+
+        AggregateCall mapCount0 = AggregateCall.create(
+                SqlStdOperatorTable.COUNT,
+                call.isDistinct(),
+                call.isApproximate(),
+                call.ignoreNulls(),
+                ImmutableList.of(),
+                call.getArgList(),
+                call.filterArg,
+                null,
+                call.collation,
+                mapCountType,
+                "AVG" + reduceArgumentOffset + "_MAP_COUNT");
+
+        // REDUCE : SUM(s) as reduce_sum, SUM0(c) as reduce_count
+        List<Integer> reduceSumArgs = List.of(reduceArgumentOffset);
+
+        // SUM0(s)
+        RelDataType reduceSumType = typeSystem.deriveSumType(tf, mapSumType);
+        if (canBeNull) {
+            reduceSumType = tf.createTypeWithNullability(reduceSumType, true);
+        }
+
+        AggregateCall reduceSum0 = AggregateCall.create(
+                SqlStdOperatorTable.SUM,
+                call.isDistinct(),
+                call.isApproximate(),
+                call.ignoreNulls(),
+                ImmutableList.of(),
+                reduceSumArgs,
+                // there is no filtering on REDUCE phase
+                -1,
+                null,
+                call.collation,
+                reduceSumType,
+                "AVG" + reduceArgumentOffset + "_RED_VAL_SUM");

Review Comment:
   such a name pattern seems too verbose. Let's have a look at output of 
EXPLAIN:
   
   ```
       ReduceHashAggregate(group=[{0, 1}], SUM_QTY=[SUM($2)], 
SUM_BASE_PRICE=[SUM($3)], SUM_DISC_PRICE=[SUM($4)], SUM_CHARGE=[SUM($5)], 
AVG6_RED_VAL_SUM=[SUM($6)], AVG6_RED_COUNT_SUM=[$SUM0($7)], 
AVG8_RED_VAL_SUM=[SUM($8)], AVG8_RED_COUNT_SUM=[$SUM0($9)], 
AVG10_RED_VAL_SUM=[SUM($10)], AVG10_RED_COUNT_SUM=[$SUM0($11)], 
COUNT_12_MAP_SUM=[$SUM0($12)]), id = 211
   ```
   * AVG -- this is an aggregate node, so everything is agg call. 
   * RED -- again, I can derive this from node itself
   * VAL -- have no idea why do we need this
   * SUM -- seems like this relates to the aggregating function
   
   Do we really need all this parts, or we may short this to something like 
`"_SUM" + reduceArgumentOffset`? WDYT?



##########
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/IgniteSqlFunctions.java:
##########
@@ -526,6 +536,8 @@ private static BigDecimal convertToBigDecimal(Number value) 
{
         return dec;
     }
 
+

Review Comment:
   unnecessary change



##########
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/HashAggregateConverterRule.java:
##########
@@ -121,13 +124,26 @@ public IgniteRel makeMapAgg(RelOptCluster cluster, 
RelNode input, ImmutableBitSe
                 }
 
                 @Override
-                public IgniteRel makeReduceAgg(RelOptCluster cluster, RelNode 
map, ImmutableBitSet groupSet,
+                public IgniteRel makeProject(RelOptCluster cluster, RelNode 
input, List<RexNode> reduceInputExprs,
+                        RelDataType projectRowType) {
+
+                    return new IgniteProject(
+                            agg.getCluster(),
+                            outTrait.replace(IgniteDistributions.single()),
+                            convert(input, 
inTrait.replace(IgniteDistributions.single())),

Review Comment:
   does it make sense to reuse traits of reduce node?  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to