alex-plekhanov commented on code in PR #12096:
URL: https://github.com/apache/ignite/pull/12096#discussion_r3434248275


##########
modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/WindowExecutionTest.java:
##########
@@ -508,13 +492,40 @@ public void testSumRowsAndRowNumberToUnboundedFollowing() 
{
             new Object[][] {{2, 1}, {2, 2}, {6, 1}, {6, 2}, {6, 3}, {3, 1}});
     }
 
+    /** row_number() over (partition by {0}). */
+    @Test
+    public void testStreamLargeInput() {
+        ExecutionContext<Object[]> ctx = executionContext(F.first(nodes()), 
UUID.randomUUID(), 0);
+        Object[][] exp = IntStream.range(0, 10000)
+            .mapToObj(i -> row(i + 1))
+            .toArray(Object[][]::new);
+
+        checkWindow(ctx, rowNumber(), true, createSeqInputNode(ctx, 10000), 
exp);
+    }
+
+    /** count(*) over (partition by {0} rows between unbounded prescending and 
unbounded following). */
+    @Test
+    public void testBufLargeInput() {
+        ExecutionContext<Object[]> ctx = executionContext(F.first(nodes()), 
UUID.randomUUID(), 0);
+        Object[][] exp = IntStream.range(0, 10000)
+            .mapToObj(i -> row(10000))
+            .toArray(Object[][]::new);
+
+        checkWindow(ctx, count(true, UNBOUNDED_PRECEDING, 
UNBOUNDED_FOLLOWING), false,
+            createSeqInputNode(ctx, 10000), exp);
+    }
+
     /** */
     private void checkWindow(Window.Group grp, boolean streaming, Object[][] 
expRes) {
-        Assert.assertEquals(streaming, WindowFunctions.streamable(grp));
-
         ExecutionContext<Object[]> ctx = executionContext(F.first(nodes()), 
UUID.randomUUID(), 0);
+        Node<Object[]> input = createSmallInputNode(ctx);
+        checkWindow(ctx, grp, streaming, input, expRes);
+    }
 
