alex-plekhanov commented on code in PR #12096:
URL: https://github.com/apache/ignite/pull/12096#discussion_r2225170086


##########
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/logical/ProjectWindowTransposeRule.java:
##########
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.rule.logical;
+
+import java.util.ArrayList;
+import java.util.List;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelRule;
+import org.apache.calcite.rel.RelCollations;
+import org.apache.calcite.rel.RelFieldCollation;
+import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rel.core.Window;
+import org.apache.calcite.rel.logical.LogicalProject;
+import org.apache.calcite.rel.logical.LogicalWindow;
+import org.apache.calcite.rel.rules.ProjectRemoveRule;
+import org.apache.calcite.rel.rules.TransformationRule;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexShuttle;
+import org.apache.calcite.sql.SqlAggFunction;
+import org.apache.calcite.util.BitSets;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.immutables.value.Value;
+
+/**
+ * Fixed copy of {@link 
org.apache.calcite.rel.rules.ProjectWindowTransposeRule}.
+ * The original rule logic is broken: it skips input refs in window boundaries.
+ * This issue fixed in calcite 1.39.0, so this rule can be replaced with the 
original after an update.

Review Comment:
   We are on 1.40 now and this rule can be replaced to original.



##########
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/WindowConverterRule.java:
##########
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.rule;
+
+import java.util.ArrayList;
+import java.util.List;
+import com.google.common.collect.ImmutableList;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.PhysicalNode;
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Window;
+import org.apache.calcite.rel.logical.LogicalWindow;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rel.type.RelRecordType;
+import org.apache.calcite.sql.SqlAggFunction;
+import 
org.apache.ignite.internal.processors.query.calcite.exec.exp.window.WindowFunctions;
+import 
org.apache.ignite.internal.processors.query.calcite.rel.IgniteConvention;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteWindow;
+import 
org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
+import org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+/**  */
+public class WindowConverterRule extends 
AbstractIgniteConverterRule<LogicalWindow> {
+    /**  */
+    public static final RelOptRule INSTANCE = new WindowConverterRule();
+
+    /**  */
+    private WindowConverterRule() {
+        super(LogicalWindow.class, "WindowConverterRule");
+    }
+
+    /**
+     * {@inheritDoc}
+     */

Review Comment:
   Use one line comment: `/** {@inheritDoc} */`



##########
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/WindowConverterRule.java:
##########
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.rule;
+
+import java.util.ArrayList;
+import java.util.List;
+import com.google.common.collect.ImmutableList;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.PhysicalNode;
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Window;
+import org.apache.calcite.rel.logical.LogicalWindow;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rel.type.RelRecordType;
+import org.apache.calcite.sql.SqlAggFunction;
+import 
org.apache.ignite.internal.processors.query.calcite.exec.exp.window.WindowFunctions;
+import 
org.apache.ignite.internal.processors.query.calcite.rel.IgniteConvention;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteWindow;
+import 
org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
+import org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+/**  */
+public class WindowConverterRule extends 
AbstractIgniteConverterRule<LogicalWindow> {
+    /**  */
+    public static final RelOptRule INSTANCE = new WindowConverterRule();
+
+    /**  */

Review Comment:
   Redundant space in comment, use `/** */`



##########
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/WindowConverterRule.java:
##########
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.rule;
+
+import java.util.ArrayList;
+import java.util.List;
+import com.google.common.collect.ImmutableList;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.PhysicalNode;
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Window;
+import org.apache.calcite.rel.logical.LogicalWindow;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rel.type.RelRecordType;
+import org.apache.calcite.sql.SqlAggFunction;
+import 
org.apache.ignite.internal.processors.query.calcite.exec.exp.window.WindowFunctions;
+import 
org.apache.ignite.internal.processors.query.calcite.rel.IgniteConvention;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteWindow;
+import 
org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
+import org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+/**  */
+public class WindowConverterRule extends 
AbstractIgniteConverterRule<LogicalWindow> {
+    /**  */
+    public static final RelOptRule INSTANCE = new WindowConverterRule();
+
+    /**  */
+    private WindowConverterRule() {
+        super(LogicalWindow.class, "WindowConverterRule");
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override protected PhysicalNode convert(RelOptPlanner planner, 
RelMetadataQuery mq, LogicalWindow window) {
+        RelOptCluster cluster = window.getCluster();
+
+        RelNode result = window.getInput();
+
+        assert window.constants.isEmpty();
+
+        for (int grpIdx = 0; grpIdx < window.groups.size(); grpIdx++) {
+            Window.Group group = window.groups.get(grpIdx);
+
+            RelCollation collation = TraitUtils.mergeCollations(

Review Comment:
   mergeCollations used only once. Let's move this function to the current 
class and change signature (merge partition columns and collation). 



##########
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/WindowConverterRule.java:
##########
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.rule;
+
+import java.util.ArrayList;
+import java.util.List;
+import com.google.common.collect.ImmutableList;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.PhysicalNode;
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Window;
+import org.apache.calcite.rel.logical.LogicalWindow;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rel.type.RelRecordType;
+import org.apache.calcite.sql.SqlAggFunction;
+import 
org.apache.ignite.internal.processors.query.calcite.exec.exp.window.WindowFunctions;
+import 
org.apache.ignite.internal.processors.query.calcite.rel.IgniteConvention;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteWindow;
+import 
org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
+import org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+/**  */
+public class WindowConverterRule extends 
AbstractIgniteConverterRule<LogicalWindow> {
+    /**  */
+    public static final RelOptRule INSTANCE = new WindowConverterRule();
+
+    /**  */
+    private WindowConverterRule() {
+        super(LogicalWindow.class, "WindowConverterRule");
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override protected PhysicalNode convert(RelOptPlanner planner, 
RelMetadataQuery mq, LogicalWindow window) {
+        RelOptCluster cluster = window.getCluster();
+
+        RelNode result = window.getInput();
+
+        assert window.constants.isEmpty();
+
+        for (int grpIdx = 0; grpIdx < window.groups.size(); grpIdx++) {
+            Window.Group group = window.groups.get(grpIdx);

Review Comment:
   Abbreviation should be used for `group`



##########
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/logical/ProjectWindowConstantsRule.java:
##########
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.rule.logical;
+
+import java.util.ArrayList;
+import java.util.List;
+import com.google.common.collect.ImmutableList;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelRule;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Window;
+import org.apache.calcite.rel.logical.LogicalWindow;
+import org.apache.calcite.rel.rules.TransformationRule;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexWindowBound;
+import org.apache.calcite.rex.RexWindowBounds;
+import org.apache.calcite.tools.RelBuilder;
+import org.immutables.value.Value;
+
+/**
+ * A rule to split window rel with constants to:
+ * - project with constants
+ * - window without constants
+ * - project removing constants
+ */
+@Value.Enclosing
+public class ProjectWindowConstantsRule extends 
RelRule<ProjectWindowConstantsRule.Config> implements TransformationRule {

Review Comment:
   It's a goal of AccumulatorsFactory in/out adapters to provide rows suitable 
for accumulator. Let's use this logic instead of creating addional rel ops.



##########
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/WindowConverterRule.java:
##########
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.rule;
+
+import java.util.ArrayList;
+import java.util.List;
+import com.google.common.collect.ImmutableList;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.PhysicalNode;
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Window;
+import org.apache.calcite.rel.logical.LogicalWindow;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rel.type.RelRecordType;
+import org.apache.calcite.sql.SqlAggFunction;
+import 
org.apache.ignite.internal.processors.query.calcite.exec.exp.window.WindowFunctions;
+import 
org.apache.ignite.internal.processors.query.calcite.rel.IgniteConvention;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteWindow;
+import 
org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
+import org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+/**  */
+public class WindowConverterRule extends 
AbstractIgniteConverterRule<LogicalWindow> {
+    /**  */
+    public static final RelOptRule INSTANCE = new WindowConverterRule();
+
+    /**  */
+    private WindowConverterRule() {
+        super(LogicalWindow.class, "WindowConverterRule");
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override protected PhysicalNode convert(RelOptPlanner planner, 
RelMetadataQuery mq, LogicalWindow window) {
+        RelOptCluster cluster = window.getCluster();
+
+        RelNode result = window.getInput();
+
+        assert window.constants.isEmpty();
+
+        for (int grpIdx = 0; grpIdx < window.groups.size(); grpIdx++) {
+            Window.Group group = window.groups.get(grpIdx);
+
+            RelCollation collation = TraitUtils.mergeCollations(
+                TraitUtils.createCollation(group.keys.asList()),
+                group.collation()
+            );
+
+            RelTraitSet inTraits = cluster
+                .traitSetOf(IgniteConvention.INSTANCE)
+                .replace(IgniteDistributions.single())
+                .replace(collation);
+
+            RelTraitSet outTraits = cluster
+                .traitSetOf(IgniteConvention.INSTANCE)
+                .replace(IgniteDistributions.single())
+                .replace(collation);
+
+            result = convert(result, inTraits);
+
+            // add fields added by current group.
+            // see org.apache.calcite.rel.logical.LogicalWindow#create
+            String groupFieldPrefix = "w" + grpIdx + "$";
+            List<RelDataTypeField> fieldsAddedByCurrentGroup = 
U.arrayList(window.getRowType().getFieldList(),
+                it -> it.getName().startsWith(groupFieldPrefix));
+            List<RelDataTypeField> groupFields = new 
ArrayList<>(result.getRowType().getFieldList());
+            groupFields.addAll(fieldsAddedByCurrentGroup);
+
+            RelRecordType rowType = new RelRecordType(groupFields);
+
+            Window.Group newGroup = convertGroup(group);
+
+            result = new IgniteWindow(
+                window.getCluster(),
+                window.getTraitSet().merge(outTraits),
+                result,
+                rowType,
+                newGroup,
+                WindowFunctions.streamable(newGroup)
+            );
+        }
+
+        return (PhysicalNode)result;
+    }
+
+    private static Window.Group convertGroup(Window.Group group) {
+        List<Window.RexWinAggCall> newAggCalls = new 
ArrayList<>(group.aggCalls.size());
+        ImmutableList<Window.RexWinAggCall> calls = group.aggCalls;
+        for (int i = 0; i < calls.size(); i++) {
+            Window.RexWinAggCall aggCall = calls.get(i);
+            Window.RexWinAggCall newCall = new Window.RexWinAggCall(
+                (SqlAggFunction)aggCall.op,
+                aggCall.type,
+                aggCall.operands,
+                i,
+                aggCall.distinct,
+                aggCall.ignoreNulls
+            );
+            newAggCalls.add(newCall);
+        }
+
+        return new Window.Group(
+            group.keys,
+            group.isRows,
+            group.lowerBound,
+            group.upperBound,
+            group.orderKeys,

Review Comment:
   Calcite now requires exclusion clause here. We need to support exclusions or 
throw an error if exclusion is specified (on SqlValidator phase)



##########
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/WindowConverterRule.java:
##########
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.rule;
+
+import java.util.ArrayList;
+import java.util.List;
+import com.google.common.collect.ImmutableList;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.PhysicalNode;
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Window;
+import org.apache.calcite.rel.logical.LogicalWindow;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rel.type.RelRecordType;
+import org.apache.calcite.sql.SqlAggFunction;
+import 
org.apache.ignite.internal.processors.query.calcite.exec.exp.window.WindowFunctions;
+import 
org.apache.ignite.internal.processors.query.calcite.rel.IgniteConvention;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteWindow;
+import 
org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
+import org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+/**  */
+public class WindowConverterRule extends 
AbstractIgniteConverterRule<LogicalWindow> {
+    /**  */
+    public static final RelOptRule INSTANCE = new WindowConverterRule();
+
+    /**  */
+    private WindowConverterRule() {
+        super(LogicalWindow.class, "WindowConverterRule");
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override protected PhysicalNode convert(RelOptPlanner planner, 
RelMetadataQuery mq, LogicalWindow window) {
+        RelOptCluster cluster = window.getCluster();
+
+        RelNode result = window.getInput();
+
+        assert window.constants.isEmpty();
+
+        for (int grpIdx = 0; grpIdx < window.groups.size(); grpIdx++) {
+            Window.Group group = window.groups.get(grpIdx);
+
+            RelCollation collation = TraitUtils.mergeCollations(
+                TraitUtils.createCollation(group.keys.asList()),
+                group.collation()
+            );
+
+            RelTraitSet inTraits = cluster
+                .traitSetOf(IgniteConvention.INSTANCE)
+                .replace(IgniteDistributions.single())
+                .replace(collation);
+
+            RelTraitSet outTraits = cluster
+                .traitSetOf(IgniteConvention.INSTANCE)
+                .replace(IgniteDistributions.single())
+                .replace(collation);
+
+            result = convert(result, inTraits);
+
+            // add fields added by current group.
+            // see org.apache.calcite.rel.logical.LogicalWindow#create
+            String groupFieldPrefix = "w" + grpIdx + "$";
+            List<RelDataTypeField> fieldsAddedByCurrentGroup = 
U.arrayList(window.getRowType().getFieldList(),
+                it -> it.getName().startsWith(groupFieldPrefix));
+            List<RelDataTypeField> groupFields = new 
ArrayList<>(result.getRowType().getFieldList());
+            groupFields.addAll(fieldsAddedByCurrentGroup);
+
+            RelRecordType rowType = new RelRecordType(groupFields);
+
+            Window.Group newGroup = convertGroup(group);
+
+            result = new IgniteWindow(
+                window.getCluster(),
+                window.getTraitSet().merge(outTraits),
+                result,
+                rowType,
+                newGroup,
+                WindowFunctions.streamable(newGroup)
+            );
+        }
+
+        return (PhysicalNode)result;
+    }
+
+    private static Window.Group convertGroup(Window.Group group) {

Review Comment:
   I can't see any convertion here. Group is copied as is. And call is copied 
as is. Why do we need this copies?



##########
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/window/BufWindowPartition.java:
##########
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.exec.exp.window;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.List;
+import java.util.function.Supplier;
+import org.apache.calcite.rel.core.Window;
+import org.apache.calcite.rel.type.RelDataType;
+import 
org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
+import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler;
+
+/** Buffering implementation of the ROWS / RANGE window partition */
+final class BufWindowPartition<Row> extends WindowPartitionBase<Row> {
+    private final List<Row> buffer;

Review Comment:
   `BufWindowPartition` -> `BufferingWindowPartition`
   According to Ignite code style we don't use abbreviation in class names and 
method names, but it's mandatory for field names and variables (for example 
`buffer` field should be renamed to `buf`)



##########
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/WindowConverterRule.java:
##########
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.rule;
+
+import java.util.ArrayList;
+import java.util.List;
+import com.google.common.collect.ImmutableList;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.PhysicalNode;
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Window;
+import org.apache.calcite.rel.logical.LogicalWindow;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rel.type.RelRecordType;
+import org.apache.calcite.sql.SqlAggFunction;
+import 
org.apache.ignite.internal.processors.query.calcite.exec.exp.window.WindowFunctions;
+import 
org.apache.ignite.internal.processors.query.calcite.rel.IgniteConvention;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteWindow;
+import 
org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
+import org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+/**  */
+public class WindowConverterRule extends 
AbstractIgniteConverterRule<LogicalWindow> {
+    /**  */

Review Comment:
   Redundant space in comment, use `/** */`



##########
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/WindowConverterRule.java:
##########
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.rule;
+
+import java.util.ArrayList;
+import java.util.List;
+import com.google.common.collect.ImmutableList;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.PhysicalNode;
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Window;
+import org.apache.calcite.rel.logical.LogicalWindow;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rel.type.RelRecordType;
+import org.apache.calcite.sql.SqlAggFunction;
+import 
org.apache.ignite.internal.processors.query.calcite.exec.exp.window.WindowFunctions;
+import 
org.apache.ignite.internal.processors.query.calcite.rel.IgniteConvention;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteWindow;
+import 
org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
+import org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+/**  */

Review Comment:
   Redundant space in comment, use `/** */`, or add class description



##########
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java:
##########
@@ -882,6 +886,31 @@ else if (rel instanceof Intersect)
         return node;
     }
 
+    /** {@inheritDoc} */
+    @Override public Node<Row> visit(IgniteWindow rel) {
+        RelDataType outType = rel.getRowType();
+        RelDataType inputType = rel.getInput().getRowType();
+
+        List<Integer> grpKeys = rel.getGroup().keys.toList();
+        RelCollation collation = rel.collation();
+
+        assert collation.getFieldCollations().size() >= grpKeys.size();
+        Comparator<Row> partCmp = 
expressionFactory.comparator(TraitUtils.createCollation(grpKeys));
+
+        List<AggregateCall> aggCalls = rel.getGroup().getAggregateCalls(rel);
+        Supplier<WindowPartition<Row>> frameFactory = 
expressionFactory.windowFrameFactory(rel.getGroup(), aggCalls, inputType, 
false);

Review Comment:
   Streaming is always `false` for production code. Inside factory it used only 
for assert. Do we need this parameter at all?



##########
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteWindow.java:
##########
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.rel;
+
+import java.util.List;
+import com.google.common.collect.ImmutableList;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelDistribution;
+import org.apache.calcite.rel.RelFieldCollation;
+import org.apache.calcite.rel.RelInput;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelWriter;
+import org.apache.calcite.rel.core.Window;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.util.Pair;
+import org.apache.calcite.util.Util;
+import 
org.apache.ignite.internal.processors.query.calcite.externalize.RelInputEx;
+import 
org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCostFactory;
+import 
org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
+import 
org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
+import org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils;
+import org.apache.ignite.internal.processors.query.calcite.util.Commons;
+import org.jetbrains.annotations.Nullable;
+
+import static 
org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCost.AGG_CALL_MEM_COST;
+import static 
org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCost.AVERAGE_FIELD_SIZE;
+import static 
org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCost.ROW_COMPARISON_COST;
+import static 
org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils.changeTraits;
+
+/**
+ * A relational expression representing a set of window aggregates.
+ *
+ * <p>A Window can handle several window aggregate functions, over several
+ * partitions, with pre- and post-expressions, and an optional post-filter.
+ * Each of the partitions is defined by a partition key (zero or more columns)
+ * and a range (logical or physical). The partitions expect the data to be
+ * sorted correctly on input to the relational expression.
+ *
+ * <p>Each {@link Window.Group} has a set of
+ * {@link org.apache.calcite.rex.RexOver} objects.
+ */
+public class IgniteWindow extends Window implements IgniteRel {
+
+    private final Group group;
+    private final boolean streaming;
+
+    /**  */
+    public IgniteWindow(
+        RelOptCluster cluster,
+        RelTraitSet traitSet,
+        RelNode input,
+        RelDataType rowType,
+        Group group,
+        boolean streaming
+    ) {
+        super(cluster, traitSet, input, ImmutableList.of(), rowType, 
ImmutableList.of(group));
+        this.group = group;
+        this.streaming = streaming;
+        assert !group.aggCalls.isEmpty();
+    }
+
+    /**  */
+    public IgniteWindow(RelInput input) {
+        this(input.getCluster(),
+            changeTraits(input, IgniteConvention.INSTANCE).getTraitSet(),
+            input.getInput(),
+            input.getRowType("rowType"),
+            ((RelInputEx)input).getWindowGroup("group"),
+            input.getBoolean("streaming", false));
+    }
+
+    /**  */
+    public Group getGroup() {
+        return group;
+    }
+
+    /**  */
+    public boolean isStreaming() {
+        return streaming;
+    }
+
+    /** {@inheritDoc} */
+    @Override public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
+        return new IgniteWindow(getCluster(), traitSet, sole(inputs), 
getRowType(), group, streaming);
+    }
+
+    /** {@inheritDoc} */
+    @Override public Window copy(List<RexLiteral> constants) {
+        assert constants.isEmpty();
+        return this;
+    }
+
+    /** {@inheritDoc} */
+    @Override public <T> T accept(IgniteRelVisitor<T> visitor) {
+        return visitor.visit(this);
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteRel clone(RelOptCluster cluster, List<IgniteRel> 
inputs) {
+        return new IgniteWindow(cluster, getTraitSet(), sole(inputs), 
getRowType(), group, streaming);
+    }
+
+    /** {@inheritDoc} */
+    @Override public RelWriter explainTerms(RelWriter pw) {
+        return pw
+            .input("input", getInput())
+            .item("rowType", getRowType())
+            .item("group", group)
+            .item("streaming", streaming);
+    }
+
+    /** {@inheritDoc} */
+    @Override public RelOptCost computeSelfCost(RelOptPlanner planner, 
RelMetadataQuery mq) {
+        IgniteCostFactory costFactory = 
(IgniteCostFactory)planner.getCostFactory();
+
+        int aggCnt = group.aggCalls.size();
+
+        double rowCnt = mq.getRowCount(getInput());
+        double cpuCost = rowCnt * ROW_COMPARISON_COST;
+        double memCost = (getRowType().getFieldCount() * AVERAGE_FIELD_SIZE + 
aggCnt * AGG_CALL_MEM_COST) * (streaming ? 1.0 : rowCnt);
+
+        RelOptCost cost =  costFactory.makeCost(rowCnt, cpuCost, 0, memCost, 
0);
+
+        // Distributed processing is more preferable than processing on the 
single node.
+        if 
(TraitUtils.distribution(traitSet).satisfies(IgniteDistributions.single()))
+            cost.plus(costFactory.makeTinyCost());
+
+        return cost;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Pair<RelTraitSet, List<RelTraitSet>> 
passThroughTraits(RelTraitSet required) {
+        RelTraitSet traits = passThroughOrDerivedTraits(required);
+        if (traits == null)
+            return null;
+
+        return Pair.of(traits, ImmutableList.of(traits));
+    }
+
+    /** {@inheritDoc} */
+    @Override public Pair<RelTraitSet, List<RelTraitSet>> 
deriveTraits(RelTraitSet childTraits, int childId) {
+        assert childId == 0;
+
+        RelTraitSet traits = passThroughOrDerivedTraits(childTraits);
+        if (traits == null)
+            return null;
+
+        return Pair.of(traits, ImmutableList.of(traits));
+    }
+
+
+    /**
+     * Propagates the trait set from the parent to the child, or derives it 
from the child node.
+     *
+     * <p>The Window node cannot independently satisfy any traits. Therefore:
+     * - Validate that collation and distribution traits are compatible with 
the Window node.
+     * - If they are not, replace them with suitable traits.
+     * - Request a new trait set from the input accordingly.
+     */
+    private @Nullable RelTraitSet passThroughOrDerivedTraits(RelTraitSet tgt) {
+        if (tgt.getConvention() != IgniteConvention.INSTANCE)
+            return null;
+
+        RelTraitSet traits = tgt;
+        RelCollation requiredCollation = TraitUtils.collation(tgt);
+        if (!satisfiesCollationSansGroupFields(requiredCollation)) {
+            traits = traits.replace(collation());
+        }
+
+        IgniteDistribution distribution = TraitUtils.distribution(tgt);
+        if (!satisfiesDistribution(distribution))
+            traits = traits.replace(distribution());
+        else if (distribution.getType() == 
RelDistribution.Type.HASH_DISTRIBUTED) {
+            // Group set contains all distribution keys, shift distribution 
keys according to used columns.
+            IgniteDistribution outDistribution = 
distribution.apply(Commons.mapping(group.keys, rowType.getFieldCount()));
+            traits = traits.replace(outDistribution);
+        }
+
+        if (traits == traitSet) {
+            // new traits equal to current traits of window.
+            // no need to pass throught or derive any.
+            return null;
+        }
+
+        return traits;
+    }
+
+    /** Check input distribution satisfies collation of this window. */
+    private boolean satisfiesDistribution(IgniteDistribution distribution) {
+        if (distribution.satisfies(IgniteDistributions.single()) || 
distribution.function().correlated()) {
+            return true;
+        }
+
+        if (distribution.getType() == RelDistribution.Type.HASH_DISTRIBUTED) {
+            for (Integer key : distribution.getKeys()) {
+                if (!group.keys.get(key))
+                    // can't derive distribution with fields unmatched to 
group keys
+                    return false;
+            }
+            return true;
+        }
+
+        return false;
+    }
+
+    /**
+     * Check input collation satisfies collation of this window.
+     * - Collations field indicies of the window should be a prefix for 
desired collation.
+     * - Group fields sort direction can be changed to desired collation.
+     * - Order fields sort direction should be the same as in desired 
collation.
+     */
+    private boolean satisfiesCollationSansGroupFields(RelCollation 
desiredCollation) {
+        RelCollation collation = collation();
+        if (desiredCollation.satisfies(collation)) {
+            return true;
+        }
+
+        if (!Util.startsWith(desiredCollation.getKeys(), collation.getKeys())) 
{

Review Comment:
   Maybe it's better to check sets, not lists? For example {0, 1} and {1, 0} 
are correct columns sets for partitioning, but current check will return `false`



##########
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteWindow.java:
##########
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.rel;
+
+import java.util.List;
+import com.google.common.collect.ImmutableList;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelDistribution;
+import org.apache.calcite.rel.RelFieldCollation;
+import org.apache.calcite.rel.RelInput;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelWriter;
+import org.apache.calcite.rel.core.Window;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.util.Pair;
+import org.apache.calcite.util.Util;
+import 
org.apache.ignite.internal.processors.query.calcite.externalize.RelInputEx;
+import 
org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCostFactory;
+import 
org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
+import 
org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
+import org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils;
+import org.apache.ignite.internal.processors.query.calcite.util.Commons;
+import org.jetbrains.annotations.Nullable;
+
+import static 
org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCost.AGG_CALL_MEM_COST;
+import static 
org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCost.AVERAGE_FIELD_SIZE;
+import static 
org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCost.ROW_COMPARISON_COST;
+import static 
org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils.changeTraits;
+
+/**
+ * A relational expression representing a set of window aggregates.
+ *
+ * <p>A Window can handle several window aggregate functions, over several
+ * partitions, with pre- and post-expressions, and an optional post-filter.
+ * Each of the partitions is defined by a partition key (zero or more columns)
+ * and a range (logical or physical). The partitions expect the data to be
+ * sorted correctly on input to the relational expression.
+ *
+ * <p>Each {@link Window.Group} has a set of
+ * {@link org.apache.calcite.rex.RexOver} objects.
+ */
+public class IgniteWindow extends Window implements IgniteRel {
+
+    private final Group group;
+    private final boolean streaming;
+
+    /**  */
+    public IgniteWindow(
+        RelOptCluster cluster,
+        RelTraitSet traitSet,
+        RelNode input,
+        RelDataType rowType,
+        Group group,
+        boolean streaming
+    ) {
+        super(cluster, traitSet, input, ImmutableList.of(), rowType, 
ImmutableList.of(group));
+        this.group = group;
+        this.streaming = streaming;
+        assert !group.aggCalls.isEmpty();
+    }
+
+    /**  */
+    public IgniteWindow(RelInput input) {
+        this(input.getCluster(),
+            changeTraits(input, IgniteConvention.INSTANCE).getTraitSet(),
+            input.getInput(),
+            input.getRowType("rowType"),
+            ((RelInputEx)input).getWindowGroup("group"),
+            input.getBoolean("streaming", false));
+    }
+
+    /**  */
+    public Group getGroup() {
+        return group;
+    }
+
+    /**  */
+    public boolean isStreaming() {
+        return streaming;
+    }
+
+    /** {@inheritDoc} */
+    @Override public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
+        return new IgniteWindow(getCluster(), traitSet, sole(inputs), 
getRowType(), group, streaming);
+    }
+
+    /** {@inheritDoc} */
+    @Override public Window copy(List<RexLiteral> constants) {
+        assert constants.isEmpty();
+        return this;
+    }
+
+    /** {@inheritDoc} */
+    @Override public <T> T accept(IgniteRelVisitor<T> visitor) {
+        return visitor.visit(this);
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteRel clone(RelOptCluster cluster, List<IgniteRel> 
inputs) {
+        return new IgniteWindow(cluster, getTraitSet(), sole(inputs), 
getRowType(), group, streaming);
+    }
+
+    /** {@inheritDoc} */
+    @Override public RelWriter explainTerms(RelWriter pw) {
+        return pw
+            .input("input", getInput())
+            .item("rowType", getRowType())
+            .item("group", group)
+            .item("streaming", streaming);
+    }
+
+    /** {@inheritDoc} */
+    @Override public RelOptCost computeSelfCost(RelOptPlanner planner, 
RelMetadataQuery mq) {
+        IgniteCostFactory costFactory = 
(IgniteCostFactory)planner.getCostFactory();
+
+        int aggCnt = group.aggCalls.size();
+
+        double rowCnt = mq.getRowCount(getInput());
+        double cpuCost = rowCnt * ROW_COMPARISON_COST;
+        double memCost = (getRowType().getFieldCount() * AVERAGE_FIELD_SIZE + 
aggCnt * AGG_CALL_MEM_COST) * (streaming ? 1.0 : rowCnt);
+
+        RelOptCost cost =  costFactory.makeCost(rowCnt, cpuCost, 0, memCost, 
0);
+
+        // Distributed processing is more preferable than processing on the 
single node.
+        if 
(TraitUtils.distribution(traitSet).satisfies(IgniteDistributions.single()))
+            cost.plus(costFactory.makeTinyCost());
+
+        return cost;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Pair<RelTraitSet, List<RelTraitSet>> 
passThroughTraits(RelTraitSet required) {
+        RelTraitSet traits = passThroughOrDerivedTraits(required);
+        if (traits == null)
+            return null;
+
+        return Pair.of(traits, ImmutableList.of(traits));
+    }
+
+    /** {@inheritDoc} */
+    @Override public Pair<RelTraitSet, List<RelTraitSet>> 
deriveTraits(RelTraitSet childTraits, int childId) {
+        assert childId == 0;
+
+        RelTraitSet traits = passThroughOrDerivedTraits(childTraits);
+        if (traits == null)
+            return null;
+
+        return Pair.of(traits, ImmutableList.of(traits));
+    }
+
+
+    /**
+     * Propagates the trait set from the parent to the child, or derives it 
from the child node.
+     *
+     * <p>The Window node cannot independently satisfy any traits. Therefore:
+     * - Validate that collation and distribution traits are compatible with 
the Window node.
+     * - If they are not, replace them with suitable traits.
+     * - Request a new trait set from the input accordingly.
+     */
+    private @Nullable RelTraitSet passThroughOrDerivedTraits(RelTraitSet tgt) {

Review Comment:
   `tgt` is not a common abbreviation in Ignite, please use the full word.



##########
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlannerPhase.java:
##########
@@ -98,6 +101,22 @@ public enum PlannerPhase {
         }
     },
 
+    /** */
+    HEP_WINDOW_SPLIT("Heuristic phase to split project to project and window") 
{
+        @Override public RuleSet getRules(PlanningContext ctx) {
+            return ctx.rules(
+                RuleSets.ofList(
+                    CoreRules.PROJECT_TO_LOGICAL_PROJECT_AND_WINDOW,

Review Comment:
   Do we really need dedicated phase for this rule? Can it be moved to the 
HEP_PROJECT_PUSH_DOWN?
   In my opinion all required filters can be pushed down by 
HEP_FILTER_PUSH_DOWN phase, when window rel op is not yet extracted. All 
required projects can be pushed down then by HEP_PROJECT_PUSH_DOWN (but 
projects with windows should not be merged to table scans), in the end we can 
extract windows from project. Is it correct workflow?



##########
modules/calcite/src/test/java/org/apache/ignite/testsuites/PlannerTestSuite.java:
##########
@@ -86,8 +87,9 @@
     RexSimplificationPlannerTest.class,
     SerializationPlannerTest.class,
     UncollectPlannerTest.class,
+    WindowPlannerTest.class,
 
-    HintsTestSuite.class,
+    HintsTestSuite.class

Review Comment:
   Please add comma to the end of list



##########
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/ExpressionFactory.java:
##########
@@ -45,6 +47,14 @@ Supplier<List<AccumulatorWrapper<Row>>> accumulatorsFactory(
         RelDataType rowType
     );
 
+    /** */
+    Supplier<WindowPartition<Row>> windowFrameFactory(

Review Comment:
   This factory produces partition, not frame, so should be called accordingly



##########
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteWindow.java:
##########
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.rel;
+
+import java.util.List;
+import com.google.common.collect.ImmutableList;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelDistribution;
+import org.apache.calcite.rel.RelFieldCollation;
+import org.apache.calcite.rel.RelInput;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelWriter;
+import org.apache.calcite.rel.core.Window;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.util.Pair;
+import org.apache.calcite.util.Util;
+import 
org.apache.ignite.internal.processors.query.calcite.externalize.RelInputEx;
+import 
org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCostFactory;
+import 
org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
+import 
org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
+import org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils;
+import org.apache.ignite.internal.processors.query.calcite.util.Commons;
+import org.jetbrains.annotations.Nullable;
+
+import static 
org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCost.AGG_CALL_MEM_COST;
+import static 
org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCost.AVERAGE_FIELD_SIZE;
+import static 
org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCost.ROW_COMPARISON_COST;
+import static 
org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils.changeTraits;
+
+/**
+ * A relational expression representing a set of window aggregates.
+ *
+ * <p>A Window can handle several window aggregate functions, over several
+ * partitions, with pre- and post-expressions, and an optional post-filter.
+ * Each of the partitions is defined by a partition key (zero or more columns)
+ * and a range (logical or physical). The partitions expect the data to be
+ * sorted correctly on input to the relational expression.
+ *
+ * <p>Each {@link Window.Group} has a set of
+ * {@link org.apache.calcite.rex.RexOver} objects.
+ */
+public class IgniteWindow extends Window implements IgniteRel {
+
+    private final Group group;
+    private final boolean streaming;
+
+    /**  */
+    public IgniteWindow(
+        RelOptCluster cluster,
+        RelTraitSet traitSet,
+        RelNode input,
+        RelDataType rowType,
+        Group group,
+        boolean streaming
+    ) {
+        super(cluster, traitSet, input, ImmutableList.of(), rowType, 
ImmutableList.of(group));
+        this.group = group;
+        this.streaming = streaming;
+        assert !group.aggCalls.isEmpty();
+    }
+
+    /**  */
+    public IgniteWindow(RelInput input) {
+        this(input.getCluster(),
+            changeTraits(input, IgniteConvention.INSTANCE).getTraitSet(),
+            input.getInput(),
+            input.getRowType("rowType"),
+            ((RelInputEx)input).getWindowGroup("group"),
+            input.getBoolean("streaming", false));
+    }
+
+    /**  */
+    public Group getGroup() {
+        return group;
+    }
+
+    /**  */
+    public boolean isStreaming() {
+        return streaming;
+    }
+
+    /** {@inheritDoc} */
+    @Override public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
+        return new IgniteWindow(getCluster(), traitSet, sole(inputs), 
getRowType(), group, streaming);
+    }
+
+    /** {@inheritDoc} */
+    @Override public Window copy(List<RexLiteral> constants) {
+        assert constants.isEmpty();
+        return this;
+    }
+
+    /** {@inheritDoc} */
+    @Override public <T> T accept(IgniteRelVisitor<T> visitor) {
+        return visitor.visit(this);
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteRel clone(RelOptCluster cluster, List<IgniteRel> 
inputs) {
+        return new IgniteWindow(cluster, getTraitSet(), sole(inputs), 
getRowType(), group, streaming);
+    }
+
+    /** {@inheritDoc} */
+    @Override public RelWriter explainTerms(RelWriter pw) {
+        return pw
+            .input("input", getInput())
+            .item("rowType", getRowType())
+            .item("group", group)
+            .item("streaming", streaming);
+    }
+
+    /** {@inheritDoc} */
+    @Override public RelOptCost computeSelfCost(RelOptPlanner planner, 
RelMetadataQuery mq) {
+        IgniteCostFactory costFactory = 
(IgniteCostFactory)planner.getCostFactory();
+
+        int aggCnt = group.aggCalls.size();
+
+        double rowCnt = mq.getRowCount(getInput());
+        double cpuCost = rowCnt * ROW_COMPARISON_COST;
+        double memCost = (getRowType().getFieldCount() * AVERAGE_FIELD_SIZE + 
aggCnt * AGG_CALL_MEM_COST) * (streaming ? 1.0 : rowCnt);
+
+        RelOptCost cost =  costFactory.makeCost(rowCnt, cpuCost, 0, memCost, 
0);
+
+        // Distributed processing is more preferable than processing on the 
single node.
+        if 
(TraitUtils.distribution(traitSet).satisfies(IgniteDistributions.single()))
+            cost.plus(costFactory.makeTinyCost());

Review Comment:
   `cost = cost.plus...`



##########
modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/WindowPlannerTest.java:
##########
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.planner;
+
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelCollations;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.util.ImmutableIntList;
+import 
org.apache.ignite.internal.processors.query.calcite.rel.IgniteCorrelatedNestedLoopJoin;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteExchange;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteProject;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteWindow;
+import org.apache.ignite.internal.processors.query.calcite.schema.IgniteSchema;
+import 
org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
+import 
org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
+import org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils;
+import org.junit.Before;
+import org.junit.Test;
+
+import static java.util.function.Predicate.not;
+
+/**
+ *
+ */
+public class WindowPlannerTest extends AbstractPlannerTest {

Review Comment:
   More tests should be added.
   At least:
   - Test with several partition by columns
   - Test with several order by columns
   - Test with different agg calls but with the same window definition (there 
should be only one IgniteWindow rel op)
   - Test for collation derivation (table index cover more columns than 
partition/order by, table index has different directions than partition fields)
   - Test for collation pass through (order by for select and order by in 
window, extend testOrderByWithWindow)
   - Test for both collation and distribution derivation
   - Test with correlated distribution (see 
CorrelatedSubqueryPlannerTest#testCorrelatedDistribution)



##########
modules/calcite/src/test/java/org/apache/ignite/testsuites/ExecutionTestSuite.java:
##########
@@ -57,6 +58,7 @@
     LimitExecutionTest.class,
     TimeCalculationExecutionTest.class,
     UncollectExecutionTest.class,
+    WindowExecutionTest.class

Review Comment:
   Please add comma to the end of list



##########
modules/calcite/src/test/java/org/apache/ignite/testsuites/IntegrationTestSuite.java:
##########
@@ -163,6 +164,7 @@
     TpchTest.class,
     UnnestIntegrationTest.class,
     CalcitePlanningDumpTest.class,
+    WindowIntegrationTest.class

Review Comment:
   Please add comma to the end of list



##########
modules/calcite/src/test/sql/subquery/scalar/test_window_function_subquery.test:
##########


Review Comment:
   Please remove ignore reason at the header of the file



##########
modules/calcite/src/test/sql/subquery/scalar/test_complex_correlated_subquery.test_ignore:
##########
@@ -111,10 +111,6 @@ SELECT i, (SELECT SUM(s1.i) FROM integers s1 FULL OUTER 
JOIN integers s2 ON s1.i
 # REQUIRE(CHECK_COLUMN(result, 0, {Value(), 1, 2, 3}));
 # REQUIRE(CHECK_COLUMN(result, 1, {6, 6, 9, 12}));
 
-# correlated expression inside window function not supported
-statement error
-SELECT i, (SELECT row_number() OVER (ORDER BY i)) FROM integers i1 ORDER BY i;
-

Review Comment:
   Test cases should not be deleted from ignore file. If test case is wrongly 
described in ignore file and was fixed in test file, it should be fixed in 
ignore file too. Ignore file is deleted after it becomes identical to test file.



##########
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteWindow.java:
##########
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.rel;
+
+import java.util.List;
+import com.google.common.collect.ImmutableList;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelDistribution;
+import org.apache.calcite.rel.RelFieldCollation;
+import org.apache.calcite.rel.RelInput;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelWriter;
+import org.apache.calcite.rel.core.Window;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.util.Pair;
+import org.apache.calcite.util.Util;
+import 
org.apache.ignite.internal.processors.query.calcite.externalize.RelInputEx;
+import 
org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCostFactory;
+import 
org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
+import 
org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
+import org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils;
+import org.apache.ignite.internal.processors.query.calcite.util.Commons;
+import org.jetbrains.annotations.Nullable;
+
+import static 
org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCost.AGG_CALL_MEM_COST;
+import static 
org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCost.AVERAGE_FIELD_SIZE;
+import static 
org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCost.ROW_COMPARISON_COST;
+import static 
org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils.changeTraits;
+
+/**
+ * A relational expression representing a set of window aggregates.
+ *
+ * <p>A Window can handle several window aggregate functions, over several
+ * partitions, with pre- and post-expressions, and an optional post-filter.
+ * Each of the partitions is defined by a partition key (zero or more columns)
+ * and a range (logical or physical). The partitions expect the data to be
+ * sorted correctly on input to the relational expression.
+ *
+ * <p>Each {@link Window.Group} has a set of
+ * {@link org.apache.calcite.rex.RexOver} objects.
+ */
+public class IgniteWindow extends Window implements IgniteRel {
+
+    private final Group group;
+    private final boolean streaming;
+
+    /**  */
+    public IgniteWindow(
+        RelOptCluster cluster,
+        RelTraitSet traitSet,
+        RelNode input,
+        RelDataType rowType,
+        Group group,
+        boolean streaming
+    ) {
+        super(cluster, traitSet, input, ImmutableList.of(), rowType, 
ImmutableList.of(group));
+        this.group = group;
+        this.streaming = streaming;
+        assert !group.aggCalls.isEmpty();
+    }
+
+    /**  */
+    public IgniteWindow(RelInput input) {
+        this(input.getCluster(),
+            changeTraits(input, IgniteConvention.INSTANCE).getTraitSet(),
+            input.getInput(),
+            input.getRowType("rowType"),
+            ((RelInputEx)input).getWindowGroup("group"),
+            input.getBoolean("streaming", false));
+    }
+
+    /**  */
+    public Group getGroup() {
+        return group;
+    }
+
+    /**  */
+    public boolean isStreaming() {
+        return streaming;
+    }
+
+    /** {@inheritDoc} */
+    @Override public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
+        return new IgniteWindow(getCluster(), traitSet, sole(inputs), 
getRowType(), group, streaming);
+    }
+
+    /** {@inheritDoc} */
+    @Override public Window copy(List<RexLiteral> constants) {
+        assert constants.isEmpty();
+        return this;
+    }
+
+    /** {@inheritDoc} */
+    @Override public <T> T accept(IgniteRelVisitor<T> visitor) {
+        return visitor.visit(this);
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteRel clone(RelOptCluster cluster, List<IgniteRel> 
inputs) {
+        return new IgniteWindow(cluster, getTraitSet(), sole(inputs), 
getRowType(), group, streaming);
+    }
+
+    /** {@inheritDoc} */
+    @Override public RelWriter explainTerms(RelWriter pw) {
+        return pw
+            .input("input", getInput())
+            .item("rowType", getRowType())
+            .item("group", group)
+            .item("streaming", streaming);
+    }
+
+    /** {@inheritDoc} */
+    @Override public RelOptCost computeSelfCost(RelOptPlanner planner, 
RelMetadataQuery mq) {
+        IgniteCostFactory costFactory = 
(IgniteCostFactory)planner.getCostFactory();
+
+        int aggCnt = group.aggCalls.size();
+
+        double rowCnt = mq.getRowCount(getInput());
+        double cpuCost = rowCnt * ROW_COMPARISON_COST;
+        double memCost = (getRowType().getFieldCount() * AVERAGE_FIELD_SIZE + 
aggCnt * AGG_CALL_MEM_COST) * (streaming ? 1.0 : rowCnt);
+
+        RelOptCost cost =  costFactory.makeCost(rowCnt, cpuCost, 0, memCost, 
0);
+
+        // Distributed processing is more preferable than processing on the 
single node.
+        if 
(TraitUtils.distribution(traitSet).satisfies(IgniteDistributions.single()))
+            cost.plus(costFactory.makeTinyCost());
+
+        return cost;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Pair<RelTraitSet, List<RelTraitSet>> 
passThroughTraits(RelTraitSet required) {
+        RelTraitSet traits = passThroughOrDerivedTraits(required);
+        if (traits == null)
+            return null;
+
+        return Pair.of(traits, ImmutableList.of(traits));
+    }
+
+    /** {@inheritDoc} */
+    @Override public Pair<RelTraitSet, List<RelTraitSet>> 
deriveTraits(RelTraitSet childTraits, int childId) {
+        assert childId == 0;
+
+        RelTraitSet traits = passThroughOrDerivedTraits(childTraits);
+        if (traits == null)
+            return null;
+
+        return Pair.of(traits, ImmutableList.of(traits));
+    }
+
+
+    /**
+     * Propagates the trait set from the parent to the child, or derives it 
from the child node.
+     *
+     * <p>The Window node cannot independently satisfy any traits. Therefore:
+     * - Validate that collation and distribution traits are compatible with 
the Window node.
+     * - If they are not, replace them with suitable traits.
+     * - Request a new trait set from the input accordingly.
+     */
+    private @Nullable RelTraitSet passThroughOrDerivedTraits(RelTraitSet tgt) {
+        if (tgt.getConvention() != IgniteConvention.INSTANCE)
+            return null;
+
+        RelTraitSet traits = tgt;
+        RelCollation requiredCollation = TraitUtils.collation(tgt);
+        if (!satisfiesCollationSansGroupFields(requiredCollation)) {
+            traits = traits.replace(collation());

Review Comment:
   Is it correct to return current trait as derived/passed trough? Maybe here 
must be something like `return null`?



##########
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/externalize/RelJson.java:
##########
@@ -889,27 +913,28 @@ private Object toJson(RexNode node) {
     }
 
     /** */
-    private Object toJson(RexWindow window) {
+    private Object toJson(Window.Group group) {
         Map<String, Object> map = map();
-        if (!window.partitionKeys.isEmpty())
-            map.put("partition", toJson(window.partitionKeys));
-        if (!window.orderKeys.isEmpty())
-            map.put("order", toJson(window.orderKeys));
-        if (window.getLowerBound() == null) {
+        map.put("calls", toJson(group.aggCalls));
+        if (!group.keys.isEmpty())
+            map.put("partition", toJson(group.keys));
+        if (!group.orderKeys.getKeys().isEmpty())
+            map.put("order", toJson(group.orderKeys));
+        if (group.lowerBound == null) {

Review Comment:
   Group constructor requires non-null lower and upper bounds. How nulls is 
possible here?



##########
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteWindow.java:
##########
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.rel;
+
+import java.util.List;
+import com.google.common.collect.ImmutableList;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelDistribution;
+import org.apache.calcite.rel.RelFieldCollation;
+import org.apache.calcite.rel.RelInput;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelWriter;
+import org.apache.calcite.rel.core.Window;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.util.Pair;
+import org.apache.calcite.util.Util;
+import 
org.apache.ignite.internal.processors.query.calcite.externalize.RelInputEx;
+import 
org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCostFactory;
+import 
org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
+import 
org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
+import org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils;
+import org.apache.ignite.internal.processors.query.calcite.util.Commons;
+import org.jetbrains.annotations.Nullable;
+
+import static 
org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCost.AGG_CALL_MEM_COST;
+import static 
org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCost.AVERAGE_FIELD_SIZE;
+import static 
org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCost.ROW_COMPARISON_COST;
+import static 
org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils.changeTraits;
+
+/**
+ * A relational expression representing a set of window aggregates.
+ *
+ * <p>A Window can handle several window aggregate functions, over several
+ * partitions, with pre- and post-expressions, and an optional post-filter.
+ * Each of the partitions is defined by a partition key (zero or more columns)
+ * and a range (logical or physical). The partitions expect the data to be
+ * sorted correctly on input to the relational expression.
+ *
+ * <p>Each {@link Window.Group} has a set of
+ * {@link org.apache.calcite.rex.RexOver} objects.
+ */
+public class IgniteWindow extends Window implements IgniteRel {
+
+    private final Group group;
+    private final boolean streaming;
+
+    /**  */
+    public IgniteWindow(
+        RelOptCluster cluster,
+        RelTraitSet traitSet,
+        RelNode input,
+        RelDataType rowType,
+        Group group,
+        boolean streaming
+    ) {
+        super(cluster, traitSet, input, ImmutableList.of(), rowType, 
ImmutableList.of(group));
+        this.group = group;
+        this.streaming = streaming;
+        assert !group.aggCalls.isEmpty();
+    }
+
+    /**  */
+    public IgniteWindow(RelInput input) {
+        this(input.getCluster(),
+            changeTraits(input, IgniteConvention.INSTANCE).getTraitSet(),
+            input.getInput(),
+            input.getRowType("rowType"),
+            ((RelInputEx)input).getWindowGroup("group"),
+            input.getBoolean("streaming", false));
+    }
+
+    /**  */
+    public Group getGroup() {
+        return group;
+    }
+
+    /**  */
+    public boolean isStreaming() {
+        return streaming;
+    }
+
+    /** {@inheritDoc} */
+    @Override public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
+        return new IgniteWindow(getCluster(), traitSet, sole(inputs), 
getRowType(), group, streaming);
+    }
+
+    /** {@inheritDoc} */
+    @Override public Window copy(List<RexLiteral> constants) {
+        assert constants.isEmpty();
+        return this;
+    }
+
+    /** {@inheritDoc} */
+    @Override public <T> T accept(IgniteRelVisitor<T> visitor) {
+        return visitor.visit(this);
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteRel clone(RelOptCluster cluster, List<IgniteRel> 
inputs) {
+        return new IgniteWindow(cluster, getTraitSet(), sole(inputs), 
getRowType(), group, streaming);
+    }
+
+    /** {@inheritDoc} */
+    @Override public RelWriter explainTerms(RelWriter pw) {
+        return pw
+            .input("input", getInput())
+            .item("rowType", getRowType())
+            .item("group", group)
+            .item("streaming", streaming);
+    }
+
+    /** {@inheritDoc} */
+    @Override public RelOptCost computeSelfCost(RelOptPlanner planner, 
RelMetadataQuery mq) {
+        IgniteCostFactory costFactory = 
(IgniteCostFactory)planner.getCostFactory();
+
+        int aggCnt = group.aggCalls.size();
+
+        double rowCnt = mq.getRowCount(getInput());
+        double cpuCost = rowCnt * ROW_COMPARISON_COST;
+        double memCost = (getRowType().getFieldCount() * AVERAGE_FIELD_SIZE + 
aggCnt * AGG_CALL_MEM_COST) * (streaming ? 1.0 : rowCnt);
+
+        RelOptCost cost =  costFactory.makeCost(rowCnt, cpuCost, 0, memCost, 
0);
+
+        // Distributed processing is more preferable than processing on the 
single node.
+        if 
(TraitUtils.distribution(traitSet).satisfies(IgniteDistributions.single()))
+            cost.plus(costFactory.makeTinyCost());
+
+        return cost;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Pair<RelTraitSet, List<RelTraitSet>> 
passThroughTraits(RelTraitSet required) {
+        RelTraitSet traits = passThroughOrDerivedTraits(required);
+        if (traits == null)
+            return null;
+
+        return Pair.of(traits, ImmutableList.of(traits));
+    }
+
+    /** {@inheritDoc} */
+    @Override public Pair<RelTraitSet, List<RelTraitSet>> 
deriveTraits(RelTraitSet childTraits, int childId) {
+        assert childId == 0;
+
+        RelTraitSet traits = passThroughOrDerivedTraits(childTraits);
+        if (traits == null)
+            return null;
+
+        return Pair.of(traits, ImmutableList.of(traits));
+    }
+
+
+    /**
+     * Propagates the trait set from the parent to the child, or derives it 
from the child node.
+     *
+     * <p>The Window node cannot independently satisfy any traits. Therefore:
+     * - Validate that collation and distribution traits are compatible with 
the Window node.
+     * - If they are not, replace them with suitable traits.
+     * - Request a new trait set from the input accordingly.
+     */
+    private @Nullable RelTraitSet passThroughOrDerivedTraits(RelTraitSet tgt) {
+        if (tgt.getConvention() != IgniteConvention.INSTANCE)
+            return null;
+
+        RelTraitSet traits = tgt;
+        RelCollation requiredCollation = TraitUtils.collation(tgt);
+        if (!satisfiesCollationSansGroupFields(requiredCollation)) {
+            traits = traits.replace(collation());
+        }
+
+        IgniteDistribution distribution = TraitUtils.distribution(tgt);
+        if (!satisfiesDistribution(distribution))
+            traits = traits.replace(distribution());
+        else if (distribution.getType() == 
RelDistribution.Type.HASH_DISTRIBUTED) {
+            // Group set contains all distribution keys, shift distribution 
keys according to used columns.
+            IgniteDistribution outDistribution = 
distribution.apply(Commons.mapping(group.keys, rowType.getFieldCount()));

Review Comment:
   Why do we need to change keys order in distribution (also for derived or 
passed through distribution)? Columns order is not changed by this rel op.



##########
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/WindowNode.java:
##########
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.exec.rel;
+
+import java.util.ArrayDeque;
+import java.util.Comparator;
+import java.util.Deque;
+import java.util.function.Supplier;
+import org.apache.calcite.rel.type.RelDataType;
+import 
org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
+import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler;
+import 
org.apache.ignite.internal.processors.query.calcite.exec.exp.window.WindowPartition;
+import org.apache.ignite.internal.util.typedef.F;
+
+/** Window support node */
+public class WindowNode<Row> extends MemoryTrackingNode<Row> implements 
SingleNode<Row>, Downstream<Row> {
+    /**  */
+    private final Comparator<Row> partCmp;
+
+    /**  */
+    private final Supplier<WindowPartition<Row>> partitionFactory;
+
+    /**  */
+    private final RowHandler.RowFactory<Row> rowFactory;
+
+    /**  */
+    private WindowPartition<Row> partition;
+
+    /**  */
+    private int requested;
+
+    /**  */
+    private int waiting;
+
+    /**  */
+    private Row prevRow;
+
+    /**  */
+    private final Deque<Row> outBuf = new ArrayDeque<>(IN_BUFFER_SIZE);
+
+    /**  */
+    public WindowNode(
+        ExecutionContext<Row> ctx,
+        RelDataType rowType,
+        Comparator<Row> partCmp,
+        Supplier<WindowPartition<Row>> partitionFactory,
+        RowHandler.RowFactory<Row> rowFactory
+    ) {
+        super(ctx, rowType, DFLT_ROW_OVERHEAD);
+        this.partCmp = partCmp;
+        this.partitionFactory = partitionFactory;
+        this.rowFactory = rowFactory;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void request(int rowsCnt) throws Exception {
+        assert !F.isEmpty(sources()) && sources().size() == 1;
+        assert rowsCnt > 0;
+
+        checkState();
+
+        requested = rowsCnt;
+
+        doPush();
+
+        if (waiting == 0) {
+            waiting = IN_BUFFER_SIZE;
+            source().request(IN_BUFFER_SIZE);
+        }
+        else if (waiting < 0)
+            downstream().end();

Review Comment:
   Downstream will be prematurely ended if there are more rows in the last 
batch than was requested



##########
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/WindowNode.java:
##########
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.exec.rel;
+
+import java.util.ArrayDeque;
+import java.util.Comparator;
+import java.util.Deque;
+import java.util.function.Supplier;
+import org.apache.calcite.rel.type.RelDataType;
+import 
org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
+import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler;
+import 
org.apache.ignite.internal.processors.query.calcite.exec.exp.window.WindowPartition;
+import org.apache.ignite.internal.util.typedef.F;
+
+/** Window support node */
+public class WindowNode<Row> extends MemoryTrackingNode<Row> implements 
SingleNode<Row>, Downstream<Row> {
+    /**  */
+    private final Comparator<Row> partCmp;
+
+    /**  */
+    private final Supplier<WindowPartition<Row>> partitionFactory;
+
+    /**  */
+    private final RowHandler.RowFactory<Row> rowFactory;
+
+    /**  */
+    private WindowPartition<Row> partition;
+
+    /**  */
+    private int requested;
+
+    /**  */
+    private int waiting;
+
+    /**  */
+    private Row prevRow;
+
+    /**  */
+    private final Deque<Row> outBuf = new ArrayDeque<>(IN_BUFFER_SIZE);
+
+    /**  */
+    public WindowNode(
+        ExecutionContext<Row> ctx,
+        RelDataType rowType,
+        Comparator<Row> partCmp,
+        Supplier<WindowPartition<Row>> partitionFactory,
+        RowHandler.RowFactory<Row> rowFactory
+    ) {
+        super(ctx, rowType, DFLT_ROW_OVERHEAD);
+        this.partCmp = partCmp;
+        this.partitionFactory = partitionFactory;
+        this.rowFactory = rowFactory;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void request(int rowsCnt) throws Exception {
+        assert !F.isEmpty(sources()) && sources().size() == 1;
+        assert rowsCnt > 0;
+
+        checkState();
+
+        requested = rowsCnt;
+
+        doPush();
+
+        if (waiting == 0) {
+            waiting = IN_BUFFER_SIZE;
+            source().request(IN_BUFFER_SIZE);
+        }
+        else if (waiting < 0)
+            downstream().end();
+    }
+
+    @Override public void push(Row row) throws Exception {
+        assert downstream() != null;
+        assert waiting > 0;
+
+        checkState();
+
+        waiting--;
+
+        if (partition == null) {
+            partition = partitionFactory.get();
+        }
+        else if (prevRow != null && partCmp != null && 
partCmp.compare(prevRow, row) != 0) {
+            partition.drainTo(rowFactory, outBuf);
+            partition.reset();
+            doPush();
+        }
+
+        if (partition.add(row)) {
+            partition.drainTo(rowFactory, outBuf);
+            doPush();
+        }
+        else
+            nodeMemoryTracker.onRowAdded(row);
+
+        prevRow = row;
+
+        if (waiting == 0 && requested > 0) {
+            waiting = IN_BUFFER_SIZE;
+
+            context().execute(() -> source().request(IN_BUFFER_SIZE), 
this::onError);
+        }
+    }
+
+    @Override public void end() throws Exception {
+        assert downstream() != null;
+        if (waiting < 0) {
+            return;
+        }
+
+        waiting = -1;
+
+        checkState();
+
+        if (partition != null) {
+            partition.drainTo(rowFactory, outBuf);
+            partition.reset();
+        }
+
+        doPush();
+
+        downstream().end();

Review Comment:
   Downstream will be prematurely ended if there are more rows in the last 
batch than was requested



##########
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/WindowNode.java:
##########
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.exec.rel;
+
+import java.util.ArrayDeque;
+import java.util.Comparator;
+import java.util.Deque;
+import java.util.function.Supplier;
+import org.apache.calcite.rel.type.RelDataType;
+import 
org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
+import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler;
+import 
org.apache.ignite.internal.processors.query.calcite.exec.exp.window.WindowPartition;
+import org.apache.ignite.internal.util.typedef.F;
+
+/** Window support node */
+public class WindowNode<Row> extends MemoryTrackingNode<Row> implements 
SingleNode<Row>, Downstream<Row> {
+    /**  */
+    private final Comparator<Row> partCmp;
+
+    /**  */
+    private final Supplier<WindowPartition<Row>> partitionFactory;
+
+    /**  */
+    private final RowHandler.RowFactory<Row> rowFactory;
+
+    /**  */
+    private WindowPartition<Row> partition;
+
+    /**  */
+    private int requested;
+
+    /**  */
+    private int waiting;
+
+    /**  */
+    private Row prevRow;
+
+    /**  */
+    private final Deque<Row> outBuf = new ArrayDeque<>(IN_BUFFER_SIZE);
+
+    /**  */
+    public WindowNode(
+        ExecutionContext<Row> ctx,
+        RelDataType rowType,
+        Comparator<Row> partCmp,
+        Supplier<WindowPartition<Row>> partitionFactory,
+        RowHandler.RowFactory<Row> rowFactory
+    ) {
+        super(ctx, rowType, DFLT_ROW_OVERHEAD);
+        this.partCmp = partCmp;
+        this.partitionFactory = partitionFactory;
+        this.rowFactory = rowFactory;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void request(int rowsCnt) throws Exception {
+        assert !F.isEmpty(sources()) && sources().size() == 1;
+        assert rowsCnt > 0;
+
+        checkState();
+
+        requested = rowsCnt;
+
+        doPush();
+
+        if (waiting == 0) {
+            waiting = IN_BUFFER_SIZE;
+            source().request(IN_BUFFER_SIZE);
+        }
+        else if (waiting < 0)
+            downstream().end();
+    }
+
+    @Override public void push(Row row) throws Exception {
+        assert downstream() != null;
+        assert waiting > 0;
+
+        checkState();
+
+        waiting--;
+
+        if (partition == null) {
+            partition = partitionFactory.get();
+        }
+        else if (prevRow != null && partCmp != null && 
partCmp.compare(prevRow, row) != 0) {
+            partition.drainTo(rowFactory, outBuf);
+            partition.reset();
+            doPush();
+        }
+
+        if (partition.add(row)) {

Review Comment:
   Strange API (true to drain). Maybe rework it somehow?



##########
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/WindowNode.java:
##########
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.exec.rel;
+
+import java.util.ArrayDeque;
+import java.util.Comparator;
+import java.util.Deque;
+import java.util.function.Supplier;
+import org.apache.calcite.rel.type.RelDataType;
+import 
org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
+import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler;
+import 
org.apache.ignite.internal.processors.query.calcite.exec.exp.window.WindowPartition;
+import org.apache.ignite.internal.util.typedef.F;
+
+/** Window support node */
+public class WindowNode<Row> extends MemoryTrackingNode<Row> implements 
SingleNode<Row>, Downstream<Row> {
+    /**  */
+    private final Comparator<Row> partCmp;
+
+    /**  */
+    private final Supplier<WindowPartition<Row>> partitionFactory;
+
+    /**  */
+    private final RowHandler.RowFactory<Row> rowFactory;
+
+    /**  */
+    private WindowPartition<Row> partition;
+
+    /**  */
+    private int requested;
+
+    /**  */
+    private int waiting;
+
+    /**  */
+    private Row prevRow;
+
+    /**  */
+    private final Deque<Row> outBuf = new ArrayDeque<>(IN_BUFFER_SIZE);
+
+    /**  */
+    public WindowNode(
+        ExecutionContext<Row> ctx,
+        RelDataType rowType,
+        Comparator<Row> partCmp,
+        Supplier<WindowPartition<Row>> partitionFactory,
+        RowHandler.RowFactory<Row> rowFactory
+    ) {
+        super(ctx, rowType, DFLT_ROW_OVERHEAD);
+        this.partCmp = partCmp;
+        this.partitionFactory = partitionFactory;
+        this.rowFactory = rowFactory;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void request(int rowsCnt) throws Exception {
+        assert !F.isEmpty(sources()) && sources().size() == 1;
+        assert rowsCnt > 0;
+
+        checkState();
+
+        requested = rowsCnt;
+
+        doPush();
+
+        if (waiting == 0) {
+            waiting = IN_BUFFER_SIZE;
+            source().request(IN_BUFFER_SIZE);
+        }
+        else if (waiting < 0)
+            downstream().end();
+    }
+
+    @Override public void push(Row row) throws Exception {
+        assert downstream() != null;
+        assert waiting > 0;
+
+        checkState();
+
+        waiting--;
+
+        if (partition == null) {
+            partition = partitionFactory.get();
+        }
+        else if (prevRow != null && partCmp != null && 
partCmp.compare(prevRow, row) != 0) {
+            partition.drainTo(rowFactory, outBuf);
+            partition.reset();
+            doPush();
+        }
+
+        if (partition.add(row)) {
+            partition.drainTo(rowFactory, outBuf);
+            doPush();
+        }
+        else
+            nodeMemoryTracker.onRowAdded(row);

Review Comment:
   Can be false positive memory limit exceed error. Tracker need to be reset 
not only on rewindInternal, but also when partition is fully drained.
   Test required for new node (see MemoryQuotasIntegrationTest) 



##########
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/sql/fun/IgniteStdSqlOperatorTable.java:
##########
@@ -317,5 +317,18 @@ public IgniteStdSqlOperatorTable() {
         register(SqlStdOperatorTable.BIT_AND);
         register(SqlStdOperatorTable.BIT_OR);
         register(SqlStdOperatorTable.BIT_XOR);
+
+        // Window specific operations
+        register(SqlStdOperatorTable.ROW_NUMBER);

Review Comment:
   Smoke test for each operand required in StdSqlOperatorsTest



##########
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/WindowNode.java:
##########
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.exec.rel;
+
+import java.util.ArrayDeque;
+import java.util.Comparator;
+import java.util.Deque;
+import java.util.function.Supplier;
+import org.apache.calcite.rel.type.RelDataType;
+import 
org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
+import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler;
+import 
org.apache.ignite.internal.processors.query.calcite.exec.exp.window.WindowPartition;
+import org.apache.ignite.internal.util.typedef.F;
+
+/** Window support node */
+public class WindowNode<Row> extends MemoryTrackingNode<Row> implements 
SingleNode<Row>, Downstream<Row> {
+    /**  */
+    private final Comparator<Row> partCmp;
+
+    /**  */
+    private final Supplier<WindowPartition<Row>> partitionFactory;
+
+    /**  */
+    private final RowHandler.RowFactory<Row> rowFactory;
+
+    /**  */
+    private WindowPartition<Row> partition;
+
+    /**  */
+    private int requested;
+
+    /**  */
+    private int waiting;
+
+    /**  */
+    private Row prevRow;
+
+    /**  */
+    private final Deque<Row> outBuf = new ArrayDeque<>(IN_BUFFER_SIZE);
+
+    /**  */
+    public WindowNode(
+        ExecutionContext<Row> ctx,
+        RelDataType rowType,
+        Comparator<Row> partCmp,
+        Supplier<WindowPartition<Row>> partitionFactory,
+        RowHandler.RowFactory<Row> rowFactory
+    ) {
+        super(ctx, rowType, DFLT_ROW_OVERHEAD);

Review Comment:
   Memory tracker row overhead depends at least on count of aggregates (maybe 
also kinds of aggregates)



##########
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/WindowNode.java:
##########
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.exec.rel;
+
+import java.util.ArrayDeque;
+import java.util.Comparator;
+import java.util.Deque;
+import java.util.function.Supplier;
+import org.apache.calcite.rel.type.RelDataType;
+import 
org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
+import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler;
+import 
org.apache.ignite.internal.processors.query.calcite.exec.exp.window.WindowPartition;
+import org.apache.ignite.internal.util.typedef.F;
+
+/** Window support node */
+public class WindowNode<Row> extends MemoryTrackingNode<Row> implements 
SingleNode<Row>, Downstream<Row> {
+    /**  */
+    private final Comparator<Row> partCmp;
+
+    /**  */
+    private final Supplier<WindowPartition<Row>> partitionFactory;
+
+    /**  */
+    private final RowHandler.RowFactory<Row> rowFactory;
+
+    /**  */
+    private WindowPartition<Row> partition;
+
+    /**  */
+    private int requested;
+
+    /**  */
+    private int waiting;
+
+    /**  */
+    private Row prevRow;
+
+    /**  */
+    private final Deque<Row> outBuf = new ArrayDeque<>(IN_BUFFER_SIZE);
+
+    /**  */
+    public WindowNode(
+        ExecutionContext<Row> ctx,
+        RelDataType rowType,
+        Comparator<Row> partCmp,
+        Supplier<WindowPartition<Row>> partitionFactory,
+        RowHandler.RowFactory<Row> rowFactory
+    ) {
+        super(ctx, rowType, DFLT_ROW_OVERHEAD);
+        this.partCmp = partCmp;
+        this.partitionFactory = partitionFactory;
+        this.rowFactory = rowFactory;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void request(int rowsCnt) throws Exception {
+        assert !F.isEmpty(sources()) && sources().size() == 1;
+        assert rowsCnt > 0;
+
+        checkState();
+
+        requested = rowsCnt;
+
+        doPush();
+
+        if (waiting == 0) {
+            waiting = IN_BUFFER_SIZE;
+            source().request(IN_BUFFER_SIZE);
+        }
+        else if (waiting < 0)
+            downstream().end();
+    }
+
+    @Override public void push(Row row) throws Exception {
+        assert downstream() != null;
+        assert waiting > 0;
+
+        checkState();
+
+        waiting--;
+
+        if (partition == null) {
+            partition = partitionFactory.get();
+        }
+        else if (prevRow != null && partCmp != null && 
partCmp.compare(prevRow, row) != 0) {
+            partition.drainTo(rowFactory, outBuf);

Review Comment:
   - This operation can block the thread for a long time.
   - Large amount of rows can be stored in outBuf, in worth case there will be 
2x input rows count (in partition and in outBuf)
   
   Consider pushing directly to downstream. Drain (and push) only requested 
amount and postpone next pushes until next request.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to