junrao commented on code in PR #14242:
URL: https://github.com/apache/kafka/pull/14242#discussion_r1383719872


##########
storage/src/main/java/org/apache/kafka/storage/internals/log/SnapshotFile.java:
##########
@@ -63,7 +63,7 @@ public File file() {
     public void renameTo(String newSuffix) throws IOException {
         File renamed = new File(Utils.replaceSuffix(file.getPath(), "", 
newSuffix));
         try {
-            Utils.atomicMoveWithFallback(file.toPath(), renamed.toPath());
+            Utils.atomicMoveWithFallback(file.toPath(), renamed.toPath(), 
false);

Review Comment:
   This works since it's ok to lose a file to be deleted. Perhaps it's better 
to rename the method to sth like `renameToDelete` so that it's clear that this 
is not a generic method for arbitrary renaming.



##########
core/src/main/scala/kafka/log/UnifiedLog.scala:
##########
@@ -1617,10 +1617,20 @@ class UnifiedLog(@volatile var logStartOffset: Long,
     // may actually be ahead of the current producer state end offset (which 
corresponds to the log end offset),
     // we manually override the state offset here prior to taking the snapshot.
     producerStateManager.updateMapEndOffset(newSegment.baseOffset)
-    producerStateManager.takeSnapshot()
+    // We avoid potentially-costly fsync call, since we acquire 
UnifiedLog#lock here
+    // which could block subsequent produces in the meantime.
+    // flush is done in the scheduler thread along with segment flushing below
+    val maybeSnapshot = producerStateManager.takeSnapshot(false)
     updateHighWatermarkWithLogEndOffset()
     // Schedule an asynchronous flush of the old segment
-    scheduler.scheduleOnce("flush-log", () => 
flushUptoOffsetExclusive(newSegment.baseOffset))
+    scheduler.scheduleOnce("flush-log", () => {
+      maybeSnapshot.ifPresent(f => {

Review Comment:
   Could we get rid of `{`?



##########
core/src/main/scala/kafka/log/UnifiedLog.scala:
##########
@@ -1617,10 +1617,20 @@ class UnifiedLog(@volatile var logStartOffset: Long,
     // may actually be ahead of the current producer state end offset (which 
corresponds to the log end offset),
     // we manually override the state offset here prior to taking the snapshot.
     producerStateManager.updateMapEndOffset(newSegment.baseOffset)
-    producerStateManager.takeSnapshot()
+    // We avoid potentially-costly fsync call, since we acquire 
UnifiedLog#lock here
+    // which could block subsequent produces in the meantime.
+    // flush is done in the scheduler thread along with segment flushing below
+    val maybeSnapshot = producerStateManager.takeSnapshot(false)
     updateHighWatermarkWithLogEndOffset()
     // Schedule an asynchronous flush of the old segment
-    scheduler.scheduleOnce("flush-log", () => 
flushUptoOffsetExclusive(newSegment.baseOffset))
+    scheduler.scheduleOnce("flush-log", () => {
+      maybeSnapshot.ifPresent(f => {
+        maybeHandleIOException(s"Error while deleting producer state snapshot 
$f for $topicPartition in dir ${dir.getParent}") {
+          Utils.flushFileIfExists(f.toPath)
+        }
+      })
+      flushUptoOffsetExclusive(newSegment.baseOffset)

Review Comment:
   Is it possible to add a test to verify that the recovery point is only 
advanced after the producer state has been flushed to disk?



##########
storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java:
##########
@@ -308,7 +308,14 @@ public void truncateFromEnd(long endOffset) {
             if (endOffset >= 0 && epochEntry.isPresent() && 
epochEntry.get().startOffset >= endOffset) {
                 List<EpochEntry> removedEntries = removeFromEnd(x -> 
x.startOffset >= endOffset);
 
-                flush();
+                // We intentionally don't force flushing change to the device 
here because:
+                // - To avoid fsync latency
+                //   * fsync latency could be huge on a disk glitch, which is 
not rare in spinning drives
+                //   * This method is called by ReplicaFetcher threads, which 
could block replica fetching
+                //     then causing ISR shrink or high produce response time 
degradation in remote scope on high fsync latency.
+                // - Even when stale epochs remained in LeaderEpoch file due 
to the unclean shutdown, it will be handled by
+                //   another truncateFromEnd call on log loading procedure so 
it won't be a problem
+                flush(false);

Review Comment:
   So, it sounds like that you agree that there is little value to call flush 
without sync. Should we remove the call then?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to