jsancio commented on a change in pull request #10431:
URL: https://github.com/apache/kafka/pull/10431#discussion_r630370013



##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -242,85 +248,116 @@ final class KafkaMetadataLog private (
   }
 
   override def readSnapshot(snapshotId: OffsetAndEpoch): 
Optional[RawSnapshotReader] = {
-    try {
-      if (snapshotIds.contains(snapshotId)) {
-        Optional.of(FileRawSnapshotReader.open(log.dir.toPath, snapshotId))
-      } else {
-        Optional.empty()
+    snapshots synchronized {
+      val reader = snapshots.get(snapshotId) match {
+        case None =>
+          // Snapshot doesn't exists
+          None
+        case Some(None) =>
+          // Snapshot exists but has never been read before
+          try {
+            val snapshotReader = 
Some(FileRawSnapshotReader.open(log.dir.toPath, snapshotId))
+            snapshots.put(snapshotId, snapshotReader)
+            snapshotReader
+          } catch {
+            case _: NoSuchFileException =>
+              // Snapshot doesn't exists in the data dir; remove
+              val path = Snapshots.snapshotPath(log.dir.toPath, snapshotId)
+              warn(s"Couldn't read $snapshotId; expected to find snapshot file 
$path")
+              snapshots.remove(snapshotId)
+              None
+          }
+        case Some(value) =>
+          // Snapshot exists and it is already open; do nothing
+          value
       }
-    } catch {
-      case _: NoSuchFileException =>
-        Optional.empty()
+
+      reader.asJava.asInstanceOf[Optional[RawSnapshotReader]]
     }
   }
 
   override def latestSnapshotId(): Optional[OffsetAndEpoch] = {
-    val descending = snapshotIds.descendingIterator
-    if (descending.hasNext) {
-      Optional.of(descending.next)
-    } else {
-      Optional.empty()
+    snapshots synchronized {
+      snapshots.lastOption.map { case (snapshotId, _) => snapshotId }.asJava
     }
   }
 
   override def earliestSnapshotId(): Optional[OffsetAndEpoch] = {
-    val ascendingIterator = snapshotIds.iterator
-    if (ascendingIterator.hasNext) {
-      Optional.of(ascendingIterator.next)
-    } else {
-      Optional.empty()
+    snapshots synchronized {
+      snapshots.headOption.map { case (snapshotId, _) => snapshotId }.asJava
     }
   }
 
   override def onSnapshotFrozen(snapshotId: OffsetAndEpoch): Unit = {
-    snapshotIds.add(snapshotId)
+    snapshots synchronized {
+      snapshots.put(snapshotId, None)
+    }
   }
 
   override def deleteBeforeSnapshot(logStartSnapshotId: OffsetAndEpoch): 
Boolean = {
-    latestSnapshotId().asScala match {
-      case Some(snapshotId) if (snapshotIds.contains(logStartSnapshotId) &&
-        startOffset < logStartSnapshotId.offset &&
-        logStartSnapshotId.offset <= snapshotId.offset &&
-        log.maybeIncrementLogStartOffset(logStartSnapshotId.offset, 
SnapshotGenerated)) =>
-        log.deleteOldSegments()
+    val (deleted, forgottenSnapshots) = snapshots synchronized {
+      latestSnapshotId().asScala match {
+        case Some(snapshotId) if (snapshots.contains(logStartSnapshotId) &&
+          startOffset < logStartSnapshotId.offset &&
+          logStartSnapshotId.offset <= snapshotId.offset &&
+          log.maybeIncrementLogStartOffset(logStartSnapshotId.offset, 
SnapshotGenerated)) =>
+
+          // Delete all segments that have a "last offset" less than the log 
start offset
+          log.deleteOldSegments()
 
-        // Delete snapshot after increasing LogStartOffset
-        removeSnapshotFilesBefore(logStartSnapshotId)
+          // Forget snapshots less than the log start offset
+          (true, forgetSnapshotsBefore(logStartSnapshotId))
+        case _ =>
+          (false, mutable.TreeMap.empty[OffsetAndEpoch, 
Option[FileRawSnapshotReader]])
+      }
+    }
 
-        true
+    removeSnapshots(forgottenSnapshots)
+    deleted
+  }
 
-      case _ => false
-    }
+  /**
+   * Forget the snapshots earlier than a given snapshot id and return the 
associated
+   * snapshot readers.
+   *
+   * This method assumes that the lock for `snapshots` is ready held.

Review comment:
       Fixed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to