jsancio commented on code in PR #15671: URL: https://github.com/apache/kafka/pull/15671#discussion_r1586871366
########## raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java: ########## @@ -2415,17 +2476,30 @@ public void resign(int epoch) { @Override public Optional<SnapshotWriter<T>> createSnapshot( OffsetAndEpoch snapshotId, - long lastContainedLogTime + long lastContainedLogTimestamp ) { - return RecordsSnapshotWriter.createWithHeader( - () -> log.createNewSnapshot(snapshotId), - MAX_BATCH_SIZE_BYTES, - memoryPool, - time, - lastContainedLogTime, - CompressionType.NONE, - serde - ); + if (!isInitialized()) { + throw new IllegalStateException("Cannot create snapshot before the replica has been initialized"); + } + + return log.createNewSnapshot(snapshotId).map(writer -> { + long lastContainedLogOffset = snapshotId.offset() - 1; Review Comment: Yes. I have this issue [KAFKA-14620](https://issues.apache.org/jira/browse/KAFKA-14620) to introduce the `SnapshotId` type. I can fix this on that PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org