-        Node<Object[]> input = createInputNode(ctx);
+    /** */
+    private void checkWindow(ExecutionContext<Object[]> ctx, Window.Group grp, 
boolean streaming,

Review Comment:
   Checkstyle: Each parameter should be on it's own line (in case declaration 
doesn't fit into one line)



##########
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/WindowNode.java:
##########
@@ -115,11 +112,10 @@ else if (prevRow != null && partCmp != null && 
partCmp.compare(prevRow, row) !=
 
         prevRow = row;
 
-        if (waiting == 0 && requested > 0) {
-            waiting = IN_BUFFER_SIZE;
-
-            context().execute(() -> source().request(IN_BUFFER_SIZE), 
this::onError);
-        }
+        if (waiting == 0 && outBuf.isEmpty())
+            source().request(waiting = IN_BUFFER_SIZE);

Review Comment:
   There was async call to source().request:
   ```
   context().execute(() -> source().request(IN_BUFFER_SIZE), this::onError);
   ```
   Why it's changed to sync now?
   In case of non streaming partition I think it can lead to too deep recursion 
in some cases (when node push rows in the same call to request method).   



##########
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/window/WindowFunctionFrame.java:
##########
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.exec.exp.window;
+
+import java.util.Collections;
+import java.util.List;
+
+/** Rows frame for window function. */
+abstract class WindowFunctionFrame<Row> {

Review Comment:
   Not fixed:
   ```
   RangeWindowPartitionFrame<Row> extends WindowFunctionFrame<Row>
   RowWindowPartitionFrame<Row> extends WindowFunctionFrame<Row>
   ```
   



##########
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteWindow.java:
##########
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.rel;
+
+import java.util.List;
+import com.google.common.collect.ImmutableList;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelDistribution;
+import org.apache.calcite.rel.RelFieldCollation;
+import org.apache.calcite.rel.RelInput;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelWriter;
+import org.apache.calcite.rel.core.Window;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.util.Pair;
+import org.apache.calcite.util.Util;
+import 
org.apache.ignite.internal.processors.query.calcite.externalize.RelInputEx;
+import 
org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCostFactory;
+import 
org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
+import 
org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
+import org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils;
+import org.apache.ignite.internal.processors.query.calcite.util.Commons;
+import org.jetbrains.annotations.Nullable;
+
+import static 
org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCost.AGG_CALL_MEM_COST;
+import static 
org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCost.AVERAGE_FIELD_SIZE;
+import static 
org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCost.ROW_COMPARISON_COST;
+import static 
org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils.changeTraits;
+
+/**
+ * A relational expression representing a set of window aggregates.
+ *
+ * <p>A Window can handle several window aggregate functions, over several
+ * partitions, with pre- and post-expressions, and an optional post-filter.
+ * Each of the partitions is defined by a partition key (zero or more columns)
+ * and a range (logical or physical). The partitions expect the data to be
+ * sorted correctly on input to the relational expression.
+ *
+ * <p>Each {@link Window.Group} has a set of
+ * {@link org.apache.calcite.rex.RexOver} objects.
+ */
+public class IgniteWindow extends Window implements IgniteRel {
+
+    private final Group group;
+    private final boolean streaming;
+
+    /**  */
+    public IgniteWindow(
+        RelOptCluster cluster,
+        RelTraitSet traitSet,
+        RelNode input,
+        RelDataType rowType,
+        Group group,
+        boolean streaming
+    ) {
+        super(cluster, traitSet, input, ImmutableList.of(), rowType, 
ImmutableList.of(group));
+        this.group = group;
+        this.streaming = streaming;
+        assert !group.aggCalls.isEmpty();
+    }
+
+    /**  */
+    public IgniteWindow(RelInput input) {
+        this(input.getCluster(),
+            changeTraits(input, IgniteConvention.INSTANCE).getTraitSet(),
+            input.getInput(),
+            input.getRowType("rowType"),
+            ((RelInputEx)input).getWindowGroup("group"),
+            input.getBoolean("streaming", false));
+    }
+
+    /**  */
+    public Group getGroup() {
+        return group;
+    }
+
+    /**  */
+    public boolean isStreaming() {
+        return streaming;
+    }
+
+    /** {@inheritDoc} */
+    @Override public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
+        return new IgniteWindow(getCluster(), traitSet, sole(inputs), 
getRowType(), group, streaming);
+    }
+
+    /** {@inheritDoc} */
+    @Override public Window copy(List<RexLiteral> constants) {
+        assert constants.isEmpty();
+        return this;
+    }
+
+    /** {@inheritDoc} */
+    @Override public <T> T accept(IgniteRelVisitor<T> visitor) {
+        return visitor.visit(this);
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteRel clone(RelOptCluster cluster, List<IgniteRel> 
inputs) {
+        return new IgniteWindow(cluster, getTraitSet(), sole(inputs), 
getRowType(), group, streaming);
+    }
+
+    /** {@inheritDoc} */
+    @Override public RelWriter explainTerms(RelWriter pw) {
+        return pw
+            .input("input", getInput())
+            .item("rowType", getRowType())
+            .item("group", group)
+            .item("streaming", streaming);
+    }
+
+    /** {@inheritDoc} */
+    @Override public RelOptCost computeSelfCost(RelOptPlanner planner, 
RelMetadataQuery mq) {
+        IgniteCostFactory costFactory = 
(IgniteCostFactory)planner.getCostFactory();
+
+        int aggCnt = group.aggCalls.size();
+
+        double rowCnt = mq.getRowCount(getInput());
+        double cpuCost = rowCnt * ROW_COMPARISON_COST;
+        double memCost = (getRowType().getFieldCount() * AVERAGE_FIELD_SIZE + 
aggCnt * AGG_CALL_MEM_COST) * (streaming ? 1.0 : rowCnt);
+
+        RelOptCost cost =  costFactory.makeCost(rowCnt, cpuCost, 0, memCost, 
0);
+
+        // Distributed processing is more preferable than processing on the 
single node.
+        if 
(TraitUtils.distribution(traitSet).satisfies(IgniteDistributions.single()))
+            cost.plus(costFactory.makeTinyCost());
+
+        return cost;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Pair<RelTraitSet, List<RelTraitSet>> 
passThroughTraits(RelTraitSet required) {
+        RelTraitSet traits = passThroughOrDerivedTraits(required);
+        if (traits == null)
+            return null;
+
+        return Pair.of(traits, ImmutableList.of(traits));
+    }
+
+    /** {@inheritDoc} */
+    @Override public Pair<RelTraitSet, List<RelTraitSet>> 
deriveTraits(RelTraitSet childTraits, int childId) {
+        assert childId == 0;
+
+        RelTraitSet traits = passThroughOrDerivedTraits(childTraits);
+        if (traits == null)
+            return null;
+
+        return Pair.of(traits, ImmutableList.of(traits));
+    }
+
+
+    /**
+     * Propagates the trait set from the parent to the child, or derives it 
from the child node.
+     *
+     * <p>The Window node cannot independently satisfy any traits. Therefore:
+     * - Validate that collation and distribution traits are compatible with 
the Window node.
+     * - If they are not, replace them with suitable traits.
+     * - Request a new trait set from the input accordingly.
+     */
+    private @Nullable RelTraitSet passThroughOrDerivedTraits(RelTraitSet tgt) {
+        if (tgt.getConvention() != IgniteConvention.INSTANCE)
+            return null;
+
+        RelTraitSet traits = tgt;
+        RelCollation requiredCollation = TraitUtils.collation(tgt);
+        if (!satisfiesCollationSansGroupFields(requiredCollation)) {
+            traits = traits.replace(collation());

Review Comment:
   I've looked at the new implementation. Now it's more complicated and 
confusing (for example, it's confusing for me, why 
`satisfiesCollationSansGroupFields` is called twice, for `relCollation, 
requiredCollation` and reversed, for `requiredCollation, relCollation`). But 
I've tried to write some new tests to check it and looks like it works. 
   
   One note: maybe it's better to check grp.keys and grp.collation() instead of 
node collation (in case of some mess with node traits).
   
   I've tried to simplify it, but looks like it not simplified a lot. But, just 
in case, here is my solution, maybe you find some ideas useful:
   
   ```
       /** {@inheritDoc} */
       @Override public Pair<RelTraitSet, List<RelTraitSet>> 
passThroughTraits(RelTraitSet required) {
           if (required.getConvention() != IgniteConvention.INSTANCE)
               return null;
   
           if (!satisfiesDistribution(TraitUtils.distribution(required)))
               return null;
   
           RelCollation collation = 
passThroughCollation(required.getCollation());
   
           if (collation == null)
               return null;
   
           required.replace(collation);
   
           return Pair.of(required, ImmutableList.of(required));
       }
   
       /** {@inheritDoc} */
       @Override public Pair<RelTraitSet, List<RelTraitSet>> 
deriveTraits(RelTraitSet childTraits, int childId) {
           assert childId == 0;
   
           if (childTraits.getConvention() != IgniteConvention.INSTANCE)
               return null;
   
           if (!satisfiesDistribution(TraitUtils.distribution(childTraits)))
               childTraits = childTraits.replace(distribution());
   
           if (!satisfiesCollation(childTraits.getCollation()))
               childTraits = childTraits.replace(collation());
   
           return Pair.of(childTraits, ImmutableList.of(childTraits));
       }
   
       /** */
       private boolean satisfiesCollation(RelCollation derived) {
           int grpKeysSize = grp.keys.cardinality();
   
           if (derived.getFieldCollations().size() < grpKeysSize)
               return false;
   
           if 
(!grp.keys.equals(ImmutableBitSet.of(Util.first(derived.getKeys(), 
grpKeysSize))))
               return false;
   
           List<RelFieldCollation> notKeyFields = 
Util.skip(derived.getFieldCollations(), grpKeysSize);
   
           return Util.startsWith(notKeyFields, 
grp.collation().getFieldCollations());
       }
   
       /** */
       private RelCollation passThroughCollation(RelCollation required) {
           int grpKeysSize = grp.keys.cardinality();
   
           List<RelFieldCollation> requiredCollationFields = 
required.getFieldCollations();
           for (int i = 0; i < requiredCollationFields.size(); i++) {
               if (requiredCollationFields.get(i).getFieldIndex() >= 
input.getRowType().getFieldCount())
                   return null;
           }
   
           if (required.getFieldCollations().size() < grpKeysSize) {
               if (!grp.keys.contains(ImmutableBitSet.of(required.getKeys())))
                   return null;
   
               List<RelFieldCollation> fields = new 
ArrayList<>(required.getFieldCollations());
               ImmutableBitSet retainedKeys = 
grp.keys.rebuild().removeAll(ImmutableBitSet.of(required.getKeys())).build();
               for (int i : retainedKeys)
                   fields.add(TraitUtils.createFieldCollation(i));
   
               fields.addAll(grp.collation().getFieldCollations());
   
               return RelCollations.of(fields);
           }
           else {
               if 
(!grp.keys.contains(ImmutableBitSet.of(Util.first(required.getKeys(), 
grpKeysSize))))
                   return null;
   
               List<RelFieldCollation> notKeyFields = 
Util.skip(required.getFieldCollations(), grpKeysSize);
   
               if (Util.startsWith(notKeyFields, 
grp.collation().getFieldCollations()))
                   return required;
               else if (Util.startsWith(grp.collation().getFieldCollations(), 
notKeyFields)) {
                   return 
RelCollations.of(Commons.combine(required.getFieldCollations(),
                       Util.skip(grp.collation().getFieldCollations(), 
required.getFieldCollations().size() - grpKeysSize)));
               }
   
               return null;
           }
       }
   ```



##########
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteWindow.java:
##########
@@ -135,123 +137,159 @@ public boolean isStreaming() {
     }
 
     /** {@inheritDoc} */
-    @Override public RelOptCost computeSelfCost(RelOptPlanner planner, 
RelMetadataQuery mq) {
-        IgniteCostFactory costFactory = 
(IgniteCostFactory)planner.getCostFactory();
-
-        int aggCnt = grp.aggCalls.size();
-
-        double rowCnt = mq.getRowCount(getInput());
-        double cpuCost = rowCnt * ROW_COMPARISON_COST;
-        double memCost = (getRowType().getFieldCount() * AVERAGE_FIELD_SIZE + 
aggCnt * AGG_CALL_MEM_COST) * (streaming ? 1.0 : rowCnt);
-
-        RelOptCost cost = costFactory.makeCost(rowCnt, cpuCost, 0, memCost, 0);
-
-        // Distributed processing is more preferable than processing on the 
single node.
-        if 
(TraitUtils.distribution(traitSet).satisfies(IgniteDistributions.single()))
-            cost = cost.plus(costFactory.makeTinyCost());
+    @Override public Pair<RelTraitSet, List<RelTraitSet>> 
passThroughTraits(RelTraitSet required) {
+        if (required.getConvention() != IgniteConvention.INSTANCE)
+            return null;
 
-        return cost;
-    }
+        if (!satisfiesDistribution(TraitUtils.distribution(required)))
+            return null;
 
-    /** {@inheritDoc} */
-    @Override public Pair<RelTraitSet, List<RelTraitSet>> 
passThroughTraits(RelTraitSet required) {
-        RelTraitSet traits = passThroughOrDerivedTraits(required, true);
-        if (traits == null)
+        RelCollation requiredCollation = TraitUtils.collation(required);
+        RelCollation relCollation = TraitUtils.collation(traitSet);
+        if (satisfiesCollationSansGroupFields(relCollation, requiredCollation))
+            required = 
required.replace(adjustGroupFieldsCollation(relCollation, requiredCollation));
+        else if (satisfiesCollationSansGroupFields(requiredCollation, 
relCollation))
+            required = required.replace(truncateCollation(requiredCollation));
+        else
             return null;
 
-        return Pair.of(traits, ImmutableList.of(traits));
+        return Pair.of(required, ImmutableList.of(required));
     }
 
     /** {@inheritDoc} */
     @Override public Pair<RelTraitSet, List<RelTraitSet>> 
deriveTraits(RelTraitSet childTraits, int childId) {
         assert childId == 0;
 
-        RelTraitSet traits = passThroughOrDerivedTraits(childTraits, false);
-        if (traits == null)
+        if (childTraits.getConvention() != IgniteConvention.INSTANCE)
+            return null;
+
+        RelCollation childCollation = TraitUtils.collation(childTraits);
+        RelCollation relCollation = TraitUtils.collation(traitSet);
+        if (satisfiesCollationSansGroupFields(relCollation, childCollation))
+            childTraits = 
childTraits.replace(adjustGroupFieldsCollation(relCollation, childCollation));
+        else if (!satisfiesCollationSansGroupFields(childCollation, 
relCollation))
             return null;
 
-        return Pair.of(traits, ImmutableList.of(traits));
+        // If a child node has the appropriate collation but not the required 
distribution
+        // (e.g., a randomly distributed table that has the index we need),
+        // we can use its collation to eliminate extra sorting.
+        // However, we must replace the distribution with current window one.
+        if (!satisfiesDistribution(TraitUtils.distribution(childTraits)))
+            childTraits = childTraits.replace(distribution());
+
+        return Pair.of(childTraits, ImmutableList.of(childTraits));
     }
 
-    /**
-     * Propagates the trait set from the parent to the child, or derives it 
from the child node.
-     *
-     * <p>The Window node cannot independently satisfy any traits. Therefore:
-     * - Validate that collation and distribution traits are compatible with 
the Window node.
-     * - If they are not, replace them with suitable traits.
-     * - Request a new trait set from the input accordingly.
-     */
-    private @Nullable RelTraitSet passThroughOrDerivedTraits(RelTraitSet 
target, boolean passThrough) {
-        if (target.getConvention() != IgniteConvention.INSTANCE)
-            return null;
+    /** {@inheritDoc} */
+    @Override public RelOptCost computeSelfCost(RelOptPlanner planner, 
RelMetadataQuery mq) {
+        IgniteCostFactory costFactory = 
(IgniteCostFactory)planner.getCostFactory();
 
-        RelTraitSet traits = target;
-        RelCollation requiredCollation = TraitUtils.collation(target);
-        if (!satisfiesCollationSansGroupFields(requiredCollation))
-            traits = traits.replace(collation());
-        else if (passThrough) {
-            // In case of pass through, required collation can use fields 
outside of input row type.
-            // So, we should truncate required collation to input row type.
-            // We do not need any additional range checks, since current 
collation keys is a prefix to required collation keys.
-            // Therefore, only additional keys in suffix can be removed here.
-            List<RelFieldCollation> requiredCollationFields = 
requiredCollation.getFieldCollations();
-            for (int i = 0; i < requiredCollationFields.size(); i++) {
-                if (requiredCollationFields.get(i).getFieldIndex() >= 
input.getRowType().getFieldCount()) {
-                    traits = 
traits.replace(RelCollations.of(requiredCollationFields.subList(0, i)));
-                    break;
-                }
-            }
-        }
+        int aggCnt = grp.aggCalls.size();
 
-        IgniteDistribution distribution = TraitUtils.distribution(target);
-        if (!satisfiesDistribution(distribution))
-            traits = traits.replace(distribution());
+        double rowCnt = mq.getRowCount(getInput());
+        double cpuCost = rowCnt * ROW_COMPARISON_COST;
+        double memCost = (getRowType().getFieldCount() * AVERAGE_FIELD_SIZE + 
aggCnt * AGG_CALL_MEM_COST) * (streaming ? 1.0 : rowCnt);
 
-        if (traits == traitSet)
-            // New traits equal to current traits of window.
-            // No need to pass throught or derive any.
-            return null;
+        RelOptCost cost = costFactory.makeCost(rowCnt, cpuCost, 0, memCost, 0);
 
-        return traits;
+        // Distributed processing is more preferable than processing on the 
single node.
+        if 
(TraitUtils.distribution(traitSet).satisfies(IgniteDistributions.single()))
+            cost = cost.plus(costFactory.makeTinyCost());
+
+        return cost;
     }
 
-    /** Check input distribution satisfies collation of this window. */
-    private boolean satisfiesDistribution(IgniteDistribution 
desiredDistribution) {
-        if (desiredDistribution.satisfies(IgniteDistributions.single()) || 
desiredDistribution.function().correlated())
+    /** Check input distribution satisfies distribution of this window. */
+    private boolean satisfiesDistribution(IgniteDistribution required) {
+        if (required.satisfies(IgniteDistributions.single()) || 
required.function().correlated())
             return true;
 
-        if (desiredDistribution.getType() == 
RelDistribution.Type.HASH_DISTRIBUTED)
-            return 
grp.keys.contains(ImmutableBitSet.of(desiredDistribution.getKeys()));
+        if (required.getType() == RelDistribution.Type.HASH_DISTRIBUTED)
+            return grp.keys.contains(ImmutableBitSet.of(required.getKeys()));
 
         return false;
     }
 
     /**
-     * Check input collation satisfies collation of this window.
-     * - Collations field indicies of the window should be a prefix for 
desired collation.
+     * Check left collation satisfies right one.
+     * - Collations field indicies of the right should be a prefix for left 
collation.
      * - Group fields sort direction can be changed to desired collation.
      * - Order fields sort direction should be the same as in desired 
collation.
      */
-    private boolean satisfiesCollationSansGroupFields(RelCollation 
desiredCollation) {
-        RelCollation collation = collation();
-        if (desiredCollation.satisfies(collation))
+    private boolean satisfiesCollationSansGroupFields(RelCollation left, 
RelCollation right) {
+        if (left.satisfies(right))
             return true;
 
-        if (desiredCollation.getFieldCollations().size() < 
collation.getFieldCollations().size())
+        int leftFldCnt = left.getFieldCollations().size();
+        int rightFldCnt = right.getFieldCollations().size();
+        if (leftFldCnt < rightFldCnt)
             return false;
 
         int grpKeysSize = grp.keys.cardinality();
 
+        assert leftFldCnt >= grpKeysSize || rightFldCnt >= grpKeysSize;
+
         // Check group keys (collation field order and direction meaningless).
         // Since window collation starts with group keys with 'default' 
sorting direction,
         // desired collation should start with same fields in any order / with 
any direction.
-        ImmutableBitSet desiredGrpFields = 
ImmutableBitSet.of(Util.first(desiredCollation.getKeys(), grpKeysSize));
-        if (!desiredGrpFields.equals(grp.keys))
+        int rightGrpFldsCnt = Math.min(grpKeysSize, rightFldCnt);
+        ImmutableBitSet leftGrpFlds = 
ImmutableBitSet.of(Util.first(left.getKeys(), grpKeysSize));
+        ImmutableBitSet rightGrpFlds = 
ImmutableBitSet.of(Util.first(right.getKeys(), rightGrpFldsCnt));
+        if (!leftGrpFlds.contains(rightGrpFlds))
             return false;
+        else if (grpKeysSize >= rightFldCnt)
+            // Right collation fiels in group keys only
+            return true;
 
         // Check remaining collation (collation field order and direction 
meaningfull).
-        List<RelFieldCollation> desiredFieldCollations = 
Util.skip(desiredCollation.getFieldCollations(), grpKeysSize);
-        List<RelFieldCollation> fieldCollations = 
Util.skip(collation.getFieldCollations(), grpKeysSize);
-        return Util.startsWith(desiredFieldCollations, fieldCollations);
+        List<RelFieldCollation> leftFldCollations = 
Util.skip(left.getFieldCollations(), grpKeysSize);
+        List<RelFieldCollation> rightFldCollations = 
Util.skip(right.getFieldCollations(), grpKeysSize);
+        return Util.startsWith(rightFldCollations, leftFldCollations);
+    }
+
+    /** */
+    private RelCollation adjustGroupFieldsCollation(RelCollation relCollation, 
RelCollation requiredCollation) {
+        // Current collation satisfies required collation, but group fields in 
prefix may have invalid field collation.
+        // So, we should replace it with field collation from required.
+        List<RelFieldCollation> fldCollations = new 
ArrayList<>(relCollation.getFieldCollations().size());
+
+        int grpFldCnt = grp.keys.cardinality();
+        IntMap<RelFieldCollation> currGrpFldCollations = new 
IntHashMap<>(grpFldCnt);
+        for (int i = 0; i < grpFldCnt; i++) {
+            RelFieldCollation grpFldCollation = 
relCollation.getFieldCollations().get(i);
+            currGrpFldCollations.put(grpFldCollation.getFieldIndex(), 
grpFldCollation);
+        }
+
+        int requiredGrpFldCnt = 
Math.min(requiredCollation.getFieldCollations().size(), grpFldCnt);
+        for (int i = 0; i < requiredGrpFldCnt; i++) {
+            RelFieldCollation requiredGrpFldCollation = 
requiredCollation.getFieldCollations().get(i);
+            fldCollations.add(requiredGrpFldCollation);
+
+            assert 
currGrpFldCollations.containsKey(requiredGrpFldCollation.getFieldIndex());
+            
currGrpFldCollations.remove(requiredGrpFldCollation.getFieldIndex());
+        }
+
+        // Add remaining group fields collation.
+        fldCollations.addAll(currGrpFldCollations.values());
+
+        // Add remaining fields collation.
+        fldCollations.addAll(Util.skip(relCollation.getFieldCollations(), 
grpFldCnt));
+
+        return RelCollations.of(fldCollations);
+    }
+
+    /** */
+    private RelCollation truncateCollation(RelCollation requiredCollation) {
+        // In case of pass through, required collation can use fields outside 
of input row type.
+        // So, we should truncate required collation to input row type.
+        // We do not need any additional range checks, since current collation 
keys is a prefix to required collation keys.
+        // Therefore, only additional keys in suffix can be removed here.
+        List<RelFieldCollation> requiredCollationFields = 
requiredCollation.getFieldCollations();
+        for (int i = 0; i < requiredCollationFields.size(); i++) {
+            if (requiredCollationFields.get(i).getFieldIndex() >= 
input.getRowType().getFieldCount()) {

Review Comment:
   Redundant braces for one line statement



##########
modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/WindowPlannerTest.java:
##########
@@ -1,4 +1,5 @@
 
+

Review Comment:
   Redundant lines



##########
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/sql/fun/IgniteStdSqlOperatorTable.java:
##########
@@ -317,5 +317,18 @@ public IgniteStdSqlOperatorTable() {
         register(SqlStdOperatorTable.BIT_AND);
         register(SqlStdOperatorTable.BIT_OR);
         register(SqlStdOperatorTable.BIT_XOR);
+
+        // Window specific operations
+        register(SqlStdOperatorTable.ROW_NUMBER);

Review Comment:
   Smoke tests still not added. We need one test for each window function 
operand in StdSqlOperatorsTest. This test mostly used to check that all 
necessary libraries are in classpath on ignite release stage.
   Something like
   ```
       public void testWindowFunctions() {
           assertExpression("ROW_NUMBER() OVER()").returns(1).check();
           // other functions
       }
   ```
   Will be enough.



##########
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/externalize/RelJson.java:
##########
@@ -122,18 +122,18 @@
  */
 @SuppressWarnings({"rawtypes", "unchecked"})
 class RelJson {
-    /** */
+    /**  */

Review Comment:
   Still not fixed



##########
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteWindow.java:
##########
@@ -135,123 +137,159 @@ public boolean isStreaming() {
     }
 
     /** {@inheritDoc} */
-    @Override public RelOptCost computeSelfCost(RelOptPlanner planner, 
RelMetadataQuery mq) {
-        IgniteCostFactory costFactory = 
(IgniteCostFactory)planner.getCostFactory();
-
-        int aggCnt = grp.aggCalls.size();
-
-        double rowCnt = mq.getRowCount(getInput());
-        double cpuCost = rowCnt * ROW_COMPARISON_COST;
-        double memCost = (getRowType().getFieldCount() * AVERAGE_FIELD_SIZE + 
aggCnt * AGG_CALL_MEM_COST) * (streaming ? 1.0 : rowCnt);
-
-        RelOptCost cost = costFactory.makeCost(rowCnt, cpuCost, 0, memCost, 0);
-
-        // Distributed processing is more preferable than processing on the 
single node.
-        if 
(TraitUtils.distribution(traitSet).satisfies(IgniteDistributions.single()))
-            cost = cost.plus(costFactory.makeTinyCost());
+    @Override public Pair<RelTraitSet, List<RelTraitSet>> 
passThroughTraits(RelTraitSet required) {
+        if (required.getConvention() != IgniteConvention.INSTANCE)
+            return null;
 
-        return cost;
-    }
+        if (!satisfiesDistribution(TraitUtils.distribution(required)))
+            return null;
 
-    /** {@inheritDoc} */
-    @Override public Pair<RelTraitSet, List<RelTraitSet>> 
passThroughTraits(RelTraitSet required) {
-        RelTraitSet traits = passThroughOrDerivedTraits(required, true);
-        if (traits == null)
+        RelCollation requiredCollation = TraitUtils.collation(required);
+        RelCollation relCollation = TraitUtils.collation(traitSet);
+        if (satisfiesCollationSansGroupFields(relCollation, requiredCollation))
+            required = 
required.replace(adjustGroupFieldsCollation(relCollation, requiredCollation));
+        else if (satisfiesCollationSansGroupFields(requiredCollation, 
relCollation))
+            required = required.replace(truncateCollation(requiredCollation));
+        else
             return null;
 
-        return Pair.of(traits, ImmutableList.of(traits));
+        return Pair.of(required, ImmutableList.of(required));
     }
 
     /** {@inheritDoc} */
     @Override public Pair<RelTraitSet, List<RelTraitSet>> 
deriveTraits(RelTraitSet childTraits, int childId) {
         assert childId == 0;
 
-        RelTraitSet traits = passThroughOrDerivedTraits(childTraits, false);
-        if (traits == null)
+        if (childTraits.getConvention() != IgniteConvention.INSTANCE)
+            return null;
+
+        RelCollation childCollation = TraitUtils.collation(childTraits);
+        RelCollation relCollation = TraitUtils.collation(traitSet);
+        if (satisfiesCollationSansGroupFields(relCollation, childCollation))
+            childTraits = 
childTraits.replace(adjustGroupFieldsCollation(relCollation, childCollation));
+        else if (!satisfiesCollationSansGroupFields(childCollation, 
relCollation))
             return null;
 
-        return Pair.of(traits, ImmutableList.of(traits));
+        // If a child node has the appropriate collation but not the required 
distribution
+        // (e.g., a randomly distributed table that has the index we need),
+        // we can use its collation to eliminate extra sorting.
+        // However, we must replace the distribution with current window one.
+        if (!satisfiesDistribution(TraitUtils.distribution(childTraits)))
+            childTraits = childTraits.replace(distribution());
+
+        return Pair.of(childTraits, ImmutableList.of(childTraits));
     }
 
-    /**
-     * Propagates the trait set from the parent to the child, or derives it 
from the child node.
-     *
-     * <p>The Window node cannot independently satisfy any traits. Therefore:
-     * - Validate that collation and distribution traits are compatible with 
the Window node.
-     * - If they are not, replace them with suitable traits.
-     * - Request a new trait set from the input accordingly.
-     */
-    private @Nullable RelTraitSet passThroughOrDerivedTraits(RelTraitSet 
target, boolean passThrough) {
-        if (target.getConvention() != IgniteConvention.INSTANCE)
-            return null;
+    /** {@inheritDoc} */
+    @Override public RelOptCost computeSelfCost(RelOptPlanner planner, 
RelMetadataQuery mq) {
+        IgniteCostFactory costFactory = 
(IgniteCostFactory)planner.getCostFactory();
 
-        RelTraitSet traits = target;
-        RelCollation requiredCollation = TraitUtils.collation(target);
-        if (!satisfiesCollationSansGroupFields(requiredCollation))
-            traits = traits.replace(collation());
-        else if (passThrough) {
-            // In case of pass through, required collation can use fields 
outside of input row type.
-            // So, we should truncate required collation to input row type.
-            // We do not need any additional range checks, since current 
collation keys is a prefix to required collation keys.
-            // Therefore, only additional keys in suffix can be removed here.
-            List<RelFieldCollation> requiredCollationFields = 
requiredCollation.getFieldCollations();
-            for (int i = 0; i < requiredCollationFields.size(); i++) {
-                if (requiredCollationFields.get(i).getFieldIndex() >= 
input.getRowType().getFieldCount()) {
-                    traits = 
traits.replace(RelCollations.of(requiredCollationFields.subList(0, i)));
-                    break;
-                }
-            }
-        }
+        int aggCnt = grp.aggCalls.size();
 
-        IgniteDistribution distribution = TraitUtils.distribution(target);
-        if (!satisfiesDistribution(distribution))
-            traits = traits.replace(distribution());
+        double rowCnt = mq.getRowCount(getInput());
+        double cpuCost = rowCnt * ROW_COMPARISON_COST;
+        double memCost = (getRowType().getFieldCount() * AVERAGE_FIELD_SIZE + 
aggCnt * AGG_CALL_MEM_COST) * (streaming ? 1.0 : rowCnt);
 
-        if (traits == traitSet)
-            // New traits equal to current traits of window.
-            // No need to pass throught or derive any.
-            return null;
+        RelOptCost cost = costFactory.makeCost(rowCnt, cpuCost, 0, memCost, 0);
 
-        return traits;
+        // Distributed processing is more preferable than processing on the 
single node.
+        if 
(TraitUtils.distribution(traitSet).satisfies(IgniteDistributions.single()))
+            cost = cost.plus(costFactory.makeTinyCost());
+
+        return cost;
     }
 
-    /** Check input distribution satisfies collation of this window. */
-    private boolean satisfiesDistribution(IgniteDistribution 
desiredDistribution) {
-        if (desiredDistribution.satisfies(IgniteDistributions.single()) || 
desiredDistribution.function().correlated())
+    /** Check input distribution satisfies distribution of this window. */
+    private boolean satisfiesDistribution(IgniteDistribution required) {
+        if (required.satisfies(IgniteDistributions.single()) || 
required.function().correlated())
             return true;
 
-        if (desiredDistribution.getType() == 
RelDistribution.Type.HASH_DISTRIBUTED)
-            return 
grp.keys.contains(ImmutableBitSet.of(desiredDistribution.getKeys()));
+        if (required.getType() == RelDistribution.Type.HASH_DISTRIBUTED)
+            return grp.keys.contains(ImmutableBitSet.of(required.getKeys()));
 
         return false;
     }
 
     /**
-     * Check input collation satisfies collation of this window.
-     * - Collations field indicies of the window should be a prefix for 
desired collation.
+     * Check left collation satisfies right one.
+     * - Collations field indicies of the right should be a prefix for left 
collation.
      * - Group fields sort direction can be changed to desired collation.
      * - Order fields sort direction should be the same as in desired 
collation.
      */
-    private boolean satisfiesCollationSansGroupFields(RelCollation 
desiredCollation) {
-        RelCollation collation = collation();
-        if (desiredCollation.satisfies(collation))
+    private boolean satisfiesCollationSansGroupFields(RelCollation left, 
RelCollation right) {
+        if (left.satisfies(right))
             return true;
 
-        if (desiredCollation.getFieldCollations().size() < 
collation.getFieldCollations().size())
+        int leftFldCnt = left.getFieldCollations().size();
+        int rightFldCnt = right.getFieldCollations().size();
+        if (leftFldCnt < rightFldCnt)
             return false;
 
         int grpKeysSize = grp.keys.cardinality();
 
+        assert leftFldCnt >= grpKeysSize || rightFldCnt >= grpKeysSize;
+
         // Check group keys (collation field order and direction meaningless).
         // Since window collation starts with group keys with 'default' 
sorting direction,
         // desired collation should start with same fields in any order / with 
any direction.
-        ImmutableBitSet desiredGrpFields = 
ImmutableBitSet.of(Util.first(desiredCollation.getKeys(), grpKeysSize));
-        if (!desiredGrpFields.equals(grp.keys))
+        int rightGrpFldsCnt = Math.min(grpKeysSize, rightFldCnt);
+        ImmutableBitSet leftGrpFlds = 
ImmutableBitSet.of(Util.first(left.getKeys(), grpKeysSize));
+        ImmutableBitSet rightGrpFlds = 
ImmutableBitSet.of(Util.first(right.getKeys(), rightGrpFldsCnt));
+        if (!leftGrpFlds.contains(rightGrpFlds))
             return false;
+        else if (grpKeysSize >= rightFldCnt)
+            // Right collation fiels in group keys only

Review Comment:
   fiels -> fields
   Point at the end.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to