jt2594838 commented on a change in pull request #1112:
URL: https://github.com/apache/incubator-iotdb/pull/1112#discussion_r420503823



##########
File path: 
cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
##########
@@ -398,7 +399,92 @@ public void appendEntry(AppendEntryRequest request, 
AsyncMethodCallback resultHa
 
   @Override
   public void appendEntries(AppendEntriesRequest request, AsyncMethodCallback 
resultHandler) {
-    //TODO-Cluster#354: implement
+    logger.debug("{} received an AppendEntriesRequest", name);
+
+    // the term checked here is that of the leader, not that of the log
+    if (!checkRequestTerm(request, resultHandler)) {
+      return;
+    }
+
+    try {
+      long response = 0;
+      List<Log> logs = new ArrayList<>();
+      for (ByteBuffer buffer : request.getEntries()) {
+        Log log = LogParser.getINSTANCE().parse(buffer);
+        logs.add(log);
+      }
+
+      response = appendEntries(logs);
+      resultHandler.onComplete(response);
+      logger.debug("{} AppendEntriesRequest of log size {} completed", name,
+          request.getEntries().size());
+    } catch (UnknownLogTypeException e) {
+      resultHandler.onError(e);
+    }
+  }
+
+  /**
+   * Find the local previous log of "log". If such log is found, discard all 
local logs behind it
+   * and append "log" to it. Otherwise report a log mismatch.
+   *
+   * @param logs
+   * @return Response.RESPONSE_AGREE when the log is successfully appended or 
Response
+   * .RESPONSE_LOG_MISMATCH if the previous log of "log" is not found.
+   */
+  private long appendEntries(List<Log> logs) {
+    if (logs.isEmpty()) {
+      return Response.RESPONSE_AGREE;
+    }
+
+    long resp;
+    synchronized (logManager) {
+      if (logs.get(0).getCurrLogIndex() > logManager.getLastLogIndex() + 1) {
+        // the incoming log points to an illegal position, reject it
+        resp = Response.RESPONSE_LOG_MISMATCH;
+      } else {
+        logManager.append(logs);

Review comment:
       LebronAl has now provided a safer method called `maybeAppendBatch` in 
`RaftLogManager`, please switch to that and check the return value.

##########
File path: 
cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
##########
@@ -398,7 +399,92 @@ public void appendEntry(AppendEntryRequest request, 
AsyncMethodCallback resultHa
 
   @Override
   public void appendEntries(AppendEntriesRequest request, AsyncMethodCallback 
resultHandler) {
-    //TODO-Cluster#354: implement
+    logger.debug("{} received an AppendEntriesRequest", name);
+
+    // the term checked here is that of the leader, not that of the log
+    if (!checkRequestTerm(request, resultHandler)) {
+      return;
+    }
+
+    try {
+      long response = 0;
+      List<Log> logs = new ArrayList<>();
+      for (ByteBuffer buffer : request.getEntries()) {
+        Log log = LogParser.getINSTANCE().parse(buffer);
+        logs.add(log);
+      }
+
+      response = appendEntries(logs);
+      resultHandler.onComplete(response);
+      logger.debug("{} AppendEntriesRequest of log size {} completed", name,
+          request.getEntries().size());
+    } catch (UnknownLogTypeException e) {
+      resultHandler.onError(e);
+    }
+  }
+
+  /**
+   * Find the local previous log of "log". If such log is found, discard all 
local logs behind it
+   * and append "log" to it. Otherwise report a log mismatch.
+   *
+   * @param logs
+   * @return Response.RESPONSE_AGREE when the log is successfully appended or 
Response
+   * .RESPONSE_LOG_MISMATCH if the previous log of "log" is not found.
+   */
+  private long appendEntries(List<Log> logs) {
+    if (logs.isEmpty()) {
+      return Response.RESPONSE_AGREE;
+    }
+
+    long resp;
+    synchronized (logManager) {
+      if (logs.get(0).getCurrLogIndex() > logManager.getLastLogIndex() + 1) {
+        // the incoming log points to an illegal position, reject it
+        resp = Response.RESPONSE_LOG_MISMATCH;
+      } else {
+        logManager.append(logs);
+        if (logger.isDebugEnabled()) {
+          logger.debug("{} append new logs list {}", name, logs);
+        }
+        resp = Response.RESPONSE_AGREE;
+      }
+    }
+    return resp;
+  }
+
+  /**
+   * Check the term of the AppendEntryRequest. The term checked is the term of 
the leader, not the
+   * term of the log. A new leader can still send logs of old leaders.
+   *
+   * @param request
+   * @param resultHandler if the term is illegal, the "resultHandler" will be 
invoked so the caller
+   *                      does not need to invoke it again
+   * @return true if the term is legal, false otherwise
+   */
+  private boolean checkRequestTerm(AppendEntriesRequest request,
+      AsyncMethodCallback resultHandler) {
+    long leaderTerm = request.getTerm();
+    long localTerm;
+
+    synchronized (term) {
+      // if the request comes before the heartbeat arrives, the local term may 
be smaller than the
+      // leader term
+      localTerm = term.get();
+      if (leaderTerm < localTerm) {
+        logger.debug("{} rejected the AppendEntryRequest for term: {}/{}", 
name, leaderTerm,

Review comment:
       The implementation of `checkRequestTerm` for `AppendEntryRequest` has 
been changed slightly, so please see to it and make necessary modifications.
   By the way, the log message should use `AppendEntriesRequest` here instead 
of `AppendEntriesRequest`. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to