vldpyatkov commented on code in PR #12096:
URL: https://github.com/apache/ignite/pull/12096#discussion_r3000422560


##########
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/window/WindowFunctions.java:
##########
@@ -0,0 +1,530 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.exec.exp.window;
+
+import java.util.List;
+import java.util.Set;
+import java.util.function.Supplier;
+import com.google.common.collect.ImmutableList;
+import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.rel.core.Window;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import 
org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
+import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler;
+import 
org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
+import org.apache.ignite.internal.util.typedef.F;
+
+import static org.apache.calcite.sql.type.SqlTypeName.ANY;
+import static org.apache.calcite.sql.type.SqlTypeName.BIGINT;
+import static org.apache.calcite.sql.type.SqlTypeName.DOUBLE;
+import static org.apache.calcite.sql.type.SqlTypeName.INTEGER;
+
+/** */
+public final class WindowFunctions {
+
+    /** Check window group can be processed with streaming partition. */
+    public static boolean streamable(Window.Group group) {
+        // Can execute window streaming in case:
+        // - group aggs does not contain operators can access whole partition.
+        if (group.aggCalls.stream().anyMatch(it -> 
BUFFERING_FUNCTIONS.contains(it.op)))
+            return false;
+
+        // - group aggs contains only ROW_NUMBER, RANK, DENSE_RANK operators
+        if (group.aggCalls.stream().allMatch(it -> 
STREAMING_FUNCTIONS.contains(it.op)))
+            return true;
+
+        // group frame in 'ROWS BETWEEN UNBOUNDED PRESCENDING AND CURRENT ROW'
+        //noinspection RedundantIfStatement
+        if (group.isRows && group.lowerBound.isUnbounded() && 
group.upperBound.isCurrentRow())
+            return true;
+
+        return false;
+    }
+
+    /** Determines if the specified {@link SqlOperator} supports sreaming 
execution. */
+    static boolean isStreamingFunction(SqlOperator op) {
+        return STREAMING_FUNCTIONS.contains(op);
+    }
+
+    /** Window functions, which definitly supports sreaming execution. */
+    private static final Set<SqlOperator> STREAMING_FUNCTIONS = Set.of(
+        SqlStdOperatorTable.ROW_NUMBER,
+        SqlStdOperatorTable.RANK,
+        SqlStdOperatorTable.DENSE_RANK
+    );
+
+    /** Window functions, which definitly requires buffering. */
+    private static final Set<SqlOperator> BUFFERING_FUNCTIONS = Set.of(
+        SqlStdOperatorTable.PERCENT_RANK,
+        SqlStdOperatorTable.CUME_DIST,
+        SqlStdOperatorTable.FIRST_VALUE,
+        SqlStdOperatorTable.LAST_VALUE,
+        SqlStdOperatorTable.LAG,
+        SqlStdOperatorTable.LEAD,
+        SqlStdOperatorTable.NTILE,
+        SqlStdOperatorTable.NTH_VALUE
+    );
+
+    /** Determines if the specified SqlOperator is a window function call. */
+    static boolean isWindowFunction(AggregateCall call) {
+        return STREAMING_FUNCTIONS.contains(call.getAggregation())
+            || BUFFERING_FUNCTIONS.contains(call.getAggregation());
+    }
+
+    /** */
+    static <Row> Supplier<WindowFunction<Row>> windowFunctionFactory(
+        AggregateCall call,
+        ExecutionContext<Row> ctx
+    ) {
+        RowHandler<Row> hnd = ctx.rowHandler();
+        switch (call.getAggregation().getName()) {
+            case "ROW_NUMBER":
+                return () -> new RowNumber<>(hnd, call);
+            case "RANK":
+                return () -> new Rank<>(hnd, call);
+            case "DENSE_RANK":
+                return () -> new DenseRank<>(hnd, call);
+            case "PERCENT_RANK":
+                return () -> new PercentRank<>(hnd, call);
+            case "CUME_DIST":
+                return () -> new CumeDist<>(hnd, call);
+            case "LAG":
+                return () -> new Lag<>(hnd, call);
+            case "LEAD":
+                return () -> new Lead<>(hnd, call);
+            case "FIRST_VALUE":
+                return () -> new FirstValue<>(hnd, call);
+            case "LAST_VALUE":
+                return () -> new LastValue<>(hnd, call);
+            case "NTILE":
+                return () -> new NTile<>(hnd, call);
+            case "NTH_VALUE":
+                return () -> new NthValue<>(hnd, call);
+            default:
+                throw new AssertionError(call.getAggregation().getName());
+        }
+    }
+
+    /** */
+    private abstract static class AbstractWindowFunction<Row> {
+
+        /** */
+        private final RowHandler<Row> hnd;
+
+        /** */
+        private final AggregateCall aggCall;
+
+        /** */
+        private AbstractWindowFunction(RowHandler<Row> hnd, AggregateCall 
aggCall) {
+            this.hnd = hnd;
+            this.aggCall = aggCall;
+        }
+
+        /** */
+        <T> T get(int idx, Row row) {
+            assert idx < arguments().size() : "idx=" + idx + "; arguments=" + 
arguments();
+
+            return (T)hnd.get(arguments().get(idx), row);
+        }
+
+        /** */
+        protected AggregateCall aggregateCall() {
+            return aggCall;
+        }
+
+        /** */
+        protected List<Integer> arguments() {
+            return aggCall.getArgList();
+        }
+
+        /** */
+        int columnCount(Row row) {
+            return hnd.columnCount(row);
+        }
+    }
+
+    /** */
+    private abstract static class AbstractLagLeadWindowFunction<Row> extends 
AbstractWindowFunction<Row> implements WindowFunction<Row> {
+        /** */
+        private AbstractLagLeadWindowFunction(RowHandler<Row> hnd, 
AggregateCall aggCall) {
+            super(hnd, aggCall);
+        }
+
+        /** */
+        protected int getOffset(Row row) {
+            if (arguments().size() > 1)
+                return get(1, row);
+            else
+                return 1;
+        }
+
+        /** */
+        protected Object getDefault(Row row) {
+            if (arguments().size() > 2)
+                return get(2, row);
+            else
+                return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Object call(Row row, int rowIdx, int peerIdx, 
WindowFunctionFrame<Row> frame) {
+            int offset = getOffset(row);
+            int idx = applyOffset(rowIdx, offset);
+            if (idx < 0 || idx >= frame.partitionSize())
+                return getDefault(row);
+            else {
+                Row offsetRow = frame.get(idx);
+                Object val = get(0, offsetRow);
+                if (val == null) {

Review Comment:
   Based on the description of LAG/LEAD function (and other database behavior), 
we have to return a value even if it is NULL. A default value returns only in 
the case when a row does not exist.
   ```
   statement ok
   CREATE TABLE t_lag_lead(id INTEGER, val INTEGER);
   
   statement ok
   INSERT INTO t_lag_lead VALUES (1, 10), (2, NULL), (3, 30);
   
   query IIII
   SELECT id, val,
       LAG(val, 1, 999) OVER (ORDER BY id),
       LEAD(val, 1, 999) OVER (ORDER BY id)
   FROM t_lag_lead
   ORDER BY id;
   ----
   1    10      999     NULL
   2    NULL    10      30
   3    30      NULL    999
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to