[ 
https://issues.apache.org/jira/browse/KAFKA-19571?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18061703#comment-18061703
 ] 

Ilyas Toumlilt commented on KAFKA-19571:
----------------------------------------

Updated the "Proposed fix" section in description to match the actual merged 
fix.

> Race condition between log segment flush and file deletion causing log dir to 
> go offline
> ----------------------------------------------------------------------------------------
>
>                 Key: KAFKA-19571
>                 URL: https://issues.apache.org/jira/browse/KAFKA-19571
>             Project: Kafka
>          Issue Type: Bug
>          Components: core, log
>    Affects Versions: 3.7.1
>            Reporter: Ilyas Toumlilt
>            Assignee: Ilyas Toumlilt
>            Priority: Major
>             Fix For: 4.2.0, 4.3.0, 4.0.2, 4.1.2
>
>
> h1. Context
> We are using Kafka v3.7.1 with Zookeeper, our brokers are configured with 
> multiple disks in a JBOD setup, routine intra-broker data rebalancing is 
> performed using Cruise Control to manage disk utilization. During these 
> rebalance operations, a race condition between a log segment flush operation 
> and the file deletion that is part of the replica's directory move. This race 
> leads to a `NoSuchFileException` when the flush operation targets a file path 
> that has just been deleted by the rebalance process. This exception 
> incorrectly forces the broker to take the entire log directory offline.
> h1. Logs / Stack trace
> {code:java}
> 2025-07-23 19:03:30,114 WARN Failed to flush file 
> /var/lib/kafka-08/topic_01-12/00000000024420850595.snapshot (org.apache.kafka.
> common.utils.Utils)
> java.nio.file.NoSuchFileException: 
> /var/lib/kafka-08/topic_01-12/00000000024420850595.snapshot
>         at 
> java.base/sun.nio.fs.UnixException.translateToIOException(UnixException.java:92)
>         at 
> java.base/sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:111)
>         at 
> java.base/sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:116)
>         at 
> java.base/sun.nio.fs.UnixFileSystemProvider.newFileChannel(UnixFileSystemProvider.java:182)
>         at java.base/java.nio.channels.FileChannel.open(FileChannel.java:292)
>         at java.base/java.nio.channels.FileChannel.open(FileChannel.java:345)
>         at 
> org.apache.kafka.common.utils.Utils.flushFileIfExists(Utils.java:1029)
>         at 
> kafka.log.UnifiedLog.$anonfun$flushProducerStateSnapshot$2(UnifiedLog.scala:1766)
>         at 
> kafka.log.UnifiedLog.flushProducerStateSnapshot(UnifiedLog.scala:1915)
>         at kafka.log.UnifiedLog.$anonfun$roll$2(UnifiedLog.scala:1679)
>         at java.base/java.util.Optional.ifPresent(Optional.java:183)
>         at kafka.log.UnifiedLog.$anonfun$roll$1(UnifiedLog.scala:1679)
>         at 
> org.apache.kafka.server.util.KafkaScheduler.lambda$schedule$1(KafkaScheduler.java:150)
>         at 
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
>         at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
>         at 
> java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
>         at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>         at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>         at java.base/java.lang.Thread.run(Thread.java:829)
>         
> 2025-07-23 19:03:30,114 ERROR Error while flushing log for topic_01-12 in dir 
> /var/lib/kafka-08 with offset 24420850595 (exclusi
> ve) and recovery point 24420850595 
> (org.apache.kafka.storage.internals.log.LogDirFailureChannel)
> java.nio.channels.ClosedChannelException
>         at 
> java.base/sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:150)
>         at 
> java.base/sun.nio.ch.FileChannelImpl.force(FileChannelImpl.java:452)
>         at 
> org.apache.kafka.common.record.FileRecords.flush(FileRecords.java:197)
>         at 
> org.apache.kafka.storage.internals.log.LogSegment$2.call(LogSegment.java:631)
>         at 
> org.apache.kafka.storage.internals.log.LogSegment$2.call(LogSegment.java:627)
>         at com.yammer.metrics.core.Timer.time(Timer.java:91)
>         at 
> org.apache.kafka.storage.internals.log.LogSegment.flush(LogSegment.java:627)
>         at kafka.log.LocalLog.$anonfun$flush$1(LocalLog.scala:176)
>         at java.base/java.lang.Iterable.forEach(Iterable.java:75)
>         at kafka.log.LocalLog.flush(LocalLog.scala:176)
>         at kafka.log.UnifiedLog.$anonfun$flush$2(UnifiedLog.scala:1719)
>         at kafka.log.UnifiedLog.flush(UnifiedLog.scala:1915)
>         at 
> kafka.log.UnifiedLog.flushUptoOffsetExclusive(UnifiedLog.scala:1700)
>         at kafka.log.UnifiedLog.$anonfun$roll$1(UnifiedLog.scala:1680)
>         at 
> org.apache.kafka.server.util.KafkaScheduler.lambda$schedule$1(KafkaScheduler.java:150)
>         at 
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
>         at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
>         at 
> java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
>         at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>         at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>         at java.base/java.lang.Thread.run(Thread.java:829)
>         
> 2025-07-23 19:03:30,115 ERROR Uncaught exception in scheduled task 
> 'flush-log' (org.apache.kafka.server.util.KafkaScheduler)
> org.apache.kafka.common.errors.KafkaStorageException: Error while flushing 
> log for topic_01-12 in dir /var/lib/kafka-08 with off
> set 24420850595 (exclusive) and recovery point 24420850595
> Caused by: java.nio.channels.ClosedChannelException
>         at 
> java.base/sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:150)
>         at 
> java.base/sun.nio.ch.FileChannelImpl.force(FileChannelImpl.java:452)
>         at 
> org.apache.kafka.common.record.FileRecords.flush(FileRecords.java:197)
>         at 
> org.apache.kafka.storage.internals.log.LogSegment$2.call(LogSegment.java:631)
>         at 
> org.apache.kafka.storage.internals.log.LogSegment$2.call(LogSegment.java:627)
>         at com.yammer.metrics.core.Timer.time(Timer.java:91)
>         at 
> org.apache.kafka.storage.internals.log.LogSegment.flush(LogSegment.java:627)
>         at kafka.log.LocalLog.$anonfun$flush$1(LocalLog.scala:176)
>         at java.base/java.lang.Iterable.forEach(Iterable.java:75)
>         at kafka.log.LocalLog.flush(LocalLog.scala:176)
>         at kafka.log.UnifiedLog.$anonfun$flush$2(UnifiedLog.scala:1719)
>         at kafka.log.UnifiedLog.flush(UnifiedLog.scala:1915)
>         at 
> kafka.log.UnifiedLog.flushUptoOffsetExclusive(UnifiedLog.scala:1700)
>         at kafka.log.UnifiedLog.$anonfun$roll$1(UnifiedLog.scala:1680)
>         at 
> org.apache.kafka.server.util.KafkaScheduler.lambda$schedule$1(KafkaScheduler.java:150)
>         at 
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
>         at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
>         at 
> java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
>         at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>         at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>         at java.base/java.lang.Thread.run(Thread.java:829)
>         
> 2025-07-23 19:03:30,117 WARN [ReplicaManager broker=32] Stopping serving 
> replicas in dir /var/lib/kafka-08 (kafka.server.ReplicaManager) {code}
> Stack Trace Analysis
> The failure begins with a benign `{{{}WARN`{}}} when a scheduled task tries 
> to flush a producer state snapshot that was moved during a disk rebalance; 
> this {{`NoSuchFileException`}} is anticipated and handled gracefully by the 
> code. As implemented in https://issues.apache.org/jira/browse/KAFKA-13403 to 
> swallow the exception. 
> However, the same task then attempts to flush the actual log segment, which 
> fails with a critical, unhandled  `{{{}ClosedChannelException{}}}` because 
> the file handles were invalidated by the directory's move. This unhandled I/O 
> error propagates up and terminates the background task, causing the 
> `{{{}KafkaScheduler{}}}` to log it as an uncaught 
> {{{}`{}}}{{{}KafkaStorageException`{}}}. As a direct consequence, the 
> `{{{}ReplicaManager{}}}` detects this fatal storage error and triggers its 
> safety mechanism, taking the entire log directory offline to prevent 
> potential data corruption.
> h1. Expected Behavior
> A {{`NoSuchFileException`}} in this context should not cause the entire log 
> directory to be marked as offline.
> h1. Workaround
> The current workaround is to manually restart the affected Kafka broker. The 
> restart clears the in-memory state, and upon re-scanning the log directories, 
> the broker marks the disk as healthy again.
> h1. Proposed fix
> The fix was implemented in `LogManager.replaceCurrentWithFutureLog()`
> *Approach:* Do not close the source log when swapping the current log with 
> the future log during replica rebalancing. Closing it was what caused pending 
> flush/read operations to see closed channels and trigger the log directory 
> failure.
> *Changes:*
>  # Remove the explicit `close()` of the source log in 
> `replaceCurrentWithFutureLog()`. After renaming the replica directory to the 
> `.delete` suffix, the source log is no longer closed there.
>  # Leave file handles open so that any in-flight operations (e.g. log 
> flusher, fetch requests) can still use the renamed files. On Unix, renamed 
> files stay accessible until all file handles are closed.
>  #  Rely on existing async deletion: the source log is already scheduled for 
> deletion via `addLogToBeDeleted()`. The background delete-logs thread closes 
> the log and deletes the files after the configured delay 
> (`log.segment.delete.delay.ms`). By then, no other operations should still be 
> using those handles.
> This avoids the race for both flush and read paths (unlike a fix only in 
> `LogSegment.flush()`), and is consistent with `LogManager.asyncDelete()`.
>  
> *Testing:*
> A new unit test `testReplaceCurrentWithFutureLogDoesNotCloseSourceLog` in 
> `LogManagerTest` verifies that the source log is not closed during the swap 
> and that `flush()` can be called without error afterward.
> h2. Related issues
>  * https://issues.apache.org/jira/browse/KAFKA-13403 was fixed to swallow the 
> first `{{{}NoSuchFileException{}}}` WARN in the above stacktrace, but not the 
> underlying exception.
>  * https://issues.apache.org/jira/browse/KAFKA-15391 is similar but 
> different, it swallows `NoSuchFileException` for race condition on log 
> directory move/delete, but not on the segment file level.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to