Copilot commented on code in PR #16953: URL: https://github.com/apache/iotdb/pull/16953#discussion_r2684749257
########## iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/RowNumberNode.java: ########## @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.relational.planner.node; + +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SingleChildProcessNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol; + +import com.google.common.base.Objects; +import com.google.common.collect.ImmutableList; +import org.apache.tsfile.utils.ReadWriteIOUtils; + +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.List; +import java.util.Optional; + +import static com.google.common.base.Preconditions.checkArgument; + +public class RowNumberNode extends SingleChildProcessNode { + private final List<Symbol> partitionBy; + /* + * This flag indicates that the node depends on the row order established by the subplan. + * It is taken into account while adding local exchanges to the plan, ensuring that sorted order + * of data will be respected. + * Note: if the subplan doesn't produce sorted output, this flag doesn't change the resulting plan. + * Note: this flag is used for planning of queries involving ORDER BY and OFFSET. + */ + private final boolean orderSensitive; + private final Optional<Integer> maxRowCountPerPartition; + private final Symbol rowNumberSymbol; + + public RowNumberNode( + PlanNodeId id, + List<Symbol> partitionBy, + boolean orderSensitive, + Symbol rowNumberSymbol, + Optional<Integer> maxRowCountPerPartition) { + super(id); + + this.partitionBy = ImmutableList.copyOf(partitionBy); + this.orderSensitive = orderSensitive; + this.rowNumberSymbol = rowNumberSymbol; + this.maxRowCountPerPartition = maxRowCountPerPartition; + } + + public RowNumberNode( + PlanNodeId id, + PlanNode child, + List<Symbol> partitionBy, + boolean orderSensitive, + Symbol rowNumberSymbol, + Optional<Integer> maxRowCountPerPartition) { + super(id, child); + + this.partitionBy = ImmutableList.copyOf(partitionBy); + this.orderSensitive = orderSensitive; + this.rowNumberSymbol = rowNumberSymbol; + this.maxRowCountPerPartition = maxRowCountPerPartition; + } + + public Symbol getRowNumberSymbol() { + return rowNumberSymbol; + } + + public Optional<Integer> getMaxRowCountPerPartition() { + return maxRowCountPerPartition; + } + + public boolean isOrderSensitive() { + return orderSensitive; + } + + @Override + public PlanNode clone() { + return new RowNumberNode( + getPlanNodeId(), partitionBy, orderSensitive, rowNumberSymbol, maxRowCountPerPartition); + } + + public <R, C> R accept(PlanVisitor<R, C> visitor, C context) { + return visitor.visitRowNumber(this, context); + } + + @Override + public List<String> getOutputColumnNames() { + throw new UnsupportedOperationException(); + } + + @Override + protected void serializeAttributes(ByteBuffer byteBuffer) { + PlanNodeType.TABLE_ROW_NUMBER_NODE.serialize(byteBuffer); + ReadWriteIOUtils.write(partitionBy.size(), byteBuffer); + for (Symbol symbol : partitionBy) { + Symbol.serialize(symbol, byteBuffer); + } + ReadWriteIOUtils.write(orderSensitive, byteBuffer); + Symbol.serialize(rowNumberSymbol, byteBuffer); + if (maxRowCountPerPartition.isPresent()) { + ReadWriteIOUtils.write(true, byteBuffer); + ReadWriteIOUtils.write(maxRowCountPerPartition.get(), byteBuffer); + } else { + ReadWriteIOUtils.write(false, byteBuffer); + } + } + + public List<Symbol> getPartitionBy() { + return partitionBy; + } + + @Override + protected void serializeAttributes(DataOutputStream stream) throws IOException { + PlanNodeType.TABLE_ROW_NUMBER_NODE.serialize(stream); + ReadWriteIOUtils.write(partitionBy.size(), stream); + for (Symbol symbol : partitionBy) { + Symbol.serialize(symbol, stream); + } + ReadWriteIOUtils.write(orderSensitive, stream); + Symbol.serialize(rowNumberSymbol, stream); + if (maxRowCountPerPartition.isPresent()) { + ReadWriteIOUtils.write(true, stream); + ReadWriteIOUtils.write(maxRowCountPerPartition.get(), stream); + } else { + ReadWriteIOUtils.write(false, stream); + } + } + + public static RowNumberNode deserialize(ByteBuffer buffer) { + int partitionBySize = ReadWriteIOUtils.readInt(buffer); + ImmutableList.Builder<Symbol> partitionBy = ImmutableList.builder(); + for (int i = 0; i < partitionBySize; i++) { + partitionBy.add(Symbol.deserialize(buffer)); + } + boolean orderSensitive = ReadWriteIOUtils.readBoolean(buffer); + Symbol rowNumberSymbol = Symbol.deserialize(buffer); + Optional<Integer> maxRowCountPerPartition; + if (ReadWriteIOUtils.readBoolean(buffer)) { + maxRowCountPerPartition = Optional.of(ReadWriteIOUtils.readInt(buffer)); + } else { + maxRowCountPerPartition = Optional.empty(); + } + + PlanNodeId planNodeId = PlanNodeId.deserialize(buffer); + return new RowNumberNode( + planNodeId, partitionBy.build(), orderSensitive, rowNumberSymbol, maxRowCountPerPartition); + } + + @Override + public List<Symbol> getOutputSymbols() { + return Collections.singletonList(rowNumberSymbol); Review Comment: The method getOutputSymbols returns only the row number symbol, but it should return all output symbols including those from the child node. This is inconsistent with how other operators handle output symbols and will cause query planning errors. ```suggestion return ImmutableList.<Symbol>builder() .addAll(getChild().getOutputSymbols()) .add(rowNumberSymbol) .build(); ``` ########## iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java: ########## @@ -709,6 +714,12 @@ public static PlanNode deserialize(ByteBuffer buffer, short nodeType) { return IntersectNode.deserialize(buffer); case 1036: return ExceptNode.deserialize(buffer); + case 1037: + return TopKNode.deserialize(buffer); Review Comment: The deserialization case for TABLE_TOPK_RANKING_NODE (1037) is calling TopKNode.deserialize(buffer) instead of TopKRankingNode.deserialize(buffer). This will cause runtime errors when deserializing TopKRankingNode instances. ########## iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/window/RowNumberOperator.java: ########## @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.db.queryengine.execution.operator.process.window; + +import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper; +import org.apache.iotdb.db.queryengine.execution.operator.Operator; +import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext; +import org.apache.iotdb.db.queryengine.execution.operator.process.ProcessOperator; +import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.UpdateMemory; +import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.hash.GroupByHash; +import org.apache.iotdb.db.queryengine.plan.relational.type.InternalTypeManager; + +import com.google.common.collect.ImmutableList; +import com.google.common.util.concurrent.ListenableFuture; +import org.apache.tsfile.block.column.Column; +import org.apache.tsfile.block.column.ColumnBuilder; +import org.apache.tsfile.common.conf.TSFileDescriptor; +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.read.common.block.TsBlock; +import org.apache.tsfile.read.common.block.TsBlockBuilder; +import org.apache.tsfile.read.common.block.column.RunLengthEncodedColumn; +import org.apache.tsfile.read.common.type.Type; +import org.apache.tsfile.utils.RamUsageEstimator; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +import static org.apache.iotdb.db.queryengine.execution.operator.source.relational.AbstractTableScanOperator.TIME_COLUMN_TEMPLATE; +import static org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.hash.GroupByHash.createGroupByHash; + +public class RowNumberOperator implements ProcessOperator { + private static final long INSTANCE_SIZE = + RamUsageEstimator.shallowSizeOfInstance(RowNumberOperator.class); + + private final OperatorContext operatorContext; + private final Operator inputOperator; + private final List<Integer> outputChannels; + private final List<Integer> partitionChannels; + private final TsBlockBuilder tsBlockBuilder; + + private final Optional<GroupByHash> groupByHash; + private final Optional<Integer> maxRowsPerPartition; + private final Map<Integer, Long> partitionRowCounts; + + public RowNumberOperator( + OperatorContext operatorContext, + Operator inputOperator, + List<TSDataType> inputDataTypes, + List<Integer> outputChannels, + List<Integer> partitionChannels, + Optional<Integer> maxRowsPerPartition, + int expectedPositions) { + this.operatorContext = operatorContext; + this.inputOperator = inputOperator; + this.outputChannels = ImmutableList.copyOf(outputChannels); + this.partitionChannels = ImmutableList.copyOf(partitionChannels); + this.maxRowsPerPartition = maxRowsPerPartition; + + // Output data types + // original output channels + row number column + List<TSDataType> outputDataTypes = new ArrayList<>(); + for (int channel : outputChannels) { + outputDataTypes.add(inputDataTypes.get(channel)); + } + outputDataTypes.add(TSDataType.INT64); + this.tsBlockBuilder = new TsBlockBuilder(outputDataTypes); + + if (partitionChannels.isEmpty()) { + this.groupByHash = Optional.empty(); + } else { + // Partition data types + List<Type> partitionDataTypes = new ArrayList<>(); + for (int channel : partitionChannels) { + TSDataType tsDataType = inputDataTypes.get(channel); + Type convertType = InternalTypeManager.fromTSDataType(tsDataType); + partitionDataTypes.add(convertType); + } + this.groupByHash = + Optional.of( + createGroupByHash(partitionDataTypes, false, expectedPositions, UpdateMemory.NOOP)); + } + + this.partitionRowCounts = new HashMap<>(expectedPositions); + } + + @Override + public OperatorContext getOperatorContext() { + return operatorContext; + } + + @Override + public TsBlock next() throws Exception { + TsBlock tsBlock = inputOperator.nextWithTimer(); + if (tsBlock == null) { + return null; + } + + int[] partitionIds = getTsBlockPartitionIds(tsBlock); + for (int position = 0; position < tsBlock.getPositionCount(); position++) { + int partitionId = groupByHash.isPresent() ? partitionIds[position] : 0; + long rowCount = partitionRowCounts.getOrDefault(partitionId, 0L); + processRow(tsBlock, partitionId, rowCount + 1); + partitionRowCounts.put(partitionId, rowCount + 1); + } + + TsBlock result = + tsBlockBuilder.build( + new RunLengthEncodedColumn(TIME_COLUMN_TEMPLATE, tsBlockBuilder.getPositionCount())); + tsBlockBuilder.reset(); + return result; + } + + private void processRow(TsBlock tsBlock, int position, long rowNumber) { Review Comment: The method processRow accepts three parameters (TsBlock tsBlock, int position, long rowNumber) but is being called with (tsBlock, partitionId, rowCount + 1) at line 121. The second argument should be 'position', not 'partitionId'. This will cause incorrect column access and likely runtime errors. ########## iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/UnaliasSymbolReferences.java: ########## @@ -619,6 +621,28 @@ public PlanAndMappings visitWindowFunction(WindowNode node, UnaliasContext conte return new PlanAndMappings(rewrittenWindow, mapping); } + @Override + public PlanAndMappings visitRowNumber(RowNumberNode node, UnaliasContext context) { + PlanAndMappings rewrittenSource = node.getChild().accept(this, context); + Map<Symbol, Symbol> mapping = new HashMap<>(rewrittenSource.getMappings()); + SymbolMapper mapper = symbolMapper(mapping); + + RowNumberNode rewrittenRowNumber = mapper.map(node, rewrittenSource.getRoot()); + + return new PlanAndMappings(rewrittenRowNumber, mapping); + } + + @Override + public PlanAndMappings visitTopKRanking(TopKRankingNode node, UnaliasContext context) { + PlanAndMappings rewrittenSource = node.getChild().accept(this, context); + Map<Symbol, Symbol> mapping = new HashMap<>(rewrittenSource.getMappings()); + SymbolMapper mapper = symbolMapper(mapping); + + TopKRankingNode rewrittenTopNRanking = mapper.map(node, rewrittenSource.getRoot()); Review Comment: The variable name 'rewrittenTopNRanking' (line 641) is inconsistent with the node type TopKRankingNode. The name should be 'rewrittenTopKRanking' to match the actual class name and maintain naming consistency. ########## iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/TopKRankingNode.java: ########## @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.relational.planner.node; + +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SingleChildProcessNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.DataOrganizationSpecification; +import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol; + +import com.google.common.base.Objects; +import org.apache.tsfile.utils.ReadWriteIOUtils; + +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.List; + +public class TopKRankingNode extends SingleChildProcessNode { + public enum RankingType { + ROW_NUMBER, + RANK, + DENSE_RANK + } + + private final DataOrganizationSpecification specification; + private final RankingType rankingType; + private final Symbol rankingSymbol; + private final int maxRankingPerPartition; + private final boolean partial; + + public TopKRankingNode( + PlanNodeId id, + DataOrganizationSpecification specification, + RankingType rankingType, + Symbol rankingSymbol, + int maxRankingPerPartition, + boolean partial) { + super(id); + + this.specification = specification; + this.rankingType = rankingType; + this.rankingSymbol = rankingSymbol; + this.maxRankingPerPartition = maxRankingPerPartition; + this.partial = partial; + } + + public TopKRankingNode( + PlanNodeId id, + PlanNode child, + DataOrganizationSpecification specification, + RankingType rankingType, + Symbol rankingSymbol, + int maxRankingPerPartition, + boolean partial) { + super(id, child); + + this.specification = specification; + this.rankingType = rankingType; + this.rankingSymbol = rankingSymbol; + this.maxRankingPerPartition = maxRankingPerPartition; + this.partial = partial; + } + + @Override + public PlanNode clone() { + return new TopKRankingNode( + getPlanNodeId(), + specification, + rankingType, + rankingSymbol, + maxRankingPerPartition, + partial); + } + + @Override + public <R, C> R accept(PlanVisitor<R, C> visitor, C context) { + return visitor.visitTopKRanking(this, context); + } + + public DataOrganizationSpecification getSpecification() { + return specification; + } + + public boolean isPartial() { + return partial; + } + + public Symbol getRankingSymbol() { + return rankingSymbol; + } + + public int getMaxRankingPerPartition() { + return maxRankingPerPartition; + } + + public RankingType getRankingType() { + return rankingType; + } + + @Override + public List<String> getOutputColumnNames() { + throw new UnsupportedOperationException(); + } + + @Override + protected void serializeAttributes(ByteBuffer byteBuffer) { + PlanNodeType.TABLE_TOPK_RANKING_NODE.serialize(byteBuffer); + specification.serialize(byteBuffer); + ReadWriteIOUtils.write(rankingType.ordinal(), byteBuffer); + Symbol.serialize(rankingSymbol, byteBuffer); + ReadWriteIOUtils.write(maxRankingPerPartition, byteBuffer); + ReadWriteIOUtils.write(partial, byteBuffer); + } + + @Override + protected void serializeAttributes(DataOutputStream stream) throws IOException { + PlanNodeType.TABLE_TOPK_RANKING_NODE.serialize(stream); + specification.serialize(stream); + ReadWriteIOUtils.write(rankingType.ordinal(), stream); + Symbol.serialize(rankingSymbol, stream); + ReadWriteIOUtils.write(maxRankingPerPartition, stream); + ReadWriteIOUtils.write(partial, stream); + } + + public static TopKRankingNode deserialize(ByteBuffer byteBuffer) { + DataOrganizationSpecification specification = + DataOrganizationSpecification.deserialize(byteBuffer); + RankingType rankingType = RankingType.values()[ReadWriteIOUtils.readInt(byteBuffer)]; + Symbol rankingSymbol = Symbol.deserialize(byteBuffer); + int maxRankingPerPartition = ReadWriteIOUtils.readInt(byteBuffer); + boolean partial = ReadWriteIOUtils.readBoolean(byteBuffer); + + PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer); + return new TopKRankingNode( + planNodeId, specification, rankingType, rankingSymbol, maxRankingPerPartition, partial); + } + + @Override + public List<Symbol> getOutputSymbols() { + return Collections.singletonList(rankingSymbol); + } Review Comment: The method getOutputSymbols returns only the ranking symbol, but it should return all output symbols including those from the child node. This inconsistency with other node implementations (like RowNumberNode which properly handles output symbols) will cause incorrect query planning. ########## iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/ReplaceWindowWithRowNumber.java: ########## @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule; + +import org.apache.iotdb.db.queryengine.plan.relational.function.BoundSignature; +import org.apache.iotdb.db.queryengine.plan.relational.metadata.Metadata; +import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.Rule; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.WindowNode; +import org.apache.iotdb.db.queryengine.plan.relational.utils.matching.Captures; +import org.apache.iotdb.db.queryengine.plan.relational.utils.matching.Pattern; + +import static com.google.common.collect.Iterables.getOnlyElement; +import static org.apache.iotdb.db.queryengine.plan.relational.planner.node.Patterns.window; + +public class ReplaceWindowWithRowNumber implements Rule<WindowNode> { + private final Pattern<WindowNode> pattern; + + public ReplaceWindowWithRowNumber(Metadata metadata) { + this.pattern = + window() + .matching( + window -> { + if (window.getWindowFunctions().size() != 1) { + return false; + } + BoundSignature signature = + getOnlyElement(window.getWindowFunctions().values()) + .getResolvedFunction() + .getSignature(); + return signature.getArgumentTypes().isEmpty() + && signature.getName().equals("row_number"); + }) + .matching(window -> !window.getSpecification().getOrderingScheme().isPresent()); + } + + @Override + public Pattern<WindowNode> getPattern() { + return pattern; + } + + @Override + public Result apply(WindowNode node, Captures captures, Context context) { + return null; Review Comment: The apply method returns null unconditionally. This rule will never perform any transformation, making it ineffective. The method should implement the actual transformation logic to replace the WindowNode with a RowNumberNode. ```suggestion return Result.empty(); ``` ########## iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/RowNumberNode.java: ########## @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.relational.planner.node; + +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SingleChildProcessNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol; + +import com.google.common.base.Objects; +import com.google.common.collect.ImmutableList; +import org.apache.tsfile.utils.ReadWriteIOUtils; + +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.List; +import java.util.Optional; + +import static com.google.common.base.Preconditions.checkArgument; + +public class RowNumberNode extends SingleChildProcessNode { + private final List<Symbol> partitionBy; + /* + * This flag indicates that the node depends on the row order established by the subplan. + * It is taken into account while adding local exchanges to the plan, ensuring that sorted order + * of data will be respected. + * Note: if the subplan doesn't produce sorted output, this flag doesn't change the resulting plan. + * Note: this flag is used for planning of queries involving ORDER BY and OFFSET. + */ + private final boolean orderSensitive; + private final Optional<Integer> maxRowCountPerPartition; + private final Symbol rowNumberSymbol; + + public RowNumberNode( + PlanNodeId id, + List<Symbol> partitionBy, + boolean orderSensitive, + Symbol rowNumberSymbol, + Optional<Integer> maxRowCountPerPartition) { + super(id); + + this.partitionBy = ImmutableList.copyOf(partitionBy); + this.orderSensitive = orderSensitive; + this.rowNumberSymbol = rowNumberSymbol; + this.maxRowCountPerPartition = maxRowCountPerPartition; + } + + public RowNumberNode( + PlanNodeId id, + PlanNode child, + List<Symbol> partitionBy, + boolean orderSensitive, + Symbol rowNumberSymbol, + Optional<Integer> maxRowCountPerPartition) { + super(id, child); + + this.partitionBy = ImmutableList.copyOf(partitionBy); + this.orderSensitive = orderSensitive; + this.rowNumberSymbol = rowNumberSymbol; + this.maxRowCountPerPartition = maxRowCountPerPartition; + } + + public Symbol getRowNumberSymbol() { + return rowNumberSymbol; + } + + public Optional<Integer> getMaxRowCountPerPartition() { + return maxRowCountPerPartition; + } + + public boolean isOrderSensitive() { + return orderSensitive; + } + + @Override + public PlanNode clone() { + return new RowNumberNode( + getPlanNodeId(), partitionBy, orderSensitive, rowNumberSymbol, maxRowCountPerPartition); + } + Review Comment: This method overrides [PlanNode.accept](1); it is advisable to add an Override annotation. ```suggestion @Override ``` ########## iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/window/TopKRankingOperator.java: ########## @@ -0,0 +1,277 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.execution.operator.process.window; + +import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper; +import org.apache.iotdb.db.queryengine.execution.operator.GroupedTopNBuilder; +import org.apache.iotdb.db.queryengine.execution.operator.GroupedTopNRowNumberBuilder; +import org.apache.iotdb.db.queryengine.execution.operator.Operator; +import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext; +import org.apache.iotdb.db.queryengine.execution.operator.SimpleTsBlockWithPositionComparator; +import org.apache.iotdb.db.queryengine.execution.operator.TsBlockWithPositionComparator; +import org.apache.iotdb.db.queryengine.execution.operator.process.ProcessOperator; +import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.UpdateMemory; +import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.hash.GroupByHash; +import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.hash.NoChannelGroupByHash; +import org.apache.iotdb.db.queryengine.plan.relational.planner.SortOrder; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TopKRankingNode; +import org.apache.iotdb.db.queryengine.plan.relational.type.InternalTypeManager; + +import com.google.common.base.Supplier; +import com.google.common.base.Suppliers; +import com.google.common.collect.ImmutableList; +import org.apache.tsfile.common.conf.TSFileDescriptor; +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.read.common.block.TsBlock; +import org.apache.tsfile.read.common.type.Type; +import org.apache.tsfile.utils.RamUsageEstimator; + +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; + +public class TopKRankingOperator implements ProcessOperator { + private static final long INSTANCE_SIZE = + RamUsageEstimator.shallowSizeOfInstance(TopKRankingOperator.class); + + private final OperatorContext operatorContext; + private final Operator inputOperator; + private final TopKRankingNode.RankingType rankingType; + private final List<TSDataType> inputTypes; + + private final List<Integer> outputChannels; + private final List<Integer> partitionChannels; + private final List<TSDataType> partitionTSDataTypes; + private final List<Integer> sortChannels; + private final List<SortOrder> sortOrders; + private final int maxRowCountPerPartition; + private final boolean partial; + private final boolean generateRanking; + private final Optional<Integer> hashChannel; + private final int expectedPositions; + + private final long maxFlushableBytes; + + private final Supplier<GroupByHash> groupByHashSupplier; + private final Supplier<GroupedTopNBuilder> groupedTopNBuilderSupplier; + + private GroupByHash groupByHash; + private GroupedTopNBuilder groupedTopNBuilder; + + private boolean finished = false; + private java.util.Iterator<TsBlock> outputIterator; + + public TopKRankingOperator( + OperatorContext operatorContext, + Operator inputOperator, + TopKRankingNode.RankingType rankingType, + List<TSDataType> inputTypes, + List<Integer> outputChannels, + List<Integer> partitionChannels, + List<TSDataType> partitionTSDataTypes, + List<Integer> sortChannels, + List<SortOrder> sortOrders, + int maxRowCountPerPartition, + boolean generateRanking, + Optional<Integer> hashChannel, + int expectedPositions, + Optional<Long> maxPartialMemory) { + this.operatorContext = operatorContext; + this.inputOperator = inputOperator; + this.rankingType = rankingType; + this.inputTypes = inputTypes; + this.partitionChannels = partitionChannels; + this.partitionTSDataTypes = partitionTSDataTypes; + this.sortChannels = sortChannels; + this.sortOrders = sortOrders; + this.maxRowCountPerPartition = maxRowCountPerPartition; + this.partial = !generateRanking; + this.generateRanking = generateRanking; Review Comment: The TopKRankingOperator constructor parameter 'generateRanking' is used to set 'partial' with inverted logic (partial = !generateRanking at line 105), but then 'generateRanking' is also stored separately. This creates confusing dual state. Additionally, the constructor parameter name at line 92 is 'generateRanking' but the field at line 66 is named 'generateRanking' while the parameter is used to derive 'partial'. Consider using a single boolean field with clear semantics. ########## iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/window/RowNumberOperator.java: ########## @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.db.queryengine.execution.operator.process.window; + +import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper; +import org.apache.iotdb.db.queryengine.execution.operator.Operator; +import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext; +import org.apache.iotdb.db.queryengine.execution.operator.process.ProcessOperator; +import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.UpdateMemory; +import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.hash.GroupByHash; +import org.apache.iotdb.db.queryengine.plan.relational.type.InternalTypeManager; + +import com.google.common.collect.ImmutableList; +import com.google.common.util.concurrent.ListenableFuture; +import org.apache.tsfile.block.column.Column; +import org.apache.tsfile.block.column.ColumnBuilder; +import org.apache.tsfile.common.conf.TSFileDescriptor; +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.read.common.block.TsBlock; +import org.apache.tsfile.read.common.block.TsBlockBuilder; +import org.apache.tsfile.read.common.block.column.RunLengthEncodedColumn; +import org.apache.tsfile.read.common.type.Type; +import org.apache.tsfile.utils.RamUsageEstimator; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +import static org.apache.iotdb.db.queryengine.execution.operator.source.relational.AbstractTableScanOperator.TIME_COLUMN_TEMPLATE; +import static org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.hash.GroupByHash.createGroupByHash; + +public class RowNumberOperator implements ProcessOperator { + private static final long INSTANCE_SIZE = + RamUsageEstimator.shallowSizeOfInstance(RowNumberOperator.class); + + private final OperatorContext operatorContext; + private final Operator inputOperator; + private final List<Integer> outputChannels; + private final List<Integer> partitionChannels; + private final TsBlockBuilder tsBlockBuilder; + + private final Optional<GroupByHash> groupByHash; + private final Optional<Integer> maxRowsPerPartition; + private final Map<Integer, Long> partitionRowCounts; + + public RowNumberOperator( + OperatorContext operatorContext, + Operator inputOperator, + List<TSDataType> inputDataTypes, + List<Integer> outputChannels, + List<Integer> partitionChannels, + Optional<Integer> maxRowsPerPartition, + int expectedPositions) { + this.operatorContext = operatorContext; + this.inputOperator = inputOperator; + this.outputChannels = ImmutableList.copyOf(outputChannels); + this.partitionChannels = ImmutableList.copyOf(partitionChannels); + this.maxRowsPerPartition = maxRowsPerPartition; + + // Output data types + // original output channels + row number column + List<TSDataType> outputDataTypes = new ArrayList<>(); + for (int channel : outputChannels) { + outputDataTypes.add(inputDataTypes.get(channel)); + } + outputDataTypes.add(TSDataType.INT64); + this.tsBlockBuilder = new TsBlockBuilder(outputDataTypes); + + if (partitionChannels.isEmpty()) { + this.groupByHash = Optional.empty(); + } else { + // Partition data types + List<Type> partitionDataTypes = new ArrayList<>(); + for (int channel : partitionChannels) { + TSDataType tsDataType = inputDataTypes.get(channel); + Type convertType = InternalTypeManager.fromTSDataType(tsDataType); + partitionDataTypes.add(convertType); + } + this.groupByHash = + Optional.of( + createGroupByHash(partitionDataTypes, false, expectedPositions, UpdateMemory.NOOP)); + } + + this.partitionRowCounts = new HashMap<>(expectedPositions); + } + + @Override + public OperatorContext getOperatorContext() { + return operatorContext; + } + + @Override + public TsBlock next() throws Exception { + TsBlock tsBlock = inputOperator.nextWithTimer(); + if (tsBlock == null) { + return null; + } + + int[] partitionIds = getTsBlockPartitionIds(tsBlock); + for (int position = 0; position < tsBlock.getPositionCount(); position++) { + int partitionId = groupByHash.isPresent() ? partitionIds[position] : 0; + long rowCount = partitionRowCounts.getOrDefault(partitionId, 0L); + processRow(tsBlock, partitionId, rowCount + 1); + partitionRowCounts.put(partitionId, rowCount + 1); + } + + TsBlock result = + tsBlockBuilder.build( + new RunLengthEncodedColumn(TIME_COLUMN_TEMPLATE, tsBlockBuilder.getPositionCount())); + tsBlockBuilder.reset(); + return result; + } + + private void processRow(TsBlock tsBlock, int position, long rowNumber) { + // Check max rows per partition limit + if (maxRowsPerPartition.isPresent() && rowNumber >= maxRowsPerPartition.get()) { Review Comment: The condition checks if rowNumber >= maxRowsPerPartition, but it should check rowNumber > maxRowsPerPartition. With the current logic, when rowNumber equals maxRowsPerPartition (which is the maximum allowed), the row is incorrectly skipped. For example, if maxRowsPerPartition is 5, row 5 will be skipped even though rows 1-5 should be included. ```suggestion if (maxRowsPerPartition.isPresent() && rowNumber > maxRowsPerPartition.get()) { ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
