[ 
https://issues.apache.org/jira/browse/KAFKA-17727?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luciano Sabença updated KAFKA-17727:
------------------------------------
    Description: 
We are using a kafka cluster deployed on-premise. The brokers are JBOD with 
around 5/6 disks per broker. When running a intra broker rebalance (ie moving 
partitions between the log dirs) triggered by cruise-control, some nodes had a 
log dir marked as offline. When we looked closed the disk was normal and with a 
broker restart the log dir became online again. 
Investigating the issue, it's seems very similar to KAFKA-15391 and, specially, 
with KAFKA-15572. The main difference on the logs between the issue we 
encountered and the one described at KAFKA-15572 is that there the exception 
that marked the log dir as offline was a `java.nio.file.NoSuchFileException`. 
In our case, we had a `java.nio.channels.ClosedChannelException`:
{noformat}
[2024-10-03 09:48:04,704] ERROR Error while flushing log for mytopic-20 in dir 
/data/0/kafka with offset 844857727 (exclusive) and recovery point 844857727 
(org.apache.kafka.storage.internals.log.LogDirFailureChannel)
java.nio.channels.ClosedChannelException
at java.base/sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:150)
at java.base/sun.nio.ch.FileChannelImpl.force(FileChannelImpl.java:452)
at org.apache.kafka.common.record.FileRecords.flush(FileRecords.java:197)
at kafka.log.LogSegment.$anonfun$flush$1(LogSegment.scala:471)
at kafka.log.LogSegment.$anonfun$flush$1$adapted(LogSegment.scala:470)
at com.yammer.metrics.core.Timer.time(Timer.java:91)
at kafka.log.LogSegment.flush(LogSegment.scala:470)
at kafka.log.LocalLog.$anonfun$flush$1(LocalLog.scala:174)
at kafka.log.LocalLog.$anonfun$flush$1$adapted(LocalLog.scala:174)
at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:575)
at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:573)
at scala.collection.AbstractIterable.foreach(Iterable.scala:933)
at kafka.log.LocalLog.flush(LocalLog.scala:174)
at kafka.log.UnifiedLog.$anonfun$flush$2(UnifiedLog.scala:1537)
at kafka.log.UnifiedLog.flush(UnifiedLog.scala:1724)
at kafka.log.UnifiedLog.flushUptoOffsetExclusive(UnifiedLog.scala:1518)
at kafka.log.UnifiedLog.$anonfun$roll$1(UnifiedLog.scala:1499)
at 
org.apache.kafka.server.util.KafkaScheduler.lambda$schedule$1(KafkaScheduler.java:150)
at 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at 
java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829){noformat}
Investigating kafka code, I believe the root cause is very similar to the one 
KAFKA-15572(https://issues.apache.org/jira/browse/KAFKA-15572). In our case, we 
can see via logs that LogManager.scala was able to replace the old log with the 
new one: 
{noformat}
[2024-10-03 09:47:06,349] INFO Attempting to replace current log 
Log(dir=/data/0/kafka/mytopic-20, topicId=jl0IzzqWSHedIunbTLziYg, 
topic=mytopic, partition=20, highWatermark=844861246, 
lastStableOffset=844861246, logStartOffset=842164116, logEndOffset=844861247) 
with Log(dir=/data/5/kafka/mytopic-20.9fa1dfec1c4a4045b8806565eced19bd-future, 
topicId=jl0IzzqWSHedIunbTLziYg, topic=mytopic, partition=20, 
highWatermark=844861246, lastStableOffset=844861246, logStartOffset=842164116, 
logEndOffset=844861247) for mytopic-20 (kafka.log.LogManager)
INFO Cleaning for partition mytopic-20 is resumed (kafka.log.LogManager)
[2024-10-03 09:47:06,364] INFO The current replica is successfully replaced 
with the future replica for mytopic-20 (kafka.log.LogManager){noformat}
During this process, it closes the old log ([LogManager.scala#L1125|#L1125]) 
and schedule if it to be deleted.  Something triggers [UnifiedLog.roll|#L1499] 
and anything that triggers flush after that for this segment will eventually 
call _LocalLog.flush_ which will try to close each segment to flush 
([[LocalLog.scala#L174|https://github.com/apache/kafka/blob/8f0b0b0d0466632b47c355489e7c9440f3e4c0f5/core/src/main/scala/kafka/log/LocalLog.scala#L174).]|[https://github.com/apache/kafka/blob/8f0b0b0d0466632b47c355489e7c9440f3e4c0f5/core/src/main/scala/kafka/log/LocalLog.scala#L174]])
 Here, however, the segment is already closed and thus the exception. It's not 
clear to me what triggers the roll. In theory the old log was already replaced 
and the only thing remaining for this old segments it to be deleted. As this is 
a high volume topic, however, it's not surprising to have segments rolling 
frequently. 

  was:
We are using a kafka cluster deployed on-premise. The brokers are JBOD with 
around 5/6 disks per broker. When running a intra broker rebalance (ie moving 
partitions between the log dirs) triggered by cruise-control, some nodes had a 
log dir marked as offline. When we looked closed the disk was normal and with a 
broker restart the log dir became online again. 
Investigating the issue, it's seems very similar to KAFKA-15391 and, specially, 
with KAFKA-15572. The main difference on the logs between the issue we 
encountered and the one described at KAFKA-15572 is that there the exception 
that marked the log dir as offline was a `java.nio.file.NoSuchFileException`. 
In our case, we had a `java.nio.channels.ClosedChannelException`:
{noformat}
[2024-10-03 09:48:04,704] ERROR Error while flushing log for mytopic-20 in dir 
/data/0/kafka with offset 844857727 (exclusive) and recovery point 844857727 
(org.apache.kafka.storage.internals.log.LogDirFailureChannel)
java.nio.channels.ClosedChannelException
at java.base/sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:150)
at java.base/sun.nio.ch.FileChannelImpl.force(FileChannelImpl.java:452)
at org.apache.kafka.common.record.FileRecords.flush(FileRecords.java:197)
at kafka.log.LogSegment.$anonfun$flush$1(LogSegment.scala:471)
at kafka.log.LogSegment.$anonfun$flush$1$adapted(LogSegment.scala:470)
at com.yammer.metrics.core.Timer.time(Timer.java:91)
at kafka.log.LogSegment.flush(LogSegment.scala:470)
at kafka.log.LocalLog.$anonfun$flush$1(LocalLog.scala:174)
at kafka.log.LocalLog.$anonfun$flush$1$adapted(LocalLog.scala:174)
at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:575)
at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:573)
at scala.collection.AbstractIterable.foreach(Iterable.scala:933)
at kafka.log.LocalLog.flush(LocalLog.scala:174)
at kafka.log.UnifiedLog.$anonfun$flush$2(UnifiedLog.scala:1537)
at kafka.log.UnifiedLog.flush(UnifiedLog.scala:1724)
at kafka.log.UnifiedLog.flushUptoOffsetExclusive(UnifiedLog.scala:1518)
at kafka.log.UnifiedLog.$anonfun$roll$1(UnifiedLog.scala:1499)
at 
org.apache.kafka.server.util.KafkaScheduler.lambda$schedule$1(KafkaScheduler.java:150)
at 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at 
java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829){noformat}
Investigating kafka code, I believe the root cause is very similar to the one 
KAFKA-15572(https://issues.apache.org/jira/browse/KAFKA-15572). In our case, we 
can see via logs that LogManager.scala was able to replace the old log with the 
new one: 
{noformat}
[2024-10-03 09:47:06,349] INFO Attempting to replace current log 
Log(dir=/data/0/kafka/mytopic-20, topicId=jl0IzzqWSHedIunbTLziYg, 
topic=mytopic, partition=20, highWatermark=844861246, 
lastStableOffset=844861246, logStartOffset=842164116, logEndOffset=844861247) 
with Log(dir=/data/5/kafka/mytopic-20.9fa1dfec1c4a4045b8806565eced19bd-future, 
topicId=jl0IzzqWSHedIunbTLziYg, topic=mytopic, partition=20, 
highWatermark=844861246, lastStableOffset=844861246, logStartOffset=842164116, 
logEndOffset=844861247) for mytopic-20 (kafka.log.LogManager)
INFO Cleaning for partition mytopic-20 is resumed (kafka.log.LogManager)
[2024-10-03 09:47:06,364] INFO The current replica is successfully replaced 
with the future replica for mytopic-20 (kafka.log.LogManager){noformat}
During this process, it closes the old log ([LogManager.scala#L1125|#L1125]]) 
and schedule if it to be deleted.  Something triggers [UnifiedLog.roll|#L1499]] 
and anything that triggers flush after that for this segment will eventually 
call _LocalLog.flush_ which will try to close each segment to flush 
([[LocalLog.scala#L174|https://github.com/apache/kafka/blob/8f0b0b0d0466632b47c355489e7c9440f3e4c0f5/core/src/main/scala/kafka/log/LocalLog.scala#L174).]|[https://github.com/apache/kafka/blob/8f0b0b0d0466632b47c355489e7c9440f3e4c0f5/core/src/main/scala/kafka/log/LocalLog.scala#L174]]
 Here, however, the segment is already closed and thus the exception. It's not 
clear to me what triggers the roll. In theory the old log was already replaced 
and the only thing remaining for this old segments it to be deleted. As this is 
a high volume topic, however, it's not surprising to have segments rolling 
frequently. 


> Log dirs marked as offline incorrectly due to race conditions on segment 
> delete
> -------------------------------------------------------------------------------
>
>                 Key: KAFKA-17727
>                 URL: https://issues.apache.org/jira/browse/KAFKA-17727
>             Project: Kafka
>          Issue Type: Bug
>          Components: core
>    Affects Versions: 3.5.2
>            Reporter: Luciano Sabença
>            Priority: Major
>
> We are using a kafka cluster deployed on-premise. The brokers are JBOD with 
> around 5/6 disks per broker. When running a intra broker rebalance (ie moving 
> partitions between the log dirs) triggered by cruise-control, some nodes had 
> a log dir marked as offline. When we looked closed the disk was normal and 
> with a broker restart the log dir became online again. 
> Investigating the issue, it's seems very similar to KAFKA-15391 and, 
> specially, with KAFKA-15572. The main difference on the logs between the 
> issue we encountered and the one described at KAFKA-15572 is that there the 
> exception that marked the log dir as offline was a 
> `java.nio.file.NoSuchFileException`. In our case, we had a 
> `java.nio.channels.ClosedChannelException`:
> {noformat}
> [2024-10-03 09:48:04,704] ERROR Error while flushing log for mytopic-20 in 
> dir /data/0/kafka with offset 844857727 (exclusive) and recovery point 
> 844857727 (org.apache.kafka.storage.internals.log.LogDirFailureChannel)
> java.nio.channels.ClosedChannelException
> at java.base/sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:150)
> at java.base/sun.nio.ch.FileChannelImpl.force(FileChannelImpl.java:452)
> at org.apache.kafka.common.record.FileRecords.flush(FileRecords.java:197)
> at kafka.log.LogSegment.$anonfun$flush$1(LogSegment.scala:471)
> at kafka.log.LogSegment.$anonfun$flush$1$adapted(LogSegment.scala:470)
> at com.yammer.metrics.core.Timer.time(Timer.java:91)
> at kafka.log.LogSegment.flush(LogSegment.scala:470)
> at kafka.log.LocalLog.$anonfun$flush$1(LocalLog.scala:174)
> at kafka.log.LocalLog.$anonfun$flush$1$adapted(LocalLog.scala:174)
> at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:575)
> at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:573)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:933)
> at kafka.log.LocalLog.flush(LocalLog.scala:174)
> at kafka.log.UnifiedLog.$anonfun$flush$2(UnifiedLog.scala:1537)
> at kafka.log.UnifiedLog.flush(UnifiedLog.scala:1724)
> at kafka.log.UnifiedLog.flushUptoOffsetExclusive(UnifiedLog.scala:1518)
> at kafka.log.UnifiedLog.$anonfun$roll$1(UnifiedLog.scala:1499)
> at 
> org.apache.kafka.server.util.KafkaScheduler.lambda$schedule$1(KafkaScheduler.java:150)
> at 
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
> at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
> at 
> java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
> at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
> at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
> at java.base/java.lang.Thread.run(Thread.java:829){noformat}
> Investigating kafka code, I believe the root cause is very similar to the one 
> KAFKA-15572(https://issues.apache.org/jira/browse/KAFKA-15572). In our case, 
> we can see via logs that LogManager.scala was able to replace the old log 
> with the new one: 
> {noformat}
> [2024-10-03 09:47:06,349] INFO Attempting to replace current log 
> Log(dir=/data/0/kafka/mytopic-20, topicId=jl0IzzqWSHedIunbTLziYg, 
> topic=mytopic, partition=20, highWatermark=844861246, 
> lastStableOffset=844861246, logStartOffset=842164116, logEndOffset=844861247) 
> with 
> Log(dir=/data/5/kafka/mytopic-20.9fa1dfec1c4a4045b8806565eced19bd-future, 
> topicId=jl0IzzqWSHedIunbTLziYg, topic=mytopic, partition=20, 
> highWatermark=844861246, lastStableOffset=844861246, 
> logStartOffset=842164116, logEndOffset=844861247) for mytopic-20 
> (kafka.log.LogManager)
> INFO Cleaning for partition mytopic-20 is resumed (kafka.log.LogManager)
> [2024-10-03 09:47:06,364] INFO The current replica is successfully replaced 
> with the future replica for mytopic-20 (kafka.log.LogManager){noformat}
> During this process, it closes the old log ([LogManager.scala#L1125|#L1125]) 
> and schedule if it to be deleted.  Something triggers 
> [UnifiedLog.roll|#L1499] and anything that triggers flush after that for this 
> segment will eventually call _LocalLog.flush_ which will try to close each 
> segment to flush 
> ([[LocalLog.scala#L174|https://github.com/apache/kafka/blob/8f0b0b0d0466632b47c355489e7c9440f3e4c0f5/core/src/main/scala/kafka/log/LocalLog.scala#L174).]|[https://github.com/apache/kafka/blob/8f0b0b0d0466632b47c355489e7c9440f3e4c0f5/core/src/main/scala/kafka/log/LocalLog.scala#L174]])
>  Here, however, the segment is already closed and thus the exception. It's 
> not clear to me what triggers the roll. In theory the old log was already 
> replaced and the only thing remaining for this old segments it to be deleted. 
> As this is a high volume topic, however, it's not surprising to have segments 
> rolling frequently. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to