JackieTien97 commented on code in PR #10263: URL: https://github.com/apache/iotdb/pull/10263#discussion_r1241046597
########## server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/DataNodeQueryContext.java: ########## @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.mpp.execution.fragment; + +import org.apache.iotdb.commons.path.PartialPath; + +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.locks.ReentrantLock; + +public class DataNodeQueryContext { + private final Set<PartialPath> needQueryAllRegionsForLastSet; + private int dataNodeFINum; + + public final ReentrantLock needQueryAllRegionsForLastSetLock = new ReentrantLock(); Review Comment: ```suggestion private final ReentrantLock needQueryAllRegionsForLastSetLock = new ReentrantLock(); ``` ########## server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java: ########## @@ -189,6 +198,13 @@ public FragmentInstanceInfo execDataQueryFragmentInstance( } } + private DataNodeQueryContext getOrCreateDataNodeQueryContext(QueryId queryId, int dataNodeFINum) { + synchronized (dataNodeQueryContextMap) { + return dataNodeQueryContextMap.computeIfAbsent( + queryId, queryId1 -> new DataNodeQueryContext(dataNodeFINum)); + } + } Review Comment: While using lock or synchronized, think more about why needing that or what does this lock protect? Like this one, do we really need that `synchronized (dataNodeQueryContextMap)`? `dataNodeQueryContextMap` is already a lock-free datastruct, computeIfAbsent has ensured the atomic and thread-safe. ########## server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanContext.java: ########## @@ -105,6 +111,7 @@ public LocalExecutionPlanContext(LocalExecutionPlanContext parentContext) { this.cachedDataTypes = parentContext.cachedDataTypes; this.driverContext = parentContext.getDriverContext().createSubDriverContext(getNextPipelineId()); + this.dataNodeQueryContext = null; Review Comment: ```suggestion this.dataNodeQueryContext = parentContext.dataNodeQueryContext; ``` ########## server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/PipelineBuilderTest.java: ########## @@ -811,7 +812,8 @@ private LocalExecutionPlanContext createLocalExecutionPlanContext(TypeProvider t createFragmentInstanceContext(instanceId, stateMachine); fragmentInstanceContext.setDataRegion(dataRegion); - return new LocalExecutionPlanContext(typeProvider, fragmentInstanceContext); + return new LocalExecutionPlanContext( + typeProvider, fragmentInstanceContext, new DataNodeQueryContext(0)); Review Comment: ```suggestion return new LocalExecutionPlanContext( typeProvider, fragmentInstanceContext, new DataNodeQueryContext(1)); ``` At least have one? ########## server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java: ########## @@ -62,6 +63,7 @@ public class FragmentInstanceManager { private final Map<FragmentInstanceId, FragmentInstanceContext> instanceContext; private final Map<FragmentInstanceId, FragmentInstanceExecution> instanceExecution; + private final Map<QueryId, DataNodeQueryContext> dataNodeQueryContextMap; Review Comment: add a metric for this map's size. ########## server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SimpleFragmentParallelPlanner.java: ########## @@ -67,6 +67,9 @@ public class SimpleFragmentParallelPlanner implements IFragmentParallelPlaner { Map<PlanNodeId, Pair<PlanFragmentId, PlanNode>> planNodeMap; List<FragmentInstance> fragmentInstanceList; + // Record num of FragmentInstances dispatched to same DataNode + Map<TDataNodeLocation, Integer> dataNodeFINumMap; Review Comment: ```suggestion private final Map<TDataNodeLocation, Integer> dataNodeFINumMap; ``` ########## server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SimpleFragmentParallelPlanner.java: ########## @@ -131,6 +139,16 @@ private void produceFragmentInstance(PlanFragment fragment) { fragmentInstance.setHostDataNode(selectTargetDataNode(regionReplicaSet)); } + dataNodeFINumMap.compute( + fragmentInstance.getHostDataNode(), + (k, v) -> { + if (v == null) { + return 1; + } else { + return v + 1; + } + }); Review Comment: ```suggestion dataNodeFINumMap.merge(fragmentInstance.getHostDataNode(), 1, Integer::sum); ``` ########## server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java: ########## @@ -393,6 +412,21 @@ protected synchronized void releaseResource() { timeFilter = null; sourcePaths = null; sharedQueryDataSource = null; + releaseDataNodeQueryContext(); + } + + private void releaseDataNodeQueryContext() { + QueryId queryId = id.getQueryId(); + if (!dataNodeQueryContextMap.containsKey(queryId)) { + // enter this means this FI is used to fetch schema, no DataNodeQueryContext need to + // release + return; + } + synchronized (dataNodeQueryContextMap.get(queryId)) { + if (dataNodeQueryContextMap.get(queryId).decreaseDataNodeFINum() <= 0) { + dataNodeQueryContextMap.remove(queryId); + } + } Review Comment: use `AtomicInteger` in `DataNodeQueryContext` and there is no need to use `synchronized` here. ########## server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java: ########## @@ -79,6 +82,16 @@ public class FragmentInstanceContext extends QueryContext { // session info private SessionInfo sessionInfo; + private final Map<QueryId, DataNodeQueryContext> dataNodeQueryContextMap; Review Comment: No need to use a `Map`, one FI could only have only one `DataNodeQueryContext`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
