alex-plekhanov commented on code in PR #12096: URL: https://github.com/apache/ignite/pull/12096#discussion_r2225170086
########## modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/logical/ProjectWindowTransposeRule.java: ########## @@ -0,0 +1,261 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.calcite.rule.logical; + +import java.util.ArrayList; +import java.util.List; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.plan.RelRule; +import org.apache.calcite.rel.RelCollations; +import org.apache.calcite.rel.RelFieldCollation; +import org.apache.calcite.rel.core.Project; +import org.apache.calcite.rel.core.Window; +import org.apache.calcite.rel.logical.LogicalProject; +import org.apache.calcite.rel.logical.LogicalWindow; +import org.apache.calcite.rel.rules.ProjectRemoveRule; +import org.apache.calcite.rel.rules.TransformationRule; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.rel.type.RelDataTypeField; +import org.apache.calcite.rex.RexCall; +import org.apache.calcite.rex.RexInputRef; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.rex.RexShuttle; +import org.apache.calcite.sql.SqlAggFunction; +import org.apache.calcite.util.BitSets; +import org.apache.calcite.util.ImmutableBitSet; +import org.immutables.value.Value; + +/** + * Fixed copy of {@link org.apache.calcite.rel.rules.ProjectWindowTransposeRule}. + * The original rule logic is broken: it skips input refs in window boundaries. + * This issue fixed in calcite 1.39.0, so this rule can be replaced with the original after an update. Review Comment: We are on 1.40 now and this rule can be replaced to original. ########## modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/WindowConverterRule.java: ########## @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.calcite.rule; + +import java.util.ArrayList; +import java.util.List; +import com.google.common.collect.ImmutableList; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptPlanner; +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.PhysicalNode; +import org.apache.calcite.rel.RelCollation; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Window; +import org.apache.calcite.rel.logical.LogicalWindow; +import org.apache.calcite.rel.metadata.RelMetadataQuery; +import org.apache.calcite.rel.type.RelDataTypeField; +import org.apache.calcite.rel.type.RelRecordType; +import org.apache.calcite.sql.SqlAggFunction; +import org.apache.ignite.internal.processors.query.calcite.exec.exp.window.WindowFunctions; +import org.apache.ignite.internal.processors.query.calcite.rel.IgniteConvention; +import org.apache.ignite.internal.processors.query.calcite.rel.IgniteWindow; +import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions; +import org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils; +import org.apache.ignite.internal.util.typedef.internal.U; + +/** */ +public class WindowConverterRule extends AbstractIgniteConverterRule<LogicalWindow> { + /** */ + public static final RelOptRule INSTANCE = new WindowConverterRule(); + + /** */ + private WindowConverterRule() { + super(LogicalWindow.class, "WindowConverterRule"); + } + + /** + * {@inheritDoc} + */ Review Comment: Use one line comment: `/** {@inheritDoc} */` ########## modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/WindowConverterRule.java: ########## @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.calcite.rule; + +import java.util.ArrayList; +import java.util.List; +import com.google.common.collect.ImmutableList; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptPlanner; +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.PhysicalNode; +import org.apache.calcite.rel.RelCollation; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Window; +import org.apache.calcite.rel.logical.LogicalWindow; +import org.apache.calcite.rel.metadata.RelMetadataQuery; +import org.apache.calcite.rel.type.RelDataTypeField; +import org.apache.calcite.rel.type.RelRecordType; +import org.apache.calcite.sql.SqlAggFunction; +import org.apache.ignite.internal.processors.query.calcite.exec.exp.window.WindowFunctions; +import org.apache.ignite.internal.processors.query.calcite.rel.IgniteConvention; +import org.apache.ignite.internal.processors.query.calcite.rel.IgniteWindow; +import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions; +import org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils; +import org.apache.ignite.internal.util.typedef.internal.U; + +/** */ +public class WindowConverterRule extends AbstractIgniteConverterRule<LogicalWindow> { + /** */ + public static final RelOptRule INSTANCE = new WindowConverterRule(); + + /** */ Review Comment: Redundant space in comment, use `/** */` ########## modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/WindowConverterRule.java: ########## @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.calcite.rule; + +import java.util.ArrayList; +import java.util.List; +import com.google.common.collect.ImmutableList; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptPlanner; +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.PhysicalNode; +import org.apache.calcite.rel.RelCollation; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Window; +import org.apache.calcite.rel.logical.LogicalWindow; +import org.apache.calcite.rel.metadata.RelMetadataQuery; +import org.apache.calcite.rel.type.RelDataTypeField; +import org.apache.calcite.rel.type.RelRecordType; +import org.apache.calcite.sql.SqlAggFunction; +import org.apache.ignite.internal.processors.query.calcite.exec.exp.window.WindowFunctions; +import org.apache.ignite.internal.processors.query.calcite.rel.IgniteConvention; +import org.apache.ignite.internal.processors.query.calcite.rel.IgniteWindow; +import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions; +import org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils; +import org.apache.ignite.internal.util.typedef.internal.U; + +/** */ +public class WindowConverterRule extends AbstractIgniteConverterRule<LogicalWindow> { + /** */ + public static final RelOptRule INSTANCE = new WindowConverterRule(); + + /** */ + private WindowConverterRule() { + super(LogicalWindow.class, "WindowConverterRule"); + } + + /** + * {@inheritDoc} + */ + @Override protected PhysicalNode convert(RelOptPlanner planner, RelMetadataQuery mq, LogicalWindow window) { + RelOptCluster cluster = window.getCluster(); + + RelNode result = window.getInput(); + + assert window.constants.isEmpty(); + + for (int grpIdx = 0; grpIdx < window.groups.size(); grpIdx++) { + Window.Group group = window.groups.get(grpIdx); + + RelCollation collation = TraitUtils.mergeCollations( Review Comment: mergeCollations used only once. Let's move this function to the current class and change signature (merge partition columns and collation). ########## modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/WindowConverterRule.java: ########## @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.calcite.rule; + +import java.util.ArrayList; +import java.util.List; +import com.google.common.collect.ImmutableList; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptPlanner; +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.PhysicalNode; +import org.apache.calcite.rel.RelCollation; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Window; +import org.apache.calcite.rel.logical.LogicalWindow; +import org.apache.calcite.rel.metadata.RelMetadataQuery; +import org.apache.calcite.rel.type.RelDataTypeField; +import org.apache.calcite.rel.type.RelRecordType; +import org.apache.calcite.sql.SqlAggFunction; +import org.apache.ignite.internal.processors.query.calcite.exec.exp.window.WindowFunctions; +import org.apache.ignite.internal.processors.query.calcite.rel.IgniteConvention; +import org.apache.ignite.internal.processors.query.calcite.rel.IgniteWindow; +import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions; +import org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils; +import org.apache.ignite.internal.util.typedef.internal.U; + +/** */ +public class WindowConverterRule extends AbstractIgniteConverterRule<LogicalWindow> { + /** */ + public static final RelOptRule INSTANCE = new WindowConverterRule(); + + /** */ + private WindowConverterRule() { + super(LogicalWindow.class, "WindowConverterRule"); + } + + /** + * {@inheritDoc} + */ + @Override protected PhysicalNode convert(RelOptPlanner planner, RelMetadataQuery mq, LogicalWindow window) { + RelOptCluster cluster = window.getCluster(); + + RelNode result = window.getInput(); + + assert window.constants.isEmpty(); + + for (int grpIdx = 0; grpIdx < window.groups.size(); grpIdx++) { + Window.Group group = window.groups.get(grpIdx); Review Comment: Abbreviation should be used for `group` ########## modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/logical/ProjectWindowConstantsRule.java: ########## @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.calcite.rule.logical; + +import java.util.ArrayList; +import java.util.List; +import com.google.common.collect.ImmutableList; +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.plan.RelRule; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Window; +import org.apache.calcite.rel.logical.LogicalWindow; +import org.apache.calcite.rel.rules.TransformationRule; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.rel.type.RelDataTypeField; +import org.apache.calcite.rex.RexBuilder; +import org.apache.calcite.rex.RexInputRef; +import org.apache.calcite.rex.RexLiteral; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.rex.RexWindowBound; +import org.apache.calcite.rex.RexWindowBounds; +import org.apache.calcite.tools.RelBuilder; +import org.immutables.value.Value; + +/** + * A rule to split window rel with constants to: + * - project with constants + * - window without constants + * - project removing constants + */ +@Value.Enclosing +public class ProjectWindowConstantsRule extends RelRule<ProjectWindowConstantsRule.Config> implements TransformationRule { Review Comment: It's a goal of AccumulatorsFactory in/out adapters to provide rows suitable for accumulator. Let's use this logic instead of creating addional rel ops. ########## modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/WindowConverterRule.java: ########## @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.calcite.rule; + +import java.util.ArrayList; +import java.util.List; +import com.google.common.collect.ImmutableList; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptPlanner; +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.PhysicalNode; +import org.apache.calcite.rel.RelCollation; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Window; +import org.apache.calcite.rel.logical.LogicalWindow; +import org.apache.calcite.rel.metadata.RelMetadataQuery; +import org.apache.calcite.rel.type.RelDataTypeField; +import org.apache.calcite.rel.type.RelRecordType; +import org.apache.calcite.sql.SqlAggFunction; +import org.apache.ignite.internal.processors.query.calcite.exec.exp.window.WindowFunctions; +import org.apache.ignite.internal.processors.query.calcite.rel.IgniteConvention; +import org.apache.ignite.internal.processors.query.calcite.rel.IgniteWindow; +import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions; +import org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils; +import org.apache.ignite.internal.util.typedef.internal.U; + +/** */ +public class WindowConverterRule extends AbstractIgniteConverterRule<LogicalWindow> { + /** */ + public static final RelOptRule INSTANCE = new WindowConverterRule(); + + /** */ + private WindowConverterRule() { + super(LogicalWindow.class, "WindowConverterRule"); + } + + /** + * {@inheritDoc} + */ + @Override protected PhysicalNode convert(RelOptPlanner planner, RelMetadataQuery mq, LogicalWindow window) { + RelOptCluster cluster = window.getCluster(); + + RelNode result = window.getInput(); + + assert window.constants.isEmpty(); + + for (int grpIdx = 0; grpIdx < window.groups.size(); grpIdx++) { + Window.Group group = window.groups.get(grpIdx); + + RelCollation collation = TraitUtils.mergeCollations( + TraitUtils.createCollation(group.keys.asList()), + group.collation() + ); + + RelTraitSet inTraits = cluster + .traitSetOf(IgniteConvention.INSTANCE) + .replace(IgniteDistributions.single()) + .replace(collation); + + RelTraitSet outTraits = cluster + .traitSetOf(IgniteConvention.INSTANCE) + .replace(IgniteDistributions.single()) + .replace(collation); + + result = convert(result, inTraits); + + // add fields added by current group. + // see org.apache.calcite.rel.logical.LogicalWindow#create + String groupFieldPrefix = "w" + grpIdx + "$"; + List<RelDataTypeField> fieldsAddedByCurrentGroup = U.arrayList(window.getRowType().getFieldList(), + it -> it.getName().startsWith(groupFieldPrefix)); + List<RelDataTypeField> groupFields = new ArrayList<>(result.getRowType().getFieldList()); + groupFields.addAll(fieldsAddedByCurrentGroup); + + RelRecordType rowType = new RelRecordType(groupFields); + + Window.Group newGroup = convertGroup(group); + + result = new IgniteWindow( + window.getCluster(), + window.getTraitSet().merge(outTraits), + result, + rowType, + newGroup, + WindowFunctions.streamable(newGroup) + ); + } + + return (PhysicalNode)result; + } + + private static Window.Group convertGroup(Window.Group group) { + List<Window.RexWinAggCall> newAggCalls = new ArrayList<>(group.aggCalls.size()); + ImmutableList<Window.RexWinAggCall> calls = group.aggCalls; + for (int i = 0; i < calls.size(); i++) { + Window.RexWinAggCall aggCall = calls.get(i); + Window.RexWinAggCall newCall = new Window.RexWinAggCall( + (SqlAggFunction)aggCall.op, + aggCall.type, + aggCall.operands, + i, + aggCall.distinct, + aggCall.ignoreNulls + ); + newAggCalls.add(newCall); + } + + return new Window.Group( + group.keys, + group.isRows, + group.lowerBound, + group.upperBound, + group.orderKeys, Review Comment: Calcite now requires exclusion clause here. We need to support exclusions or throw an error if exclusion is specified (on SqlValidator phase) ########## modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/WindowConverterRule.java: ########## @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.calcite.rule; + +import java.util.ArrayList; +import java.util.List; +import com.google.common.collect.ImmutableList; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptPlanner; +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.PhysicalNode; +import org.apache.calcite.rel.RelCollation; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Window; +import org.apache.calcite.rel.logical.LogicalWindow; +import org.apache.calcite.rel.metadata.RelMetadataQuery; +import org.apache.calcite.rel.type.RelDataTypeField; +import org.apache.calcite.rel.type.RelRecordType; +import org.apache.calcite.sql.SqlAggFunction; +import org.apache.ignite.internal.processors.query.calcite.exec.exp.window.WindowFunctions; +import org.apache.ignite.internal.processors.query.calcite.rel.IgniteConvention; +import org.apache.ignite.internal.processors.query.calcite.rel.IgniteWindow; +import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions; +import org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils; +import org.apache.ignite.internal.util.typedef.internal.U; + +/** */ +public class WindowConverterRule extends AbstractIgniteConverterRule<LogicalWindow> { + /** */ + public static final RelOptRule INSTANCE = new WindowConverterRule(); + + /** */ + private WindowConverterRule() { + super(LogicalWindow.class, "WindowConverterRule"); + } + + /** + * {@inheritDoc} + */ + @Override protected PhysicalNode convert(RelOptPlanner planner, RelMetadataQuery mq, LogicalWindow window) { + RelOptCluster cluster = window.getCluster(); + + RelNode result = window.getInput(); + + assert window.constants.isEmpty(); + + for (int grpIdx = 0; grpIdx < window.groups.size(); grpIdx++) { + Window.Group group = window.groups.get(grpIdx); + + RelCollation collation = TraitUtils.mergeCollations( + TraitUtils.createCollation(group.keys.asList()), + group.collation() + ); + + RelTraitSet inTraits = cluster + .traitSetOf(IgniteConvention.INSTANCE) + .replace(IgniteDistributions.single()) + .replace(collation); + + RelTraitSet outTraits = cluster + .traitSetOf(IgniteConvention.INSTANCE) + .replace(IgniteDistributions.single()) + .replace(collation); + + result = convert(result, inTraits); + + // add fields added by current group. + // see org.apache.calcite.rel.logical.LogicalWindow#create + String groupFieldPrefix = "w" + grpIdx + "$"; + List<RelDataTypeField> fieldsAddedByCurrentGroup = U.arrayList(window.getRowType().getFieldList(), + it -> it.getName().startsWith(groupFieldPrefix)); + List<RelDataTypeField> groupFields = new ArrayList<>(result.getRowType().getFieldList()); + groupFields.addAll(fieldsAddedByCurrentGroup); + + RelRecordType rowType = new RelRecordType(groupFields); + + Window.Group newGroup = convertGroup(group); + + result = new IgniteWindow( + window.getCluster(), + window.getTraitSet().merge(outTraits), + result, + rowType, + newGroup, + WindowFunctions.streamable(newGroup) + ); + } + + return (PhysicalNode)result; + } + + private static Window.Group convertGroup(Window.Group group) { Review Comment: I can't see any convertion here. Group is copied as is. And call is copied as is. Why do we need this copies? ########## modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/window/BufWindowPartition.java: ########## @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.calcite.exec.exp.window; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Comparator; +import java.util.List; +import java.util.function.Supplier; +import org.apache.calcite.rel.core.Window; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext; +import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler; + +/** Buffering implementation of the ROWS / RANGE window partition */ +final class BufWindowPartition<Row> extends WindowPartitionBase<Row> { + private final List<Row> buffer; Review Comment: `BufWindowPartition` -> `BufferingWindowPartition` According to Ignite code style we don't use abbreviation in class names and method names, but it's mandatory for field names and variables (for example `buffer` field should be renamed to `buf`) ########## modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/WindowConverterRule.java: ########## @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.calcite.rule; + +import java.util.ArrayList; +import java.util.List; +import com.google.common.collect.ImmutableList; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptPlanner; +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.PhysicalNode; +import org.apache.calcite.rel.RelCollation; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Window; +import org.apache.calcite.rel.logical.LogicalWindow; +import org.apache.calcite.rel.metadata.RelMetadataQuery; +import org.apache.calcite.rel.type.RelDataTypeField; +import org.apache.calcite.rel.type.RelRecordType; +import org.apache.calcite.sql.SqlAggFunction; +import org.apache.ignite.internal.processors.query.calcite.exec.exp.window.WindowFunctions; +import org.apache.ignite.internal.processors.query.calcite.rel.IgniteConvention; +import org.apache.ignite.internal.processors.query.calcite.rel.IgniteWindow; +import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions; +import org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils; +import org.apache.ignite.internal.util.typedef.internal.U; + +/** */ +public class WindowConverterRule extends AbstractIgniteConverterRule<LogicalWindow> { + /** */ Review Comment: Redundant space in comment, use `/** */` ########## modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/WindowConverterRule.java: ########## @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.calcite.rule; + +import java.util.ArrayList; +import java.util.List; +import com.google.common.collect.ImmutableList; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptPlanner; +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.PhysicalNode; +import org.apache.calcite.rel.RelCollation; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Window; +import org.apache.calcite.rel.logical.LogicalWindow; +import org.apache.calcite.rel.metadata.RelMetadataQuery; +import org.apache.calcite.rel.type.RelDataTypeField; +import org.apache.calcite.rel.type.RelRecordType; +import org.apache.calcite.sql.SqlAggFunction; +import org.apache.ignite.internal.processors.query.calcite.exec.exp.window.WindowFunctions; +import org.apache.ignite.internal.processors.query.calcite.rel.IgniteConvention; +import org.apache.ignite.internal.processors.query.calcite.rel.IgniteWindow; +import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions; +import org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils; +import org.apache.ignite.internal.util.typedef.internal.U; + +/** */ Review Comment: Redundant space in comment, use `/** */`, or add class description ########## modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java: ########## @@ -882,6 +886,31 @@ else if (rel instanceof Intersect) return node; } + /** {@inheritDoc} */ + @Override public Node<Row> visit(IgniteWindow rel) { + RelDataType outType = rel.getRowType(); + RelDataType inputType = rel.getInput().getRowType(); + + List<Integer> grpKeys = rel.getGroup().keys.toList(); + RelCollation collation = rel.collation(); + + assert collation.getFieldCollations().size() >= grpKeys.size(); + Comparator<Row> partCmp = expressionFactory.comparator(TraitUtils.createCollation(grpKeys)); + + List<AggregateCall> aggCalls = rel.getGroup().getAggregateCalls(rel); + Supplier<WindowPartition<Row>> frameFactory = expressionFactory.windowFrameFactory(rel.getGroup(), aggCalls, inputType, false); Review Comment: Streaming is always `false` for production code. Inside factory it used only for assert. Do we need this parameter at all? ########## modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteWindow.java: ########## @@ -0,0 +1,249 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.calcite.rel; + +import java.util.List; +import com.google.common.collect.ImmutableList; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptCost; +import org.apache.calcite.plan.RelOptPlanner; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelCollation; +import org.apache.calcite.rel.RelDistribution; +import org.apache.calcite.rel.RelFieldCollation; +import org.apache.calcite.rel.RelInput; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.RelWriter; +import org.apache.calcite.rel.core.Window; +import org.apache.calcite.rel.metadata.RelMetadataQuery; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rex.RexLiteral; +import org.apache.calcite.util.Pair; +import org.apache.calcite.util.Util; +import org.apache.ignite.internal.processors.query.calcite.externalize.RelInputEx; +import org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCostFactory; +import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution; +import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions; +import org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils; +import org.apache.ignite.internal.processors.query.calcite.util.Commons; +import org.jetbrains.annotations.Nullable; + +import static org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCost.AGG_CALL_MEM_COST; +import static org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCost.AVERAGE_FIELD_SIZE; +import static org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCost.ROW_COMPARISON_COST; +import static org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils.changeTraits; + +/** + * A relational expression representing a set of window aggregates. + * + * <p>A Window can handle several window aggregate functions, over several + * partitions, with pre- and post-expressions, and an optional post-filter. + * Each of the partitions is defined by a partition key (zero or more columns) + * and a range (logical or physical). The partitions expect the data to be + * sorted correctly on input to the relational expression. + * + * <p>Each {@link Window.Group} has a set of + * {@link org.apache.calcite.rex.RexOver} objects. + */ +public class IgniteWindow extends Window implements IgniteRel { + + private final Group group; + private final boolean streaming; + + /** */ + public IgniteWindow( + RelOptCluster cluster, + RelTraitSet traitSet, + RelNode input, + RelDataType rowType, + Group group, + boolean streaming + ) { + super(cluster, traitSet, input, ImmutableList.of(), rowType, ImmutableList.of(group)); + this.group = group; + this.streaming = streaming; + assert !group.aggCalls.isEmpty(); + } + + /** */ + public IgniteWindow(RelInput input) { + this(input.getCluster(), + changeTraits(input, IgniteConvention.INSTANCE).getTraitSet(), + input.getInput(), + input.getRowType("rowType"), + ((RelInputEx)input).getWindowGroup("group"), + input.getBoolean("streaming", false)); + } + + /** */ + public Group getGroup() { + return group; + } + + /** */ + public boolean isStreaming() { + return streaming; + } + + /** {@inheritDoc} */ + @Override public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) { + return new IgniteWindow(getCluster(), traitSet, sole(inputs), getRowType(), group, streaming); + } + + /** {@inheritDoc} */ + @Override public Window copy(List<RexLiteral> constants) { + assert constants.isEmpty(); + return this; + } + + /** {@inheritDoc} */ + @Override public <T> T accept(IgniteRelVisitor<T> visitor) { + return visitor.visit(this); + } + + /** {@inheritDoc} */ + @Override public IgniteRel clone(RelOptCluster cluster, List<IgniteRel> inputs) { + return new IgniteWindow(cluster, getTraitSet(), sole(inputs), getRowType(), group, streaming); + } + + /** {@inheritDoc} */ + @Override public RelWriter explainTerms(RelWriter pw) { + return pw + .input("input", getInput()) + .item("rowType", getRowType()) + .item("group", group) + .item("streaming", streaming); + } + + /** {@inheritDoc} */ + @Override public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) { + IgniteCostFactory costFactory = (IgniteCostFactory)planner.getCostFactory(); + + int aggCnt = group.aggCalls.size(); + + double rowCnt = mq.getRowCount(getInput()); + double cpuCost = rowCnt * ROW_COMPARISON_COST; + double memCost = (getRowType().getFieldCount() * AVERAGE_FIELD_SIZE + aggCnt * AGG_CALL_MEM_COST) * (streaming ? 1.0 : rowCnt); + + RelOptCost cost = costFactory.makeCost(rowCnt, cpuCost, 0, memCost, 0); + + // Distributed processing is more preferable than processing on the single node. + if (TraitUtils.distribution(traitSet).satisfies(IgniteDistributions.single())) + cost.plus(costFactory.makeTinyCost()); + + return cost; + } + + /** {@inheritDoc} */ + @Override public Pair<RelTraitSet, List<RelTraitSet>> passThroughTraits(RelTraitSet required) { + RelTraitSet traits = passThroughOrDerivedTraits(required); + if (traits == null) + return null; + + return Pair.of(traits, ImmutableList.of(traits)); + } + + /** {@inheritDoc} */ + @Override public Pair<RelTraitSet, List<RelTraitSet>> deriveTraits(RelTraitSet childTraits, int childId) { + assert childId == 0; + + RelTraitSet traits = passThroughOrDerivedTraits(childTraits); + if (traits == null) + return null; + + return Pair.of(traits, ImmutableList.of(traits)); + } + + + /** + * Propagates the trait set from the parent to the child, or derives it from the child node. + * + * <p>The Window node cannot independently satisfy any traits. Therefore: + * - Validate that collation and distribution traits are compatible with the Window node. + * - If they are not, replace them with suitable traits. + * - Request a new trait set from the input accordingly. + */ + private @Nullable RelTraitSet passThroughOrDerivedTraits(RelTraitSet tgt) { + if (tgt.getConvention() != IgniteConvention.INSTANCE) + return null; + + RelTraitSet traits = tgt; + RelCollation requiredCollation = TraitUtils.collation(tgt); + if (!satisfiesCollationSansGroupFields(requiredCollation)) { + traits = traits.replace(collation()); + } + + IgniteDistribution distribution = TraitUtils.distribution(tgt); + if (!satisfiesDistribution(distribution)) + traits = traits.replace(distribution()); + else if (distribution.getType() == RelDistribution.Type.HASH_DISTRIBUTED) { + // Group set contains all distribution keys, shift distribution keys according to used columns. + IgniteDistribution outDistribution = distribution.apply(Commons.mapping(group.keys, rowType.getFieldCount())); + traits = traits.replace(outDistribution); + } + + if (traits == traitSet) { + // new traits equal to current traits of window. + // no need to pass throught or derive any. + return null; + } + + return traits; + } + + /** Check input distribution satisfies collation of this window. */ + private boolean satisfiesDistribution(IgniteDistribution distribution) { + if (distribution.satisfies(IgniteDistributions.single()) || distribution.function().correlated()) { + return true; + } + + if (distribution.getType() == RelDistribution.Type.HASH_DISTRIBUTED) { + for (Integer key : distribution.getKeys()) { + if (!group.keys.get(key)) + // can't derive distribution with fields unmatched to group keys + return false; + } + return true; + } + + return false; + } + + /** + * Check input collation satisfies collation of this window. + * - Collations field indicies of the window should be a prefix for desired collation. + * - Group fields sort direction can be changed to desired collation. + * - Order fields sort direction should be the same as in desired collation. + */ + private boolean satisfiesCollationSansGroupFields(RelCollation desiredCollation) { + RelCollation collation = collation(); + if (desiredCollation.satisfies(collation)) { + return true; + } + + if (!Util.startsWith(desiredCollation.getKeys(), collation.getKeys())) { Review Comment: Maybe it's better to check sets, not lists? For example {0, 1} and {1, 0} are correct columns sets for partitioning, but current check will return `false` ########## modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteWindow.java: ########## @@ -0,0 +1,249 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.calcite.rel; + +import java.util.List; +import com.google.common.collect.ImmutableList; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptCost; +import org.apache.calcite.plan.RelOptPlanner; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelCollation; +import org.apache.calcite.rel.RelDistribution; +import org.apache.calcite.rel.RelFieldCollation; +import org.apache.calcite.rel.RelInput; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.RelWriter; +import org.apache.calcite.rel.core.Window; +import org.apache.calcite.rel.metadata.RelMetadataQuery; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rex.RexLiteral; +import org.apache.calcite.util.Pair; +import org.apache.calcite.util.Util; +import org.apache.ignite.internal.processors.query.calcite.externalize.RelInputEx; +import org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCostFactory; +import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution; +import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions; +import org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils; +import org.apache.ignite.internal.processors.query.calcite.util.Commons; +import org.jetbrains.annotations.Nullable; + +import static org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCost.AGG_CALL_MEM_COST; +import static org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCost.AVERAGE_FIELD_SIZE; +import static org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCost.ROW_COMPARISON_COST; +import static org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils.changeTraits; + +/** + * A relational expression representing a set of window aggregates. + * + * <p>A Window can handle several window aggregate functions, over several + * partitions, with pre- and post-expressions, and an optional post-filter. + * Each of the partitions is defined by a partition key (zero or more columns) + * and a range (logical or physical). The partitions expect the data to be + * sorted correctly on input to the relational expression. + * + * <p>Each {@link Window.Group} has a set of + * {@link org.apache.calcite.rex.RexOver} objects. + */ +public class IgniteWindow extends Window implements IgniteRel { + + private final Group group; + private final boolean streaming; + + /** */ + public IgniteWindow( + RelOptCluster cluster, + RelTraitSet traitSet, + RelNode input, + RelDataType rowType, + Group group, + boolean streaming + ) { + super(cluster, traitSet, input, ImmutableList.of(), rowType, ImmutableList.of(group)); + this.group = group; + this.streaming = streaming; + assert !group.aggCalls.isEmpty(); + } + + /** */ + public IgniteWindow(RelInput input) { + this(input.getCluster(), + changeTraits(input, IgniteConvention.INSTANCE).getTraitSet(), + input.getInput(), + input.getRowType("rowType"), + ((RelInputEx)input).getWindowGroup("group"), + input.getBoolean("streaming", false)); + } + + /** */ + public Group getGroup() { + return group; + } + + /** */ + public boolean isStreaming() { + return streaming; + } + + /** {@inheritDoc} */ + @Override public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) { + return new IgniteWindow(getCluster(), traitSet, sole(inputs), getRowType(), group, streaming); + } + + /** {@inheritDoc} */ + @Override public Window copy(List<RexLiteral> constants) { + assert constants.isEmpty(); + return this; + } + + /** {@inheritDoc} */ + @Override public <T> T accept(IgniteRelVisitor<T> visitor) { + return visitor.visit(this); + } + + /** {@inheritDoc} */ + @Override public IgniteRel clone(RelOptCluster cluster, List<IgniteRel> inputs) { + return new IgniteWindow(cluster, getTraitSet(), sole(inputs), getRowType(), group, streaming); + } + + /** {@inheritDoc} */ + @Override public RelWriter explainTerms(RelWriter pw) { + return pw + .input("input", getInput()) + .item("rowType", getRowType()) + .item("group", group) + .item("streaming", streaming); + } + + /** {@inheritDoc} */ + @Override public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) { + IgniteCostFactory costFactory = (IgniteCostFactory)planner.getCostFactory(); + + int aggCnt = group.aggCalls.size(); + + double rowCnt = mq.getRowCount(getInput()); + double cpuCost = rowCnt * ROW_COMPARISON_COST; + double memCost = (getRowType().getFieldCount() * AVERAGE_FIELD_SIZE + aggCnt * AGG_CALL_MEM_COST) * (streaming ? 1.0 : rowCnt); + + RelOptCost cost = costFactory.makeCost(rowCnt, cpuCost, 0, memCost, 0); + + // Distributed processing is more preferable than processing on the single node. + if (TraitUtils.distribution(traitSet).satisfies(IgniteDistributions.single())) + cost.plus(costFactory.makeTinyCost()); + + return cost; + } + + /** {@inheritDoc} */ + @Override public Pair<RelTraitSet, List<RelTraitSet>> passThroughTraits(RelTraitSet required) { + RelTraitSet traits = passThroughOrDerivedTraits(required); + if (traits == null) + return null; + + return Pair.of(traits, ImmutableList.of(traits)); + } + + /** {@inheritDoc} */ + @Override public Pair<RelTraitSet, List<RelTraitSet>> deriveTraits(RelTraitSet childTraits, int childId) { + assert childId == 0; + + RelTraitSet traits = passThroughOrDerivedTraits(childTraits); + if (traits == null) + return null; + + return Pair.of(traits, ImmutableList.of(traits)); + } + + + /** + * Propagates the trait set from the parent to the child, or derives it from the child node. + * + * <p>The Window node cannot independently satisfy any traits. Therefore: + * - Validate that collation and distribution traits are compatible with the Window node. + * - If they are not, replace them with suitable traits. + * - Request a new trait set from the input accordingly. + */ + private @Nullable RelTraitSet passThroughOrDerivedTraits(RelTraitSet tgt) { Review Comment: `tgt` is not a common abbreviation in Ignite, please use the full word. ########## modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlannerPhase.java: ########## @@ -98,6 +101,22 @@ public enum PlannerPhase { } }, + /** */ + HEP_WINDOW_SPLIT("Heuristic phase to split project to project and window") { + @Override public RuleSet getRules(PlanningContext ctx) { + return ctx.rules( + RuleSets.ofList( + CoreRules.PROJECT_TO_LOGICAL_PROJECT_AND_WINDOW, Review Comment: Do we really need dedicated phase for this rule? Can it be moved to the HEP_PROJECT_PUSH_DOWN? In my opinion all required filters can be pushed down by HEP_FILTER_PUSH_DOWN phase, when window rel op is not yet extracted. All required projects can be pushed down then by HEP_PROJECT_PUSH_DOWN (but projects with windows should not be merged to table scans), in the end we can extract windows from project. Is it correct workflow? ########## modules/calcite/src/test/java/org/apache/ignite/testsuites/PlannerTestSuite.java: ########## @@ -86,8 +87,9 @@ RexSimplificationPlannerTest.class, SerializationPlannerTest.class, UncollectPlannerTest.class, + WindowPlannerTest.class, - HintsTestSuite.class, + HintsTestSuite.class Review Comment: Please add comma to the end of list ########## modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/ExpressionFactory.java: ########## @@ -45,6 +47,14 @@ Supplier<List<AccumulatorWrapper<Row>>> accumulatorsFactory( RelDataType rowType ); + /** */ + Supplier<WindowPartition<Row>> windowFrameFactory( Review Comment: This factory produces partition, not frame, so should be called accordingly ########## modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteWindow.java: ########## @@ -0,0 +1,249 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.calcite.rel; + +import java.util.List; +import com.google.common.collect.ImmutableList; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptCost; +import org.apache.calcite.plan.RelOptPlanner; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelCollation; +import org.apache.calcite.rel.RelDistribution; +import org.apache.calcite.rel.RelFieldCollation; +import org.apache.calcite.rel.RelInput; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.RelWriter; +import org.apache.calcite.rel.core.Window; +import org.apache.calcite.rel.metadata.RelMetadataQuery; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rex.RexLiteral; +import org.apache.calcite.util.Pair; +import org.apache.calcite.util.Util; +import org.apache.ignite.internal.processors.query.calcite.externalize.RelInputEx; +import org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCostFactory; +import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution; +import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions; +import org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils; +import org.apache.ignite.internal.processors.query.calcite.util.Commons; +import org.jetbrains.annotations.Nullable; + +import static org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCost.AGG_CALL_MEM_COST; +import static org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCost.AVERAGE_FIELD_SIZE; +import static org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCost.ROW_COMPARISON_COST; +import static org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils.changeTraits; + +/** + * A relational expression representing a set of window aggregates. + * + * <p>A Window can handle several window aggregate functions, over several + * partitions, with pre- and post-expressions, and an optional post-filter. + * Each of the partitions is defined by a partition key (zero or more columns) + * and a range (logical or physical). The partitions expect the data to be + * sorted correctly on input to the relational expression. + * + * <p>Each {@link Window.Group} has a set of + * {@link org.apache.calcite.rex.RexOver} objects. + */ +public class IgniteWindow extends Window implements IgniteRel { + + private final Group group; + private final boolean streaming; + + /** */ + public IgniteWindow( + RelOptCluster cluster, + RelTraitSet traitSet, + RelNode input, + RelDataType rowType, + Group group, + boolean streaming + ) { + super(cluster, traitSet, input, ImmutableList.of(), rowType, ImmutableList.of(group)); + this.group = group; + this.streaming = streaming; + assert !group.aggCalls.isEmpty(); + } + + /** */ + public IgniteWindow(RelInput input) { + this(input.getCluster(), + changeTraits(input, IgniteConvention.INSTANCE).getTraitSet(), + input.getInput(), + input.getRowType("rowType"), + ((RelInputEx)input).getWindowGroup("group"), + input.getBoolean("streaming", false)); + } + + /** */ + public Group getGroup() { + return group; + } + + /** */ + public boolean isStreaming() { + return streaming; + } + + /** {@inheritDoc} */ + @Override public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) { + return new IgniteWindow(getCluster(), traitSet, sole(inputs), getRowType(), group, streaming); + } + + /** {@inheritDoc} */ + @Override public Window copy(List<RexLiteral> constants) { + assert constants.isEmpty(); + return this; + } + + /** {@inheritDoc} */ + @Override public <T> T accept(IgniteRelVisitor<T> visitor) { + return visitor.visit(this); + } + + /** {@inheritDoc} */ + @Override public IgniteRel clone(RelOptCluster cluster, List<IgniteRel> inputs) { + return new IgniteWindow(cluster, getTraitSet(), sole(inputs), getRowType(), group, streaming); + } + + /** {@inheritDoc} */ + @Override public RelWriter explainTerms(RelWriter pw) { + return pw + .input("input", getInput()) + .item("rowType", getRowType()) + .item("group", group) + .item("streaming", streaming); + } + + /** {@inheritDoc} */ + @Override public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) { + IgniteCostFactory costFactory = (IgniteCostFactory)planner.getCostFactory(); + + int aggCnt = group.aggCalls.size(); + + double rowCnt = mq.getRowCount(getInput()); + double cpuCost = rowCnt * ROW_COMPARISON_COST; + double memCost = (getRowType().getFieldCount() * AVERAGE_FIELD_SIZE + aggCnt * AGG_CALL_MEM_COST) * (streaming ? 1.0 : rowCnt); + + RelOptCost cost = costFactory.makeCost(rowCnt, cpuCost, 0, memCost, 0); + + // Distributed processing is more preferable than processing on the single node. + if (TraitUtils.distribution(traitSet).satisfies(IgniteDistributions.single())) + cost.plus(costFactory.makeTinyCost()); Review Comment: `cost = cost.plus...` ########## modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/WindowPlannerTest.java: ########## @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.calcite.planner; + +import org.apache.calcite.rel.RelCollation; +import org.apache.calcite.rel.RelCollations; +import org.apache.calcite.rex.RexLiteral; +import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.calcite.util.ImmutableIntList; +import org.apache.ignite.internal.processors.query.calcite.rel.IgniteCorrelatedNestedLoopJoin; +import org.apache.ignite.internal.processors.query.calcite.rel.IgniteExchange; +import org.apache.ignite.internal.processors.query.calcite.rel.IgniteProject; +import org.apache.ignite.internal.processors.query.calcite.rel.IgniteWindow; +import org.apache.ignite.internal.processors.query.calcite.schema.IgniteSchema; +import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution; +import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions; +import org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils; +import org.junit.Before; +import org.junit.Test; + +import static java.util.function.Predicate.not; + +/** + * + */ +public class WindowPlannerTest extends AbstractPlannerTest { Review Comment: More tests should be added. At least: - Test with several partition by columns - Test with several order by columns - Test with different agg calls but with the same window definition (there should be only one IgniteWindow rel op) - Test for collation derivation (table index cover more columns than partition/order by, table index has different directions than partition fields) - Test for collation pass through (order by for select and order by in window, extend testOrderByWithWindow) - Test for both collation and distribution derivation - Test with correlated distribution (see CorrelatedSubqueryPlannerTest#testCorrelatedDistribution) ########## modules/calcite/src/test/java/org/apache/ignite/testsuites/ExecutionTestSuite.java: ########## @@ -57,6 +58,7 @@ LimitExecutionTest.class, TimeCalculationExecutionTest.class, UncollectExecutionTest.class, + WindowExecutionTest.class Review Comment: Please add comma to the end of list ########## modules/calcite/src/test/java/org/apache/ignite/testsuites/IntegrationTestSuite.java: ########## @@ -163,6 +164,7 @@ TpchTest.class, UnnestIntegrationTest.class, CalcitePlanningDumpTest.class, + WindowIntegrationTest.class Review Comment: Please add comma to the end of list ########## modules/calcite/src/test/sql/subquery/scalar/test_window_function_subquery.test: ########## Review Comment: Please remove ignore reason at the header of the file ########## modules/calcite/src/test/sql/subquery/scalar/test_complex_correlated_subquery.test_ignore: ########## @@ -111,10 +111,6 @@ SELECT i, (SELECT SUM(s1.i) FROM integers s1 FULL OUTER JOIN integers s2 ON s1.i # REQUIRE(CHECK_COLUMN(result, 0, {Value(), 1, 2, 3})); # REQUIRE(CHECK_COLUMN(result, 1, {6, 6, 9, 12})); -# correlated expression inside window function not supported -statement error -SELECT i, (SELECT row_number() OVER (ORDER BY i)) FROM integers i1 ORDER BY i; - Review Comment: Test cases should not be deleted from ignore file. If test case is wrongly described in ignore file and was fixed in test file, it should be fixed in ignore file too. Ignore file is deleted after it becomes identical to test file. ########## modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteWindow.java: ########## @@ -0,0 +1,249 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.calcite.rel; + +import java.util.List; +import com.google.common.collect.ImmutableList; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptCost; +import org.apache.calcite.plan.RelOptPlanner; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelCollation; +import org.apache.calcite.rel.RelDistribution; +import org.apache.calcite.rel.RelFieldCollation; +import org.apache.calcite.rel.RelInput; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.RelWriter; +import org.apache.calcite.rel.core.Window; +import org.apache.calcite.rel.metadata.RelMetadataQuery; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rex.RexLiteral; +import org.apache.calcite.util.Pair; +import org.apache.calcite.util.Util; +import org.apache.ignite.internal.processors.query.calcite.externalize.RelInputEx; +import org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCostFactory; +import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution; +import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions; +import org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils; +import org.apache.ignite.internal.processors.query.calcite.util.Commons; +import org.jetbrains.annotations.Nullable; + +import static org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCost.AGG_CALL_MEM_COST; +import static org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCost.AVERAGE_FIELD_SIZE; +import static org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCost.ROW_COMPARISON_COST; +import static org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils.changeTraits; + +/** + * A relational expression representing a set of window aggregates. + * + * <p>A Window can handle several window aggregate functions, over several + * partitions, with pre- and post-expressions, and an optional post-filter. + * Each of the partitions is defined by a partition key (zero or more columns) + * and a range (logical or physical). The partitions expect the data to be + * sorted correctly on input to the relational expression. + * + * <p>Each {@link Window.Group} has a set of + * {@link org.apache.calcite.rex.RexOver} objects. + */ +public class IgniteWindow extends Window implements IgniteRel { + + private final Group group; + private final boolean streaming; + + /** */ + public IgniteWindow( + RelOptCluster cluster, + RelTraitSet traitSet, + RelNode input, + RelDataType rowType, + Group group, + boolean streaming + ) { + super(cluster, traitSet, input, ImmutableList.of(), rowType, ImmutableList.of(group)); + this.group = group; + this.streaming = streaming; + assert !group.aggCalls.isEmpty(); + } + + /** */ + public IgniteWindow(RelInput input) { + this(input.getCluster(), + changeTraits(input, IgniteConvention.INSTANCE).getTraitSet(), + input.getInput(), + input.getRowType("rowType"), + ((RelInputEx)input).getWindowGroup("group"), + input.getBoolean("streaming", false)); + } + + /** */ + public Group getGroup() { + return group; + } + + /** */ + public boolean isStreaming() { + return streaming; + } + + /** {@inheritDoc} */ + @Override public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) { + return new IgniteWindow(getCluster(), traitSet, sole(inputs), getRowType(), group, streaming); + } + + /** {@inheritDoc} */ + @Override public Window copy(List<RexLiteral> constants) { + assert constants.isEmpty(); + return this; + } + + /** {@inheritDoc} */ + @Override public <T> T accept(IgniteRelVisitor<T> visitor) { + return visitor.visit(this); + } + + /** {@inheritDoc} */ + @Override public IgniteRel clone(RelOptCluster cluster, List<IgniteRel> inputs) { + return new IgniteWindow(cluster, getTraitSet(), sole(inputs), getRowType(), group, streaming); + } + + /** {@inheritDoc} */ + @Override public RelWriter explainTerms(RelWriter pw) { + return pw + .input("input", getInput()) + .item("rowType", getRowType()) + .item("group", group) + .item("streaming", streaming); + } + + /** {@inheritDoc} */ + @Override public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) { + IgniteCostFactory costFactory = (IgniteCostFactory)planner.getCostFactory(); + + int aggCnt = group.aggCalls.size(); + + double rowCnt = mq.getRowCount(getInput()); + double cpuCost = rowCnt * ROW_COMPARISON_COST; + double memCost = (getRowType().getFieldCount() * AVERAGE_FIELD_SIZE + aggCnt * AGG_CALL_MEM_COST) * (streaming ? 1.0 : rowCnt); + + RelOptCost cost = costFactory.makeCost(rowCnt, cpuCost, 0, memCost, 0); + + // Distributed processing is more preferable than processing on the single node. + if (TraitUtils.distribution(traitSet).satisfies(IgniteDistributions.single())) + cost.plus(costFactory.makeTinyCost()); + + return cost; + } + + /** {@inheritDoc} */ + @Override public Pair<RelTraitSet, List<RelTraitSet>> passThroughTraits(RelTraitSet required) { + RelTraitSet traits = passThroughOrDerivedTraits(required); + if (traits == null) + return null; + + return Pair.of(traits, ImmutableList.of(traits)); + } + + /** {@inheritDoc} */ + @Override public Pair<RelTraitSet, List<RelTraitSet>> deriveTraits(RelTraitSet childTraits, int childId) { + assert childId == 0; + + RelTraitSet traits = passThroughOrDerivedTraits(childTraits); + if (traits == null) + return null; + + return Pair.of(traits, ImmutableList.of(traits)); + } + + + /** + * Propagates the trait set from the parent to the child, or derives it from the child node. + * + * <p>The Window node cannot independently satisfy any traits. Therefore: + * - Validate that collation and distribution traits are compatible with the Window node. + * - If they are not, replace them with suitable traits. + * - Request a new trait set from the input accordingly. + */ + private @Nullable RelTraitSet passThroughOrDerivedTraits(RelTraitSet tgt) { + if (tgt.getConvention() != IgniteConvention.INSTANCE) + return null; + + RelTraitSet traits = tgt; + RelCollation requiredCollation = TraitUtils.collation(tgt); + if (!satisfiesCollationSansGroupFields(requiredCollation)) { + traits = traits.replace(collation()); Review Comment: Is it correct to return current trait as derived/passed trough? Maybe here must be something like `return null`? ########## modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/externalize/RelJson.java: ########## @@ -889,27 +913,28 @@ private Object toJson(RexNode node) { } /** */ - private Object toJson(RexWindow window) { + private Object toJson(Window.Group group) { Map<String, Object> map = map(); - if (!window.partitionKeys.isEmpty()) - map.put("partition", toJson(window.partitionKeys)); - if (!window.orderKeys.isEmpty()) - map.put("order", toJson(window.orderKeys)); - if (window.getLowerBound() == null) { + map.put("calls", toJson(group.aggCalls)); + if (!group.keys.isEmpty()) + map.put("partition", toJson(group.keys)); + if (!group.orderKeys.getKeys().isEmpty()) + map.put("order", toJson(group.orderKeys)); + if (group.lowerBound == null) { Review Comment: Group constructor requires non-null lower and upper bounds. How nulls is possible here? ########## modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteWindow.java: ########## @@ -0,0 +1,249 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.calcite.rel; + +import java.util.List; +import com.google.common.collect.ImmutableList; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptCost; +import org.apache.calcite.plan.RelOptPlanner; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelCollation; +import org.apache.calcite.rel.RelDistribution; +import org.apache.calcite.rel.RelFieldCollation; +import org.apache.calcite.rel.RelInput; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.RelWriter; +import org.apache.calcite.rel.core.Window; +import org.apache.calcite.rel.metadata.RelMetadataQuery; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rex.RexLiteral; +import org.apache.calcite.util.Pair; +import org.apache.calcite.util.Util; +import org.apache.ignite.internal.processors.query.calcite.externalize.RelInputEx; +import org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCostFactory; +import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution; +import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions; +import org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils; +import org.apache.ignite.internal.processors.query.calcite.util.Commons; +import org.jetbrains.annotations.Nullable; + +import static org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCost.AGG_CALL_MEM_COST; +import static org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCost.AVERAGE_FIELD_SIZE; +import static org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCost.ROW_COMPARISON_COST; +import static org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils.changeTraits; + +/** + * A relational expression representing a set of window aggregates. + * + * <p>A Window can handle several window aggregate functions, over several + * partitions, with pre- and post-expressions, and an optional post-filter. + * Each of the partitions is defined by a partition key (zero or more columns) + * and a range (logical or physical). The partitions expect the data to be + * sorted correctly on input to the relational expression. + * + * <p>Each {@link Window.Group} has a set of + * {@link org.apache.calcite.rex.RexOver} objects. + */ +public class IgniteWindow extends Window implements IgniteRel { + + private final Group group; + private final boolean streaming; + + /** */ + public IgniteWindow( + RelOptCluster cluster, + RelTraitSet traitSet, + RelNode input, + RelDataType rowType, + Group group, + boolean streaming + ) { + super(cluster, traitSet, input, ImmutableList.of(), rowType, ImmutableList.of(group)); + this.group = group; + this.streaming = streaming; + assert !group.aggCalls.isEmpty(); + } + + /** */ + public IgniteWindow(RelInput input) { + this(input.getCluster(), + changeTraits(input, IgniteConvention.INSTANCE).getTraitSet(), + input.getInput(), + input.getRowType("rowType"), + ((RelInputEx)input).getWindowGroup("group"), + input.getBoolean("streaming", false)); + } + + /** */ + public Group getGroup() { + return group; + } + + /** */ + public boolean isStreaming() { + return streaming; + } + + /** {@inheritDoc} */ + @Override public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) { + return new IgniteWindow(getCluster(), traitSet, sole(inputs), getRowType(), group, streaming); + } + + /** {@inheritDoc} */ + @Override public Window copy(List<RexLiteral> constants) { + assert constants.isEmpty(); + return this; + } + + /** {@inheritDoc} */ + @Override public <T> T accept(IgniteRelVisitor<T> visitor) { + return visitor.visit(this); + } + + /** {@inheritDoc} */ + @Override public IgniteRel clone(RelOptCluster cluster, List<IgniteRel> inputs) { + return new IgniteWindow(cluster, getTraitSet(), sole(inputs), getRowType(), group, streaming); + } + + /** {@inheritDoc} */ + @Override public RelWriter explainTerms(RelWriter pw) { + return pw + .input("input", getInput()) + .item("rowType", getRowType()) + .item("group", group) + .item("streaming", streaming); + } + + /** {@inheritDoc} */ + @Override public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) { + IgniteCostFactory costFactory = (IgniteCostFactory)planner.getCostFactory(); + + int aggCnt = group.aggCalls.size(); + + double rowCnt = mq.getRowCount(getInput()); + double cpuCost = rowCnt * ROW_COMPARISON_COST; + double memCost = (getRowType().getFieldCount() * AVERAGE_FIELD_SIZE + aggCnt * AGG_CALL_MEM_COST) * (streaming ? 1.0 : rowCnt); + + RelOptCost cost = costFactory.makeCost(rowCnt, cpuCost, 0, memCost, 0); + + // Distributed processing is more preferable than processing on the single node. + if (TraitUtils.distribution(traitSet).satisfies(IgniteDistributions.single())) + cost.plus(costFactory.makeTinyCost()); + + return cost; + } + + /** {@inheritDoc} */ + @Override public Pair<RelTraitSet, List<RelTraitSet>> passThroughTraits(RelTraitSet required) { + RelTraitSet traits = passThroughOrDerivedTraits(required); + if (traits == null) + return null; + + return Pair.of(traits, ImmutableList.of(traits)); + } + + /** {@inheritDoc} */ + @Override public Pair<RelTraitSet, List<RelTraitSet>> deriveTraits(RelTraitSet childTraits, int childId) { + assert childId == 0; + + RelTraitSet traits = passThroughOrDerivedTraits(childTraits); + if (traits == null) + return null; + + return Pair.of(traits, ImmutableList.of(traits)); + } + + + /** + * Propagates the trait set from the parent to the child, or derives it from the child node. + * + * <p>The Window node cannot independently satisfy any traits. Therefore: + * - Validate that collation and distribution traits are compatible with the Window node. + * - If they are not, replace them with suitable traits. + * - Request a new trait set from the input accordingly. + */ + private @Nullable RelTraitSet passThroughOrDerivedTraits(RelTraitSet tgt) { + if (tgt.getConvention() != IgniteConvention.INSTANCE) + return null; + + RelTraitSet traits = tgt; + RelCollation requiredCollation = TraitUtils.collation(tgt); + if (!satisfiesCollationSansGroupFields(requiredCollation)) { + traits = traits.replace(collation()); + } + + IgniteDistribution distribution = TraitUtils.distribution(tgt); + if (!satisfiesDistribution(distribution)) + traits = traits.replace(distribution()); + else if (distribution.getType() == RelDistribution.Type.HASH_DISTRIBUTED) { + // Group set contains all distribution keys, shift distribution keys according to used columns. + IgniteDistribution outDistribution = distribution.apply(Commons.mapping(group.keys, rowType.getFieldCount())); Review Comment: Why do we need to change keys order in distribution (also for derived or passed through distribution)? Columns order is not changed by this rel op. ########## modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/WindowNode.java: ########## @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.calcite.exec.rel; + +import java.util.ArrayDeque; +import java.util.Comparator; +import java.util.Deque; +import java.util.function.Supplier; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext; +import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler; +import org.apache.ignite.internal.processors.query.calcite.exec.exp.window.WindowPartition; +import org.apache.ignite.internal.util.typedef.F; + +/** Window support node */ +public class WindowNode<Row> extends MemoryTrackingNode<Row> implements SingleNode<Row>, Downstream<Row> { + /** */ + private final Comparator<Row> partCmp; + + /** */ + private final Supplier<WindowPartition<Row>> partitionFactory; + + /** */ + private final RowHandler.RowFactory<Row> rowFactory; + + /** */ + private WindowPartition<Row> partition; + + /** */ + private int requested; + + /** */ + private int waiting; + + /** */ + private Row prevRow; + + /** */ + private final Deque<Row> outBuf = new ArrayDeque<>(IN_BUFFER_SIZE); + + /** */ + public WindowNode( + ExecutionContext<Row> ctx, + RelDataType rowType, + Comparator<Row> partCmp, + Supplier<WindowPartition<Row>> partitionFactory, + RowHandler.RowFactory<Row> rowFactory + ) { + super(ctx, rowType, DFLT_ROW_OVERHEAD); + this.partCmp = partCmp; + this.partitionFactory = partitionFactory; + this.rowFactory = rowFactory; + } + + /** {@inheritDoc} */ + @Override public void request(int rowsCnt) throws Exception { + assert !F.isEmpty(sources()) && sources().size() == 1; + assert rowsCnt > 0; + + checkState(); + + requested = rowsCnt; + + doPush(); + + if (waiting == 0) { + waiting = IN_BUFFER_SIZE; + source().request(IN_BUFFER_SIZE); + } + else if (waiting < 0) + downstream().end(); Review Comment: Downstream will be prematurely ended if there are more rows in the last batch than was requested ########## modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/WindowNode.java: ########## @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.calcite.exec.rel; + +import java.util.ArrayDeque; +import java.util.Comparator; +import java.util.Deque; +import java.util.function.Supplier; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext; +import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler; +import org.apache.ignite.internal.processors.query.calcite.exec.exp.window.WindowPartition; +import org.apache.ignite.internal.util.typedef.F; + +/** Window support node */ +public class WindowNode<Row> extends MemoryTrackingNode<Row> implements SingleNode<Row>, Downstream<Row> { + /** */ + private final Comparator<Row> partCmp; + + /** */ + private final Supplier<WindowPartition<Row>> partitionFactory; + + /** */ + private final RowHandler.RowFactory<Row> rowFactory; + + /** */ + private WindowPartition<Row> partition; + + /** */ + private int requested; + + /** */ + private int waiting; + + /** */ + private Row prevRow; + + /** */ + private final Deque<Row> outBuf = new ArrayDeque<>(IN_BUFFER_SIZE); + + /** */ + public WindowNode( + ExecutionContext<Row> ctx, + RelDataType rowType, + Comparator<Row> partCmp, + Supplier<WindowPartition<Row>> partitionFactory, + RowHandler.RowFactory<Row> rowFactory + ) { + super(ctx, rowType, DFLT_ROW_OVERHEAD); + this.partCmp = partCmp; + this.partitionFactory = partitionFactory; + this.rowFactory = rowFactory; + } + + /** {@inheritDoc} */ + @Override public void request(int rowsCnt) throws Exception { + assert !F.isEmpty(sources()) && sources().size() == 1; + assert rowsCnt > 0; + + checkState(); + + requested = rowsCnt; + + doPush(); + + if (waiting == 0) { + waiting = IN_BUFFER_SIZE; + source().request(IN_BUFFER_SIZE); + } + else if (waiting < 0) + downstream().end(); + } + + @Override public void push(Row row) throws Exception { + assert downstream() != null; + assert waiting > 0; + + checkState(); + + waiting--; + + if (partition == null) { + partition = partitionFactory.get(); + } + else if (prevRow != null && partCmp != null && partCmp.compare(prevRow, row) != 0) { + partition.drainTo(rowFactory, outBuf); + partition.reset(); + doPush(); + } + + if (partition.add(row)) { + partition.drainTo(rowFactory, outBuf); + doPush(); + } + else + nodeMemoryTracker.onRowAdded(row); + + prevRow = row; + + if (waiting == 0 && requested > 0) { + waiting = IN_BUFFER_SIZE; + + context().execute(() -> source().request(IN_BUFFER_SIZE), this::onError); + } + } + + @Override public void end() throws Exception { + assert downstream() != null; + if (waiting < 0) { + return; + } + + waiting = -1; + + checkState(); + + if (partition != null) { + partition.drainTo(rowFactory, outBuf); + partition.reset(); + } + + doPush(); + + downstream().end(); Review Comment: Downstream will be prematurely ended if there are more rows in the last batch than was requested ########## modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/WindowNode.java: ########## @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.calcite.exec.rel; + +import java.util.ArrayDeque; +import java.util.Comparator; +import java.util.Deque; +import java.util.function.Supplier; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext; +import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler; +import org.apache.ignite.internal.processors.query.calcite.exec.exp.window.WindowPartition; +import org.apache.ignite.internal.util.typedef.F; + +/** Window support node */ +public class WindowNode<Row> extends MemoryTrackingNode<Row> implements SingleNode<Row>, Downstream<Row> { + /** */ + private final Comparator<Row> partCmp; + + /** */ + private final Supplier<WindowPartition<Row>> partitionFactory; + + /** */ + private final RowHandler.RowFactory<Row> rowFactory; + + /** */ + private WindowPartition<Row> partition; + + /** */ + private int requested; + + /** */ + private int waiting; + + /** */ + private Row prevRow; + + /** */ + private final Deque<Row> outBuf = new ArrayDeque<>(IN_BUFFER_SIZE); + + /** */ + public WindowNode( + ExecutionContext<Row> ctx, + RelDataType rowType, + Comparator<Row> partCmp, + Supplier<WindowPartition<Row>> partitionFactory, + RowHandler.RowFactory<Row> rowFactory + ) { + super(ctx, rowType, DFLT_ROW_OVERHEAD); + this.partCmp = partCmp; + this.partitionFactory = partitionFactory; + this.rowFactory = rowFactory; + } + + /** {@inheritDoc} */ + @Override public void request(int rowsCnt) throws Exception { + assert !F.isEmpty(sources()) && sources().size() == 1; + assert rowsCnt > 0; + + checkState(); + + requested = rowsCnt; + + doPush(); + + if (waiting == 0) { + waiting = IN_BUFFER_SIZE; + source().request(IN_BUFFER_SIZE); + } + else if (waiting < 0) + downstream().end(); + } + + @Override public void push(Row row) throws Exception { + assert downstream() != null; + assert waiting > 0; + + checkState(); + + waiting--; + + if (partition == null) { + partition = partitionFactory.get(); + } + else if (prevRow != null && partCmp != null && partCmp.compare(prevRow, row) != 0) { + partition.drainTo(rowFactory, outBuf); + partition.reset(); + doPush(); + } + + if (partition.add(row)) { Review Comment: Strange API (true to drain). Maybe rework it somehow? ########## modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/WindowNode.java: ########## @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.calcite.exec.rel; + +import java.util.ArrayDeque; +import java.util.Comparator; +import java.util.Deque; +import java.util.function.Supplier; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext; +import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler; +import org.apache.ignite.internal.processors.query.calcite.exec.exp.window.WindowPartition; +import org.apache.ignite.internal.util.typedef.F; + +/** Window support node */ +public class WindowNode<Row> extends MemoryTrackingNode<Row> implements SingleNode<Row>, Downstream<Row> { + /** */ + private final Comparator<Row> partCmp; + + /** */ + private final Supplier<WindowPartition<Row>> partitionFactory; + + /** */ + private final RowHandler.RowFactory<Row> rowFactory; + + /** */ + private WindowPartition<Row> partition; + + /** */ + private int requested; + + /** */ + private int waiting; + + /** */ + private Row prevRow; + + /** */ + private final Deque<Row> outBuf = new ArrayDeque<>(IN_BUFFER_SIZE); + + /** */ + public WindowNode( + ExecutionContext<Row> ctx, + RelDataType rowType, + Comparator<Row> partCmp, + Supplier<WindowPartition<Row>> partitionFactory, + RowHandler.RowFactory<Row> rowFactory + ) { + super(ctx, rowType, DFLT_ROW_OVERHEAD); + this.partCmp = partCmp; + this.partitionFactory = partitionFactory; + this.rowFactory = rowFactory; + } + + /** {@inheritDoc} */ + @Override public void request(int rowsCnt) throws Exception { + assert !F.isEmpty(sources()) && sources().size() == 1; + assert rowsCnt > 0; + + checkState(); + + requested = rowsCnt; + + doPush(); + + if (waiting == 0) { + waiting = IN_BUFFER_SIZE; + source().request(IN_BUFFER_SIZE); + } + else if (waiting < 0) + downstream().end(); + } + + @Override public void push(Row row) throws Exception { + assert downstream() != null; + assert waiting > 0; + + checkState(); + + waiting--; + + if (partition == null) { + partition = partitionFactory.get(); + } + else if (prevRow != null && partCmp != null && partCmp.compare(prevRow, row) != 0) { + partition.drainTo(rowFactory, outBuf); + partition.reset(); + doPush(); + } + + if (partition.add(row)) { + partition.drainTo(rowFactory, outBuf); + doPush(); + } + else + nodeMemoryTracker.onRowAdded(row); Review Comment: Can be false positive memory limit exceed error. Tracker need to be reset not only on rewindInternal, but also when partition is fully drained. Test required for new node (see MemoryQuotasIntegrationTest) ########## modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/sql/fun/IgniteStdSqlOperatorTable.java: ########## @@ -317,5 +317,18 @@ public IgniteStdSqlOperatorTable() { register(SqlStdOperatorTable.BIT_AND); register(SqlStdOperatorTable.BIT_OR); register(SqlStdOperatorTable.BIT_XOR); + + // Window specific operations + register(SqlStdOperatorTable.ROW_NUMBER); Review Comment: Smoke test for each operand required in StdSqlOperatorsTest ########## modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/WindowNode.java: ########## @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.calcite.exec.rel; + +import java.util.ArrayDeque; +import java.util.Comparator; +import java.util.Deque; +import java.util.function.Supplier; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext; +import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler; +import org.apache.ignite.internal.processors.query.calcite.exec.exp.window.WindowPartition; +import org.apache.ignite.internal.util.typedef.F; + +/** Window support node */ +public class WindowNode<Row> extends MemoryTrackingNode<Row> implements SingleNode<Row>, Downstream<Row> { + /** */ + private final Comparator<Row> partCmp; + + /** */ + private final Supplier<WindowPartition<Row>> partitionFactory; + + /** */ + private final RowHandler.RowFactory<Row> rowFactory; + + /** */ + private WindowPartition<Row> partition; + + /** */ + private int requested; + + /** */ + private int waiting; + + /** */ + private Row prevRow; + + /** */ + private final Deque<Row> outBuf = new ArrayDeque<>(IN_BUFFER_SIZE); + + /** */ + public WindowNode( + ExecutionContext<Row> ctx, + RelDataType rowType, + Comparator<Row> partCmp, + Supplier<WindowPartition<Row>> partitionFactory, + RowHandler.RowFactory<Row> rowFactory + ) { + super(ctx, rowType, DFLT_ROW_OVERHEAD); Review Comment: Memory tracker row overhead depends at least on count of aggregates (maybe also kinds of aggregates) ########## modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/WindowNode.java: ########## @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.calcite.exec.rel; + +import java.util.ArrayDeque; +import java.util.Comparator; +import java.util.Deque; +import java.util.function.Supplier; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext; +import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler; +import org.apache.ignite.internal.processors.query.calcite.exec.exp.window.WindowPartition; +import org.apache.ignite.internal.util.typedef.F; + +/** Window support node */ +public class WindowNode<Row> extends MemoryTrackingNode<Row> implements SingleNode<Row>, Downstream<Row> { + /** */ + private final Comparator<Row> partCmp; + + /** */ + private final Supplier<WindowPartition<Row>> partitionFactory; + + /** */ + private final RowHandler.RowFactory<Row> rowFactory; + + /** */ + private WindowPartition<Row> partition; + + /** */ + private int requested; + + /** */ + private int waiting; + + /** */ + private Row prevRow; + + /** */ + private final Deque<Row> outBuf = new ArrayDeque<>(IN_BUFFER_SIZE); + + /** */ + public WindowNode( + ExecutionContext<Row> ctx, + RelDataType rowType, + Comparator<Row> partCmp, + Supplier<WindowPartition<Row>> partitionFactory, + RowHandler.RowFactory<Row> rowFactory + ) { + super(ctx, rowType, DFLT_ROW_OVERHEAD); + this.partCmp = partCmp; + this.partitionFactory = partitionFactory; + this.rowFactory = rowFactory; + } + + /** {@inheritDoc} */ + @Override public void request(int rowsCnt) throws Exception { + assert !F.isEmpty(sources()) && sources().size() == 1; + assert rowsCnt > 0; + + checkState(); + + requested = rowsCnt; + + doPush(); + + if (waiting == 0) { + waiting = IN_BUFFER_SIZE; + source().request(IN_BUFFER_SIZE); + } + else if (waiting < 0) + downstream().end(); + } + + @Override public void push(Row row) throws Exception { + assert downstream() != null; + assert waiting > 0; + + checkState(); + + waiting--; + + if (partition == null) { + partition = partitionFactory.get(); + } + else if (prevRow != null && partCmp != null && partCmp.compare(prevRow, row) != 0) { + partition.drainTo(rowFactory, outBuf); Review Comment: - This operation can block the thread for a long time. - Large amount of rows can be stored in outBuf, in worth case there will be 2x input rows count (in partition and in outBuf) Consider pushing directly to downstream. Drain (and push) only requested amount and postpone next pushes until next request. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org