gardnervickers commented on a change in pull request #10896:
URL: https://github.com/apache/kafka/pull/10896#discussion_r655545444
##########
File path: core/src/main/scala/kafka/log/ProducerStateManager.scala
##########
@@ -828,12 +834,41 @@ class ProducerStateManager(val topicPartition:
TopicPartition,
* Removes the producer state snapshot file metadata corresponding to the
provided offset if it exists from this
* ProducerStateManager, and deletes the backing snapshot file.
*/
- private[log] def removeAndDeleteSnapshot(snapshotOffset: Long): Unit = {
+ private def removeAndDeleteSnapshot(snapshotOffset: Long): Unit = {
Option(snapshots.remove(snapshotOffset)).foreach(_.deleteIfExists())
}
+
+ /**
+ * Removes the producer state snapshot file metadata corresponding to the
provided offset if it exists from this
+ * ProducerStateManager, and renames the backing snapshot file to have the
Log.DeletionSuffix.
+ *
+ * Note: This method is safe to use with async deletes. If a race occurs and
the snapshot file
+ * is deleted without this ProducerStateManager instance knowing, the
resulting exception on
+ * SnapshotFile rename will be ignored and None will be returned.
+ */
+ private[log] def removeAndMarkSnapshotForDeletion(snapshotOffset: Long):
Option[SnapshotFile] = {
+ Option(snapshots.remove(snapshotOffset)).flatMap { snapshot => {
+ // If the file cannot be renamed, it likely means that the file was
deleted already.
+ // This can happen due to the way we construct an intermediate producer
state manager
Review comment:
@junrao @kowshik I accidentally included the word `async` here, the
deletion performed by the intermediate ProducerStateManager is done
synchronously.
I'm referring to the case where we go through `LogLoader.recoverSegment`. We
construct a new "intermediate" `ProducerStateManager` for segment recovery
which is separate from the "real" `ProducerStateManager` captured in
`LoadLogParams`.
Segment recovery can use the intermediate `ProducerStateManager` to truncate
snapshot files via `ProducerStateManager.truncateAndReload` in
`Log.rebuildProducerState`. The `SnapshotFile` instances will be removed from
the in-memory map for the "intermediate" `ProducerStateManager` in this case,
but will remain for the "real" `ProducerStateManager` captured in
`LoadLogParams`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]