guozhangwang commented on a change in pull request #9352:
URL: https://github.com/apache/kafka/pull/9352#discussion_r503567593



##########
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##########
@@ -193,8 +193,9 @@ private void updateFollowerHighWatermark(
     ) {
         highWatermarkOpt.ifPresent(highWatermark -> {
             long newHighWatermark = Math.min(endOffset().offset, 
highWatermark);
-            state.updateHighWatermark(OptionalLong.of(newHighWatermark));
-            updateHighWatermark(state, currentTimeMs);
+            if (state.updateHighWatermark(OptionalLong.of(newHighWatermark))) {

Review comment:
       Just to clarify this is not a correctness bugfix, but just to optimize 
away unnecessary purgatory access right?

##########
File path: raft/src/main/java/org/apache/kafka/raft/LeaderState.java
##########
@@ -114,14 +113,6 @@ private boolean updateHighWatermark() {
         return false;
     }
 
-    private OptionalLong quorumMajorityFetchTimestamp() {

Review comment:
       Good catch.

##########
File path: raft/src/main/java/org/apache/kafka/raft/ReplicatedLog.java
##########
@@ -103,6 +103,16 @@
      */
     void updateHighWatermark(LogOffsetMetadata offsetMetadata);
 
+    /**
+     * Flush the current log to disk.
+     */
+    void flush();
+
+    /**
+     * Get the last offset which has been flushed to disk.
+     */
+    long lastFlushedOffset();

Review comment:
       Is this function going to be used for non-testing code in the future?

##########
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##########
@@ -985,6 +986,8 @@ private boolean handleFetchResponse(
                 Records records = (Records) partitionResponse.recordSet();
                 if (records.sizeInBytes() > 0) {
                     LogAppendInfo info = log.appendAsFollower(records);
+                    log.flush();

Review comment:
       nit: maybe we can wrap the flushing and hwm updating logic in a 
`flushFollowerLog` as well.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to