fanhualta commented on a change in pull request #3191:
URL: https://github.com/apache/iotdb/pull/3191#discussion_r639867296



##########
File path: 
cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
##########
@@ -869,39 +929,116 @@ private boolean processAddNodeLocally(
       return true;
     }
 
+    AddNodeLog addNodeLog = new AddNodeLog();
     // node adding is serialized to reduce potential concurrency problem
     synchronized (logManager) {
-      AddNodeLog addNodeLog = new AddNodeLog();
+      // update partition table
+      PartitionTable table = new SlotPartitionTable(thisNode);
+      table.deserialize(partitionTable.serialize());
+      table.addNode(newNode);
+      ((SlotPartitionTable) 
table).setLastMetaLogIndex(logManager.getLastLogIndex() + 1);
+
+      addNodeLog.setPartitionTable(table.serialize());
       addNodeLog.setCurrLogTerm(getTerm().get());
       addNodeLog.setCurrLogIndex(logManager.getLastLogIndex() + 1);
+      addNodeLog.setMetaLogIndex(logManager.getLastLogIndex() + 1);
 
-      addNodeLog.setNewNode(node);
+      addNodeLog.setNewNode(newNode);
 
       logManager.append(addNodeLog);
+    }
 
-      int retryTime = 1;
-      while (true) {
-        logger.info("Send the join request of {} to other nodes, retry time: 
{}", node, retryTime);
-        AppendLogResult result = sendLogToAllGroups(addNodeLog);
-        switch (result) {
-          case OK:
-            logger.info("Join request of {} is accepted", node);
-            commitLog(addNodeLog);
-
-            synchronized (partitionTable) {
-              response.setPartitionTableBytes(partitionTable.serialize());
-            }
-            response.setRespNum((int) Response.RESPONSE_AGREE);
-            logger.info("Sending join response of {}", node);
-            return true;
-          case TIME_OUT:
-            logger.info("Join request of {} timed out", node);
-            retryTime++;
-            continue;
-          case LEADERSHIP_STALE:
-          default:
-            return false;
-        }
+    int retryTime = 0;
+    while (true) {
+      logger.info(
+          "{}: Send the join request of {} to other nodes, retry time: {}",
+          name,
+          newNode,
+          retryTime);
+      AppendLogResult result = sendLogToFollowers(addNodeLog);
+      switch (result) {
+        case OK:
+          commitLog(addNodeLog);
+          logger.info("{}: Join request of {} is accepted", name, newNode);
+
+          synchronized (partitionTable) {
+            response.setPartitionTableBytes(partitionTable.serialize());
+          }
+          response.setRespNum((int) Response.RESPONSE_AGREE);
+          logger.info("{}: Sending join response of {}", name, newNode);
+          return true;
+        case TIME_OUT:
+          logger.debug("{}: log {} timed out, retrying...", name, addNodeLog);
+          try {
+            Thread.sleep(ClusterConstant.RETRY_WAIT_TIME_MS);
+          } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+          }
+          logger.info("{}: Join request of {} timed out", name, newNode);
+          retryTime++;
+          break;
+        case LEADERSHIP_STALE:
+        default:
+          return false;
+      }
+    }
+  }
+
+  /** Check if there has data migration due to previous change membership 
operation. */
+  private boolean waitDataMigrationEnd() throws InterruptedException, 
CheckConsistencyException {
+    // try 5 time
+    int retryTime = 0;
+    while (true) {
+      Map<PartitionGroup, Integer> res = collectAllPartitionMigrationStatus();
+      if (res != null && res.isEmpty()) {
+        return true;
+      }
+      if (++retryTime == 5) {
+        break;
+      }
+      Thread.sleep(ClusterConstant.RETRY_WAIT_TIME_MS);
+    }
+    return false;
+  }
+
+  /** Process empty log for leader to commit all previous log. */
+  public void processEmptyContentLog() {
+    if (character != NodeCharacter.LEADER) {
+      return;
+    }
+
+    Log log = new EmptyContentLog();
+    log.setCurrLogTerm(getTerm().get());
+    log.setCurrLogIndex(logManager.getLastLogIndex() + 1);
+
+    synchronized (logManager) {
+      logManager.append(log);
+    }

Review comment:
       fixed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to