[jira] [Commented] (KAFKA-7278) replaceSegments() should not call asyncDeleteSegment() for segments which have been removed from segments list
[ https://issues.apache.org/jira/browse/KAFKA-7278?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16601552#comment-16601552 ] Stephane Maarek commented on KAFKA-7278: [~lindong] Unfortunately, this does not fix https://issues.apache.org/jira/browse/KAFKA-1194 I'll post the details there... > replaceSegments() should not call asyncDeleteSegment() for segments which > have been removed from segments list > -- > > Key: KAFKA-7278 > URL: https://issues.apache.org/jira/browse/KAFKA-7278 > Project: Kafka > Issue Type: Improvement >Reporter: Dong Lin >Assignee: Dong Lin >Priority: Major > Fix For: 1.1.2, 2.0.1, 2.1.0 > > > Currently Log.replaceSegments() will call `asyncDeleteSegment(...)` for every > segment listed in the `oldSegments`. oldSegments should be constructed from > Log.segments and only contain segments listed in Log.segments. > However, Log.segments may be modified between the time oldSegments is > determined to the time Log.replaceSegments() is called. If there are > concurrent async deletion of the same log segment file, Log.replaceSegments() > will call asyncDeleteSegment() for a segment that does not exist and Kafka > server may shutdown the log directory due to NoSuchFileException. > This is likely the root cause of > https://issues.apache.org/jira/browse/KAFKA-6188. > Given the understanding of the problem, we should be able to fix the issue by > only deleting segment if the segment can be found in Log.segments. > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7278) replaceSegments() should not call asyncDeleteSegment() for segments which have been removed from segments list
[ https://issues.apache.org/jira/browse/KAFKA-7278?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16598948#comment-16598948 ] Dong Lin commented on KAFKA-7278: - [~niob] The stacktrace in that Jira seems similar to the issue fixed here. So there is good chance that we have fixed that issue as well. > replaceSegments() should not call asyncDeleteSegment() for segments which > have been removed from segments list > -- > > Key: KAFKA-7278 > URL: https://issues.apache.org/jira/browse/KAFKA-7278 > Project: Kafka > Issue Type: Improvement >Reporter: Dong Lin >Assignee: Dong Lin >Priority: Major > Fix For: 1.1.2, 2.0.1, 2.1.0 > > > Currently Log.replaceSegments() will call `asyncDeleteSegment(...)` for every > segment listed in the `oldSegments`. oldSegments should be constructed from > Log.segments and only contain segments listed in Log.segments. > However, Log.segments may be modified between the time oldSegments is > determined to the time Log.replaceSegments() is called. If there are > concurrent async deletion of the same log segment file, Log.replaceSegments() > will call asyncDeleteSegment() for a segment that does not exist and Kafka > server may shutdown the log directory due to NoSuchFileException. > This is likely the root cause of > https://issues.apache.org/jira/browse/KAFKA-6188. > Given the understanding of the problem, we should be able to fix the issue by > only deleting segment if the segment can be found in Log.segments. > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7278) replaceSegments() should not call asyncDeleteSegment() for segments which have been removed from segments list
[ https://issues.apache.org/jira/browse/KAFKA-7278?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16598540#comment-16598540 ] Christoph Schmidt commented on KAFKA-7278: -- This ticket has raised questions in the comments of ancient KAFKA-1194 (which knows three old PRs) - does it by chance fix that issue, too? Root cause over there is that rename-while-still-open blows up under windows, with the only available workaround being to completely disable the log cleaner. > replaceSegments() should not call asyncDeleteSegment() for segments which > have been removed from segments list > -- > > Key: KAFKA-7278 > URL: https://issues.apache.org/jira/browse/KAFKA-7278 > Project: Kafka > Issue Type: Improvement >Reporter: Dong Lin >Assignee: Dong Lin >Priority: Major > Fix For: 1.1.2, 2.0.1, 2.1.0 > > > Currently Log.replaceSegments() will call `asyncDeleteSegment(...)` for every > segment listed in the `oldSegments`. oldSegments should be constructed from > Log.segments and only contain segments listed in Log.segments. > However, Log.segments may be modified between the time oldSegments is > determined to the time Log.replaceSegments() is called. If there are > concurrent async deletion of the same log segment file, Log.replaceSegments() > will call asyncDeleteSegment() for a segment that does not exist and Kafka > server may shutdown the log directory due to NoSuchFileException. > This is likely the root cause of > https://issues.apache.org/jira/browse/KAFKA-6188. > Given the understanding of the problem, we should be able to fix the issue by > only deleting segment if the segment can be found in Log.segments. > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7278) replaceSegments() should not call asyncDeleteSegment() for segments which have been removed from segments list
[ https://issues.apache.org/jira/browse/KAFKA-7278?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16587106#comment-16587106 ] ASF GitHub Bot commented on KAFKA-7278: --- lindong28 closed pull request #5535: Cherry-pick KAFKA-7278; replaceSegments() should not call asyncDeleteSegment() for segments which have been removed from segments list URL: https://github.com/apache/kafka/pull/5535 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 8b62918bc97..9b423ba5933 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -1608,7 +1608,9 @@ class Log(@volatile var dir: File, } /** - * Perform an asynchronous delete on the given file if it exists (otherwise do nothing) + * Perform an asynchronous delete on the given file. + * + * This method assumes that the file exists and the method is not thread-safe. * * This method does not need to convert IOException (thrown from changeFileSuffixes) to KafkaStorageException because * it is either called before all logs are loaded or the caller will catch and handle IOException @@ -1655,6 +1657,8 @@ class Log(@volatile var dir: File, */ private[log] def replaceSegments(newSegment: LogSegment, oldSegments: Seq[LogSegment], isRecoveredSwapFile: Boolean = false) { lock synchronized { + val existingOldSegments = oldSegments.filter(seg => segments.containsKey(seg.baseOffset)) + checkIfMemoryMappedBufferClosed() // need to do this in two phases to be crash safe AND do the delete asynchronously // if we crash in the middle of this we complete the swap in loadSegments() @@ -1663,7 +1667,7 @@ class Log(@volatile var dir: File, addSegment(newSegment) // delete the old files - for (seg <- oldSegments) { + for (seg <- existingOldSegments) { // remove the index entry if (seg.baseOffset != newSegment.baseOffset) segments.remove(seg.baseOffset) diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala index ae949bf6b85..f6001e9f375 100755 --- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala @@ -21,6 +21,7 @@ import java.io.File import java.nio._ import java.nio.file.Paths import java.util.Properties +import java.util.concurrent.{CountDownLatch, TimeUnit} import kafka.common._ import kafka.server.{BrokerTopicStats, LogDirFailureChannel} @@ -88,6 +89,74 @@ class LogCleanerTest extends JUnitSuite { assertEquals(expectedBytesRead, stats.bytesRead) } + @Test + def testCleanSegmentsWithConcurrentSegmentDeletion(): Unit = { +val deleteStartLatch = new CountDownLatch(1) +val deleteCompleteLatch = new CountDownLatch(1) + +// Construct a log instance. The replaceSegments() method of the log instance is overridden so that +// it waits for another thread to execute deleteOldSegments() +val logProps = new Properties() +logProps.put(LogConfig.SegmentBytesProp, 1024 : java.lang.Integer) +logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact + "," + LogConfig.Delete) +val topicPartition = Log.parseTopicPartitionName(dir) +val producerStateManager = new ProducerStateManager(topicPartition, dir) +val log = new Log(dir, + config = LogConfig.fromProps(logConfig.originals, logProps), + logStartOffset = 0L, + recoveryPoint = 0L, + scheduler = time.scheduler, + brokerTopicStats = new BrokerTopicStats, time, + maxProducerIdExpirationMs = 60 * 60 * 1000, + producerIdExpirationCheckIntervalMs = LogManager.ProducerIdExpirationCheckIntervalMs, + topicPartition = topicPartition, + producerStateManager = producerStateManager, + logDirFailureChannel = new LogDirFailureChannel(10)) { + override def replaceSegments(newSegment: LogSegment, oldSegments: Seq[LogSegment], isRecoveredSwapFile: Boolean = false): Unit = { +deleteStartLatch.countDown() +if (!deleteCompleteLatch.await(5000, TimeUnit.MILLISECONDS)) { + throw new IllegalStateException("Log segment deletion timed out") +} +super.replaceSegments(newSegment, oldSegments, isRecoveredSwapFile) + } +} + +// Start a thread which execute log.deleteOldSegments() right before replaceSegments() is executed +val t = new Thread() { +
[jira] [Commented] (KAFKA-7278) replaceSegments() should not call asyncDeleteSegment() for segments which have been removed from segments list
[ https://issues.apache.org/jira/browse/KAFKA-7278?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16586362#comment-16586362 ] ASF GitHub Bot commented on KAFKA-7278: --- lindong28 opened a new pull request #5535: Cherry-pick KAFKA-7278; replaceSegments() should not call asyncDeleteSegment() for segments which have been removed from segments list URL: https://github.com/apache/kafka/pull/5535 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > replaceSegments() should not call asyncDeleteSegment() for segments which > have been removed from segments list > -- > > Key: KAFKA-7278 > URL: https://issues.apache.org/jira/browse/KAFKA-7278 > Project: Kafka > Issue Type: Improvement >Reporter: Dong Lin >Assignee: Dong Lin >Priority: Major > Fix For: 2.0.1 > > > Currently Log.replaceSegments() will call `asyncDeleteSegment(...)` for every > segment listed in the `oldSegments`. oldSegments should be constructed from > Log.segments and only contain segments listed in Log.segments. > However, Log.segments may be modified between the time oldSegments is > determined to the time Log.replaceSegments() is called. If there are > concurrent async deletion of the same log segment file, Log.replaceSegments() > will call asyncDeleteSegment() for a segment that does not exist and Kafka > server may shutdown the log directory due to NoSuchFileException. > This is likely the root cause of > https://issues.apache.org/jira/browse/KAFKA-6188. > Given the understanding of the problem, we should be able to fix the issue by > only deleting segment if the segment can be found in Log.segments. > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7278) replaceSegments() should not call asyncDeleteSegment() for segments which have been removed from segments list
[ https://issues.apache.org/jira/browse/KAFKA-7278?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16586320#comment-16586320 ] ASF GitHub Bot commented on KAFKA-7278: --- hachikuji closed pull request #5491: KAFKA-7278; replaceSegments() should not call asyncDeleteSegment() for segments which have been removed from segments list URL: https://github.com/apache/kafka/pull/5491 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index e4be8fcc43d..afe151d69b6 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -1740,7 +1740,9 @@ class Log(@volatile var dir: File, } /** - * Perform an asynchronous delete on the given file if it exists (otherwise do nothing) + * Perform an asynchronous delete on the given file. + * + * This method assumes that the file exists and the method is not thread-safe. * * This method does not need to convert IOException (thrown from changeFileSuffixes) to KafkaStorageException because * it is either called before all logs are loaded or the caller will catch and handle IOException @@ -1791,10 +1793,13 @@ class Log(@volatile var dir: File, * @param isRecoveredSwapFile true if the new segment was created from a swap file during recovery after a crash */ private[log] def replaceSegments(newSegments: Seq[LogSegment], oldSegments: Seq[LogSegment], isRecoveredSwapFile: Boolean = false) { -val sortedNewSegments = newSegments.sortBy(_.baseOffset) -val sortedOldSegments = oldSegments.sortBy(_.baseOffset) - lock synchronized { + val sortedNewSegments = newSegments.sortBy(_.baseOffset) + // Some old segments may have been removed from index and scheduled for async deletion after the caller reads segments + // but before this method is executed. We want to filter out those segments to avoid calling asyncDeleteSegment() + // multiple times for the same segment. + val sortedOldSegments = oldSegments.filter(seg => segments.containsKey(seg.baseOffset)).sortBy(_.baseOffset) + checkIfMemoryMappedBufferClosed() // need to do this in two phases to be crash safe AND do the delete asynchronously // if we crash in the middle of this we complete the swap in loadSegments() diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala index b351311b329..0240707ca3b 100755 --- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala @@ -21,6 +21,7 @@ import java.io.{File, RandomAccessFile} import java.nio._ import java.nio.file.Paths import java.util.Properties +import java.util.concurrent.{CountDownLatch, TimeUnit} import kafka.common._ import kafka.server.{BrokerTopicStats, LogDirFailureChannel} @@ -89,6 +90,74 @@ class LogCleanerTest extends JUnitSuite { assertEquals(expectedBytesRead, stats.bytesRead) } + @Test + def testCleanSegmentsWithConcurrentSegmentDeletion(): Unit = { +val deleteStartLatch = new CountDownLatch(1) +val deleteCompleteLatch = new CountDownLatch(1) + +// Construct a log instance. The replaceSegments() method of the log instance is overridden so that +// it waits for another thread to execute deleteOldSegments() +val logProps = new Properties() +logProps.put(LogConfig.SegmentBytesProp, 1024 : java.lang.Integer) +logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact + "," + LogConfig.Delete) +val topicPartition = Log.parseTopicPartitionName(dir) +val producerStateManager = new ProducerStateManager(topicPartition, dir) +val log = new Log(dir, + config = LogConfig.fromProps(logConfig.originals, logProps), + logStartOffset = 0L, + recoveryPoint = 0L, + scheduler = time.scheduler, + brokerTopicStats = new BrokerTopicStats, time, + maxProducerIdExpirationMs = 60 * 60 * 1000, + producerIdExpirationCheckIntervalMs = LogManager.ProducerIdExpirationCheckIntervalMs, + topicPartition = topicPartition, + producerStateManager = producerStateManager, + logDirFailureChannel = new LogDirFailureChannel(10)) { + override def replaceSegments(newSegments: Seq[LogSegment], oldSegments: Seq[LogSegment], isRecoveredSwapFile: Boolean = false): Unit = { +deleteStartLatch.countDown() +if (!deleteCompleteLatch.await(5000, TimeUnit.MILLISECONDS)) { +
[jira] [Commented] (KAFKA-7278) replaceSegments() should not call asyncDeleteSegment() for segments which have been removed from segments list
[ https://issues.apache.org/jira/browse/KAFKA-7278?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16577735#comment-16577735 ] ASF GitHub Bot commented on KAFKA-7278: --- lindong28 opened a new pull request #5491: KAFKA-7278; replaceSegments() should not call asyncDeleteSegment() for segments which have been removed from segments list URL: https://github.com/apache/kafka/pull/5491 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > replaceSegments() should not call asyncDeleteSegment() for segments which > have been removed from segments list > -- > > Key: KAFKA-7278 > URL: https://issues.apache.org/jira/browse/KAFKA-7278 > Project: Kafka > Issue Type: Improvement >Reporter: Dong Lin >Assignee: Dong Lin >Priority: Major > > Currently Log.replaceSegments() will call `asyncDeleteSegment(...)` for every > segment listed in the `oldSegments`. oldSegments should be constructed from > Log.segments and only contain segments listed in Log.segments. > However, Log.segments may be modified between the time oldSegments is > determined to the time Log.replaceSegments() is called. If there are > concurrent async deletion of the same log segment file, Log.replaceSegments() > will call asyncDeleteSegment() for a segment that does not exist and Kafka > server may shutdown the log directory due to NoSuchFileException. > This is likely the root cause of > https://issues.apache.org/jira/browse/KAFKA-6188. > Given the understanding of the problem, we should be able to fix the issue by > only deleting segment if the segment can be found in Log.segments. > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7278) replaceSegments() should not call asyncDeleteSegment() for segments which have been removed from segments list
[ https://issues.apache.org/jira/browse/KAFKA-7278?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16577707#comment-16577707 ] Dong Lin commented on KAFKA-7278: - Yes `segment.changeFileSuffixes("", Log.DeletedFileSuffix)` is executed when the lock is hold. But the lock is released between step 2), 3) and 4) in the example sequence provided above. > replaceSegments() should not call asyncDeleteSegment() for segments which > have been removed from segments list > -- > > Key: KAFKA-7278 > URL: https://issues.apache.org/jira/browse/KAFKA-7278 > Project: Kafka > Issue Type: Improvement >Reporter: Dong Lin >Assignee: Dong Lin >Priority: Major > > Currently Log.replaceSegments() will call `asyncDeleteSegment(...)` for every > segment listed in the `oldSegments`. oldSegments should be constructed from > Log.segments and only contain segments listed in Log.segments. > However, Log.segments may be modified between the time oldSegments is > determined to the time Log.replaceSegments() is called. If there are > concurrent async deletion of the same log segment file, Log.replaceSegments() > will call asyncDeleteSegment() for a segment that does not exist and Kafka > server may shutdown the log directory due to NoSuchFileException. > This is likely the root cause of > https://issues.apache.org/jira/browse/KAFKA-6188. > Given the understanding of the problem, we should be able to fix the issue by > only deleting segment if the segment can be found in Log.segments. > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7278) replaceSegments() should not call asyncDeleteSegment() for segments which have been removed from segments list
[ https://issues.apache.org/jira/browse/KAFKA-7278?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16577706#comment-16577706 ] Dong Lin commented on KAFKA-7278: - [~ijuma] The exception is probably thrown from `segment.changeFileSuffixes("", Log.DeletedFileSuffix)`. Below is the stacktrace in the discussion of https://issues.apache.org/jira/browse/KAFKA-6188. {code} [2018-05-07 16:53:06,721] ERROR Failed to clean up log for __consumer_offsets-24 in dir /tmp/kafka-logs due to IOException (kafka.server.LogDirFailureChannel) java.nio.file.NoSuchFileException: /tmp/kafka-logs/__consumer_offsets-24/.log at sun.nio.fs.UnixException.translateToIOException(UnixException.java:86) at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102) at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107) at sun.nio.fs.UnixCopyFile.move(UnixCopyFile.java:409) at sun.nio.fs.UnixFileSystemProvider.move(UnixFileSystemProvider.java:262) at java.nio.file.Files.move(Files.java:1395) at org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:697) at org.apache.kafka.common.record.FileRecords.renameTo(FileRecords.java:212) at kafka.log.LogSegment.changeFileSuffixes(LogSegment.scala:415) at kafka.log.Log.asyncDeleteSegment(Log.scala:1601) at kafka.log.Log.$anonfun$replaceSegments$1(Log.scala:1653) at kafka.log.Log.$anonfun$replaceSegments$1$adapted(Log.scala:1648) at scala.collection.immutable.List.foreach(List.scala:389) at kafka.log.Log.replaceSegments(Log.scala:1648) at kafka.log.Cleaner.cleanSegments(LogCleaner.scala:535) at kafka.log.Cleaner.$anonfun$doClean$6(LogCleaner.scala:462) at kafka.log.Cleaner.$anonfun$doClean$6$adapted(LogCleaner.scala:461) at scala.collection.immutable.List.foreach(List.scala:389) at kafka.log.Cleaner.doClean(LogCleaner.scala:461) at kafka.log.Cleaner.clean(LogCleaner.scala:438) at kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:305) at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:291) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82) Suppressed: java.nio.file.NoSuchFileException: /tmp/kafka-logs/__consumer_offsets-24/.log -> /tmp/kafka-logs/__consumer_offsets-24/.log.deleted at sun.nio.fs.UnixException.translateToIOException(UnixException.java:86) at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102) at sun.nio.fs.UnixCopyFile.move(UnixCopyFile.java:396) at sun.nio.fs.UnixFileSystemProvider.move(UnixFileSystemProvider.java:262) at java.nio.file.Files.move(Files.java:1395) at org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:694) ... 16 more [2018-05-07 16:53:06,725] INFO [ReplicaManager broker=0] Stopping serving replicas in dir /tmp/kafka-logs (kafka.server.ReplicaManager) [2018-05-07 16:53:06,762] INFO Stopping serving logs in dir /tmp/kafka-logs (kafka.log.LogManager) [2018-05-07 16:53:07,032] ERROR Shutdown broker because all log dirs in /tmp/kafka-logs have failed (kafka.log.LogManager) {code} > replaceSegments() should not call asyncDeleteSegment() for segments which > have been removed from segments list > -- > > Key: KAFKA-7278 > URL: https://issues.apache.org/jira/browse/KAFKA-7278 > Project: Kafka > Issue Type: Improvement >Reporter: Dong Lin >Assignee: Dong Lin >Priority: Major > > Currently Log.replaceSegments() will call `asyncDeleteSegment(...)` for every > segment listed in the `oldSegments`. oldSegments should be constructed from > Log.segments and only contain segments listed in Log.segments. > However, Log.segments may be modified between the time oldSegments is > determined to the time Log.replaceSegments() is called. If there are > concurrent async deletion of the same log segment file, Log.replaceSegments() > will call asyncDeleteSegment() for a segment that does not exist and Kafka > server may shutdown the log directory due to NoSuchFileException. > This is likely the root cause of > https://issues.apache.org/jira/browse/KAFKA-6188. > Given the understanding of the problem, we should be able to fix the issue by > only deleting segment if the segment can be found in Log.segments. > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7278) replaceSegments() should not call asyncDeleteSegment() for segments which have been removed from segments list
[ https://issues.apache.org/jira/browse/KAFKA-7278?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16577484#comment-16577484 ] Ismael Juma commented on KAFKA-7278: Can you please clarify which part of the code is the problem? {code} private def asyncDeleteSegment(segment: LogSegment) { segment.changeFileSuffixes("", Log.DeletedFileSuffix) def deleteSeg() { info(s"Deleting segment ${segment.baseOffset}") maybeHandleIOException(s"Error while deleting segments for $topicPartition in dir ${dir.getParent}") { segment.deleteIfExists() } } scheduler.schedule("delete-file", deleteSeg _, delay = config.fileDeleteDelayMs) } {code} `segment.deleteIfExists()` should not throw an exception if the file doesn't exist (this is the code that I changed some time ago). There rest executes with the lock held. That's why I suspected you were seeing an issue that has since been fixed. But I might be missing something. > replaceSegments() should not call asyncDeleteSegment() for segments which > have been removed from segments list > -- > > Key: KAFKA-7278 > URL: https://issues.apache.org/jira/browse/KAFKA-7278 > Project: Kafka > Issue Type: Improvement >Reporter: Dong Lin >Assignee: Dong Lin >Priority: Major > > Currently Log.replaceSegments() will call `asyncDeleteSegment(...)` for every > segment listed in the `oldSegments`. oldSegments should be constructed from > Log.segments and only contain segments listed in Log.segments. > However, Log.segments may be modified between the time oldSegments is > determined to the time Log.replaceSegments() is called. If there are > concurrent async deletion of the same log segment file, Log.replaceSegments() > will call asyncDeleteSegment() for a segment that does not exist and Kafka > server may shutdown the log directory due to NoSuchFileException. > This is likely the root cause of > https://issues.apache.org/jira/browse/KAFKA-6188. > Given the understanding of the problem, we should be able to fix the issue by > only deleting segment if the segment can be found in Log.segments. > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7278) replaceSegments() should not call asyncDeleteSegment() for segments which have been removed from segments list
[ https://issues.apache.org/jira/browse/KAFKA-7278?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16577335#comment-16577335 ] Dong Lin commented on KAFKA-7278: - [~ijuma] Yeah the latest code in trunk seems to have this issue. The following sequence of events may happen: 1) There is segment1, segment2 and segment 3 for a given partition 2) LogCleaner determines to merge segment1 and segment2 into segment3 and will call Log.replaceSegments(..., oldSegments=[segment1, segment2]) 3) Log retention is triggered and Log.deleteSegment(segment=segment1) is called and executed. This renames the files for segment1 from log directory. 4) Log.replaceSegments(oldSegments=[segment1, segment2]) is executed and Log.asyncDeleteSegment(segment1) is executed, which fails to find files for segment1 and throws IOException. > replaceSegments() should not call asyncDeleteSegment() for segments which > have been removed from segments list > -- > > Key: KAFKA-7278 > URL: https://issues.apache.org/jira/browse/KAFKA-7278 > Project: Kafka > Issue Type: Improvement >Reporter: Dong Lin >Assignee: Dong Lin >Priority: Major > > Currently Log.replaceSegments() will call `asyncDeleteSegment(...)` for every > segment listed in the `oldSegments`. oldSegments should be constructed from > Log.segments and only contain segments listed in Log.segments. > However, Log.segments may be modified between the time oldSegments is > determined to the time Log.replaceSegments() is called. If there are > concurrent async deletion of the same log segment file, Log.replaceSegments() > will call asyncDeleteSegment() for a segment that does not exist and Kafka > server may shutdown the log directory due to NoSuchFileException. > This is likely the root cause of > https://issues.apache.org/jira/browse/KAFKA-6188. > Given the understanding of the problem, we should be able to fix the issue by > only deleting segment if the segment can be found in Log.segments. > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7278) replaceSegments() should not call asyncDeleteSegment() for segments which have been removed from segments list
[ https://issues.apache.org/jira/browse/KAFKA-7278?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16577280#comment-16577280 ] Ismael Juma commented on KAFKA-7278: Is this the case in trunk? I remember we fixed one issue like what you describe, but there could be more. > replaceSegments() should not call asyncDeleteSegment() for segments which > have been removed from segments list > -- > > Key: KAFKA-7278 > URL: https://issues.apache.org/jira/browse/KAFKA-7278 > Project: Kafka > Issue Type: Improvement >Reporter: Dong Lin >Assignee: Dong Lin >Priority: Major > > Currently Log.replaceSegments() will call `asyncDeleteSegment(...)` for every > segment listed in the `oldSegments`. oldSegments should be constructed from > Log.segments and only contain segments listed in Log.segments. > However, Log.segments may be modified between the time oldSegments is > determined to the time Log.replaceSegments() is called. If there are > concurrent async deletion of the same log segment file, Log.replaceSegments() > will call asyncDeleteSegment() for a segment that does not exist and Kafka > server may shutdown the log directory due to NoSuchFileException. > This is likely the root cause of > https://issues.apache.org/jira/browse/KAFKA-6188. > Given the understanding of the problem, we should be able to fix the issue by > only deleting segment if the segment can be found in Log.segments. > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)