hachikuji commented on a change in pull request #10786: URL: https://github.com/apache/kafka/pull/10786#discussion_r651330764
########## File path: raft/src/main/java/org/apache/kafka/snapshot/SnapshotWriter.java ########## @@ -85,6 +85,20 @@ public OffsetAndEpoch snapshotId() { return snapshot.snapshotId(); } + /** + * Returns the last log offset which is represented in the snapshot. + */ + public long lastOffsetFromLog() { Review comment: Maybe something like `lastIncludedOffset` or `lastContainedOffset`? ########## File path: raft/src/test/java/org/apache/kafka/raft/MockLogTest.java ########## @@ -437,14 +437,89 @@ public void testCreateSnapshot() throws IOException { appendBatch(numberOfRecords, epoch); log.updateHighWatermark(new LogOffsetMetadata(numberOfRecords)); - try (RawSnapshotWriter snapshot = log.createSnapshot(snapshotId)) { + try (RawSnapshotWriter snapshot = log.createSnapshot(snapshotId, true).get()) { snapshot.freeze(); } RawSnapshotReader snapshot = log.readSnapshot(snapshotId).get(); assertEquals(0, snapshot.sizeInBytes()); } + @Test + public void testCreateSnapshotValidation() { + int numberOfRecords = 10; + int firstEpoch = 1; + int secondEpoch = 3; + + appendBatch(numberOfRecords, firstEpoch); + appendBatch(numberOfRecords, secondEpoch); + log.updateHighWatermark(new LogOffsetMetadata(2 * numberOfRecords)); + + // Test snapshot id for the first epoch + try (RawSnapshotWriter snapshot = log.createSnapshot(new OffsetAndEpoch(numberOfRecords, firstEpoch), true).get()) { } + try (RawSnapshotWriter snapshot = log.createSnapshot(new OffsetAndEpoch(numberOfRecords - 1, firstEpoch), true).get()) { } + try (RawSnapshotWriter snapshot = log.createSnapshot(new OffsetAndEpoch(1, firstEpoch), true).get()) { } + + // Test snapshot id for the second epoch + try (RawSnapshotWriter snapshot = log.createSnapshot(new OffsetAndEpoch(2 * numberOfRecords, secondEpoch), true).get()) { } + try (RawSnapshotWriter snapshot = log.createSnapshot(new OffsetAndEpoch(2 * numberOfRecords - 1, secondEpoch), true).get()) { } + try (RawSnapshotWriter snapshot = log.createSnapshot(new OffsetAndEpoch(numberOfRecords + 1, secondEpoch), true).get()) { } + } + + @Test + public void testCreateSnapshotLaterThanHighWatermark() { + int numberOfRecords = 10; + int epoch = 1; + + appendBatch(numberOfRecords, epoch); + log.updateHighWatermark(new LogOffsetMetadata(numberOfRecords)); + + assertThrows( + IllegalArgumentException.class, + () -> log.createSnapshot(new OffsetAndEpoch(numberOfRecords + 1, epoch), true) + ); + } + + @Test + public void testCreateSnapshotBeforeLogStartOffset() { Review comment: Worth adding any test cases for an invalid epoch? ########## File path: raft/src/main/java/org/apache/kafka/raft/RaftClient.java ########## @@ -180,14 +181,17 @@ default void beginShutdown() {} void resign(int epoch); /** - * Create a writable snapshot file for a given offset and epoch. + * Create a writable snapshot file for a commmitted offset. Review comment: nit: one extra 'm' ########## File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java ########## @@ -1101,7 +1101,7 @@ private boolean handleFetchResponse( partitionResponse.snapshotId().epoch() ); - state.setFetchingSnapshot(Optional.of(log.createSnapshot(snapshotId))); + state.setFetchingSnapshot(log.createSnapshot(snapshotId, false)); Review comment: Might be worth a brief comment that the snapshot is expected to be well ahead of the current log, so we have to skip validation. ########## File path: raft/src/main/java/org/apache/kafka/raft/ReplicatedLog.java ########## @@ -230,12 +230,18 @@ default long truncateToEndOffset(OffsetAndEpoch endOffset) { * Create a writable snapshot for the given snapshot id. * * See {@link RawSnapshotWriter} for details on how to use this object. The caller of - * this method is responsible for invoking {@link RawSnapshotWriter#close()}. + * this method is responsible for invoking {@link RawSnapshotWriter#close()}. If a + * snapshot already exists then return an {@link Optional#empty()}. * * @param snapshotId the end offset and epoch that identifies the snapshot - * @return a writable snapshot + * @param validate validate the snapshot id against the log + * @return a writable snapshot if it doesn't already exists + * @throws IllegalArgumentException if validate is true and end offset is greater than the + * high-watermark + * @throws IllegalArgumentException if validate is true and end offset is less than the log + * start offset */ - RawSnapshotWriter createSnapshot(OffsetAndEpoch snapshotId); + Optional<RawSnapshotWriter> createSnapshot(OffsetAndEpoch snapshotId, boolean validate); Review comment: I don't feel too strongly about it, but I wonder if it is worth having a separate API instead of the boolean parameter here. For example, maybe `createNewSnapshot` vs `storeSnapshot` or something like that. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org