gardnervickers commented on a change in pull request #7929:
URL: https://github.com/apache/kafka/pull/7929#discussion_r501942512



##########
File path: core/src/main/scala/kafka/log/ProducerStateManager.scala
##########
@@ -653,36 +697,44 @@ class ProducerStateManager(val topicPartition: 
TopicPartition,
   def takeSnapshot(): Unit = {
     // If not a new offset, then it is not worth taking another snapshot
     if (lastMapOffset > lastSnapOffset) {
-      val snapshotFile = Log.producerSnapshotFile(logDir, lastMapOffset)
+      val snapshotFile = SnapshotFile(Log.producerSnapshotFile(_logDir, 
lastMapOffset))
       info(s"Writing producer snapshot at offset $lastMapOffset")
-      writeSnapshot(snapshotFile, producers)
+      writeSnapshot(snapshotFile.file, producers)
+      snapshots.put(snapshotFile.offset, snapshotFile)
 
       // Update the last snap offset according to the serialized map
       lastSnapOffset = lastMapOffset
     }
   }
 
+  /**
+   * Update the parentDir for this ProducerStateManager and all of the 
snapshot files which it manages.
+   */
+  def updateParentDir(parentDir: File): Unit ={
+    _logDir = parentDir
+    snapshots.forEach((_, s) => s.updateParentDir(parentDir))
+  }
+
   /**
    * Get the last offset (exclusive) of the latest snapshot file.
    */
-  def latestSnapshotOffset: Option[Long] = latestSnapshotFile.map(file => 
offsetFromFile(file))
+  def latestSnapshotOffset: Option[Long] = latestSnapshotFile.map(_.offset)
 
   /**
    * Get the last offset (exclusive) of the oldest snapshot file.
    */
-  def oldestSnapshotOffset: Option[Long] = oldestSnapshotFile.map(file => 
offsetFromFile(file))
+  def oldestSnapshotOffset: Option[Long] = oldestSnapshotFile.map(_.offset)
 
   /**
-   * When we remove the head of the log due to retention, we need to remove 
snapshots older than
-   * the new log start offset.
+   * Remove any unreplicated transactions lower than the provided 
logStartOffset and bring the lastMapOffset forward
+   * if necessary.
    */
-  def truncateHead(logStartOffset: Long): Unit = {
+  def onLogStartOffsetIncremented(logStartOffset: Long): Unit = {
     removeUnreplicatedTransactions(logStartOffset)
 
     if (lastMapOffset < logStartOffset)
       lastMapOffset = logStartOffset
 
-    deleteSnapshotsBefore(logStartOffset)

Review comment:
       The idea here is to clear un-replicated transactions and optionally 
advance the `lastMapOffset` and `lastSnapOffset` when the logStartOffset is 
advanced, but to leave the snapshot files around. The corresponding snapshot 
files should be removed during the retention pass as we cleanup the associated 
segment files. 
   
   I was attempting to optimize incrementing the logStartOffset a bit so that 
we don't need to delete the snapshot files from the request handler thread when 
handling `DELETE_RECORDS`. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to