jsancio commented on a change in pull request #9816:
URL: https://github.com/apache/kafka/pull/9816#discussion_r557563029



##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -147,18 +199,106 @@ class KafkaMetadataLog(
   }
 
   override def createSnapshot(snapshotId: raft.OffsetAndEpoch): 
RawSnapshotWriter = {
-    FileRawSnapshotWriter.create(log.dir.toPath, snapshotId)
+    // TODO: Talk to Jason about truncation past the high-watermark since it 
can lead to truncation past snapshots.
+    // This can result in the leader having a snapshot that is less that the 
follower's snapshot. I think that the Raft
+    // Client checks against this and aborts. If so, then this check and 
exception is okay.
+
+    // Do let the state machine create snapshots older than the latest snapshot
+    latestSnapshotId().ifPresent { latest =>
+      if (latest.epoch > snapshotId.epoch || latest.offset > 
snapshotId.offset) {
+        // Since snapshots are less than the high-watermark absolute offset 
comparison is okay.
+        throw new IllegalArgumentException(
+          s"Attemting to create a snapshot ($snapshotId) that is not greater 
than the latest snapshot ($latest)"
+        )
+      }
+    }
+
+    FileRawSnapshotWriter.create(log.dir.toPath, snapshotId, Optional.of(this))
   }
 
   override def readSnapshot(snapshotId: raft.OffsetAndEpoch): 
Optional[RawSnapshotReader] = {
     try {
-      Optional.of(FileRawSnapshotReader.open(log.dir.toPath, snapshotId))
+      if (snapshotIds.contains(snapshotId)) {
+        Optional.of(FileRawSnapshotReader.open(log.dir.toPath, snapshotId))
+      } else {
+        Optional.empty()
+      }
     } catch {
-      case e: NoSuchFileException => Optional.empty()
+      case e: NoSuchFileException =>
+        Optional.empty()
     }
   }
 
+  override def latestSnapshotId(): Optional[raft.OffsetAndEpoch] = {
+    try {
+      Optional.of(snapshotIds.last)
+    } catch {
+      case _: NoSuchElementException =>
+        Optional.empty()
+    }
+  }
+
+  override def startSnapshotId(): Optional[raft.OffsetAndEpoch] = {

Review comment:
       Sounds good. In this PR it is not technically the oldest snapshot but it 
will be when we implement deleting snapshot in 
https://issues.apache.org/jira/browse/KAFKA-12205




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to