[ 
https://issues.apache.org/jira/browse/KAFKA-7278?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16587106#comment-16587106
 ] 

ASF GitHub Bot commented on KAFKA-7278:
---------------------------------------

lindong28 closed pull request #5535: Cherry-pick KAFKA-7278; replaceSegments() 
should not call asyncDeleteSegment() for segments which have been removed from 
segments list
URL: https://github.com/apache/kafka/pull/5535
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/kafka/log/Log.scala 
b/core/src/main/scala/kafka/log/Log.scala
index 8b62918bc97..9b423ba5933 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -1608,7 +1608,9 @@ class Log(@volatile var dir: File,
   }
 
   /**
-   * Perform an asynchronous delete on the given file if it exists (otherwise 
do nothing)
+   * Perform an asynchronous delete on the given file.
+   *
+   * This method assumes that the file exists and the method is not 
thread-safe.
    *
    * This method does not need to convert IOException (thrown from 
changeFileSuffixes) to KafkaStorageException because
    * it is either called before all logs are loaded or the caller will catch 
and handle IOException
@@ -1655,6 +1657,8 @@ class Log(@volatile var dir: File,
    */
   private[log] def replaceSegments(newSegment: LogSegment, oldSegments: 
Seq[LogSegment], isRecoveredSwapFile: Boolean = false) {
     lock synchronized {
+      val existingOldSegments = oldSegments.filter(seg => 
segments.containsKey(seg.baseOffset))
+
       checkIfMemoryMappedBufferClosed()
       // need to do this in two phases to be crash safe AND do the delete 
asynchronously
       // if we crash in the middle of this we complete the swap in 
loadSegments()
@@ -1663,7 +1667,7 @@ class Log(@volatile var dir: File,
       addSegment(newSegment)
 
       // delete the old files
-      for (seg <- oldSegments) {
+      for (seg <- existingOldSegments) {
         // remove the index entry
         if (seg.baseOffset != newSegment.baseOffset)
           segments.remove(seg.baseOffset)
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala 
b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
index ae949bf6b85..f6001e9f375 100755
--- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
@@ -21,6 +21,7 @@ import java.io.File
 import java.nio._
 import java.nio.file.Paths
 import java.util.Properties
+import java.util.concurrent.{CountDownLatch, TimeUnit}
 
 import kafka.common._
 import kafka.server.{BrokerTopicStats, LogDirFailureChannel}
@@ -88,6 +89,74 @@ class LogCleanerTest extends JUnitSuite {
     assertEquals(expectedBytesRead, stats.bytesRead)
   }
 
+  @Test
+  def testCleanSegmentsWithConcurrentSegmentDeletion(): Unit = {
+    val deleteStartLatch = new CountDownLatch(1)
+    val deleteCompleteLatch = new CountDownLatch(1)
+
+    // Construct a log instance. The replaceSegments() method of the log 
instance is overridden so that
+    // it waits for another thread to execute deleteOldSegments()
+    val logProps = new Properties()
+    logProps.put(LogConfig.SegmentBytesProp, 1024 : java.lang.Integer)
+    logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact + "," + 
LogConfig.Delete)
+    val topicPartition = Log.parseTopicPartitionName(dir)
+    val producerStateManager = new ProducerStateManager(topicPartition, dir)
+    val log = new Log(dir,
+                      config = LogConfig.fromProps(logConfig.originals, 
logProps),
+                      logStartOffset = 0L,
+                      recoveryPoint = 0L,
+                      scheduler = time.scheduler,
+                      brokerTopicStats = new BrokerTopicStats, time,
+                      maxProducerIdExpirationMs = 60 * 60 * 1000,
+                      producerIdExpirationCheckIntervalMs = 
LogManager.ProducerIdExpirationCheckIntervalMs,
+                      topicPartition = topicPartition,
+                      producerStateManager = producerStateManager,
+                      logDirFailureChannel = new LogDirFailureChannel(10)) {
+      override def replaceSegments(newSegment: LogSegment, oldSegments: 
Seq[LogSegment], isRecoveredSwapFile: Boolean = false): Unit = {
+        deleteStartLatch.countDown()
+        if (!deleteCompleteLatch.await(5000, TimeUnit.MILLISECONDS)) {
+          throw new IllegalStateException("Log segment deletion timed out")
+        }
+        super.replaceSegments(newSegment, oldSegments, isRecoveredSwapFile)
+      }
+    }
+
+    // Start a thread which execute log.deleteOldSegments() right before 
replaceSegments() is executed
+    val t = new Thread() {
+      override def run(): Unit = {
+        deleteStartLatch.await(5000, TimeUnit.MILLISECONDS)
+        log.maybeIncrementLogStartOffset(log.activeSegment.baseOffset)
+        log.onHighWatermarkIncremented(log.activeSegment.baseOffset)
+        log.deleteOldSegments()
+        deleteCompleteLatch.countDown()
+      }
+    }
+    t.start()
+
+    // Append records so that segment number increase to 3
+    while (log.numberOfSegments < 3) {
+      log.appendAsLeader(record(key = 0, log.logEndOffset.toInt), leaderEpoch 
= 0)
+      log.roll()
+    }
+    assertEquals(3, log.numberOfSegments)
+
+    // Remember reference to the first log and determine its file name 
expected for async deletion
+    val firstLogFile = log.logSegments.head.log
+    val expectedFileName = CoreUtils.replaceSuffix(firstLogFile.file.getPath, 
"", Log.DeletedFileSuffix)
+
+    // Clean the log. This should trigger replaceSegments() and 
deleteOldSegments();
+    val offsetMap = new FakeOffsetMap(Int.MaxValue)
+    val cleaner = makeCleaner(Int.MaxValue)
+    val segments = log.logSegments(0, log.activeSegment.baseOffset).toSeq
+    val stats = new CleanerStats()
+    cleaner.buildOffsetMap(log, 0, log.activeSegment.baseOffset, offsetMap, 
stats)
+    cleaner.cleanSegments(log, segments, offsetMap, 0L, stats)
+
+    // Validate based on the file name that log segment file is renamed 
exactly once for async deletion
+    assertEquals(expectedFileName, firstLogFile.file().getPath)
+    assertEquals(2, log.numberOfSegments)
+  }
+
   @Test
   def testSizeTrimmedForPreallocatedAndCompactedTopic(): Unit = {
     val originalMaxFileSize = 1024;


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> replaceSegments() should not call asyncDeleteSegment() for segments which 
> have been removed from segments list
> --------------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-7278
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7278
>             Project: Kafka
>          Issue Type: Improvement
>            Reporter: Dong Lin
>            Assignee: Dong Lin
>            Priority: Major
>             Fix For: 1.1.2, 2.0.1, 2.1.0
>
>
> Currently Log.replaceSegments() will call `asyncDeleteSegment(...)` for every 
> segment listed in the `oldSegments`. oldSegments should be constructed from 
> Log.segments and only contain segments listed in Log.segments.
> However, Log.segments may be modified between the time oldSegments is 
> determined to the time Log.replaceSegments() is called. If there are 
> concurrent async deletion of the same log segment file, Log.replaceSegments() 
> will call asyncDeleteSegment() for a segment that does not exist and Kafka 
> server may shutdown the log directory due to NoSuchFileException.
> This is likely the root cause of 
> https://issues.apache.org/jira/browse/KAFKA-6188.
> Given the understanding of the problem, we should be able to fix the issue by 
> only deleting segment if the segment can be found in Log.segments.
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to