alex-plekhanov commented on code in PR #10023:
URL: https://github.com/apache/ignite/pull/10023#discussion_r881800535


##########
modules/calcite/src/main/codegen/config.fmpp:
##########
@@ -519,6 +519,8 @@ data: {
       "SYSTEM"
       "SYSTEM_TIME"
       "SYSTEM_USER"
+      "STRING_AGG"
+      "GROUP_CONCAT"

Review Comment:
   Can you please also remove `UUID` from `keywords` array? It's not related to 
this ticket, but it was added by mistake and there are to much changes in 
generated parser for this minor change.
   



##########
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/agg/Accumulators.java:
##########
@@ -46,177 +56,277 @@
  */
 public class Accumulators {
     /** */
-    public static Supplier<Accumulator> accumulatorFactory(AggregateCall call) 
{
-        if (!call.isDistinct())
-            return accumulatorFunctionFactory(call);
+    public static <Row> Supplier<Accumulator<Row>> 
accumulatorFactory(AggregateCall call, ExecutionContext<Row> ctx) {
+        Supplier<Accumulator<Row>> supplier = accumulatorFunctionFactory(call, 
ctx);
 
-        Supplier<Accumulator> fac = accumulatorFunctionFactory(call);
+        if (call.isDistinct())
+            return () -> new DistinctAccumulator<>(supplier, ctx.rowHandler());
 
-        return () -> new DistinctAccumulator(fac);
+        return supplier;
     }
 
     /** */
-    public static Supplier<Accumulator> 
accumulatorFunctionFactory(AggregateCall call) {
+    private static <Row> Supplier<Accumulator<Row>> accumulatorFunctionFactory(
+        AggregateCall call,
+        ExecutionContext<Row> ctx
+    ) {
+        RowHandler<Row> hnd = ctx.rowHandler();
+
         switch (call.getAggregation().getName()) {
             case "COUNT":
-                return LongCount.FACTORY;
+                return () -> new LongCount<>(hnd);
             case "AVG":
-                return avgFactory(call);
+                return avgFactory(call, hnd);
             case "SUM":
-                return sumFactory(call);
+                return sumFactory(call, hnd);
             case "$SUM0":
-                return sumEmptyIsZeroFactory(call);
+                return sumEmptyIsZeroFactory(call, hnd);
             case "MIN":
-                return minFactory(call);
+                return minFactory(call, hnd);
             case "MAX":
-                return maxFactory(call);
+                return maxFactory(call, hnd);
             case "SINGLE_VALUE":
-                return SingleVal.FACTORY;
+                return () -> new SingleVal<>(hnd);
             case "ANY_VALUE":
-                return AnyVal.FACTORY;
+                return () -> new AnyVal<>(hnd);
+            case "LISTAGG":
+                return listAggregateSupplier(call, ctx);
             default:
                 throw new AssertionError(call.getAggregation().getName());
         }
     }
 
     /** */
-    private static Supplier<Accumulator> avgFactory(AggregateCall call) {
+    private static <Row> Supplier<Accumulator<Row>> listAggregateSupplier(
+        AggregateCall call,
+        ExecutionContext<Row> ctx
+    ) {
+        RowHandler<Row> hnd = ctx.rowHandler();
+
+        Supplier<Accumulator<Row>> accSup = () -> new 
ListAggAccumulator<>(call, hnd);
+
+        RelCollation mapColl = getMappedCollation(call);
+
+        Comparator<Row> cmp;
+        if (mapColl != null) {
+            cmp = ctx.expressionFactory().comparator(getMappedCollation(call));
+
+            return () -> new SortingAccumulator<>(accSup, cmp);
+        }
+
+        return accSup;
+    }
+
+    /** */
+    private static RelCollation getMappedCollation(AggregateCall call) {
+        if (call.getCollation() == null || 
call.getCollation().getFieldCollations().isEmpty())
+            return null;
+
+        List<RelFieldCollation> collations = 
call.getCollation().getFieldCollations();
+        List<Integer> argList = call.getArgList();
+
+        // The target value will be accessed by field index in mapping array 
(targets[fieldIndex]),
+        // so srcCnt should be "max_field_index + 1" to prevent 
IndexOutOfBoundsException.
+        int srcCnt = Collections.max(collations, 
Comparator.comparingInt(RelFieldCollation::getFieldIndex))
+            .getFieldIndex() + 1;
+
+        Map<Integer, Integer> mapping = new HashMap<>();
+
+        int collOff = 0;
+        for (int i = 0; i < collations.size(); i++) {
+            int idx = collations.get(i).getFieldIndex();
+
+            int mapIdx = argList.indexOf(idx);
+            if (mapIdx == -1) { //collation not found in arglist
+                mapIdx = argList.size() + collOff;
+
+                collOff++;
+            }
+
+            mapping.put(idx, mapIdx);
+        }
+
+        return mapping.isEmpty() ? call.getCollation() : call.getCollation()
+            .apply(Mappings.target(mapping, srcCnt, srcCnt));
+    }
+
+    /** */
+    private static <Row> Supplier<Accumulator<Row>> avgFactory(AggregateCall 
call, RowHandler<Row> hnd) {
         switch (call.type.getSqlTypeName()) {
             case ANY:
                 throw new UnsupportedOperationException("AVG() is not 
supported for type '" + call.type + "'.");
             case BIGINT:
             case DECIMAL:
-                return DecimalAvg.FACTORY;
+                return () -> new DecimalAvg<>(hnd);
             case DOUBLE:
             case REAL:
             case FLOAT:
             case INTEGER:
             default:
-                return DoubleAvg.FACTORY;
+                return () -> new DoubleAvg<>(hnd);
         }
     }
 
     /** */
-    private static Supplier<Accumulator> sumFactory(AggregateCall call) {
+    private static <Row> Supplier<Accumulator<Row>> sumFactory(AggregateCall 
call, RowHandler<Row> hnd) {
         switch (call.type.getSqlTypeName()) {
             case ANY:
                 throw new UnsupportedOperationException("SUM() is not 
supported for type '" + call.type + "'.");
 
             case BIGINT:
             case DECIMAL:
-                return () -> new Sum(new DecimalSumEmptyIsZero());
+                return () -> new Sum<>(new DecimalSumEmptyIsZero<>(hnd), hnd);
 
             case DOUBLE:
             case REAL:
             case FLOAT:
-                return () -> new Sum(new DoubleSumEmptyIsZero());
+                return () -> new Sum<>(new DoubleSumEmptyIsZero<>(hnd), hnd);
 
             case TINYINT:
             case SMALLINT:
             case INTEGER:
             default:
-                return () -> new Sum(new LongSumEmptyIsZero());
+                return () -> new Sum<>(new LongSumEmptyIsZero<>(hnd), hnd);
         }
     }
 
     /** */
-    private static Supplier<Accumulator> sumEmptyIsZeroFactory(AggregateCall 
call) {
+    private static <Row> Supplier<Accumulator<Row>> 
sumEmptyIsZeroFactory(AggregateCall call, RowHandler<Row> hnd) {
         switch (call.type.getSqlTypeName()) {
             case ANY:
                 throw new UnsupportedOperationException("SUM() is not 
supported for type '" + call.type + "'.");
 
             case BIGINT:
             case DECIMAL:
-                return DecimalSumEmptyIsZero.FACTORY;
+                return () -> new DecimalSumEmptyIsZero<>(hnd);
 
             case DOUBLE:
             case REAL:
             case FLOAT:
-                return DoubleSumEmptyIsZero.FACTORY;
+                return () -> new DoubleSumEmptyIsZero<>(hnd);
 
             case TINYINT:
             case SMALLINT:
             case INTEGER:
             default:
-                return LongSumEmptyIsZero.FACTORY;
+                return () -> new LongSumEmptyIsZero<>(hnd);
         }
     }
 
     /** */
-    private static Supplier<Accumulator> minFactory(AggregateCall call) {
+    private static <Row> Supplier<Accumulator<Row>> minFactory(AggregateCall 
call, RowHandler<Row> hnd) {
         switch (call.type.getSqlTypeName()) {
             case DOUBLE:
             case REAL:
             case FLOAT:
-                return DoubleMinMax.MIN_FACTORY;
+                return () -> new DoubleMinMax<>(true, hnd);
             case DECIMAL:
-                return DecimalMinMax.MIN_FACTORY;
+                return () -> new DecimalMinMax<>(true, hnd);
             case INTEGER:
-                return IntMinMax.MIN_FACTORY;
+                return () -> new IntMinMax<>(true, hnd);
             case CHAR:
             case VARCHAR:
-                return VarCharMinMax.MIN_FACTORY;
+                return () -> new VarCharMinMax<>(true, hnd);
             case BINARY:
             case VARBINARY:
-                return ComparableMinMax.VARBINARY_MIN_FACTORY;
+                return () -> new ComparableMinMax<Row, ByteString>(true,
+                    tf -> 
tf.createTypeWithNullability(tf.createSqlType(VARBINARY), true), hnd);
             case ANY:
-                if (call.type instanceof UuidType)
-                    return ComparableMinMax.UUID_MIN_FACTORY;
+                if (call.type instanceof UuidType) {
+                    return () -> new ComparableMinMax<Row, UUID>(true,
+                        tf -> 
tf.createTypeWithNullability(tf.createCustomType(UUID.class), true), hnd);
+                }
                 throw new UnsupportedOperationException("MIN() is not 
supported for type '" + call.type + "'.");
             case BIGINT:
             default:
-                return LongMinMax.MIN_FACTORY;
+                return () -> new LongMinMax<>(true, hnd);
         }
     }
 
     /** */
-    private static Supplier<Accumulator> maxFactory(AggregateCall call) {
+    private static <Row> Supplier<Accumulator<Row>> maxFactory(AggregateCall 
call, RowHandler<Row> hnd) {
         switch (call.type.getSqlTypeName()) {
             case DOUBLE:
             case REAL:
             case FLOAT:
-                return DoubleMinMax.MAX_FACTORY;
+                return () -> new DoubleMinMax<>(false, hnd);
             case DECIMAL:
-                return DecimalMinMax.MAX_FACTORY;
+                return () -> new DecimalMinMax<>(false, hnd);
             case INTEGER:
-                return IntMinMax.MAX_FACTORY;
+                return () -> new IntMinMax<>(false, hnd);
             case CHAR:
             case VARCHAR:
-                return VarCharMinMax.MAX_FACTORY;
+                return () -> new VarCharMinMax<>(false, hnd);
             case BINARY:
             case VARBINARY:
-                return ComparableMinMax.VARBINARY_MAX_FACTORY;
+                return () -> new ComparableMinMax<Row, ByteString>(false,
+                    tf -> 
tf.createTypeWithNullability(tf.createSqlType(VARBINARY), true), hnd);
             case ANY:
-                if (call.type instanceof UuidType)
-                    return ComparableMinMax.UUID_MAX_FACTORY;
+                if (call.type instanceof UuidType) {
+                    return () -> new ComparableMinMax<Row, UUID>(false,
+                        tf -> 
tf.createTypeWithNullability(tf.createCustomType(UUID.class), true), hnd);
+                }
                 throw new UnsupportedOperationException("MAX() is not 
supported for type '" + call.type + "'.");
             case BIGINT:
             default:
-                return LongMinMax.MAX_FACTORY;
+                return () -> new LongMinMax<>(false, hnd);
         }
     }
 
     /** */
-    private static class SingleVal extends AnyVal {
+    private abstract static class AbstractAccumulator<Row> implements 
Accumulator<Row> {
+        /** */
+        private final RowHandler<Row> hnd;
+
+        /** */
+        AbstractAccumulator(RowHandler<Row> hnd) {
+            this.hnd = hnd;
+        }
+
+        /** */
+        <T> T get(int idx, Row row) {
+            return (T)hnd.get(idx, row);
+        }
+
+        /** */
+        <T> void set(int idx, Row row, T val) {

Review Comment:
   Looks like method is not used



##########
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/agg/Accumulators.java:
##########
@@ -46,177 +56,277 @@
  */
 public class Accumulators {
     /** */
-    public static Supplier<Accumulator> accumulatorFactory(AggregateCall call) 
{
-        if (!call.isDistinct())
-            return accumulatorFunctionFactory(call);
+    public static <Row> Supplier<Accumulator<Row>> 
accumulatorFactory(AggregateCall call, ExecutionContext<Row> ctx) {
+        Supplier<Accumulator<Row>> supplier = accumulatorFunctionFactory(call, 
ctx);
 
-        Supplier<Accumulator> fac = accumulatorFunctionFactory(call);
+        if (call.isDistinct())
+            return () -> new DistinctAccumulator<>(supplier, ctx.rowHandler());
 
-        return () -> new DistinctAccumulator(fac);
+        return supplier;
     }
 
     /** */
-    public static Supplier<Accumulator> 
accumulatorFunctionFactory(AggregateCall call) {
+    private static <Row> Supplier<Accumulator<Row>> accumulatorFunctionFactory(
+        AggregateCall call,
+        ExecutionContext<Row> ctx
+    ) {
+        RowHandler<Row> hnd = ctx.rowHandler();
+
         switch (call.getAggregation().getName()) {
             case "COUNT":
-                return LongCount.FACTORY;
+                return () -> new LongCount<>(hnd);
             case "AVG":
-                return avgFactory(call);
+                return avgFactory(call, hnd);
             case "SUM":
-                return sumFactory(call);
+                return sumFactory(call, hnd);
             case "$SUM0":
-                return sumEmptyIsZeroFactory(call);
+                return sumEmptyIsZeroFactory(call, hnd);
             case "MIN":
-                return minFactory(call);
+                return minFactory(call, hnd);
             case "MAX":
-                return maxFactory(call);
+                return maxFactory(call, hnd);
             case "SINGLE_VALUE":
-                return SingleVal.FACTORY;
+                return () -> new SingleVal<>(hnd);
             case "ANY_VALUE":
-                return AnyVal.FACTORY;
+                return () -> new AnyVal<>(hnd);
+            case "LISTAGG":
+                return listAggregateSupplier(call, ctx);
             default:
                 throw new AssertionError(call.getAggregation().getName());
         }
     }
 
     /** */
-    private static Supplier<Accumulator> avgFactory(AggregateCall call) {
+    private static <Row> Supplier<Accumulator<Row>> listAggregateSupplier(
+        AggregateCall call,
+        ExecutionContext<Row> ctx
+    ) {
+        RowHandler<Row> hnd = ctx.rowHandler();
+
+        Supplier<Accumulator<Row>> accSup = () -> new 
ListAggAccumulator<>(call, hnd);
+
+        RelCollation mapColl = getMappedCollation(call);
+
+        Comparator<Row> cmp;
+        if (mapColl != null) {
+            cmp = ctx.expressionFactory().comparator(getMappedCollation(call));

Review Comment:
   `getMappedCollation(call)` -> `mapColl`



##########
modules/calcite/src/test/sql/aggregate/aggregates/test_string_agg.test:
##########
@@ -0,0 +1,153 @@
+# name: test/sql/aggregate/aggregates/test_string_agg.test
+# description: Test STRING_AGG operator
+# group: [aggregates]
+
+# test incorrect usage of STRING_AGG function
+
+statement error
+SELECT STRING_AGG()
+
+statement error
+SELECT STRING_AGG(1, 2, 3)
+
+statement error
+SELECT STRING_AGG(STRING_AGG('a', ','))
+
+# test string aggregation on scalar values
+query T
+SELECT STRING_AGG('a', ',')
+----
+a
+
+# test string aggregation on scalar values
+query TTTT
+SELECT STRING_AGG('a', ','), STRING_AGG(NULL, ','), STRING_AGG('a', NULL), 
STRING_AGG(NULL, NULL)
+----
+a
+NULL
+a

Review Comment:
   Why do we have not NULL value here? Do other databases return the same 
result with NULL separated?



##########
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/agg/Accumulators.java:
##########
@@ -46,177 +56,277 @@
  */
 public class Accumulators {
     /** */
-    public static Supplier<Accumulator> accumulatorFactory(AggregateCall call) 
{
-        if (!call.isDistinct())
-            return accumulatorFunctionFactory(call);
+    public static <Row> Supplier<Accumulator<Row>> 
accumulatorFactory(AggregateCall call, ExecutionContext<Row> ctx) {
+        Supplier<Accumulator<Row>> supplier = accumulatorFunctionFactory(call, 
ctx);
 
-        Supplier<Accumulator> fac = accumulatorFunctionFactory(call);
+        if (call.isDistinct())
+            return () -> new DistinctAccumulator<>(supplier, ctx.rowHandler());
 
-        return () -> new DistinctAccumulator(fac);
+        return supplier;
     }
 
     /** */
-    public static Supplier<Accumulator> 
accumulatorFunctionFactory(AggregateCall call) {
+    private static <Row> Supplier<Accumulator<Row>> accumulatorFunctionFactory(
+        AggregateCall call,
+        ExecutionContext<Row> ctx
+    ) {
+        RowHandler<Row> hnd = ctx.rowHandler();
+
         switch (call.getAggregation().getName()) {
             case "COUNT":
-                return LongCount.FACTORY;
+                return () -> new LongCount<>(hnd);
             case "AVG":
-                return avgFactory(call);
+                return avgFactory(call, hnd);
             case "SUM":
-                return sumFactory(call);
+                return sumFactory(call, hnd);
             case "$SUM0":
-                return sumEmptyIsZeroFactory(call);
+                return sumEmptyIsZeroFactory(call, hnd);
             case "MIN":
-                return minFactory(call);
+                return minFactory(call, hnd);
             case "MAX":
-                return maxFactory(call);
+                return maxFactory(call, hnd);
             case "SINGLE_VALUE":
-                return SingleVal.FACTORY;
+                return () -> new SingleVal<>(hnd);
             case "ANY_VALUE":
-                return AnyVal.FACTORY;
+                return () -> new AnyVal<>(hnd);
+            case "LISTAGG":
+                return listAggregateSupplier(call, ctx);
             default:
                 throw new AssertionError(call.getAggregation().getName());
         }
     }
 
     /** */
-    private static Supplier<Accumulator> avgFactory(AggregateCall call) {
+    private static <Row> Supplier<Accumulator<Row>> listAggregateSupplier(
+        AggregateCall call,
+        ExecutionContext<Row> ctx
+    ) {
+        RowHandler<Row> hnd = ctx.rowHandler();
+
+        Supplier<Accumulator<Row>> accSup = () -> new 
ListAggAccumulator<>(call, hnd);
+
+        RelCollation mapColl = getMappedCollation(call);
+
+        Comparator<Row> cmp;
+        if (mapColl != null) {
+            cmp = ctx.expressionFactory().comparator(getMappedCollation(call));
+
+            return () -> new SortingAccumulator<>(accSup, cmp);
+        }
+
+        return accSup;
+    }
+
+    /** */
+    private static RelCollation getMappedCollation(AggregateCall call) {
+        if (call.getCollation() == null || 
call.getCollation().getFieldCollations().isEmpty())
+            return null;
+
+        List<RelFieldCollation> collations = 
call.getCollation().getFieldCollations();
+        List<Integer> argList = call.getArgList();
+
+        // The target value will be accessed by field index in mapping array 
(targets[fieldIndex]),
+        // so srcCnt should be "max_field_index + 1" to prevent 
IndexOutOfBoundsException.
+        int srcCnt = Collections.max(collations, 
Comparator.comparingInt(RelFieldCollation::getFieldIndex))
+            .getFieldIndex() + 1;
+
+        Map<Integer, Integer> mapping = new HashMap<>();
+
+        int collOff = 0;
+        for (int i = 0; i < collations.size(); i++) {
+            int idx = collations.get(i).getFieldIndex();
+
+            int mapIdx = argList.indexOf(idx);
+            if (mapIdx == -1) { //collation not found in arglist
+                mapIdx = argList.size() + collOff;
+
+                collOff++;
+            }
+
+            mapping.put(idx, mapIdx);
+        }
+
+        return mapping.isEmpty() ? call.getCollation() : call.getCollation()
+            .apply(Mappings.target(mapping, srcCnt, srcCnt));
+    }
+
+    /** */
+    private static <Row> Supplier<Accumulator<Row>> avgFactory(AggregateCall 
call, RowHandler<Row> hnd) {
         switch (call.type.getSqlTypeName()) {
             case ANY:
                 throw new UnsupportedOperationException("AVG() is not 
supported for type '" + call.type + "'.");
             case BIGINT:
             case DECIMAL:
-                return DecimalAvg.FACTORY;
+                return () -> new DecimalAvg<>(hnd);
             case DOUBLE:
             case REAL:
             case FLOAT:
             case INTEGER:
             default:
-                return DoubleAvg.FACTORY;
+                return () -> new DoubleAvg<>(hnd);
         }
     }
 
     /** */
-    private static Supplier<Accumulator> sumFactory(AggregateCall call) {
+    private static <Row> Supplier<Accumulator<Row>> sumFactory(AggregateCall 
call, RowHandler<Row> hnd) {
         switch (call.type.getSqlTypeName()) {
             case ANY:
                 throw new UnsupportedOperationException("SUM() is not 
supported for type '" + call.type + "'.");
 
             case BIGINT:
             case DECIMAL:
-                return () -> new Sum(new DecimalSumEmptyIsZero());
+                return () -> new Sum<>(new DecimalSumEmptyIsZero<>(hnd), hnd);
 
             case DOUBLE:
             case REAL:
             case FLOAT:
-                return () -> new Sum(new DoubleSumEmptyIsZero());
+                return () -> new Sum<>(new DoubleSumEmptyIsZero<>(hnd), hnd);
 
             case TINYINT:
             case SMALLINT:
             case INTEGER:
             default:
-                return () -> new Sum(new LongSumEmptyIsZero());
+                return () -> new Sum<>(new LongSumEmptyIsZero<>(hnd), hnd);
         }
     }
 
     /** */
-    private static Supplier<Accumulator> sumEmptyIsZeroFactory(AggregateCall 
call) {
+    private static <Row> Supplier<Accumulator<Row>> 
sumEmptyIsZeroFactory(AggregateCall call, RowHandler<Row> hnd) {
         switch (call.type.getSqlTypeName()) {
             case ANY:
                 throw new UnsupportedOperationException("SUM() is not 
supported for type '" + call.type + "'.");
 
             case BIGINT:
             case DECIMAL:
-                return DecimalSumEmptyIsZero.FACTORY;
+                return () -> new DecimalSumEmptyIsZero<>(hnd);
 
             case DOUBLE:
             case REAL:
             case FLOAT:
-                return DoubleSumEmptyIsZero.FACTORY;
+                return () -> new DoubleSumEmptyIsZero<>(hnd);
 
             case TINYINT:
             case SMALLINT:
             case INTEGER:
             default:
-                return LongSumEmptyIsZero.FACTORY;
+                return () -> new LongSumEmptyIsZero<>(hnd);
         }
     }
 
     /** */
-    private static Supplier<Accumulator> minFactory(AggregateCall call) {
+    private static <Row> Supplier<Accumulator<Row>> minFactory(AggregateCall 
call, RowHandler<Row> hnd) {
         switch (call.type.getSqlTypeName()) {
             case DOUBLE:
             case REAL:
             case FLOAT:
-                return DoubleMinMax.MIN_FACTORY;
+                return () -> new DoubleMinMax<>(true, hnd);
             case DECIMAL:
-                return DecimalMinMax.MIN_FACTORY;
+                return () -> new DecimalMinMax<>(true, hnd);
             case INTEGER:
-                return IntMinMax.MIN_FACTORY;
+                return () -> new IntMinMax<>(true, hnd);
             case CHAR:
             case VARCHAR:
-                return VarCharMinMax.MIN_FACTORY;
+                return () -> new VarCharMinMax<>(true, hnd);
             case BINARY:
             case VARBINARY:
-                return ComparableMinMax.VARBINARY_MIN_FACTORY;
+                return () -> new ComparableMinMax<Row, ByteString>(true,
+                    tf -> 
tf.createTypeWithNullability(tf.createSqlType(VARBINARY), true), hnd);
             case ANY:
-                if (call.type instanceof UuidType)
-                    return ComparableMinMax.UUID_MIN_FACTORY;
+                if (call.type instanceof UuidType) {
+                    return () -> new ComparableMinMax<Row, UUID>(true,
+                        tf -> 
tf.createTypeWithNullability(tf.createCustomType(UUID.class), true), hnd);
+                }
                 throw new UnsupportedOperationException("MIN() is not 
supported for type '" + call.type + "'.");
             case BIGINT:
             default:
-                return LongMinMax.MIN_FACTORY;
+                return () -> new LongMinMax<>(true, hnd);
         }
     }
 
     /** */
-    private static Supplier<Accumulator> maxFactory(AggregateCall call) {
+    private static <Row> Supplier<Accumulator<Row>> maxFactory(AggregateCall 
call, RowHandler<Row> hnd) {
         switch (call.type.getSqlTypeName()) {
             case DOUBLE:
             case REAL:
             case FLOAT:
-                return DoubleMinMax.MAX_FACTORY;
+                return () -> new DoubleMinMax<>(false, hnd);
             case DECIMAL:
-                return DecimalMinMax.MAX_FACTORY;
+                return () -> new DecimalMinMax<>(false, hnd);
             case INTEGER:
-                return IntMinMax.MAX_FACTORY;
+                return () -> new IntMinMax<>(false, hnd);
             case CHAR:
             case VARCHAR:
-                return VarCharMinMax.MAX_FACTORY;
+                return () -> new VarCharMinMax<>(false, hnd);
             case BINARY:
             case VARBINARY:
-                return ComparableMinMax.VARBINARY_MAX_FACTORY;
+                return () -> new ComparableMinMax<Row, ByteString>(false,
+                    tf -> 
tf.createTypeWithNullability(tf.createSqlType(VARBINARY), true), hnd);
             case ANY:
-                if (call.type instanceof UuidType)
-                    return ComparableMinMax.UUID_MAX_FACTORY;
+                if (call.type instanceof UuidType) {
+                    return () -> new ComparableMinMax<Row, UUID>(false,
+                        tf -> 
tf.createTypeWithNullability(tf.createCustomType(UUID.class), true), hnd);
+                }
                 throw new UnsupportedOperationException("MAX() is not 
supported for type '" + call.type + "'.");
             case BIGINT:
             default:
-                return LongMinMax.MAX_FACTORY;
+                return () -> new LongMinMax<>(false, hnd);
         }
     }
 
     /** */
-    private static class SingleVal extends AnyVal {
+    private abstract static class AbstractAccumulator<Row> implements 
Accumulator<Row> {
+        /** */
+        private final RowHandler<Row> hnd;
+
+        /** */
+        AbstractAccumulator(RowHandler<Row> hnd) {
+            this.hnd = hnd;
+        }
+
+        /** */
+        <T> T get(int idx, Row row) {
+            return (T)hnd.get(idx, row);
+        }
+
+        /** */
+        <T> void set(int idx, Row row, T val) {
+            hnd.set(idx, row, val);
+        }
+
+        /** */
+        int columnCount(Row row) {
+            return hnd.columnCount(row);
+        }
+
+        /** */
+        Row createRow(IgniteTypeFactory typeFactory, List<RelDataType> 
fieldTypes) {

Review Comment:
   Looks like method is not used



##########
modules/calcite/src/test/sql/aggregate/aggregates/test_perfect_ht.test_ignore:
##########
@@ -25,15 +24,6 @@ NULL 1       1       1
 1997   12      1       1
 2001   30      1       1
 
-# use aggregates with destructors

Review Comment:
   Test cases should not be removed from test_ignore file until all test cases 
are green and we can remove test_ignore file entirely.
   
   In this test, looks like we can also implement ARRAY_AGG (by another ticket) 
as replacement for LIST aggregate function.



##########
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/sql/fun/IgniteStdSqlOperatorTable.java:
##########
@@ -83,6 +83,9 @@ public IgniteStdSqlOperatorTable() {
         register(SqlStdOperatorTable.ANY_VALUE);
         register(SqlStdOperatorTable.SINGLE_VALUE);
         register(SqlStdOperatorTable.FILTER);
+        register(SqlLibraryOperators.GROUP_CONCAT);

Review Comment:
   Let's also add primitive tests for these functions to `StdSqlOperatorsTest` 
(one test per function). And add new supported functions to `sql-calcite.adoc` 
file.



##########
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/agg/Accumulators.java:
##########
@@ -972,39 +1062,170 @@ private ComparableMinMax(boolean min, 
Function<IgniteTypeFactory, RelDataType> t
     }
 
     /** */
-    private static class DistinctAccumulator implements Accumulator {
+    private static class SortingAccumulator<Row> implements Accumulator<Row> {
         /** */
-        private final Accumulator acc;
+        private final transient Comparator<Row> cmp;
 
         /** */
-        private final Set<Object> set = new HashSet<>();
+        private final List<Row> list;
 
         /** */
-        private DistinctAccumulator(Supplier<Accumulator> accSup) {
-            this.acc = accSup.get();
+        private final Accumulator<Row> acc;
+
+        /**
+         * @param accSup Acc support.
+         * @param cmp Comparator.
+         */
+        private SortingAccumulator(Supplier<Accumulator<Row>> accSup, 
Comparator<Row> cmp) {
+            this.cmp = cmp;
+
+            list = new ArrayList<>();
+            acc = accSup.get();
         }
 
         /** {@inheritDoc} */
-        @Override public void add(Object... args) {
-            Object in = args[0];
+        @Override public void add(Row row) {
+            list.add(row);
+        }
 
-            if (in == null)
+        /** {@inheritDoc} */
+        @Override public void apply(Accumulator<Row> other) {
+            SortingAccumulator<Row> other1 = (SortingAccumulator<Row>)other;
+
+            list.addAll(other1.list);
+        }
+
+        /** {@inheritDoc} */
+        @Override public Object end() {
+            list.sort(cmp);
+
+            for (Row row : list)
+                acc.add(row);
+
+            return acc.end();
+        }
+
+        /** {@inheritDoc} */
+        @Override public List<RelDataType> argumentTypes(IgniteTypeFactory 
typeFactory) {
+            return acc.argumentTypes(typeFactory);
+        }
+
+        /** {@inheritDoc} */
+        @Override public RelDataType returnType(IgniteTypeFactory typeFactory) 
{
+            return acc.returnType(typeFactory);
+        }
+    }
+
+    /** */
+    private static class ListAggAccumulator<Row> extends 
AbstractAccumulator<Row> {
+        /** Default separator. */
+        private static final String DEFAULT_SEPARATOR = ",";
+
+        /** */
+        private final List<Row> list;
+
+        /** */
+        private final int sepIdx;
+
+        /** */
+        public ListAggAccumulator(AggregateCall call, RowHandler<Row> hnd) {
+            super(hnd);
+
+            sepIdx = call.getArgList().size() > 1 ? 1 : 0;
+
+            list = new ArrayList<>();
+        }
+
+        /** {@inheritDoc} */
+        @Override public void add(Row row) {
+            if (row == null || get(0, row) == null)
+                return;
+
+            list.add(row);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void apply(Accumulator<Row> other) {
+            ListAggAccumulator<Row> other0 = (ListAggAccumulator<Row>)other;
+
+            list.addAll(other0.list);
+        }
+
+        /** {@inheritDoc} */
+        @Override public Object end() {
+            if (list.isEmpty())
+                return null;
+
+            StringBuilder builder = new StringBuilder();
+
+            for (Row row: list) {
+                if (builder.length() != 0)
+                    builder.append(extractSeparator(row));
+
+                builder.append(Objects.toString(get(0, row)));
+            }
+
+            return builder.toString();
+        }
+
+        /** */
+        private String extractSeparator(Row row) {
+            if (sepIdx < 1 || columnCount(row) <= sepIdx)
+                return DEFAULT_SEPARATOR;
+
+            Object rawSep = get(sepIdx, row);
+
+            if (rawSep == null)
+                return DEFAULT_SEPARATOR;
+
+            return rawSep.toString();
+        }
+
+        /** {@inheritDoc} */
+        @Override public List<RelDataType> argumentTypes(IgniteTypeFactory 
typeFactory) {
+            return 
F.asList(typeFactory.createTypeWithNullability(typeFactory.createSqlType(VARCHAR),
 true),
+                
typeFactory.createTypeWithNullability(typeFactory.createSqlType(CHAR), true));
+        }
+
+        /** {@inheritDoc} */
+        @Override public RelDataType returnType(IgniteTypeFactory typeFactory) 
{
+            return 
typeFactory.createTypeWithNullability(typeFactory.createSqlType(VARCHAR), true);
+        }
+    }
+
+    /** */
+    private static class DistinctAccumulator<Row> extends 
AbstractAccumulator<Row> {
+        /** */
+        private final Accumulator<Row> acc;
+
+        /** */
+        private final Map<Object, Row> rows = new LinkedHashMap<>();
+
+        /** */
+        private DistinctAccumulator(Supplier<Accumulator<Row>> accSup, 
RowHandler<Row> hnd) {
+            super(hnd);
+            acc = accSup.get();
+        }
+
+        /** {@inheritDoc} */
+        @Override public void add(Row row) {
+            if (row == null || columnCount(row) == 0)

Review Comment:
   Shouldn't we check first value for `null` ( `|| get(0, row) == null` )?



##########
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/agg/Accumulators.java:
##########
@@ -46,177 +56,277 @@
  */
 public class Accumulators {
     /** */
-    public static Supplier<Accumulator> accumulatorFactory(AggregateCall call) 
{
-        if (!call.isDistinct())
-            return accumulatorFunctionFactory(call);
+    public static <Row> Supplier<Accumulator<Row>> 
accumulatorFactory(AggregateCall call, ExecutionContext<Row> ctx) {
+        Supplier<Accumulator<Row>> supplier = accumulatorFunctionFactory(call, 
ctx);
 
-        Supplier<Accumulator> fac = accumulatorFunctionFactory(call);
+        if (call.isDistinct())
+            return () -> new DistinctAccumulator<>(supplier, ctx.rowHandler());
 
-        return () -> new DistinctAccumulator(fac);
+        return supplier;
     }
 
     /** */
-    public static Supplier<Accumulator> 
accumulatorFunctionFactory(AggregateCall call) {
+    private static <Row> Supplier<Accumulator<Row>> accumulatorFunctionFactory(
+        AggregateCall call,
+        ExecutionContext<Row> ctx
+    ) {
+        RowHandler<Row> hnd = ctx.rowHandler();
+
         switch (call.getAggregation().getName()) {
             case "COUNT":
-                return LongCount.FACTORY;
+                return () -> new LongCount<>(hnd);
             case "AVG":
-                return avgFactory(call);
+                return avgFactory(call, hnd);
             case "SUM":
-                return sumFactory(call);
+                return sumFactory(call, hnd);
             case "$SUM0":
-                return sumEmptyIsZeroFactory(call);
+                return sumEmptyIsZeroFactory(call, hnd);
             case "MIN":
-                return minFactory(call);
+                return minFactory(call, hnd);
             case "MAX":
-                return maxFactory(call);
+                return maxFactory(call, hnd);
             case "SINGLE_VALUE":
-                return SingleVal.FACTORY;
+                return () -> new SingleVal<>(hnd);
             case "ANY_VALUE":
-                return AnyVal.FACTORY;
+                return () -> new AnyVal<>(hnd);
+            case "LISTAGG":
+                return listAggregateSupplier(call, ctx);
             default:
                 throw new AssertionError(call.getAggregation().getName());
         }
     }
 
     /** */
-    private static Supplier<Accumulator> avgFactory(AggregateCall call) {
+    private static <Row> Supplier<Accumulator<Row>> listAggregateSupplier(
+        AggregateCall call,
+        ExecutionContext<Row> ctx
+    ) {
+        RowHandler<Row> hnd = ctx.rowHandler();
+
+        Supplier<Accumulator<Row>> accSup = () -> new 
ListAggAccumulator<>(call, hnd);
+
+        RelCollation mapColl = getMappedCollation(call);
+
+        Comparator<Row> cmp;

Review Comment:
   Move `cmp` declaration inside `if` block



##########
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/agg/Accumulators.java:
##########
@@ -972,39 +1062,170 @@ private ComparableMinMax(boolean min, 
Function<IgniteTypeFactory, RelDataType> t
     }
 
     /** */
-    private static class DistinctAccumulator implements Accumulator {
+    private static class SortingAccumulator<Row> implements Accumulator<Row> {
         /** */
-        private final Accumulator acc;
+        private final transient Comparator<Row> cmp;
 
         /** */
-        private final Set<Object> set = new HashSet<>();
+        private final List<Row> list;
 
         /** */
-        private DistinctAccumulator(Supplier<Accumulator> accSup) {
-            this.acc = accSup.get();
+        private final Accumulator<Row> acc;
+
+        /**
+         * @param accSup Acc support.

Review Comment:
   `Acc support` -> `Accumulator supplier`



##########
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/agg/Accumulators.java:
##########
@@ -972,39 +1062,170 @@ private ComparableMinMax(boolean min, 
Function<IgniteTypeFactory, RelDataType> t
     }
 
     /** */
-    private static class DistinctAccumulator implements Accumulator {
+    private static class SortingAccumulator<Row> implements Accumulator<Row> {
         /** */
-        private final Accumulator acc;
+        private final transient Comparator<Row> cmp;
 
         /** */
-        private final Set<Object> set = new HashSet<>();
+        private final List<Row> list;
 
         /** */
-        private DistinctAccumulator(Supplier<Accumulator> accSup) {
-            this.acc = accSup.get();
+        private final Accumulator<Row> acc;
+
+        /**
+         * @param accSup Acc support.
+         * @param cmp Comparator.
+         */
+        private SortingAccumulator(Supplier<Accumulator<Row>> accSup, 
Comparator<Row> cmp) {
+            this.cmp = cmp;
+
+            list = new ArrayList<>();
+            acc = accSup.get();
         }
 
         /** {@inheritDoc} */
-        @Override public void add(Object... args) {
-            Object in = args[0];
+        @Override public void add(Row row) {
+            list.add(row);
+        }
 
-            if (in == null)
+        /** {@inheritDoc} */
+        @Override public void apply(Accumulator<Row> other) {
+            SortingAccumulator<Row> other1 = (SortingAccumulator<Row>)other;
+
+            list.addAll(other1.list);
+        }
+
+        /** {@inheritDoc} */
+        @Override public Object end() {
+            list.sort(cmp);
+
+            for (Row row : list)
+                acc.add(row);
+
+            return acc.end();
+        }
+
+        /** {@inheritDoc} */
+        @Override public List<RelDataType> argumentTypes(IgniteTypeFactory 
typeFactory) {
+            return acc.argumentTypes(typeFactory);
+        }
+
+        /** {@inheritDoc} */
+        @Override public RelDataType returnType(IgniteTypeFactory typeFactory) 
{
+            return acc.returnType(typeFactory);
+        }
+    }
+
+    /** */
+    private static class ListAggAccumulator<Row> extends 
AbstractAccumulator<Row> {
+        /** Default separator. */
+        private static final String DEFAULT_SEPARATOR = ",";
+
+        /** */
+        private final List<Row> list;
+
+        /** */
+        private final int sepIdx;
+
+        /** */
+        public ListAggAccumulator(AggregateCall call, RowHandler<Row> hnd) {
+            super(hnd);
+
+            sepIdx = call.getArgList().size() > 1 ? 1 : 0;

Review Comment:
   Looks like there should be some boolean flag, since currently we encode some 
condition to integer and later decode this to determine if default separator 
should be used.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to