hachikuji commented on a change in pull request #10786:
URL: https://github.com/apache/kafka/pull/10786#discussion_r651330764



##########
File path: raft/src/main/java/org/apache/kafka/snapshot/SnapshotWriter.java
##########
@@ -85,6 +85,20 @@ public OffsetAndEpoch snapshotId() {
         return snapshot.snapshotId();
     }
 
+    /**
+     * Returns the last log offset which is represented in the snapshot.
+     */
+    public long lastOffsetFromLog() {

Review comment:
       Maybe something like `lastIncludedOffset` or `lastContainedOffset`?

##########
File path: raft/src/test/java/org/apache/kafka/raft/MockLogTest.java
##########
@@ -437,14 +437,89 @@ public void testCreateSnapshot() throws IOException {
         appendBatch(numberOfRecords, epoch);
         log.updateHighWatermark(new LogOffsetMetadata(numberOfRecords));
 
-        try (RawSnapshotWriter snapshot = log.createSnapshot(snapshotId)) {
+        try (RawSnapshotWriter snapshot = log.createSnapshot(snapshotId, 
true).get()) {
             snapshot.freeze();
         }
 
         RawSnapshotReader snapshot = log.readSnapshot(snapshotId).get();
         assertEquals(0, snapshot.sizeInBytes());
     }
 
+    @Test
+    public void testCreateSnapshotValidation() {
+        int numberOfRecords = 10;
+        int firstEpoch = 1;
+        int secondEpoch = 3;
+
+        appendBatch(numberOfRecords, firstEpoch);
+        appendBatch(numberOfRecords, secondEpoch);
+        log.updateHighWatermark(new LogOffsetMetadata(2 * numberOfRecords));
+
+        // Test snapshot id for the first epoch
+        try (RawSnapshotWriter snapshot = log.createSnapshot(new 
OffsetAndEpoch(numberOfRecords, firstEpoch), true).get()) { }
+        try (RawSnapshotWriter snapshot = log.createSnapshot(new 
OffsetAndEpoch(numberOfRecords - 1, firstEpoch), true).get()) { }
+        try (RawSnapshotWriter snapshot = log.createSnapshot(new 
OffsetAndEpoch(1, firstEpoch), true).get()) { }
+
+        // Test snapshot id for the second epoch
+        try (RawSnapshotWriter snapshot = log.createSnapshot(new 
OffsetAndEpoch(2 * numberOfRecords, secondEpoch), true).get()) { }
+        try (RawSnapshotWriter snapshot = log.createSnapshot(new 
OffsetAndEpoch(2 * numberOfRecords - 1, secondEpoch), true).get()) { }
+        try (RawSnapshotWriter snapshot = log.createSnapshot(new 
OffsetAndEpoch(numberOfRecords + 1, secondEpoch), true).get()) { }
+    }
+
+    @Test
+    public void testCreateSnapshotLaterThanHighWatermark() {
+        int numberOfRecords = 10;
+        int epoch = 1;
+
+        appendBatch(numberOfRecords, epoch);
+        log.updateHighWatermark(new LogOffsetMetadata(numberOfRecords));
+
+        assertThrows(
+            IllegalArgumentException.class,
+            () -> log.createSnapshot(new OffsetAndEpoch(numberOfRecords + 1, 
epoch), true)
+        );
+    }
+
+    @Test
+    public void testCreateSnapshotBeforeLogStartOffset() {

Review comment:
       Worth adding any test cases for an invalid epoch?

##########
File path: raft/src/main/java/org/apache/kafka/raft/RaftClient.java
##########
@@ -180,14 +181,17 @@ default void beginShutdown() {}
     void resign(int epoch);
 
     /**
-     * Create a writable snapshot file for a given offset and epoch.
+     * Create a writable snapshot file for a commmitted offset.

Review comment:
       nit: one extra 'm'

##########
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##########
@@ -1101,7 +1101,7 @@ private boolean handleFetchResponse(
                         partitionResponse.snapshotId().epoch()
                     );
 
-                    
state.setFetchingSnapshot(Optional.of(log.createSnapshot(snapshotId)));
+                    state.setFetchingSnapshot(log.createSnapshot(snapshotId, 
false));

Review comment:
       Might be worth a brief comment that the snapshot is expected to be well 
ahead of the current log, so we have to skip validation.

##########
File path: raft/src/main/java/org/apache/kafka/raft/ReplicatedLog.java
##########
@@ -230,12 +230,18 @@ default long truncateToEndOffset(OffsetAndEpoch 
endOffset) {
      * Create a writable snapshot for the given snapshot id.
      *
      * See {@link RawSnapshotWriter} for details on how to use this object. 
The caller of
-     * this method is responsible for invoking {@link 
RawSnapshotWriter#close()}.
+     * this method is responsible for invoking {@link 
RawSnapshotWriter#close()}. If a
+     * snapshot already exists then return an {@link Optional#empty()}.
      *
      * @param snapshotId the end offset and epoch that identifies the snapshot
-     * @return a writable snapshot
+     * @param validate validate the snapshot id against the log
+     * @return a writable snapshot if it doesn't already exists
+     * @throws IllegalArgumentException if validate is true and end offset is 
greater than the
+     *         high-watermark
+     * @throws IllegalArgumentException if validate is true and end offset is 
less than the log
+     *         start offset
      */
-    RawSnapshotWriter createSnapshot(OffsetAndEpoch snapshotId);
+    Optional<RawSnapshotWriter> createSnapshot(OffsetAndEpoch snapshotId, 
boolean validate);

Review comment:
       I don't feel too strongly about it, but I wonder if it is worth having a 
separate API instead of the boolean parameter here. For example, maybe 
`createNewSnapshot` vs `storeSnapshot` or something like that.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to