JackieTien97 commented on code in PR #12100:
URL: https://github.com/apache/iotdb/pull/12100#discussion_r1512296679
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecution.java:
##########
@@ -57,6 +70,8 @@ public class FragmentInstanceExecution {
private final long timeoutInMs;
private final MPPDataExchangeManager exchangeManager;
+ private final ReadWriteLock statisticsLock = new ReentrantReadWriteLock();
Review Comment:
we don't need this lock, we only need to change `staticsRemoved` to
`AtomicBoolean`.
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java:
##########
@@ -92,6 +93,18 @@ public class FragmentInstanceContext extends QueryContext {
private final Map<QueryId, DataNodeQueryContext> dataNodeQueryContextMap;
private DataNodeQueryContext dataNodeQueryContext;
+ // Used for EXPLAIN ANALYZE to cache statistics result when the FI is
finished,
+ // it will not be released until it's fetched.
+ private TFetchFragmentInstanceStatisticsResp fragmentInstanceStatistics =
null;
+
+ private long initQueryDataSourceCost = 0;
+ private long readyQueueTime = 0;
+ private long blockQueueTime = 0;
Review Comment:
should be ThreadSafe? using AtomicLong?
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/OperatorContext.java:
##########
@@ -51,6 +53,10 @@ public class OperatorContext {
private long totalExecutionTimeInNanos = 0L;
private long nextCalledCount = 0L;
+ private long hasNextCalledCount = 0L;
+ private Map<String, String> specifiedInfo = null;
Review Comment:
add some comments about this field to describe what this field is used for?
like what the key and value mean? when this field will be used.
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/DataDriver.java:
##########
@@ -104,8 +104,11 @@ private void initialize() throws QueryProcessException {
this.init = true;
} finally {
((DataDriverContext) driverContext).clearSourceOperators();
- QUERY_EXECUTION_METRICS.recordExecutionCost(
- QUERY_RESOURCE_INIT, System.nanoTime() - startTime);
+ long currentTime = System.nanoTime();
+ driverContext
+ .getFragmentInstanceContext()
+ .setInitQueryDataSourceCost(currentTime - startTime);
Review Comment:
record here will cause this cost to be set multi times, you should record it
only once for one FI, you can see the detailed logics in
`getSharedQueryDataSource` of `FragmentInstanceContext`.
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/ExplainAnalyzeOperator.java:
##########
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.execution.operator;
+
+import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
+import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.db.exception.mpp.FragmentInstanceFetchException;
+import org.apache.iotdb.db.queryengine.common.FragmentInstanceId;
+import
org.apache.iotdb.db.queryengine.execution.operator.process.ProcessOperator;
+import org.apache.iotdb.db.queryengine.plan.Coordinator;
+import org.apache.iotdb.db.queryengine.plan.execution.QueryExecution;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.FragmentInstance;
+import
org.apache.iotdb.db.queryengine.statistics.FragmentInstanceStatisticsDrawer;
+import org.apache.iotdb.db.queryengine.statistics.QueryStatisticsFetcher;
+import org.apache.iotdb.db.queryengine.statistics.StatisticLine;
+import org.apache.iotdb.db.utils.SetThreadName;
+import org.apache.iotdb.mpp.rpc.thrift.TFetchFragmentInstanceStatisticsResp;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
+import org.apache.iotdb.tsfile.utils.Binary;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+public class ExplainAnalyzeOperator implements ProcessOperator {
+ private final OperatorContext operatorContext;
+ private final Operator child;
+ private final boolean verbose;
+ private boolean outputResult = false;
+ private final List<FragmentInstance> instances;
+ private static final long LOG_INTERNAL_IN_MS = 10000;
+ private static final Logger logger =
+ LoggerFactory.getLogger(IoTDBConstant.EXPLAIN_ANALYZE_LOGGER_NAME);
+ private final FragmentInstanceStatisticsDrawer
fragmentInstanceStatisticsDrawer =
+ new FragmentInstanceStatisticsDrawer();
+ private static final String LOG_TITLE =
+ "---------------------Intermediate result of EXPLAIN
ANALYZE---------------------:";
+
+ private final ScheduledFuture<?> logRecordTask;
+
+ public ExplainAnalyzeOperator(OperatorContext operatorContext, Operator
child, boolean verbose) {
+ this.operatorContext = operatorContext;
+ this.child = child;
+ this.verbose = verbose;
+ QueryExecution queryExecution =
Review Comment:
record the long queryId in ExplainAnalyzeNode and then pass it to
ExplainAnalyzeOperator, in this way we can avoid calling this ineffective
method.
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java:
##########
@@ -353,10 +355,14 @@ private void schedule() {
// Use LogicalPlanner to do the logical query plan and logical optimization
public void doLogicalPlan() {
LogicalPlanner planner = new LogicalPlanner(this.context);
+ long startTime = System.nanoTime();
this.logicalPlan = planner.plan(this.analysis);
Review Comment:
Now there will be a new timer called optimize time in LogicalPlanner. Divide
these two.
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java:
##########
@@ -134,7 +134,10 @@ private Future<FragInstanceDispatchResult>
dispatchRead(List<FragmentInstance> i
} finally {
// friendly for gc, clear the plan node tree, for some queries select
all devices, it will
// release lots of memory
- instance.getFragment().clearUselessField();
+ if (!queryContext.isExplainAnalyze()) {
+ // EXPLAIN ANALYZE will use these instances, so we can't clear them
+ instance.getFragment().clearUselessField();
Review Comment:
`TypeProvider` can also be cleared?
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java:
##########
@@ -192,6 +192,15 @@ public IQueryExecution getQueryExecution(Long queryId) {
return queryExecutionMap.get(queryId);
}
+ public IQueryExecution getQueryExecution(QueryId queryId) {
+ for (IQueryExecution queryExecution : queryExecutionMap.values()) {
+ if (queryExecution.getQueryId().equals(queryId.getId())) {
+ return queryExecution;
+ }
+ }
+ return null;
+ }
+
Review Comment:
remove, this method is ineffective.
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java:
##########
@@ -62,6 +62,13 @@ public class MPPQueryContext {
private int acquiredLockNum;
+ private boolean isExplainAnalyze = false;
+ private long analyzeCost;
+ private long fetchPartitionCost;
+ private long fetchSchemaCost;
+ private long logicalPlanCost;
+ private long distributionPlanCost;
Review Comment:
use `QueryPlanStatistics` instead?
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/DistributionPlanner.java:
##########
@@ -106,7 +107,7 @@ private void adjustUpStream(PlanNode root, NodeGroupContext
context) {
return;
}
- if (analysis.isVirtualSource()) {
+ if (analysis.isVirtualSource() || root instanceof ExplainAnalyzeNode) {
Review Comment:
What if the children of `ExplainAnalyzeNode` needShuffleSinkNode?
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/statistics/QueryStatisticsFetcher.java:
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.statistics;
+
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.client.ClientPoolFactory;
+import org.apache.iotdb.commons.client.IClientManager;
+import org.apache.iotdb.commons.client.exception.ClientManagerException;
+import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.mpp.FragmentInstanceFetchException;
+import org.apache.iotdb.db.queryengine.common.FragmentInstanceId;
+import
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceManager;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.FragmentInstance;
+import org.apache.iotdb.mpp.rpc.thrift.TFetchFragmentInstanceStatisticsReq;
+import org.apache.iotdb.mpp.rpc.thrift.TFetchFragmentInstanceStatisticsResp;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.apache.thrift.TException;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class QueryStatisticsFetcher {
+
+ private static final String LOCAL_HOST_IP =
+ IoTDBDescriptor.getInstance().getConfig().getInternalAddress();
+
+ private static final int LOCAL_HOST_PORT =
+ IoTDBDescriptor.getInstance().getConfig().getInternalPort();
+
+ private static final IClientManager<TEndPoint,
SyncDataNodeInternalServiceClient>
Review Comment:
Reuse the `SYNC_INTERNAL_SERVICE_CLIENT_MANAGER` in `Coordinator`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]