JackieTien97 commented on code in PR #15014:
URL: https://github.com/apache/iotdb/pull/15014#discussion_r2002991167


##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java:
##########
@@ -204,6 +204,11 @@ private Future<FragInstanceDispatchResult> 
dispatchWriteAsync(List<FragmentInsta
     List<FragmentInstance> localInstances = new ArrayList<>();
     List<FragmentInstance> remoteInstances = new ArrayList<>();
     for (FragmentInstance instance : instances) {
+      if (instance.getHostDataNode() == null) {
+        dataNodeFailureList.add(
+            new 
TSStatus(TSStatusCode.PLAN_FAILED_NETWORK_PARTITION.getStatusCode()));

Review Comment:
   add PLAN_FAILED_NETWORK_PARTITION to ErrorHandlingUtils.onQueryException, 
known error doesn't need to be printed as warn
   
   
![image](https://github.com/user-attachments/assets/b5fb5fed-6d57-41a0-851b-b28a81a4cc26)
   



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/exceptions/RootFIPlacementException.java:
##########
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.planner.exceptions;
+
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+
+import java.util.Collection;
+
+/**
+ * During planning phase of Query, if there exists no datanode that can be 
served as the role of
+ * RootFragmentInstance, that is, no datanode can reach to all replica-sets 
possibly due to network
+ * partition issues, this exception will be thrown and this query will fail.
+ */
+public class RootFIPlacementException extends RuntimeException {

Review Comment:
   extends IoTDBRuntimeException and add a TSStatusCode for it which makes 
error more readable



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/exceptions/ReplicaSetUnreachableException.java:
##########
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.planner.exceptions;
+
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+
+/**
+ * When ALL DataNodeLocations in a QUERY-typed {@link
+ * org.apache.iotdb.db.queryengine.plan.planner.plan.FragmentInstance} are 
unreachable, possibly due
+ * to network partition issues, this exception will be thrown and this query 
will fail.
+ */
+public class ReplicaSetUnreachableException extends RuntimeException {

Review Comment:
   extends IoTDBRuntimeException and add a TSStatusCode for it which makes 
error more readable



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java:
##########
@@ -429,6 +431,13 @@ private TSExecuteStatementResp executeStatementInternal(
       t = e;
       return RpcUtils.getTSExecuteStatementResp(
           RpcUtils.getStatus(TSStatusCode.SQL_PARSE_ERROR, e.getMessage()));
+    } catch (RootFIPlacementException | ReplicaSetUnreachableException e) {
+      finished = true;
+      t = e;
+      final TSStatus status =
+          RpcUtils.getStatus(TSStatusCode.PLAN_FAILED_NETWORK_PARTITION, 
e.getMessage());
+      status.setNeedRetry(true);
+      return RpcUtils.getTSExecuteStatementResp(status);

Review Comment:
   after making RootFIPlacementException and ReplicaSetUnreachableException 
extends IoTDBRuntimeException, you won't need this catch block, you can handle 
these two in onQueryException of ErrorHandlingUtils



##########
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/StorageExecutor.java:
##########
@@ -36,6 +36,9 @@ public StorageExecutor(@Nonnull TRegionReplicaSet 
regionReplicaSet) {
 
   @Override
   public TDataNodeLocation getDataNodeLocation() {
+    if (regionReplicaSet.getDataNodeLocations().isEmpty()) {
+      return null;
+    }

Review Comment:
   when will this return null? if so, change TDataNodeLocation to 
Optional<TDataNodeLocation>, and add java doc in interface about when will 
return Optional.empty()



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SimpleFragmentParallelPlanner.java:
##########
@@ -75,6 +78,7 @@ public class SimpleFragmentParallelPlanner implements 
IFragmentParallelPlaner {
 
   // Record FragmentInstances dispatched to same DataNode
   private final Map<TDataNodeLocation, List<FragmentInstance>> dataNodeFIMap;
+  private final ClusterTopology topology = ClusterTopology.getInstance();

Review Comment:
   better not using singleton, passing it in constructor will be better for UT 
code



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableModelQueryFragmentPlanner.java:
##########
@@ -124,6 +128,14 @@ private void produceFragmentInstance(
     // Get the target region for origin PlanFragment, then its instance will 
be distributed one
     // of them.
     TRegionReplicaSet regionReplicaSet = 
fragment.getTargetRegionForTableModel(nodeDistributionMap);
+    if (regionReplicaSet != null
+        && !CollectionUtils.isEmpty(regionReplicaSet.getDataNodeLocations())) {
+      regionReplicaSet = topology.getReachableSet(regionReplicaSet);
+      if (regionReplicaSet.getDataNodeLocations().isEmpty()) {
+        throw new ReplicaSetUnreachableException(
+            fragment.getTargetRegionForTableModel(nodeDistributionMap));
+      }
+    }

Review Comment:
   extract the same code into one common static method



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/ClusterTopology.java:
##########
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan;
+
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class ClusterTopology {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(ClusterTopology.class);
+  private final Integer myself;
+  private volatile Map<Integer, TDataNodeLocation> dataNodes;
+  private volatile Map<Integer, Set<Integer>> topologyMap;
+  private volatile boolean isPartitioned;
+
+  public static ClusterTopology getInstance() {
+    return ClusterTopologyHolder.INSTANCE;
+  }
+
+  public TRegionReplicaSet getReachableSet(TRegionReplicaSet origin) {
+    if (!isPartitioned || origin == null) {
+      return origin;
+    }
+    final Set<Integer> reachableToMyself = 
Collections.unmodifiableSet(topologyMap.get(myself));
+    final List<TDataNodeLocation> locations = new ArrayList<>();
+    for (final TDataNodeLocation location : origin.getDataNodeLocations()) {
+      if (reachableToMyself.contains(location.getDataNodeId())) {
+        locations.add(location);
+      }
+    }
+    return new TRegionReplicaSet(origin.getRegionId(), locations);
+  }
+
+  public <T> Set<Map.Entry<TRegionReplicaSet, T>> filterReachableCandidates(
+      Set<Map.Entry<TRegionReplicaSet, T>> input) {
+    final List<TRegionReplicaSet> allSets =
+        input.stream().map(Map.Entry::getKey).collect(Collectors.toList());
+    final List<TRegionReplicaSet> candidates = getReachableCandidates(allSets);

Review Comment:
   why we need to firstly convert ti to List



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java:
##########
@@ -120,6 +122,7 @@ public class TableDistributedPlanGenerator
   private final SymbolAllocator symbolAllocator;
   private final Map<PlanNodeId, OrderingScheme> nodeOrderingMap = new 
HashMap<>();
   private final DataNodeLocationSupplierFactory.DataNodeLocationSupplier 
dataNodeLocationSupplier;
+  private final ClusterTopology topology = ClusterTopology.getInstance();

Review Comment:
   avoid using singleton



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableModelQueryFragmentPlanner.java:
##########
@@ -74,6 +77,7 @@ public class TableModelQueryFragmentPlanner {
 
   // Record FragmentInstances dispatched to same DataNode
   private final Map<TDataNodeLocation, List<FragmentInstance>> dataNodeFIMap = 
new HashMap<>();
+  private final ClusterTopology topology = ClusterTopology.getInstance();

Review Comment:
   same as above



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/ClusterTopology.java:
##########
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan;
+
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class ClusterTopology {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(ClusterTopology.class);
+  private final Integer myself;
+  private volatile Map<Integer, TDataNodeLocation> dataNodes;
+  private volatile Map<Integer, Set<Integer>> topologyMap;
+  private volatile boolean isPartitioned;
+
+  public static ClusterTopology getInstance() {
+    return ClusterTopologyHolder.INSTANCE;
+  }
+
+  public TRegionReplicaSet getReachableSet(TRegionReplicaSet origin) {
+    if (!isPartitioned || origin == null) {
+      return origin;
+    }
+    final Set<Integer> reachableToMyself = 
Collections.unmodifiableSet(topologyMap.get(myself));
+    final List<TDataNodeLocation> locations = new ArrayList<>();
+    for (final TDataNodeLocation location : origin.getDataNodeLocations()) {
+      if (reachableToMyself.contains(location.getDataNodeId())) {
+        locations.add(location);
+      }
+    }
+    return new TRegionReplicaSet(origin.getRegionId(), locations);
+  }
+
+  public <T> Set<Map.Entry<TRegionReplicaSet, T>> filterReachableCandidates(
+      Set<Map.Entry<TRegionReplicaSet, T>> input) {
+    final List<TRegionReplicaSet> allSets =
+        input.stream().map(Map.Entry::getKey).collect(Collectors.toList());
+    final List<TRegionReplicaSet> candidates = getReachableCandidates(allSets);
+    final Map<TConsensusGroupId, TRegionReplicaSet> newSet = new HashMap<>();
+    candidates.forEach(set -> newSet.put(set.getRegionId(), set));
+    final Map<TRegionReplicaSet, T> candidateMap = new HashMap<>();
+    for (final Map.Entry<TRegionReplicaSet, T> entry : input) {
+      final TConsensusGroupId gid = entry.getKey().getRegionId();
+      if (newSet.containsKey(gid)) {
+        candidateMap.put(newSet.get(gid), entry.getValue());
+      }

Review Comment:
   directly call newSet.get(gid) and use value != null to avoid calling 
containsKey



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/ClusterTopology.java:
##########
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan;
+
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class ClusterTopology {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(ClusterTopology.class);
+  private final Integer myself;
+  private volatile Map<Integer, TDataNodeLocation> dataNodes;
+  private volatile Map<Integer, Set<Integer>> topologyMap;
+  private volatile boolean isPartitioned;
+
+  public static ClusterTopology getInstance() {
+    return ClusterTopologyHolder.INSTANCE;
+  }
+
+  public TRegionReplicaSet getReachableSet(TRegionReplicaSet origin) {
+    if (!isPartitioned || origin == null) {
+      return origin;
+    }
+    final Set<Integer> reachableToMyself = 
Collections.unmodifiableSet(topologyMap.get(myself));
+    final List<TDataNodeLocation> locations = new ArrayList<>();
+    for (final TDataNodeLocation location : origin.getDataNodeLocations()) {
+      if (reachableToMyself.contains(location.getDataNodeId())) {
+        locations.add(location);
+      }
+    }
+    return new TRegionReplicaSet(origin.getRegionId(), locations);
+  }
+
+  public <T> Set<Map.Entry<TRegionReplicaSet, T>> filterReachableCandidates(

Review Comment:
   To avoid performance degrade, we need a fast path to directy retun oirgin 
Set if no datanode failed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to