JackieTien97 commented on code in PR #15136:
URL: https://github.com/apache/iotdb/pull/15136#discussion_r2011794679
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/PartitionRecognizer.java:
##########
@@ -156,15 +160,39 @@ private PartitionState
handleIteratingOrNewPartitionState() {
* all rows have the same partition values, return the position count of the
current TsBlock.
*/
private int findNextDifferentRowIndex() {
- SortKey compareKey = new SortKey(currentTsBlock, currentIndex);
- while (compareKey.rowIndex < currentTsBlock.getPositionCount()) {
- if (partitionComparator.compare(partitionKey, compareKey) != 0) {
- partitionKey = compareKey;
- return compareKey.rowIndex;
+ int totalRows = currentTsBlock.getPositionCount();
+
+ // check if all rows have the same partition values
+ SortKey compareKey = new SortKey(currentTsBlock, totalRows - 1);
+ if (partitionComparator.compare(currentPartitionKey, compareKey) == 0) {
+ return totalRows;
+ }
+
+ // check the first row
+ compareKey.rowIndex = currentIndex;
+ if (partitionComparator.compare(currentPartitionKey, compareKey) != 0) {
+ currentPartitionKey = compareKey;
+ return currentIndex;
+ }
+
+ // binary search to find the next different partition values
+ int low = currentIndex;
+ int high = totalRows - 1;
+ int firstDiff = totalRows;
+ while (low <= high) {
+ compareKey.rowIndex = low + (high - low) / 2;
+ int cmp = partitionComparator.compare(currentPartitionKey, compareKey);
+ if (cmp == 0) {
+ low = compareKey.rowIndex + 1;
Review Comment:
need update low? otherwise, it will be dead-loop
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/TableFunctionOperator.java:
##########
@@ -61,16 +65,19 @@ public class TableFunctionOperator implements
ProcessOperator {
private final Operator inputOperator;
private final TableFunctionProcessorProvider processorProvider;
private final PartitionRecognizer partitionRecognizer;
- private final TsBlockBuilder blockBuilder;
+ private final TsBlockBuilder properBlockBuilder;
private final int properChannelCount;
private final boolean needPassThrough;
- private final SliceCache sliceCache;
+ private final PartitionCache partitionCache;
+ private final boolean requireRecordSnapshot;
private TableFunctionDataProcessor processor;
private PartitionState partitionState;
private ListenableFuture<?> isBlocked;
private boolean finished = false;
+ private Queue<TsBlock> resultTsBlocks;
Review Comment:
```suggestion
private final Queue<TsBlock> resultTsBlocks;
```
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/GroupNode.java:
##########
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.relational.planner.node;
+
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.OrderingScheme;
+
+import com.google.common.collect.Iterables;
+import org.apache.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+/**
+ * GroupNode is a auxiliary node that is used to group data. Currently, it is
implemented based on
+ * SortNode. It will only be generated some special node that required
grouping source, such as
+ * FillNode and TableFunctionNode.
+ *
+ * <p>GroupNode's ordering schema consists of two parts: PartitionKey and
OrderKey. It guarantees to
+ * return data grouped by PartitionKey and sorted by OrderKey. For example,
PARTITION BY device_id
+ * ORDER BY time will return data grouped by device_id, and in each group,
data will be sorted by
+ * time.
+ */
+public class GroupNode extends SortNode {
+
+ /**
+ * orderingScheme may include two parts: PartitionKey and OrderKey. It marks
the number of
+ * PartitionKey.
+ */
+ private int partitionKeyCount;
+
+ public GroupNode(PlanNodeId id, PlanNode child, OrderingScheme scheme, int
partitionKeyCount) {
+ super(id, child, scheme, false, false);
+ this.partitionKeyCount = partitionKeyCount;
+ }
+
+ @Override
+ public PlanNode replaceChildren(List<PlanNode> newChildren) {
+ return new GroupNode(
+ id, Iterables.getOnlyElement(newChildren), orderingScheme,
partitionKeyCount);
+ }
+
+ public int getPartitionKeyCount() {
+ return partitionKeyCount;
+ }
+
+ @Override
+ public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+ return visitor.visitGroup(this, context);
+ }
+
+ @Override
+ public PlanNode clone() {
+ return new GroupNode(id, null, orderingScheme, partitionKeyCount);
+ }
+
+ @Override
+ protected void serializeAttributes(ByteBuffer byteBuffer) {
+ PlanNodeType.TABLE_GROUP_NODE.serialize(byteBuffer);
+ orderingScheme.serialize(byteBuffer);
Review Comment:
better add a new protected method in SortNode, to extract these common code
together, then if SortNode add some new attributes, we won't need to change
subClass of it
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]