LebronAl commented on a change in pull request #3191:
URL: https://github.com/apache/iotdb/pull/3191#discussion_r645402913



##########
File path: 
cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
##########
@@ -767,57 +781,109 @@ public void removeLocalData(List<Integer> slots) {
               SlotPartitionTable.getSlotStrategy()
                   .calculateSlotByPartitionNum(
                       storageGroupName, timePartitionId, 
ClusterConstant.SLOT_NUM);
+          /**
+           * If this slot is just held by different raft groups in the same 
node, it should keep the
+           * data of slot.
+           */
+          if (((SlotPartitionTable) metaGroupMember.getPartitionTable())
+              .judgeHoldSlot(thisNode, slot)) {
+            return false;
+          }
           return slotSet.contains(slot);
         };
     for (PartialPath sg : allStorageGroupNames) {
       StorageEngine.getInstance().removePartitions(sg, filter);
     }
     for (Integer slot : slots) {
-      slotManager.setToNull(slot);
+      slotManager.setToNull(slot, false);
     }
+    slotManager.save();
 
     if (logger.isInfoEnabled()) {
       logger.info(
           "{}: data of {} and other {} slots are removed", name, slots.get(0), 
slots.size() - 1);
     }
   }
 
+  public void preRemoveNode(Node removedNode) {
+    if (logger.isDebugEnabled()) {
+      logger.debug("{}: start to pre remove node {}", name, removedNode);
+    }
+    synchronized (allNodes) {
+      if (allNodes.contains(removedNode) && allNodes.size() == 
config.getReplicationNum()) {
+        // update the group if the deleted node was in it
+        PartitionGroup newGroup = 
metaGroupMember.getPartitionTable().getHeaderGroup(getHeader());
+        if (newGroup == null) {
+          return;
+        }
+        Node newNodeToGroup = newGroup.get(newGroup.size() - 1);
+        allNodes.add(newNodeToGroup);
+        peerMap.putIfAbsent(newNodeToGroup, new 
Peer(logManager.getLastLogIndex()));
+      }
+    }
+  }
+
   /**
    * When a node is removed and IT IS NOT THE HEADER of the group, the member 
should take over some
    * slots from the removed group, and add a new node to the group the removed 
node was in the
    * group.
    */
   @SuppressWarnings("java:S2445") // the reference of allNodes is unchanged
