junrao commented on a change in pull request #7929:
URL: https://github.com/apache/kafka/pull/7929#discussion_r501325705



##########
File path: core/src/main/scala/kafka/log/ProducerStateManager.scala
##########
@@ -496,6 +491,53 @@ class ProducerStateManager(val topicPartition: 
TopicPartition,
   // completed transactions whose markers are at offsets above the high 
watermark
   private val unreplicatedTxns = new util.TreeMap[Long, TxnMetadata]
 
+  /**
+   * Load producer state snapshots by scanning the _logDir.
+   */
+  private def loadSnapshots(): ConcurrentSkipListMap[java.lang.Long, 
SnapshotFile] = {
+    val tm = new ConcurrentSkipListMap[java.lang.Long, SnapshotFile]()
+    for (f <- ProducerStateManager.listSnapshotFiles(_logDir)) {

Review comment:
       ProducerStateManager.listSnapshotFiles() could just be 
listSnapshotFiles() ?

##########
File path: core/src/main/scala/kafka/log/ProducerStateManager.scala
##########
@@ -496,6 +491,53 @@ class ProducerStateManager(val topicPartition: 
TopicPartition,
   // completed transactions whose markers are at offsets above the high 
watermark
   private val unreplicatedTxns = new util.TreeMap[Long, TxnMetadata]
 
+  /**
+   * Load producer state snapshots by scanning the _logDir.
+   */
+  private def loadSnapshots(): ConcurrentSkipListMap[java.lang.Long, 
SnapshotFile] = {
+    val tm = new ConcurrentSkipListMap[java.lang.Long, SnapshotFile]()
+    for (f <- ProducerStateManager.listSnapshotFiles(_logDir)) {
+      tm.put(f.offset, f)
+    }
+    tm
+  }
+
+  /**
+   * Scans the log directory, gathering all producer state snapshot files. 
Snapshot files which do not have an offset
+   * corresponding to one of the provided offsets in segmentBaseOffsets will 
be removed, except in the case that there
+   * is a snapshot file at a higher offset than any offset in 
segmentBaseOffsets.
+   *
+   * The goal here is to remove any snapshot files which do not have an 
associated segment file, but not to remove

Review comment:
       Incomplete sentence after "but not to remove".

##########
File path: core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
##########
@@ -834,6 +834,40 @@ class ProducerStateManagerTest {
     assertEquals(None, 
stateManager.lastEntry(producerId).get.currentTxnFirstOffset)
   }
 
+  @Test
+  def testRemoveStraySnapshotsKeepCleanShutdownSnapshot(): Unit = {
+    // Test that when stray snapshots are removed, the largest stray snapshot 
is kept around. This covers the case where
+    // the broker shutdown cleanly and emitted a snapshot file larger than the 
base offset of the active segment.
+
+    // Create 3 snapshot files at different offsets.
+    Log.producerSnapshotFile(logDir, 42).createNewFile()
+    Log.producerSnapshotFile(logDir, 5).createNewFile()
+    Log.producerSnapshotFile(logDir, 2).createNewFile()
+
+    // claim that we only have one segment with a base offset of 5
+    stateManager.removeStraySnapshots(Set(5))
+
+    // The snapshot file at offset 2 should be considered a stray, but the 
snapshot at 42 should be kept
+    // around because it is the largest snapshot.
+    assertEquals(Some(42), stateManager.latestSnapshotOffset)
+    assertEquals(Some(5), stateManager.oldestSnapshotOffset)
+    assertEquals(Seq(5, 42), 
ProducerStateManager.listSnapshotFiles(logDir).map(_.offset).sorted)
+  }
+
+  @Test
+  def testRemoveAllStraySnapshots(): Unit = {
+    // Test that when stray snapshots are removed, all stray snapshots are 
removed when the base offset of the largest
+    // segment exceeds the offset of the largest stray snapshot.

Review comment:
       Below, the base offset of the largest segment equals to and doesn't 
exceed the offset of the largest stray snapshot.

##########
File path: core/src/main/scala/kafka/log/ProducerStateManager.scala
##########
@@ -496,6 +491,53 @@ class ProducerStateManager(val topicPartition: 
TopicPartition,
   // completed transactions whose markers are at offsets above the high 
watermark
   private val unreplicatedTxns = new util.TreeMap[Long, TxnMetadata]
 
+  /**
+   * Load producer state snapshots by scanning the _logDir.
+   */
+  private def loadSnapshots(): ConcurrentSkipListMap[java.lang.Long, 
SnapshotFile] = {
+    val tm = new ConcurrentSkipListMap[java.lang.Long, SnapshotFile]()
+    for (f <- ProducerStateManager.listSnapshotFiles(_logDir)) {
+      tm.put(f.offset, f)
+    }
+    tm
+  }
+
+  /**
+   * Scans the log directory, gathering all producer state snapshot files. 
Snapshot files which do not have an offset
+   * corresponding to one of the provided offsets in segmentBaseOffsets will 
be removed, except in the case that there
+   * is a snapshot file at a higher offset than any offset in 
segmentBaseOffsets.
+   *
+   * The goal here is to remove any snapshot files which do not have an 
associated segment file, but not to remove
+   */
+  private[log] def removeStraySnapshots(segmentBaseOffsets: Set[Long]): Unit = 
{
+    var latestStraySnapshot: Option[SnapshotFile] = None
+    val ss = loadSnapshots()
+    for (snapshot <- ss.values().asScala) {
+      val key = snapshot.offset
+      latestStraySnapshot match {
+        case Some(prev) =>
+          if (!segmentBaseOffsets.contains(key)) {
+            // this snapshot is now the largest stray snapshot.
+            prev.deleteIfExists()
+            ss.remove(prev.offset)
+            latestStraySnapshot = Some(snapshot)
+          }
+        case None =>
+          if (!segmentBaseOffsets.contains(key)) {
+            latestStraySnapshot = Some(snapshot)

Review comment:
       Could we also make sure that the offset for latestStraySnapshot is > the 
largest offset in segmentBaseOffsets?
   

##########
File path: core/src/test/scala/unit/kafka/log/LogTest.scala
##########
@@ -1226,6 +1225,104 @@ class LogTest {
     assertEquals(retainedLastSeqOpt, reloadedLastSeqOpt)
   }
 
+  @Test
+  def testRetentionDeletesProducerStateSnapshots(): Unit = {
+    val logConfig = LogTest.createLogConfig(segmentBytes = 2048 * 5, 
retentionBytes = 0, retentionMs = 1000 * 60, fileDeleteDelayMs = 0)
+    val log = createLog(logDir, logConfig)
+    val pid1 = 1L
+    val epoch = 0.toShort
+
+    log.appendAsLeader(TestUtils.records(List(new SimpleRecord("a".getBytes)), 
producerId = pid1,
+      producerEpoch = epoch, sequence = 0), leaderEpoch = 0)
+    log.roll()
+    log.appendAsLeader(TestUtils.records(List(new SimpleRecord("b".getBytes)), 
producerId = pid1,
+      producerEpoch = epoch, sequence = 1), leaderEpoch = 0)
+    log.roll()
+    log.appendAsLeader(TestUtils.records(List(new SimpleRecord("c".getBytes)), 
producerId = pid1,
+      producerEpoch = epoch, sequence = 2), leaderEpoch = 0)
+
+    log.updateHighWatermark(log.logEndOffset)
+
+    assertEquals(2, ProducerStateManager.listSnapshotFiles(logDir).size)
+    // Sleep to breach the retention period
+    mockTime.sleep(1000 * 60 + 1)
+    log.deleteOldSegments()
+    // Sleep to breach the file delete delay and run scheduled file deletion 
tasks
+    mockTime.sleep(1)
+    assertEquals("expect a single producer state snapshot remaining", 1, 
ProducerStateManager.listSnapshotFiles(logDir).size)
+  }
+
+  @Test
+  def testLogStartOffsetMovementDeletesSnapshots(): Unit = {
+    val logConfig = LogTest.createLogConfig(segmentBytes = 2048 * 5, 
retentionBytes = -1, fileDeleteDelayMs = 0)
+    val log = createLog(logDir, logConfig)
+    val pid1 = 1L
+    val epoch = 0.toShort
+
+    log.appendAsLeader(TestUtils.records(List(new SimpleRecord("a".getBytes)), 
producerId = pid1,
+      producerEpoch = epoch, sequence = 0), leaderEpoch = 0)
+    log.roll()
+    log.appendAsLeader(TestUtils.records(List(new SimpleRecord("b".getBytes)), 
producerId = pid1,
+      producerEpoch = epoch, sequence = 1), leaderEpoch = 0)
+    log.roll()
+    log.appendAsLeader(TestUtils.records(List(new SimpleRecord("c".getBytes)), 
producerId = pid1,
+      producerEpoch = epoch, sequence = 2), leaderEpoch = 0)
+    log.updateHighWatermark(log.logEndOffset)
+    assertEquals(2, ProducerStateManager.listSnapshotFiles(logDir).size)
+
+    // Increment the log start offset to exclude the first two segments.
+    log.maybeIncrementLogStartOffset(log.logEndOffset - 1, 
ClientRecordDeletion)
+    log.deleteOldSegments()
+    // Sleep to breach the file delete delay and run scheduled file deletion 
tasks
+    mockTime.sleep(1)
+    assertEquals("expect a single producer state snapshot remaining", 1, 
ProducerStateManager.listSnapshotFiles(logDir).size)
+  }
+
+  @Test
+  def testCompactionDeletesProducerStateSnapshots(): Unit = {
+    val logConfig = LogTest.createLogConfig(segmentBytes = 2048 * 5, 
cleanupPolicy = LogConfig.Compact, fileDeleteDelayMs = 0)
+    val log = createLog(logDir, logConfig)
+    val pid1 = 1L
+    val epoch = 0.toShort
+    val cleaner = new Cleaner(id = 0,
+      offsetMap = new FakeOffsetMap(Int.MaxValue),
+      ioBufferSize = 64 * 1024,
+      maxIoBufferSize = 64 * 1024,
+      dupBufferLoadFactor = 0.75,
+      throttler = new Throttler(Double.MaxValue, Long.MaxValue, false, time = 
mockTime),
+      time = mockTime,
+      checkDone = _ => {})
+
+    log.appendAsLeader(TestUtils.records(List(new SimpleRecord("a".getBytes, 
"a".getBytes())), producerId = pid1,
+      producerEpoch = epoch, sequence = 0), leaderEpoch = 0)
+    log.roll()
+    log.appendAsLeader(TestUtils.records(List(new SimpleRecord("a".getBytes, 
"b".getBytes())), producerId = pid1,
+      producerEpoch = epoch, sequence = 1), leaderEpoch = 0)
+    log.roll()
+    log.appendAsLeader(TestUtils.records(List(new SimpleRecord("a".getBytes, 
"c".getBytes())), producerId = pid1,
+      producerEpoch = epoch, sequence = 2), leaderEpoch = 0)
+    log.updateHighWatermark(log.logEndOffset)
+    assertEquals("expected a snapshot file per segment base offset, except the 
first segment", log.logSegments.map(_.baseOffset).toSeq.sorted.drop(1), 
ProducerStateManager.listSnapshotFiles(logDir).map(_.offset).sorted)
+    assertEquals(2, ProducerStateManager.listSnapshotFiles(logDir).size)
+
+    // Clean segments, this should delete everything except the active segment 
since there only
+    // exists the key "a".
+    cleaner.clean(LogToClean(log.topicPartition, log, 0, log.logEndOffset))
+    log.deleteOldSegments()
+    // Sleep to breach the file delete delay and run scheduled file deletion 
tasks
+    mockTime.sleep(1)
+    assertEquals("expected a snapshot file per segment base offset, excluding 
the first", log.logSegments.map(_.baseOffset).toSeq.sorted.drop(1), 
ProducerStateManager.listSnapshotFiles(logDir).map(_.offset).sorted)
+  }
+
+  @Test
+  def testLoadingLogCleansOrphanedProducerStateSnapshots(): Unit = {
+    val orphanedSnapshotFile = Log.producerSnapshotFile(logDir, 42).toPath
+    Files.createFile(orphanedSnapshotFile)
+    val logConfig = LogTest.createLogConfig(segmentBytes = 2048 * 5, 
retentionBytes = -1, fileDeleteDelayMs = 0)
+    createLog(logDir, logConfig)
+    assertEquals("expected orphaned producer state snapshot file to be cleaned 
up", 0, ProducerStateManager.listSnapshotFiles(logDir).size)

Review comment:
       Hmm, why is orphanedSnapshotFile deleted since we keep the last stray 
snapshot?

##########
File path: core/src/test/scala/unit/kafka/log/LogTest.scala
##########
@@ -1661,14 +1753,17 @@ class LogTest {
     log.roll()
 
     assertEquals(2, log.activeProducersWithLastSequence.size)
-    assertEquals(2, 
ProducerStateManager.listSnapshotFiles(log.producerStateManager.logDir).size)
+    assertEquals(2, ProducerStateManager.listSnapshotFiles(log.dir).size)
 
     log.updateHighWatermark(log.logEndOffset)
     log.maybeIncrementLogStartOffset(2L, ClientRecordDeletion)
+    log.deleteOldSegments() // force retention to kick in so that the snapshot 
files are cleaned up.
+    mockTime.sleep(logConfig.fileDeleteDelayMs + 1000) // advance the clock so 
file deletion takes place
+    mockTime.scheduler.tick()

Review comment:
       mockTime.sleep() calls scheduler.tick() already.

##########
File path: core/src/main/scala/kafka/log/ProducerStateManager.scala
##########
@@ -653,36 +697,44 @@ class ProducerStateManager(val topicPartition: 
TopicPartition,
   def takeSnapshot(): Unit = {
     // If not a new offset, then it is not worth taking another snapshot
     if (lastMapOffset > lastSnapOffset) {
-      val snapshotFile = Log.producerSnapshotFile(logDir, lastMapOffset)
+      val snapshotFile = SnapshotFile(Log.producerSnapshotFile(_logDir, 
lastMapOffset))
       info(s"Writing producer snapshot at offset $lastMapOffset")
-      writeSnapshot(snapshotFile, producers)
+      writeSnapshot(snapshotFile.file, producers)
+      snapshots.put(snapshotFile.offset, snapshotFile)
 
       // Update the last snap offset according to the serialized map
       lastSnapOffset = lastMapOffset
     }
   }
 
+  /**
+   * Update the parentDir for this ProducerStateManager and all of the 
snapshot files which it manages.
+   */
+  def updateParentDir(parentDir: File): Unit ={
+    _logDir = parentDir
+    snapshots.forEach((_, s) => s.updateParentDir(parentDir))
+  }
+
   /**
    * Get the last offset (exclusive) of the latest snapshot file.
    */
-  def latestSnapshotOffset: Option[Long] = latestSnapshotFile.map(file => 
offsetFromFile(file))
+  def latestSnapshotOffset: Option[Long] = latestSnapshotFile.map(_.offset)
 
   /**
    * Get the last offset (exclusive) of the oldest snapshot file.
    */
-  def oldestSnapshotOffset: Option[Long] = oldestSnapshotFile.map(file => 
offsetFromFile(file))
+  def oldestSnapshotOffset: Option[Long] = oldestSnapshotFile.map(_.offset)
 
   /**
-   * When we remove the head of the log due to retention, we need to remove 
snapshots older than
-   * the new log start offset.
+   * Remove any unreplicated transactions lower than the provided 
logStartOffset and bring the lastMapOffset forward
+   * if necessary.
    */
-  def truncateHead(logStartOffset: Long): Unit = {
+  def onLogStartOffsetIncremented(logStartOffset: Long): Unit = {
     removeUnreplicatedTransactions(logStartOffset)
 
     if (lastMapOffset < logStartOffset)
       lastMapOffset = logStartOffset
 
-    deleteSnapshotsBefore(logStartOffset)

Review comment:
       Hmm, why don't we need to delete snapshots before logStartOffset?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to