jsancio commented on a change in pull request #10899: URL: https://github.com/apache/kafka/pull/10899#discussion_r654704639
########## File path: clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java ########## @@ -664,4 +666,55 @@ private static void writeLeaderChangeMessage(ByteBuffer buffer, builder.close(); } + public static MemoryRecords withSnapshotHeaderRecord( + long initialOffset, + long timestamp, + int leaderEpoch, + ByteBuffer buffer, + MetadataSnapshotHeaderRecord snapshotHeaderRecord) { Review comment: Apache Kafka doesn't have a style guide but newer code tends to have 4 spaces for argument indentation and the sequence `) {` in its own line. E.g. ``` void functionName( Type argument, AnotherType anotherArgument ) { // Code here } ``` This comment applies to a few places in this PR. ########## File path: raft/src/main/java/org/apache/kafka/snapshot/SnapshotWriter.java ########## @@ -135,9 +210,12 @@ public void append(List<T> records) { /** * Freezes the snapshot by flushing all pending writes and marking it as immutable. + * + * Also adds a {@link MetadataSnapshotFooterRecord} to the end of the snapshot */ public void freeze() { appendBatches(accumulator.drain()); + finalizeSnapshotWithFooter(); Review comment: I think we need to do this before appending the batches above. ########## File path: raft/src/main/java/org/apache/kafka/snapshot/SnapshotWriter.java ########## @@ -78,6 +83,76 @@ public SnapshotWriter( ); } + /** + * Adds a {@link MetadataSnapshotHeaderRecord} to snapshot + * + * @throws IllegalStateException if the snapshot is not empty + */ + private void initializeSnapshotWithHeader() { + if (snapshot.sizeInBytes() != 0) { + throw new IllegalStateException("Initializing new snapshot (ID: " + + snapshot.snapshotId().epoch + ", " + snapshot.snapshotId().offset + + ") with a non-empty file"); + } + MetadataSnapshotHeaderRecord headerRecord = new MetadataSnapshotHeaderRecord() + .setVersion(ControlRecordUtils.METADATA_SNAPSHOT_HEADER_VERSION); + accumulator.appendSnapshotHeaderMessage(headerRecord); + accumulator.forceDrain(); + } + + /** + * Add a {@link MetadataSnapshotFooterRecord} to the snapshot + * + * No more records should be appended to the snapshot after calling this method + * + * @throws IllegalStateException if the snapshot is empty (no header) + */ + private void finalizeSnapshotWithFooter() { + if (snapshot.sizeInBytes() == 0) { Review comment: Hmm. Not sure but I would argue that this check is always false since there is only once "constructor" and that always appends at least one record batch. We can confirm in the tests that this is always true. ########## File path: clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java ########## @@ -664,4 +666,55 @@ private static void writeLeaderChangeMessage(ByteBuffer buffer, builder.close(); } + public static MemoryRecords withSnapshotHeaderRecord( + long initialOffset, + long timestamp, + int leaderEpoch, + ByteBuffer buffer, + MetadataSnapshotHeaderRecord snapshotHeaderRecord) { + writeSnapshotHeaderRecord(buffer, initialOffset, timestamp, leaderEpoch, snapshotHeaderRecord); + buffer.flip(); + return MemoryRecords.readableRecords(buffer); + } Review comment: Extra spaces. ########## File path: raft/src/main/java/org/apache/kafka/snapshot/SnapshotWriter.java ########## @@ -55,7 +60,7 @@ * @param compressionType the compression algorithm to use * @param serde the record serialization and deserialization implementation */ Review comment: Let's remove this Java Doc since you moved it to the static method and the constructor is private. ########## File path: raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java ########## @@ -233,6 +235,68 @@ public void appendLeaderChangeMessage(LeaderChangeMessage leaderChangeMessage, l } } + + public void appendSnapshotHeaderMessage(MetadataSnapshotHeaderRecord snapshotHeaderRecord) { + appendLock.lock(); + try { + // Ideally there should be nothing in the batch here. + // TODO verify this Review comment: At this layer I think the best we can do is `forceDrain()`. It should be a noop if there are no records in the current batch. ########## File path: raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java ########## @@ -233,6 +235,68 @@ public void appendLeaderChangeMessage(LeaderChangeMessage leaderChangeMessage, l } } + + public void appendSnapshotHeaderMessage(MetadataSnapshotHeaderRecord snapshotHeaderRecord) { + appendLock.lock(); + try { + // Ideally there should be nothing in the batch here. + // TODO verify this + long currentTimeMs = time.milliseconds(); + ByteBuffer buffer = memoryPool.tryAllocate(256); + if (buffer != null) { + MemoryRecords data = MemoryRecords.withSnapshotHeaderRecord( + this.nextOffset, + currentTimeMs, + this.epoch, + buffer, + snapshotHeaderRecord + ); + completed.add(new CompletedBatch<>( + nextOffset, + 1, + data, + memoryPool, + buffer + )); + nextOffset += 1; + } else { + throw new IllegalStateException("Could not allocate buffer for the metadata snapshot header record."); + } + } finally { + appendLock.unlock(); + } + } + + public void appendSnapshotFooterMessage(MetadataSnapshotFooterRecord snapshotFooterRecord) { + appendLock.lock(); + try { + forceDrain(); + long currentTimeMs = time.milliseconds(); + ByteBuffer buffer = memoryPool.tryAllocate(256); + if (buffer != null) { + MemoryRecords data = MemoryRecords.withSnapshotFooterRecord( + this.nextOffset, + currentTimeMs, + this.epoch, + buffer, + snapshotFooterRecord + ); + completed.add(new CompletedBatch<>( + nextOffset, + 1, + data, + memoryPool, + buffer + )); + nextOffset += 1; + } else { + throw new IllegalStateException("Could not allocate buffer for the metadata snapshot footer record."); + } + } finally { + appendLock.unlock(); + } + } Review comment: There is code duplication between these 3 methods. Let's figure out a way to remove this duplicate code. ########## File path: raft/src/main/java/org/apache/kafka/snapshot/SnapshotWriter.java ########## @@ -78,6 +83,76 @@ public SnapshotWriter( ); } + /** + * Adds a {@link MetadataSnapshotHeaderRecord} to snapshot + * + * @throws IllegalStateException if the snapshot is not empty + */ + private void initializeSnapshotWithHeader() { + if (snapshot.sizeInBytes() != 0) { + throw new IllegalStateException("Initializing new snapshot (ID: " + + snapshot.snapshotId().epoch + ", " + snapshot.snapshotId().offset + + ") with a non-empty file"); Review comment: You can rely on `OffsetAndEpoch::toString`. For example, `String.format("Initializing with non-empty snapshot (%s)", snapshot.snapshotId())`. Changed the message because it is possible that the `RawSnapshotWriter` is not a file but an in-memory snapshot. This comment always applies to the footer message. ########## File path: raft/src/main/java/org/apache/kafka/snapshot/SnapshotWriter.java ########## @@ -78,6 +83,76 @@ public SnapshotWriter( ); } + /** + * Adds a {@link MetadataSnapshotHeaderRecord} to snapshot + * + * @throws IllegalStateException if the snapshot is not empty + */ + private void initializeSnapshotWithHeader() { + if (snapshot.sizeInBytes() != 0) { + throw new IllegalStateException("Initializing new snapshot (ID: " + + snapshot.snapshotId().epoch + ", " + snapshot.snapshotId().offset + + ") with a non-empty file"); + } + MetadataSnapshotHeaderRecord headerRecord = new MetadataSnapshotHeaderRecord() + .setVersion(ControlRecordUtils.METADATA_SNAPSHOT_HEADER_VERSION); + accumulator.appendSnapshotHeaderMessage(headerRecord); + accumulator.forceDrain(); + } + + /** + * Add a {@link MetadataSnapshotFooterRecord} to the snapshot + * + * No more records should be appended to the snapshot after calling this method + * + * @throws IllegalStateException if the snapshot is empty (no header) + */ + private void finalizeSnapshotWithFooter() { + if (snapshot.sizeInBytes() == 0) { + throw new IllegalStateException("Finalizing snapshot (ID: " + + snapshot.snapshotId().epoch + ", " + snapshot.snapshotId().offset + + ") without a header"); + } + MetadataSnapshotFooterRecord footerRecord = new MetadataSnapshotFooterRecord() + .setVersion(ControlRecordUtils.METADATA_SNAPSHOT_FOOTER_VERSION); + accumulator.appendSnapshotFooterMessage(footerRecord); + accumulator.forceDrain(); + } + + /** + * Create an instance of this class and initialize + * the underlying snapshot with {@link MetadataSnapshotHeaderRecord} + * + * @param snapshot a lambda to create the low level snapshot writer + * @param maxBatchSize the maximum size in byte for a batch + * @param memoryPool the memory pool for buffer allocation + * @param time the clock implementation + * @param compressionType the compression algorithm to use + * @param serde the record serialization and deserialization implementation + * @return {@link Optional}{@link SnapshotWriter} + */ + public static <T> Optional<SnapshotWriter<T>> createWithHeader( + Supplier<Optional<RawSnapshotWriter>> supplier, + int maxBatchSize, + MemoryPool memoryPool, + Time snapshotTime, + CompressionType compressionType, + RecordSerde<T> serde) { + Optional<SnapshotWriter<T>> writer = supplier.get().map(snapshot -> { + return new SnapshotWriter<T>( + snapshot, + maxBatchSize, + memoryPool, + snapshotTime, + CompressionType.NONE, + serde); + }); + if (writer.isPresent()) { + writer.get().initializeSnapshotWithHeader(); + } Review comment: I think you can use `ifPresent`. E.g. ``` writer.ifPresent(SnapshotWriter::initializeSnapshotWithHeader) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org