-  public void removeNode(Node removedNode, NodeRemovalResult removalResult) {
+  public void removeNode(Node removedNode) {
+    if (logger.isDebugEnabled()) {
+      logger.debug("{}: start to remove node {}", name, removedNode);
+    }
+
     synchronized (allNodes) {
+      preRemoveNode(removedNode);
       if (allNodes.contains(removedNode)) {
         // update the group if the deleted node was in it
-        allNodes = 
metaGroupMember.getPartitionTable().getHeaderGroup(getHeader());
-        initPeerMap();
+        allNodes.remove(removedNode);
+        peerMap.remove(removedNode);
         if (removedNode.equals(leader.get())) {
           // if the leader is removed, also start an election immediately
           synchronized (term) {
             setCharacter(NodeCharacter.ELECTOR);
-            setLastHeartbeatReceivedTime(Long.MIN_VALUE);
+            setLeader(null);
+          }
+          synchronized (getHeartBeatWaitObject()) {
+            getHeartBeatWaitObject().notifyAll();
           }
         }
       }
-      List<Integer> slotsToPull =
-          ((SlotNodeRemovalResult) 
removalResult).getNewSlotOwners().get(getHeader());
-      if (slotsToPull != null) {
-        // pull the slots that should be taken over
-        PullSnapshotTaskDescriptor taskDescriptor =
-            new PullSnapshotTaskDescriptor(removalResult.getRemovedGroup(), 
slotsToPull, true);
-        pullFileSnapshot(taskDescriptor, null);
+    }
+  }
+
+  public void pullSlots(NodeRemovalResult removalResult) {
+    List<Integer> slotsToPull =
+        ((SlotNodeRemovalResult) 
removalResult).getNewSlotOwners().get(getHeader());
+    if (slotsToPull != null) {
+      // pull the slots that should be taken over
+      PullSnapshotTaskDescriptor taskDescriptor =
+          new PullSnapshotTaskDescriptor(
+              removalResult.getRemovedGroup(getRaftGroupId()), new 
ArrayList<>(slotsToPull), true);
+      pullFileSnapshot(taskDescriptor, null);
+    }
+  }
+
+  /** For data group, it's necessary to apply remove/add log immediately after 
append. */
+  @Override
+  protected long appendEntry(long prevLogIndex, long prevLogTerm, long 
leaderCommit, Log log) {
+    long resp = super.appendEntry(prevLogIndex, prevLogTerm, leaderCommit, 
log);
+    if (resp == Response.RESPONSE_AGREE
+        && (log instanceof AddNodeLog || log instanceof RemoveNodeLog)) {
+      try {
+        commitLog(log);

Review comment:
       I suspect there maybe something wrong with this code, because it will 
commit all the logs before this addNode. Is it possible to apply only this log?

##########
File path: 
cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
##########
@@ -1411,6 +1435,291 @@ public TSStatus 
processNonPartitionedMetaPlan(PhysicalPlan plan) {
     return result;
   }
 
+  /**
+   * Forward plans to the DataGroupMember of one node in the corresponding 
group. Only when all
+   * nodes time out, will a TIME_OUT be returned.
+   *
+   * @param planGroupMap sub-plan -> belong data group pairs
+   */
+  public TSStatus forwardPlan(Map<PhysicalPlan, PartitionGroup> planGroupMap, 
PhysicalPlan plan) {

Review comment:
       It seems that this function will not be used?

##########
File path: 
cluster/src/main/java/org/apache/iotdb/cluster/coordinator/Coordinator.java
##########
@@ -442,9 +445,11 @@ private TSStatus forwardToMultipleGroup(Map<PhysicalPlan, 
PartitionGroup> planGr
     }
     TSStatus status;
     if (errorCodePartitionGroups.isEmpty()) {
-      status = StatusUtils.OK;
       if (allRedirect) {
-        status = StatusUtils.getStatus(status, endPoint);
+        status = new TSStatus(TSStatusCode.NEED_REDIRECTION.getStatusCode());

Review comment:
       What about adding a new field `public static final TSStatus 
NEED_REDIRECTION = getStatus(TSStatusCode. NEED_REDIRECTION); in StatusUtils`, 
and then try to use `StatusUtils.getStatus(StatusUtils.NEED_REDIRECTION, 
endPoint)`.

##########
File path: 
cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
##########
@@ -1411,6 +1435,291 @@ public TSStatus 
processNonPartitionedMetaPlan(PhysicalPlan plan) {
     return result;
   }
 
+  /**
+   * Forward plans to the DataGroupMember of one node in the corresponding 
group. Only when all
+   * nodes time out, will a TIME_OUT be returned.
+   *
+   * @param planGroupMap sub-plan -> belong data group pairs
+   */
+  public TSStatus forwardPlan(Map<PhysicalPlan, PartitionGroup> planGroupMap, 
PhysicalPlan plan) {
+    // the error codes from the groups that cannot execute the plan
+    TSStatus status;
+    if (planGroupMap.size() == 1) {
+      status = forwardToSingleGroup(planGroupMap.entrySet().iterator().next());
+    } else {
+      if (plan instanceof InsertTabletPlan || plan instanceof 
CreateMultiTimeSeriesPlan) {
+        // InsertTabletPlan and CreateMultiTimeSeriesPlan contains many rows, 
each will correspond
+        // to a TSStatus as its
+        // execution result, as the plan is split and the sub-plans may have 
interleaving ranges,
+        // we must assure that each TSStatus is placed to the right position
+        // e.g., an InsertTabletPlan contains 3 rows, row1 and row3 belong to 
NodeA and row2
+        // belongs to NodeB, when NodeA returns a success while NodeB returns 
a failure, the
+        // failure and success should be placed into proper positions in 
TSStatus.subStatus
+        status = forwardMultiSubPlan(planGroupMap, plan);
+      } else {
+        status = forwardToMultipleGroup(planGroupMap);
+      }
+    }
+    if (plan instanceof InsertPlan
+        && status.getCode() == 
TSStatusCode.TIMESERIES_NOT_EXIST.getStatusCode()
+        && config.isEnableAutoCreateSchema()) {
+      TSStatus tmpStatus = createTimeseriesForFailedInsertion(planGroupMap, 
((InsertPlan) plan));
+      if (tmpStatus != null) {
+        status = tmpStatus;
+      }
+    }
+    logger.debug("{}: executed {} with answer {}", name, plan, status);
+    return status;
+  }
+
+  private TSStatus createTimeseriesForFailedInsertion(

Review comment:
       It seems that this function will not be used?

##########
File path: 
cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
##########
@@ -1411,6 +1435,291 @@ public TSStatus 
processNonPartitionedMetaPlan(PhysicalPlan plan) {
     return result;
   }
 
+  /**
+   * Forward plans to the DataGroupMember of one node in the corresponding 
group. Only when all
+   * nodes time out, will a TIME_OUT be returned.
+   *
+   * @param planGroupMap sub-plan -> belong data group pairs
+   */
+  public TSStatus forwardPlan(Map<PhysicalPlan, PartitionGroup> planGroupMap, 
PhysicalPlan plan) {
+    // the error codes from the groups that cannot execute the plan
+    TSStatus status;
+    if (planGroupMap.size() == 1) {
+      status = forwardToSingleGroup(planGroupMap.entrySet().iterator().next());
+    } else {
+      if (plan instanceof InsertTabletPlan || plan instanceof 
CreateMultiTimeSeriesPlan) {
+        // InsertTabletPlan and CreateMultiTimeSeriesPlan contains many rows, 
each will correspond
+        // to a TSStatus as its
+        // execution result, as the plan is split and the sub-plans may have 
interleaving ranges,
+        // we must assure that each TSStatus is placed to the right position
+        // e.g., an InsertTabletPlan contains 3 rows, row1 and row3 belong to 
NodeA and row2
+        // belongs to NodeB, when NodeA returns a success while NodeB returns 
a failure, the
+        // failure and success should be placed into proper positions in 
TSStatus.subStatus
+        status = forwardMultiSubPlan(planGroupMap, plan);
+      } else {
+        status = forwardToMultipleGroup(planGroupMap);
+      }
+    }
+    if (plan instanceof InsertPlan
+        && status.getCode() == 
TSStatusCode.TIMESERIES_NOT_EXIST.getStatusCode()
+        && config.isEnableAutoCreateSchema()) {
+      TSStatus tmpStatus = createTimeseriesForFailedInsertion(planGroupMap, 
((InsertPlan) plan));
+      if (tmpStatus != null) {
+        status = tmpStatus;
+      }
+    }
+    logger.debug("{}: executed {} with answer {}", name, plan, status);
+    return status;
+  }
+
+  private TSStatus createTimeseriesForFailedInsertion(
+      Map<PhysicalPlan, PartitionGroup> planGroupMap, InsertPlan plan) {
+    // try to create timeseries
+    if (plan.getFailedMeasurements() != null) {
+      plan.getPlanFromFailed();
+    }
+    boolean hasCreate;
+    try {
+      hasCreate = ((CMManager) IoTDB.metaManager).createTimeseries(plan);
+    } catch (IllegalPathException | CheckConsistencyException e) {
+      return StatusUtils.getStatus(StatusUtils.EXECUTE_STATEMENT_ERROR, 
e.getMessage());
+    }
+    if (hasCreate) {
+      return forwardPlan(planGroupMap, plan);
+    } else {
+      logger.error("{}, Cannot auto create timeseries.", thisNode);
+    }
+    return null;
+  }
+
+  /**
+   * Forward each sub-plan to its belonging data group, and combine responses 
from the groups.
+   *
+   * @param planGroupMap sub-plan -> data group pairs
+   */
+  @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity 
warning
+  private TSStatus forwardMultiSubPlan(
+      Map<PhysicalPlan, PartitionGroup> planGroupMap, PhysicalPlan parentPlan) 
{
+    List<String> errorCodePartitionGroups = new ArrayList<>();
+    TSStatus tmpStatus;
+    TSStatus[] subStatus = null;
+    boolean noFailure = true;
+    boolean isBatchFailure = false;
+    EndPoint endPoint = null;
+    int totalRowNum = 0;
+    // send sub-plans to each belonging data group and collect results
+    for (Map.Entry<PhysicalPlan, PartitionGroup> entry : 
planGroupMap.entrySet()) {
+      tmpStatus = forwardToSingleGroup(entry);
+      logger.debug("{}: from {},{},{}", name, entry.getKey(), 
entry.getValue(), tmpStatus);
+      noFailure = (tmpStatus.getCode() == 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) && noFailure;
+      isBatchFailure =
+          (tmpStatus.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) 
|| isBatchFailure;
+      if (tmpStatus.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
+        if (parentPlan instanceof InsertTabletPlan) {
+          totalRowNum = ((InsertTabletPlan) parentPlan).getRowCount();
+        } else if (parentPlan instanceof CreateMultiTimeSeriesPlan) {
+          totalRowNum = ((CreateMultiTimeSeriesPlan) 
parentPlan).getIndexes().size();
+        }
+        if (subStatus == null) {
+          subStatus = new TSStatus[totalRowNum];
+          Arrays.fill(subStatus, RpcUtils.SUCCESS_STATUS);
+        }
+        // set the status from one group to the proper positions of the 
overall status
+        if (parentPlan instanceof InsertTabletPlan) {
+          PartitionUtils.reordering(
+              (InsertTabletPlan) entry.getKey(),
+              subStatus,
+              tmpStatus.subStatus.toArray(new TSStatus[] {}));
+        } else if (parentPlan instanceof CreateMultiTimeSeriesPlan) {
+          CreateMultiTimeSeriesPlan subPlan = (CreateMultiTimeSeriesPlan) 
entry.getKey();
+          for (int i = 0; i < subPlan.getIndexes().size(); i++) {
+            subStatus[subPlan.getIndexes().get(i)] = 
tmpStatus.subStatus.get(i);
+          }
+        }
+      }
+      if (tmpStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+        // execution failed, record the error message
+        errorCodePartitionGroups.add(
+            String.format(
+                "[%s@%s:%s:%s]",
+                tmpStatus.getCode(),
+                entry.getValue().getHeader(),
+                tmpStatus.getMessage(),
+                tmpStatus.subStatus));
+      }
+      if (parentPlan instanceof InsertTabletPlan
+          && tmpStatus.isSetRedirectNode()
+          && ((InsertTabletPlan) entry.getKey()).getMaxTime()
+              == ((InsertTabletPlan) parentPlan).getMaxTime()) {
+        endPoint = tmpStatus.getRedirectNode();
+      }
+    }
+
+    if (parentPlan instanceof CreateMultiTimeSeriesPlan
+        && !((CreateMultiTimeSeriesPlan) parentPlan).getResults().isEmpty()) {
+      if (subStatus == null) {
+        subStatus = new TSStatus[totalRowNum];
+        Arrays.fill(subStatus, RpcUtils.SUCCESS_STATUS);
+      }
+      noFailure = false;
+      isBatchFailure = true;
+      for (Entry<Integer, TSStatus> integerTSStatusEntry :
+          ((CreateMultiTimeSeriesPlan) parentPlan).getResults().entrySet()) {
+        subStatus[integerTSStatusEntry.getKey()] = 
integerTSStatusEntry.getValue();
+      }
+    }
+    return concludeFinalStatus(
+        noFailure, endPoint, isBatchFailure, subStatus, 
errorCodePartitionGroups);
+  }
+
+  private TSStatus concludeFinalStatus(
+      boolean noFailure,
+      EndPoint endPoint,
+      boolean isBatchFailure,
+      TSStatus[] subStatus,
+      List<String> errorCodePartitionGroups) {
+    TSStatus status;
+    if (noFailure) {
+      status = StatusUtils.OK;
+      if (endPoint != null) {
+        status = StatusUtils.getStatus(status, endPoint);
+      }
+    } else if (isBatchFailure) {
+      status = RpcUtils.getStatus(Arrays.asList(subStatus));
+    } else {
+      status =
+          StatusUtils.getStatus(
+              StatusUtils.EXECUTE_STATEMENT_ERROR,
+              MSG_MULTIPLE_ERROR + errorCodePartitionGroups.toString());
+    }
+    return status;
+  }
+
+  private TSStatus forwardToSingleGroup(Map.Entry<PhysicalPlan, 
PartitionGroup> entry) {

Review comment:
       same

##########
File path: 
cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
##########
@@ -1411,6 +1435,291 @@ public TSStatus 
processNonPartitionedMetaPlan(PhysicalPlan plan) {
     return result;
   }
 
+  /**
+   * Forward plans to the DataGroupMember of one node in the corresponding 
group. Only when all
+   * nodes time out, will a TIME_OUT be returned.
+   *
+   * @param planGroupMap sub-plan -> belong data group pairs
+   */
+  public TSStatus forwardPlan(Map<PhysicalPlan, PartitionGroup> planGroupMap, 
PhysicalPlan plan) {
+    // the error codes from the groups that cannot execute the plan
+    TSStatus status;
+    if (planGroupMap.size() == 1) {
+      status = forwardToSingleGroup(planGroupMap.entrySet().iterator().next());
+    } else {
+      if (plan instanceof InsertTabletPlan || plan instanceof 
CreateMultiTimeSeriesPlan) {
+        // InsertTabletPlan and CreateMultiTimeSeriesPlan contains many rows, 
each will correspond
+        // to a TSStatus as its
+        // execution result, as the plan is split and the sub-plans may have 
interleaving ranges,
+        // we must assure that each TSStatus is placed to the right position
+        // e.g., an InsertTabletPlan contains 3 rows, row1 and row3 belong to 
NodeA and row2
+        // belongs to NodeB, when NodeA returns a success while NodeB returns 
a failure, the
+        // failure and success should be placed into proper positions in 
TSStatus.subStatus
+        status = forwardMultiSubPlan(planGroupMap, plan);
+      } else {
+        status = forwardToMultipleGroup(planGroupMap);
+      }
+    }
+    if (plan instanceof InsertPlan
+        && status.getCode() == 
TSStatusCode.TIMESERIES_NOT_EXIST.getStatusCode()
+        && config.isEnableAutoCreateSchema()) {
+      TSStatus tmpStatus = createTimeseriesForFailedInsertion(planGroupMap, 
((InsertPlan) plan));
+      if (tmpStatus != null) {
+        status = tmpStatus;
+      }
+    }
+    logger.debug("{}: executed {} with answer {}", name, plan, status);
+    return status;
+  }
+
+  private TSStatus createTimeseriesForFailedInsertion(
+      Map<PhysicalPlan, PartitionGroup> planGroupMap, InsertPlan plan) {
+    // try to create timeseries
+    if (plan.getFailedMeasurements() != null) {
+      plan.getPlanFromFailed();
+    }
+    boolean hasCreate;
+    try {
+      hasCreate = ((CMManager) IoTDB.metaManager).createTimeseries(plan);
+    } catch (IllegalPathException | CheckConsistencyException e) {
+      return StatusUtils.getStatus(StatusUtils.EXECUTE_STATEMENT_ERROR, 
e.getMessage());
+    }
+    if (hasCreate) {
+      return forwardPlan(planGroupMap, plan);
+    } else {
+      logger.error("{}, Cannot auto create timeseries.", thisNode);
+    }
+    return null;
+  }
+
+  /**
+   * Forward each sub-plan to its belonging data group, and combine responses 
from the groups.
+   *
+   * @param planGroupMap sub-plan -> data group pairs
+   */
+  @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity 
warning
+  private TSStatus forwardMultiSubPlan(
+      Map<PhysicalPlan, PartitionGroup> planGroupMap, PhysicalPlan parentPlan) 
{
+    List<String> errorCodePartitionGroups = new ArrayList<>();
+    TSStatus tmpStatus;
+    TSStatus[] subStatus = null;
+    boolean noFailure = true;
+    boolean isBatchFailure = false;
+    EndPoint endPoint = null;
+    int totalRowNum = 0;
+    // send sub-plans to each belonging data group and collect results
+    for (Map.Entry<PhysicalPlan, PartitionGroup> entry : 
planGroupMap.entrySet()) {
+      tmpStatus = forwardToSingleGroup(entry);
+      logger.debug("{}: from {},{},{}", name, entry.getKey(), 
entry.getValue(), tmpStatus);
+      noFailure = (tmpStatus.getCode() == 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) && noFailure;
+      isBatchFailure =
+          (tmpStatus.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) 
|| isBatchFailure;
+      if (tmpStatus.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
+        if (parentPlan instanceof InsertTabletPlan) {
+          totalRowNum = ((InsertTabletPlan) parentPlan).getRowCount();
+        } else if (parentPlan instanceof CreateMultiTimeSeriesPlan) {
+          totalRowNum = ((CreateMultiTimeSeriesPlan) 
parentPlan).getIndexes().size();
+        }
+        if (subStatus == null) {
+          subStatus = new TSStatus[totalRowNum];
+          Arrays.fill(subStatus, RpcUtils.SUCCESS_STATUS);
+        }
+        // set the status from one group to the proper positions of the 
overall status
+        if (parentPlan instanceof InsertTabletPlan) {
+          PartitionUtils.reordering(
+              (InsertTabletPlan) entry.getKey(),
+              subStatus,
+              tmpStatus.subStatus.toArray(new TSStatus[] {}));
+        } else if (parentPlan instanceof CreateMultiTimeSeriesPlan) {
+          CreateMultiTimeSeriesPlan subPlan = (CreateMultiTimeSeriesPlan) 
entry.getKey();
+          for (int i = 0; i < subPlan.getIndexes().size(); i++) {
+            subStatus[subPlan.getIndexes().get(i)] = 
tmpStatus.subStatus.get(i);
+          }
+        }
+      }
+      if (tmpStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+        // execution failed, record the error message
+        errorCodePartitionGroups.add(
+            String.format(
+                "[%s@%s:%s:%s]",
+                tmpStatus.getCode(),
+                entry.getValue().getHeader(),
+                tmpStatus.getMessage(),
+                tmpStatus.subStatus));
+      }
+      if (parentPlan instanceof InsertTabletPlan
+          && tmpStatus.isSetRedirectNode()
+          && ((InsertTabletPlan) entry.getKey()).getMaxTime()
+              == ((InsertTabletPlan) parentPlan).getMaxTime()) {
+        endPoint = tmpStatus.getRedirectNode();
+      }
+    }
+
+    if (parentPlan instanceof CreateMultiTimeSeriesPlan
+        && !((CreateMultiTimeSeriesPlan) parentPlan).getResults().isEmpty()) {
+      if (subStatus == null) {
+        subStatus = new TSStatus[totalRowNum];
+        Arrays.fill(subStatus, RpcUtils.SUCCESS_STATUS);
+      }
+      noFailure = false;
+      isBatchFailure = true;
+      for (Entry<Integer, TSStatus> integerTSStatusEntry :
+          ((CreateMultiTimeSeriesPlan) parentPlan).getResults().entrySet()) {
+        subStatus[integerTSStatusEntry.getKey()] = 
integerTSStatusEntry.getValue();
+      }
+    }
+    return concludeFinalStatus(
+        noFailure, endPoint, isBatchFailure, subStatus, 
errorCodePartitionGroups);
+  }
+
+  private TSStatus concludeFinalStatus(
+      boolean noFailure,
+      EndPoint endPoint,
+      boolean isBatchFailure,
+      TSStatus[] subStatus,
+      List<String> errorCodePartitionGroups) {
+    TSStatus status;
+    if (noFailure) {
+      status = StatusUtils.OK;
+      if (endPoint != null) {
+        status = StatusUtils.getStatus(status, endPoint);
+      }
+    } else if (isBatchFailure) {
+      status = RpcUtils.getStatus(Arrays.asList(subStatus));
+    } else {
+      status =
+          StatusUtils.getStatus(
+              StatusUtils.EXECUTE_STATEMENT_ERROR,
+              MSG_MULTIPLE_ERROR + errorCodePartitionGroups.toString());
+    }
+    return status;
+  }
+
+  private TSStatus forwardToSingleGroup(Map.Entry<PhysicalPlan, 
PartitionGroup> entry) {
+    TSStatus result;
+    if (entry.getValue().contains(thisNode)) {
+      // the query should be handled by a group the local node is in, handle 
it with in the group
+      long startTime =
+          Timer.Statistic.META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP
+              .getOperationStartTime();
+      logger.debug(
+          "Execute {} in a local group of {}", entry.getKey(), 
entry.getValue().getHeader());
+      result =
+          getLocalDataMember(entry.getValue().getHeader(), 
entry.getValue().getId())
+              .executeNonQueryPlan(entry.getKey());
+      Timer.Statistic.META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP
+          .calOperationCostTimeFromStart(startTime);
+    } else {
+      // forward the query to the group that should handle it
+      long startTime =
+          Timer.Statistic.META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_REMOTE_GROUP
+              .getOperationStartTime();
+      logger.debug(
+          "Forward {} to a remote group of {}", entry.getKey(), 
entry.getValue().getHeader());
+      result = forwardPlan(entry.getKey(), entry.getValue());
+      Timer.Statistic.META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_REMOTE_GROUP
+          .calOperationCostTimeFromStart(startTime);
+    }
+    return result;
+  }
+
+  /**
+   * forward each sub-plan to its corresponding data group, if some groups 
goes wrong, the error
+   * messages from each group will be compacted into one string.
+   *
+   * @param planGroupMap sub-plan -> data group pairs
+   */
+  private TSStatus forwardToMultipleGroup(Map<PhysicalPlan, PartitionGroup> 
planGroupMap) {

Review comment:
       same

##########
File path: 
cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
##########
@@ -1411,6 +1435,291 @@ public TSStatus 
processNonPartitionedMetaPlan(PhysicalPlan plan) {
     return result;
   }
 
+  /**
+   * Forward plans to the DataGroupMember of one node in the corresponding 
group. Only when all
+   * nodes time out, will a TIME_OUT be returned.
+   *
+   * @param planGroupMap sub-plan -> belong data group pairs
+   */
+  public TSStatus forwardPlan(Map<PhysicalPlan, PartitionGroup> planGroupMap, 
PhysicalPlan plan) {
+    // the error codes from the groups that cannot execute the plan
+    TSStatus status;
+    if (planGroupMap.size() == 1) {
+      status = forwardToSingleGroup(planGroupMap.entrySet().iterator().next());
+    } else {
+      if (plan instanceof InsertTabletPlan || plan instanceof 
CreateMultiTimeSeriesPlan) {
+        // InsertTabletPlan and CreateMultiTimeSeriesPlan contains many rows, 
each will correspond
+        // to a TSStatus as its
+        // execution result, as the plan is split and the sub-plans may have 
interleaving ranges,
+        // we must assure that each TSStatus is placed to the right position
+        // e.g., an InsertTabletPlan contains 3 rows, row1 and row3 belong to 
NodeA and row2
+        // belongs to NodeB, when NodeA returns a success while NodeB returns 
a failure, the
+        // failure and success should be placed into proper positions in 
TSStatus.subStatus
+        status = forwardMultiSubPlan(planGroupMap, plan);
+      } else {
+        status = forwardToMultipleGroup(planGroupMap);
+      }
+    }
+    if (plan instanceof InsertPlan
+        && status.getCode() == 
TSStatusCode.TIMESERIES_NOT_EXIST.getStatusCode()
+        && config.isEnableAutoCreateSchema()) {
+      TSStatus tmpStatus = createTimeseriesForFailedInsertion(planGroupMap, 
((InsertPlan) plan));
+      if (tmpStatus != null) {
+        status = tmpStatus;
+      }
+    }
+    logger.debug("{}: executed {} with answer {}", name, plan, status);
+    return status;
+  }
+
+  private TSStatus createTimeseriesForFailedInsertion(
+      Map<PhysicalPlan, PartitionGroup> planGroupMap, InsertPlan plan) {
+    // try to create timeseries
+    if (plan.getFailedMeasurements() != null) {
+      plan.getPlanFromFailed();
+    }
+    boolean hasCreate;
+    try {
+      hasCreate = ((CMManager) IoTDB.metaManager).createTimeseries(plan);
+    } catch (IllegalPathException | CheckConsistencyException e) {
+      return StatusUtils.getStatus(StatusUtils.EXECUTE_STATEMENT_ERROR, 
e.getMessage());
+    }
+    if (hasCreate) {
+      return forwardPlan(planGroupMap, plan);
+    } else {
+      logger.error("{}, Cannot auto create timeseries.", thisNode);
+    }
+    return null;
+  }
+
+  /**
+   * Forward each sub-plan to its belonging data group, and combine responses 
from the groups.
+   *
+   * @param planGroupMap sub-plan -> data group pairs
+   */
+  @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity 
warning
+  private TSStatus forwardMultiSubPlan(

Review comment:
       It seems that this function will not be used?

##########
File path: 
cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
##########
@@ -1411,6 +1435,291 @@ public TSStatus 
processNonPartitionedMetaPlan(PhysicalPlan plan) {
     return result;
   }
 
+  /**
+   * Forward plans to the DataGroupMember of one node in the corresponding 
group. Only when all
+   * nodes time out, will a TIME_OUT be returned.
+   *
+   * @param planGroupMap sub-plan -> belong data group pairs
+   */
+  public TSStatus forwardPlan(Map<PhysicalPlan, PartitionGroup> planGroupMap, 
PhysicalPlan plan) {
+    // the error codes from the groups that cannot execute the plan
+    TSStatus status;
+    if (planGroupMap.size() == 1) {
+      status = forwardToSingleGroup(planGroupMap.entrySet().iterator().next());
+    } else {
+      if (plan instanceof InsertTabletPlan || plan instanceof 
CreateMultiTimeSeriesPlan) {
+        // InsertTabletPlan and CreateMultiTimeSeriesPlan contains many rows, 
each will correspond
+        // to a TSStatus as its
+        // execution result, as the plan is split and the sub-plans may have 
interleaving ranges,
+        // we must assure that each TSStatus is placed to the right position
+        // e.g., an InsertTabletPlan contains 3 rows, row1 and row3 belong to 
NodeA and row2
+        // belongs to NodeB, when NodeA returns a success while NodeB returns 
a failure, the
+        // failure and success should be placed into proper positions in 
TSStatus.subStatus
+        status = forwardMultiSubPlan(planGroupMap, plan);
+      } else {
+        status = forwardToMultipleGroup(planGroupMap);
+      }
+    }
+    if (plan instanceof InsertPlan
+        && status.getCode() == 
TSStatusCode.TIMESERIES_NOT_EXIST.getStatusCode()
+        && config.isEnableAutoCreateSchema()) {
+      TSStatus tmpStatus = createTimeseriesForFailedInsertion(planGroupMap, 
((InsertPlan) plan));
+      if (tmpStatus != null) {
+        status = tmpStatus;
+      }
+    }
+    logger.debug("{}: executed {} with answer {}", name, plan, status);
+    return status;
+  }
+
+  private TSStatus createTimeseriesForFailedInsertion(
+      Map<PhysicalPlan, PartitionGroup> planGroupMap, InsertPlan plan) {
+    // try to create timeseries
+    if (plan.getFailedMeasurements() != null) {
+      plan.getPlanFromFailed();
+    }
+    boolean hasCreate;
+    try {
+      hasCreate = ((CMManager) IoTDB.metaManager).createTimeseries(plan);
+    } catch (IllegalPathException | CheckConsistencyException e) {
+      return StatusUtils.getStatus(StatusUtils.EXECUTE_STATEMENT_ERROR, 
e.getMessage());
+    }
+    if (hasCreate) {
+      return forwardPlan(planGroupMap, plan);
+    } else {
+      logger.error("{}, Cannot auto create timeseries.", thisNode);
+    }
+    return null;
+  }
+
+  /**
+   * Forward each sub-plan to its belonging data group, and combine responses 
from the groups.
+   *
+   * @param planGroupMap sub-plan -> data group pairs
+   */
+  @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity 
warning
+  private TSStatus forwardMultiSubPlan(
+      Map<PhysicalPlan, PartitionGroup> planGroupMap, PhysicalPlan parentPlan) 
{
+    List<String> errorCodePartitionGroups = new ArrayList<>();
+    TSStatus tmpStatus;
+    TSStatus[] subStatus = null;
+    boolean noFailure = true;
+    boolean isBatchFailure = false;
+    EndPoint endPoint = null;
+    int totalRowNum = 0;
+    // send sub-plans to each belonging data group and collect results
+    for (Map.Entry<PhysicalPlan, PartitionGroup> entry : 
planGroupMap.entrySet()) {
+      tmpStatus = forwardToSingleGroup(entry);
+      logger.debug("{}: from {},{},{}", name, entry.getKey(), 
entry.getValue(), tmpStatus);
+      noFailure = (tmpStatus.getCode() == 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) && noFailure;
+      isBatchFailure =
+          (tmpStatus.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) 
|| isBatchFailure;
+      if (tmpStatus.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
+        if (parentPlan instanceof InsertTabletPlan) {
+          totalRowNum = ((InsertTabletPlan) parentPlan).getRowCount();
+        } else if (parentPlan instanceof CreateMultiTimeSeriesPlan) {
+          totalRowNum = ((CreateMultiTimeSeriesPlan) 
parentPlan).getIndexes().size();
+        }
+        if (subStatus == null) {
+          subStatus = new TSStatus[totalRowNum];
+          Arrays.fill(subStatus, RpcUtils.SUCCESS_STATUS);
+        }
+        // set the status from one group to the proper positions of the 
overall status
+        if (parentPlan instanceof InsertTabletPlan) {
+          PartitionUtils.reordering(
+              (InsertTabletPlan) entry.getKey(),
+              subStatus,
+              tmpStatus.subStatus.toArray(new TSStatus[] {}));
+        } else if (parentPlan instanceof CreateMultiTimeSeriesPlan) {
+          CreateMultiTimeSeriesPlan subPlan = (CreateMultiTimeSeriesPlan) 
entry.getKey();
+          for (int i = 0; i < subPlan.getIndexes().size(); i++) {
+            subStatus[subPlan.getIndexes().get(i)] = 
tmpStatus.subStatus.get(i);
+          }
+        }
+      }
+      if (tmpStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+        // execution failed, record the error message
+        errorCodePartitionGroups.add(
+            String.format(
+                "[%s@%s:%s:%s]",
+                tmpStatus.getCode(),
+                entry.getValue().getHeader(),
+                tmpStatus.getMessage(),
+                tmpStatus.subStatus));
+      }
+      if (parentPlan instanceof InsertTabletPlan
+          && tmpStatus.isSetRedirectNode()
+          && ((InsertTabletPlan) entry.getKey()).getMaxTime()
+              == ((InsertTabletPlan) parentPlan).getMaxTime()) {
+        endPoint = tmpStatus.getRedirectNode();
+      }
+    }
+
+    if (parentPlan instanceof CreateMultiTimeSeriesPlan
+        && !((CreateMultiTimeSeriesPlan) parentPlan).getResults().isEmpty()) {
+      if (subStatus == null) {
+        subStatus = new TSStatus[totalRowNum];
+        Arrays.fill(subStatus, RpcUtils.SUCCESS_STATUS);
+      }
+      noFailure = false;
+      isBatchFailure = true;
+      for (Entry<Integer, TSStatus> integerTSStatusEntry :
+          ((CreateMultiTimeSeriesPlan) parentPlan).getResults().entrySet()) {
+        subStatus[integerTSStatusEntry.getKey()] = 
integerTSStatusEntry.getValue();
+      }
+    }
+    return concludeFinalStatus(
+        noFailure, endPoint, isBatchFailure, subStatus, 
errorCodePartitionGroups);
+  }
+
+  private TSStatus concludeFinalStatus(

Review comment:
       same

##########
File path: 
cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/FileSnapshot.java
##########
@@ -195,64 +194,70 @@ public String toString() {
     }
 
     @Override
-    public void install(FileSnapshot snapshot, int slot) throws 
SnapshotInstallationException {
+    public void install(FileSnapshot snapshot, int slot, boolean 
isDataMigration)

Review comment:
       Then please add some comments for this parameters

##########
File path: 
cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/AddNodeLog.java
##########
@@ -32,25 +32,57 @@
 /** AddNodeLog records the operation of adding a node into this cluster. */
 public class AddNodeLog extends Log {
 
+  private ByteBuffer partitionTable;
+
   private Node newNode;
 
-  public Node getNewNode() {
-    return newNode;
+  private long metaLogIndex;

Review comment:
       OK

##########
File path: 
cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptor.java
##########
@@ -165,14 +165,14 @@ public static Long querySingleSeries(
   }
 
   public static List<String> getNodeList(

Review comment:
       OK

##########
File path: 
cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
##########
@@ -1411,6 +1435,291 @@ public TSStatus 
processNonPartitionedMetaPlan(PhysicalPlan plan) {
     return result;
   }
 
+  /**
+   * Forward plans to the DataGroupMember of one node in the corresponding 
group. Only when all
+   * nodes time out, will a TIME_OUT be returned.
+   *
+   * @param planGroupMap sub-plan -> belong data group pairs
+   */
+  public TSStatus forwardPlan(Map<PhysicalPlan, PartitionGroup> planGroupMap, 
PhysicalPlan plan) {
+    // the error codes from the groups that cannot execute the plan
+    TSStatus status;
+    if (planGroupMap.size() == 1) {
+      status = forwardToSingleGroup(planGroupMap.entrySet().iterator().next());
+    } else {
+      if (plan instanceof InsertTabletPlan || plan instanceof 
CreateMultiTimeSeriesPlan) {
+        // InsertTabletPlan and CreateMultiTimeSeriesPlan contains many rows, 
each will correspond
+        // to a TSStatus as its
+        // execution result, as the plan is split and the sub-plans may have 
interleaving ranges,
+        // we must assure that each TSStatus is placed to the right position
+        // e.g., an InsertTabletPlan contains 3 rows, row1 and row3 belong to 
NodeA and row2
+        // belongs to NodeB, when NodeA returns a success while NodeB returns 
a failure, the
+        // failure and success should be placed into proper positions in 
TSStatus.subStatus
+        status = forwardMultiSubPlan(planGroupMap, plan);
+      } else {
+        status = forwardToMultipleGroup(planGroupMap);
+      }
+    }
+    if (plan instanceof InsertPlan
+        && status.getCode() == 
TSStatusCode.TIMESERIES_NOT_EXIST.getStatusCode()
+        && config.isEnableAutoCreateSchema()) {
+      TSStatus tmpStatus = createTimeseriesForFailedInsertion(planGroupMap, 
((InsertPlan) plan));
+      if (tmpStatus != null) {
+        status = tmpStatus;
+      }
+    }
+    logger.debug("{}: executed {} with answer {}", name, plan, status);
+    return status;
+  }
+
+  private TSStatus createTimeseriesForFailedInsertion(
+      Map<PhysicalPlan, PartitionGroup> planGroupMap, InsertPlan plan) {
+    // try to create timeseries
+    if (plan.getFailedMeasurements() != null) {
+      plan.getPlanFromFailed();
+    }
+    boolean hasCreate;
+    try {
+      hasCreate = ((CMManager) IoTDB.metaManager).createTimeseries(plan);
+    } catch (IllegalPathException | CheckConsistencyException e) {
+      return StatusUtils.getStatus(StatusUtils.EXECUTE_STATEMENT_ERROR, 
e.getMessage());
+    }
+    if (hasCreate) {
+      return forwardPlan(planGroupMap, plan);
+    } else {
+      logger.error("{}, Cannot auto create timeseries.", thisNode);
+    }
+    return null;
+  }
+
+  /**
+   * Forward each sub-plan to its belonging data group, and combine responses 
from the groups.
+   *
+   * @param planGroupMap sub-plan -> data group pairs
+   */
+  @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity 
warning
+  private TSStatus forwardMultiSubPlan(
+      Map<PhysicalPlan, PartitionGroup> planGroupMap, PhysicalPlan parentPlan) 
{
+    List<String> errorCodePartitionGroups = new ArrayList<>();
+    TSStatus tmpStatus;
+    TSStatus[] subStatus = null;
+    boolean noFailure = true;
+    boolean isBatchFailure = false;
+    EndPoint endPoint = null;
+    int totalRowNum = 0;
+    // send sub-plans to each belonging data group and collect results
+    for (Map.Entry<PhysicalPlan, PartitionGroup> entry : 
planGroupMap.entrySet()) {
+      tmpStatus = forwardToSingleGroup(entry);
+      logger.debug("{}: from {},{},{}", name, entry.getKey(), 
entry.getValue(), tmpStatus);
+      noFailure = (tmpStatus.getCode() == 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) && noFailure;
+      isBatchFailure =
+          (tmpStatus.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) 
|| isBatchFailure;
+      if (tmpStatus.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
+        if (parentPlan instanceof InsertTabletPlan) {
+          totalRowNum = ((InsertTabletPlan) parentPlan).getRowCount();
+        } else if (parentPlan instanceof CreateMultiTimeSeriesPlan) {
+          totalRowNum = ((CreateMultiTimeSeriesPlan) 
parentPlan).getIndexes().size();
+        }
+        if (subStatus == null) {
+          subStatus = new TSStatus[totalRowNum];
+          Arrays.fill(subStatus, RpcUtils.SUCCESS_STATUS);
+        }
+        // set the status from one group to the proper positions of the 
overall status
+        if (parentPlan instanceof InsertTabletPlan) {
+          PartitionUtils.reordering(
+              (InsertTabletPlan) entry.getKey(),
+              subStatus,
+              tmpStatus.subStatus.toArray(new TSStatus[] {}));
+        } else if (parentPlan instanceof CreateMultiTimeSeriesPlan) {
+          CreateMultiTimeSeriesPlan subPlan = (CreateMultiTimeSeriesPlan) 
entry.getKey();
+          for (int i = 0; i < subPlan.getIndexes().size(); i++) {
+            subStatus[subPlan.getIndexes().get(i)] = 
tmpStatus.subStatus.get(i);
+          }
+        }
+      }
+      if (tmpStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+        // execution failed, record the error message
+        errorCodePartitionGroups.add(
+            String.format(
+                "[%s@%s:%s:%s]",
+                tmpStatus.getCode(),
+                entry.getValue().getHeader(),
+                tmpStatus.getMessage(),
+                tmpStatus.subStatus));
+      }
+      if (parentPlan instanceof InsertTabletPlan
+          && tmpStatus.isSetRedirectNode()
+          && ((InsertTabletPlan) entry.getKey()).getMaxTime()
+              == ((InsertTabletPlan) parentPlan).getMaxTime()) {
+        endPoint = tmpStatus.getRedirectNode();
+      }
+    }
+
+    if (parentPlan instanceof CreateMultiTimeSeriesPlan
+        && !((CreateMultiTimeSeriesPlan) parentPlan).getResults().isEmpty()) {
+      if (subStatus == null) {
+        subStatus = new TSStatus[totalRowNum];
+        Arrays.fill(subStatus, RpcUtils.SUCCESS_STATUS);
+      }
+      noFailure = false;
+      isBatchFailure = true;
+      for (Entry<Integer, TSStatus> integerTSStatusEntry :
+          ((CreateMultiTimeSeriesPlan) parentPlan).getResults().entrySet()) {
+        subStatus[integerTSStatusEntry.getKey()] = 
integerTSStatusEntry.getValue();
+      }
+    }
+    return concludeFinalStatus(
+        noFailure, endPoint, isBatchFailure, subStatus, 
errorCodePartitionGroups);
+  }
+
+  private TSStatus concludeFinalStatus(
+      boolean noFailure,
+      EndPoint endPoint,
+      boolean isBatchFailure,
+      TSStatus[] subStatus,
+      List<String> errorCodePartitionGroups) {
+    TSStatus status;
+    if (noFailure) {
+      status = StatusUtils.OK;
+      if (endPoint != null) {
+        status = StatusUtils.getStatus(status, endPoint);
+      }
+    } else if (isBatchFailure) {
+      status = RpcUtils.getStatus(Arrays.asList(subStatus));
+    } else {
+      status =
+          StatusUtils.getStatus(
+              StatusUtils.EXECUTE_STATEMENT_ERROR,
+              MSG_MULTIPLE_ERROR + errorCodePartitionGroups.toString());
+    }
+    return status;
+  }
+
+  private TSStatus forwardToSingleGroup(Map.Entry<PhysicalPlan, 
PartitionGroup> entry) {
+    TSStatus result;
+    if (entry.getValue().contains(thisNode)) {
+      // the query should be handled by a group the local node is in, handle 
it with in the group
+      long startTime =
+          Timer.Statistic.META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP
+              .getOperationStartTime();
+      logger.debug(
+          "Execute {} in a local group of {}", entry.getKey(), 
entry.getValue().getHeader());
+      result =
+          getLocalDataMember(entry.getValue().getHeader(), 
entry.getValue().getId())
+              .executeNonQueryPlan(entry.getKey());
+      Timer.Statistic.META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP
+          .calOperationCostTimeFromStart(startTime);
+    } else {
+      // forward the query to the group that should handle it
+      long startTime =
+          Timer.Statistic.META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_REMOTE_GROUP
+              .getOperationStartTime();
+      logger.debug(
+          "Forward {} to a remote group of {}", entry.getKey(), 
entry.getValue().getHeader());
+      result = forwardPlan(entry.getKey(), entry.getValue());
+      Timer.Statistic.META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_REMOTE_GROUP
+          .calOperationCostTimeFromStart(startTime);
+    }
+    return result;
+  }
+
+  /**
+   * forward each sub-plan to its corresponding data group, if some groups 
goes wrong, the error
+   * messages from each group will be compacted into one string.
+   *
+   * @param planGroupMap sub-plan -> data group pairs
+   */
+  private TSStatus forwardToMultipleGroup(Map<PhysicalPlan, PartitionGroup> 
planGroupMap) {
+    List<String> errorCodePartitionGroups = new ArrayList<>();
+    TSStatus tmpStatus;
+    boolean allRedirect = true;
+    for (Map.Entry<PhysicalPlan, PartitionGroup> entry : 
planGroupMap.entrySet()) {
+      tmpStatus = forwardToSingleGroup(entry);
+      if (!tmpStatus.isSetRedirectNode()) {
+        allRedirect = false;
+      }
+      if (tmpStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+        logger.error("Fail to send log {} to data group {}", entry.getKey(), 
entry.getValue());
+        // execution failed, record the error message
+        errorCodePartitionGroups.add(
+            String.format(
+                "[%s@%s:%s]",
+                tmpStatus.getCode(), entry.getValue().getHeader(), 
tmpStatus.getMessage()));
+      }
+    }
+    TSStatus status;
+    if (errorCodePartitionGroups.isEmpty()) {
+      if (allRedirect) {
+        status = new TSStatus();
+        status.setCode(TSStatusCode.NEED_REDIRECTION.getStatusCode());
+      } else {
+        status = StatusUtils.OK;
+      }
+    } else {
+      status =
+          StatusUtils.getStatus(
+              StatusUtils.EXECUTE_STATEMENT_ERROR, MSG_MULTIPLE_ERROR + 
errorCodePartitionGroups);
+    }
+    return status;
+  }
+
+  /**
+   * Forward a plan to the DataGroupMember of one node in the group. Only when 
all nodes time out,
+   * will a TIME_OUT be returned.
+   */
+  private TSStatus forwardPlan(PhysicalPlan plan, PartitionGroup group) {

Review comment:
       same

##########
File path: 
cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
##########
@@ -314,97 +321,61 @@ public synchronized boolean addNode(Node node, 
NodeAdditionResult result) {
       if (insertIndex > 0) {
         allNodes.add(insertIndex, node);
         peerMap.putIfAbsent(node, new Peer(logManager.getLastLogIndex()));
-        // remove the last node because the group size is fixed to replication 
number
-        Node removedNode = allNodes.remove(allNodes.size() - 1);
-        peerMap.remove(removedNode);
         // if the local node is the last node and the insertion succeeds, this 
node should leave
         // the group
         logger.debug("{}: Node {} is inserted into the data group {}", name, 
node, allNodes);
-        return removedNode.equals(thisNode);
       }
-      return false;
+      return insertIndex > 0;
     }
   }
 
   /**
-   * Process the election request from another node in the group. To win the 
vote from the local
-   * member, a node must have both meta and data logs no older than then local 
member, or it will be
-   * turned down.
+   * Try to add a Node into the group to which the member belongs.
    *
-   * @param electionRequest
-   * @return Response.RESPONSE_META_LOG_STALE if the meta logs of the elector 
fall behind
-   *     Response.RESPONSE_LOG_MISMATCH if the data logs of the elector fall 
behind Response.SUCCESS
-   *     if the vote is given to the elector the term of local member if the 
elector's term is no
-   *     bigger than the local member
+   * @return true if this node should leave the group because of the addition 
of the node, false
+   *     otherwise
    */
-  @Override
-  long checkElectorLogProgress(ElectionRequest electionRequest) {
-    // to be a data group leader, a node should also be qualified to be the 
meta group leader
-    // which guarantees the data group leader has the newest partition table.
-    long thatTerm = electionRequest.getTerm();
-    long thatMetaLastLogIndex = electionRequest.getLastLogIndex();
-    long thatMetaLastLogTerm = electionRequest.getLastLogTerm();
-    long thatDataLastLogIndex = electionRequest.getDataLogLastIndex();
-    long thatDataLastLogTerm = electionRequest.getDataLogLastTerm();
-    logger.info(
-        "{} received an dataGroup election request, term:{}, 
metaLastLogIndex:{}, metaLastLogTerm:{}, dataLastLogIndex:{}, 
dataLastLogTerm:{}",
-        name,
-        thatTerm,
-        thatMetaLastLogIndex,
-        thatMetaLastLogTerm,
-        thatDataLastLogIndex,
-        thatDataLastLogTerm);
-
-    // check meta logs
-    // term of the electors' MetaGroupMember is not verified, so 0 and 1 are 
used to make sure
-    // the verification does not fail
-    long metaResponse = metaGroupMember.checkLogProgress(thatMetaLastLogIndex, 
thatMetaLastLogTerm);
-    if (metaResponse == Response.RESPONSE_LOG_MISMATCH) {
-      return Response.RESPONSE_META_LOG_STALE;
-    }
-
-    long resp = checkLogProgress(thatDataLastLogIndex, thatDataLastLogTerm);
-    if (resp == Response.RESPONSE_AGREE) {
-      logger.info(
-          "{} accepted an dataGroup election request, term:{}/{}, 
dataLogIndex:{}/{}, dataLogTerm:{}/{}, metaLogIndex:{}/{},metaLogTerm:{}/{}",
-          name,
-          thatTerm,
-          term.get(),
-          thatDataLastLogIndex,
-          logManager.getLastLogIndex(),
-          thatDataLastLogTerm,
-          logManager.getLastLogTerm(),
-          thatMetaLastLogIndex,
-          metaGroupMember.getLogManager().getLastLogIndex(),
-          thatMetaLastLogTerm,
-          metaGroupMember.getLogManager().getLastLogTerm());
-      setCharacter(NodeCharacter.FOLLOWER);
-      lastHeartbeatReceivedTime = System.currentTimeMillis();
-      setVoteFor(electionRequest.getElector());
-      updateHardState(thatTerm, getVoteFor());
-    } else {
-      logger.info(
-          "{} rejected an dataGroup election request, term:{}/{}, 
dataLogIndex:{}/{}, dataLogTerm:{}/{}, metaLogIndex:{}/{},metaLogTerm:{}/{}",
-          name,
-          thatTerm,
-          term.get(),
-          thatDataLastLogIndex,
-          logManager.getLastLogIndex(),
-          thatDataLastLogTerm,
-          logManager.getLastLogTerm(),
-          thatMetaLastLogIndex,
-          metaGroupMember.getLogManager().getLastLogIndex(),
-          thatMetaLastLogTerm,
-          metaGroupMember.getLogManager().getLastLogTerm());
+  public boolean addNode(Node node, NodeAdditionResult result) {
+    if (logger.isDebugEnabled()) {
+      logger.debug("{}: start to add node {}", name, node);
+    }
+
+    // mark slots that do not belong to this group any more
+    Set<Integer> lostSlots =
+        ((SlotNodeAdditionResult) result)
+            .getLostSlots()
+            .getOrDefault(new RaftNode(getHeader(), getRaftGroupId()), 
Collections.emptySet());
+    for (Integer lostSlot : lostSlots) {
+      slotManager.setToSending(lostSlot, false);
+    }
+    slotManager.save();
+
+    synchronized (allNodes) {
+      preAddNode(node);
+      if (allNodes.contains(node) && allNodes.size() > 
config.getReplicationNum()) {
+        // remove the last node because the group size is fixed to replication 
number
+        Node removedNode = allNodes.remove(allNodes.size() - 1);
+        peerMap.remove(removedNode);
+
+        if (removedNode.equals(leader.get()) && !removedNode.equals(thisNode)) 
{
+          // if the leader is removed, also start an election immediately
+          synchronized (term) {
+            setCharacter(NodeCharacter.ELECTOR);

Review comment:
       OK, I misunderstood




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to