JackieTien97 commented on code in PR #8613: URL: https://github.com/apache/iotdb/pull/8613#discussion_r1058027870
########## server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/VirtualSourceNode.java: ########## @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.db.mpp.plan.planner.plan.node.source; + +import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; +import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode; +import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId; + +public abstract class VirtualSourceNode extends PlanNode implements AutoCloseable { + + private TDataNodeLocation dataNodeLocation; + + public VirtualSourceNode(PlanNodeId id, TDataNodeLocation dataNodeLocation) { + super(id); + this.dataNodeLocation = dataNodeLocation; + } + + public abstract void open() throws Exception; Review Comment: ```suggestion ``` ########## server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SimpleFragmentParallelPlanner.java: ########## @@ -114,21 +115,25 @@ private void produceFragmentInstance(PlanFragment fragment) { // We need to store all the replica host in case of the scenario that the instance need to be // redirected // to another host when scheduling - if ((analysis.getDataPartitionInfo() == null || analysis.getDataPartitionInfo().isEmpty()) - && (analysis.getStatement() instanceof QueryStatement - && ((QueryStatement) analysis.getStatement()).isAggregationQuery())) { - // AggregationQuery && no data region, we need to execute this FI on local - fragmentInstance.setExecutorAndHost( - new QueryExecutor( - new TDataNodeLocation() - .setInternalEndPoint(DataNodeEndPoints.LOCAL_HOST_INTERNAL_ENDPOINT) - .setMPPDataExchangeEndPoint(DataNodeEndPoints.LOCAL_HOST_DATA_BLOCK_ENDPOINT))); + if (regionReplicaSet == null || regionReplicaSet.getRegionId() == null) { + TDataNodeLocation dataNodeLocation = fragment.getTargetLocation(); + if (dataNodeLocation != null) { + fragmentInstance.setExecutorAndHost(new QueryExecutor(dataNodeLocation)); Review Comment: add some comments about this if branch, tell readers that currently only show queries will enter here. ########## server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java: ########## @@ -1111,4 +1115,61 @@ public LogicalPlanBuilder planPathsUsingTemplateSource( context.getQueryId().genPlanNodeId(), pathPatternList, templateId); return this; } + + public LogicalPlanBuilder planOneChildOrderBy( + OrderByParameter orderByParameter, List<String> outputColumns) { + if (orderByParameter.isEmpty()) { + return this; + } + this.root = + new MergeSortNode( + context.getQueryId().genPlanNodeId(), + Collections.singletonList(this.root), + orderByParameter, + outputColumns); + return this; + } + + public LogicalPlanBuilder planShowQueries(Analysis analysis, ShowQueriesStatement statement) { + List<TDataNodeLocation> dataNodeLocations = analysis.getRunningDataNodeLocations(); + if (dataNodeLocations.size() == 1) { + this.root = + planSingleShowQueries(dataNodeLocations.get(0)) + .planFilterAndTransform( + analysis.getWhereExpression(), + analysis.getSourceExpressions(), + false, + statement.getZoneId(), + Ordering.ASC) + .getRoot(); + } else { + HorizontallyConcatNode concatNode = + new HorizontallyConcatNode(context.getQueryId().genPlanNodeId()); + dataNodeLocations.forEach( + dataNodeLocation -> + concatNode.addChild( + new LogicalPlanBuilder(analysis, context) Review Comment: no need to new LogicalPlanBuilder here, we can just use `this` ########## server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java: ########## @@ -1111,4 +1115,61 @@ public LogicalPlanBuilder planPathsUsingTemplateSource( context.getQueryId().genPlanNodeId(), pathPatternList, templateId); return this; } + + public LogicalPlanBuilder planOneChildOrderBy( + OrderByParameter orderByParameter, List<String> outputColumns) { + if (orderByParameter.isEmpty()) { + return this; + } + this.root = + new MergeSortNode( + context.getQueryId().genPlanNodeId(), + Collections.singletonList(this.root), + orderByParameter, + outputColumns); + return this; + } + + public LogicalPlanBuilder planShowQueries(Analysis analysis, ShowQueriesStatement statement) { + List<TDataNodeLocation> dataNodeLocations = analysis.getRunningDataNodeLocations(); + if (dataNodeLocations.size() == 1) { + this.root = + planSingleShowQueries(dataNodeLocations.get(0)) + .planFilterAndTransform( + analysis.getWhereExpression(), + analysis.getSourceExpressions(), + false, + statement.getZoneId(), + Ordering.ASC) + .getRoot(); + } else { + HorizontallyConcatNode concatNode = + new HorizontallyConcatNode(context.getQueryId().genPlanNodeId()); + dataNodeLocations.forEach( + dataNodeLocation -> + concatNode.addChild( + new LogicalPlanBuilder(analysis, context) + .planSingleShowQueries(dataNodeLocation) + .planFilterAndTransform( + analysis.getWhereExpression(), + analysis.getSourceExpressions(), + false, + statement.getZoneId(), + Ordering.ASC) + .getRoot())); Review Comment: we still need one more SortNode here. ########## server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/HorizontallyConcatNode.java: ########## @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.db.mpp.plan.planner.plan.node.process; + +import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode; +import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId; +import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType; +import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor; + +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; + +/** This node is responsible for contacting horizontally two or more TsBlocks have same columns. */ +public class HorizontallyConcatNode extends MultiChildProcessNode { Review Comment: We can defaultly order by time asc, and we may don't need this node and corresponding operator. ########## server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/VirtualSourceNode.java: ########## @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.db.mpp.plan.planner.plan.node.source; + +import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; +import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode; +import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId; + +public abstract class VirtualSourceNode extends PlanNode implements AutoCloseable { Review Comment: Why must implement `AutoCloseable`? ########## server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java: ########## @@ -1111,4 +1115,61 @@ public LogicalPlanBuilder planPathsUsingTemplateSource( context.getQueryId().genPlanNodeId(), pathPatternList, templateId); return this; } + + public LogicalPlanBuilder planOneChildOrderBy( Review Comment: MergeSort should never have only one chil, if so, there is no need to add one MergeSortNode. And BTW, we must make sure all the children have already been sorted by order item before they are sent into MergeSortOperator. ########## server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/Driver.java: ########## @@ -191,7 +191,7 @@ private ListenableFuture<?> processInternal() { } if (root.hasNext()) { TsBlock tsBlock = root.next(); - if (tsBlock != null && !tsBlock.isEmpty()) { Review Comment: We never need send empty tsblock. Even if the whole result set is empty, we will set no more tsblock in `closeAndDestroyOperators`. ########## server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/ShowQueriesNode.java: ########## @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.db.mpp.plan.planner.plan.node.source; + +import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; +import org.apache.iotdb.db.mpp.common.header.ColumnHeaderConstant; +import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode; +import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId; +import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType; +import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor; + +import com.google.common.collect.ImmutableList; + +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.Objects; + +public class ShowQueriesNode extends VirtualSourceNode { + + public static final List<String> SHOW_QUERIES_HEADER_COLUMNS = + ImmutableList.of( + ColumnHeaderConstant.QUERY_ID, + ColumnHeaderConstant.DATA_NODE_ID, + ColumnHeaderConstant.ELAPSED_TIME, + ColumnHeaderConstant.STATEMENT); + + public ShowQueriesNode(PlanNodeId id, TDataNodeLocation dataNodeLocation) { + super(id, dataNodeLocation); + } + + @Override + public List<PlanNode> getChildren() { + return ImmutableList.of(); + } + + @Override + public void addChild(PlanNode child) { + throw new UnsupportedOperationException("no child is allowed for ShowQueriesNode"); + } + + @Override + public PlanNode clone() { + return new ShowQueriesNode(getPlanNodeId(), getDataNodeLocation()); + } + + @Override + public int allowedChildCount() { + return NO_CHILD_ALLOWED; + } + + @Override + public List<String> getOutputColumnNames() { + return SHOW_QUERIES_HEADER_COLUMNS; + } + + @Override + public <R, C> R accept(PlanVisitor<R, C> visitor, C context) { + return visitor.visitShowQueries(this, context); + } + + // We only use DataNodeLocation when do distributionPlan, so DataNodeLocation is no need to + // serialize + @Override + protected void serializeAttributes(ByteBuffer byteBuffer) { + PlanNodeType.SHOW_QUERIES.serialize(byteBuffer); + } + + @Override + protected void serializeAttributes(DataOutputStream stream) throws IOException { + PlanNodeType.SHOW_QUERIES.serialize(stream); + } + + public static ShowQueriesNode deserialize(ByteBuffer byteBuffer) { + PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer); + return new ShowQueriesNode(planNodeId, null); + } + + @Override + public boolean equals(Object o) { + return super.equals(o); + } + + @Override + public int hashCode() { + return Objects.hash(super.hashCode()); + } + + @Override + public String toString() { + return "ShowQueriesNode-" + this.getPlanNodeId(); + } + + @Override + public void open() throws Exception {} + + @Override + public void close() throws Exception {} Review Comment: ```suggestion ``` ########## server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/ExchangeNodeAdder.java: ########## @@ -412,6 +414,10 @@ public boolean equals(Object o) { } private PlanNode processMultiChildNode(MultiChildProcessNode node, NodeGroupContext context) { + if (hasVirtualSourceNode(node)) { Review Comment: You can put whether having VirtualSourceNode in `NodeGroupContext` instead of recursively judge here. Actually for some queries that have many PlanNodes, this `hasVirtualSourceNode` is not efficient beacuse it will iterate all the planodes. So it's better to put this information in `NodeGroupContext` since you've already know it outside. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
