gardnervickers commented on a change in pull request #7929:
URL: https://github.com/apache/kafka/pull/7929#discussion_r501942512
##########
File path: core/src/main/scala/kafka/log/ProducerStateManager.scala
##########
@@ -653,36 +697,44 @@ class ProducerStateManager(val topicPartition:
TopicPartition,
def takeSnapshot(): Unit = {
// If not a new offset, then it is not worth taking another snapshot
if (lastMapOffset > lastSnapOffset) {
- val snapshotFile = Log.producerSnapshotFile(logDir, lastMapOffset)
+ val snapshotFile = SnapshotFile(Log.producerSnapshotFile(_logDir,
lastMapOffset))
info(s"Writing producer snapshot at offset $lastMapOffset")
- writeSnapshot(snapshotFile, producers)
+ writeSnapshot(snapshotFile.file, producers)
+ snapshots.put(snapshotFile.offset, snapshotFile)
// Update the last snap offset according to the serialized map
lastSnapOffset = lastMapOffset
}
}
+ /**
+ * Update the parentDir for this ProducerStateManager and all of the
snapshot files which it manages.
+ */
+ def updateParentDir(parentDir: File): Unit ={
+ _logDir = parentDir
+ snapshots.forEach((_, s) => s.updateParentDir(parentDir))
+ }
+
/**
* Get the last offset (exclusive) of the latest snapshot file.
*/
- def latestSnapshotOffset: Option[Long] = latestSnapshotFile.map(file =>
offsetFromFile(file))
+ def latestSnapshotOffset: Option[Long] = latestSnapshotFile.map(_.offset)
/**
* Get the last offset (exclusive) of the oldest snapshot file.
*/
- def oldestSnapshotOffset: Option[Long] = oldestSnapshotFile.map(file =>
offsetFromFile(file))
+ def oldestSnapshotOffset: Option[Long] = oldestSnapshotFile.map(_.offset)
/**
- * When we remove the head of the log due to retention, we need to remove
snapshots older than
- * the new log start offset.
+ * Remove any unreplicated transactions lower than the provided
logStartOffset and bring the lastMapOffset forward
+ * if necessary.
*/
- def truncateHead(logStartOffset: Long): Unit = {
+ def onLogStartOffsetIncremented(logStartOffset: Long): Unit = {
removeUnreplicatedTransactions(logStartOffset)
if (lastMapOffset < logStartOffset)
lastMapOffset = logStartOffset
- deleteSnapshotsBefore(logStartOffset)
Review comment:
The idea here is to clear un-replicated transactions and optionally
advance the `lastMapOffset` and `lastSnapOffset` when the logStartOffset is
advanced, but to leave the snapshot files around. The corresponding snapshot
files should be removed during the retention pass as we cleanup the associated
segment files.
I was attempting to optimize incrementing the logStartOffset a bit so that
we don't need to delete the snapshot files from the request handler thread when
handling `DELETE_RECORDS`.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]