hachikuji commented on a change in pull request #9819: URL: https://github.com/apache/kafka/pull/9819#discussion_r563916499
########## File path: clients/src/test/java/org/apache/kafka/common/record/UnalignedFileRecordsTest.java ########## @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.record; + + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Iterator; + +import static org.apache.kafka.test.TestUtils.tempFile; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class UnalignedFileRecordsTest { + + private byte[][] values = new byte[][] { + "foo".getBytes(), + "bar".getBytes() + }; + private FileRecords fileRecords; + + @BeforeEach + public void setup() throws IOException { + this.fileRecords = createFileRecords(values); + } + + @Test + public void testWriteTo() throws IOException { + + org.apache.kafka.common.requests.ByteBufferChannel channel = new org.apache.kafka.common.requests.ByteBufferChannel(fileRecords.sizeInBytes()); Review comment: nit: why not import `ByteBufferChannel`? ########## File path: raft/src/main/java/org/apache/kafka/snapshot/RawSnapshotWriter.java ########## @@ -29,39 +31,52 @@ /** * Returns the end offset and epoch for the snapshot. */ - public OffsetAndEpoch snapshotId(); + OffsetAndEpoch snapshotId(); /** * Returns the number of bytes for the snapshot. * * @throws IOException for any IO error while reading the size */ - public long sizeInBytes() throws IOException; + long sizeInBytes() throws IOException; /** - * Fully appends the buffer to the snapshot. + * Fully appends the memory record set to the snapshot. * - * If the method returns without an exception the given buffer was fully writing the + * If the method returns without an exception the given record set was fully writing the * snapshot. * - * @param buffer the buffer to append + * @param records the region to append * @throws IOException for any IO error during append */ - public void append(ByteBuffer buffer) throws IOException; + void append(MemoryRecords records) throws IOException; + + /** + * Fully appends the memory record set to the snapshot, the difference with {@link RawSnapshotWriter#append(MemoryRecords)} + * is that the record set are fetched from leader by FetchSnapshotRequest, so the records are unaligned. + * + * If the method returns without an exception the given records was fully writing the + * snapshot. + * + * @param records the region to append + * @throws IOException for any IO error during append + */ + Review comment: nit: remove newline ########## File path: clients/src/main/java/org/apache/kafka/common/record/FileRecords.java ########## @@ -134,6 +134,27 @@ public void readInto(ByteBuffer buffer, int position) throws IOException { * @return A sliced wrapper on this message set limited based on the given position and size */ public FileRecords slice(int position, int size) throws IOException { + int availableBytes = availableBytes(position, size); + return new FileRecords(file, channel, this.start + position, this.start + position + availableBytes, true); + } + + /** + * Return a slice of records from this instance, the difference with {@link FileRecords#slice(int, int)} is + * that the position is not necessarily on an offset boundary. + * + * This method is reserved for cases where we do not necessarily need to read an entire record batch such as + * fetching snapshot in for raft. Review comment: nit: > This method is reserved for cases where offset alignment is not necessary, such as in the replication of raft snapshots. ########## File path: clients/src/main/java/org/apache/kafka/common/utils/Utils.java ########## @@ -1098,6 +1099,29 @@ public static void writeFully(FileChannel channel, ByteBuffer sourceBuffer) thro channel.write(sourceBuffer); } + /** + * Trying to write data in source buffer to a {@link TransferableChannel}, we may need to call this method multiple + * times since this method doesn't ensure data in source buffer can be fully written to dest channel. Review comment: nit: > ... since this method doesn't ensure the data in the source buffer can be fully written to the destination channel. ########## File path: clients/src/main/java/org/apache/kafka/common/record/FileRecords.java ########## @@ -134,6 +134,27 @@ public void readInto(ByteBuffer buffer, int position) throws IOException { * @return A sliced wrapper on this message set limited based on the given position and size */ public FileRecords slice(int position, int size) throws IOException { + int availableBytes = availableBytes(position, size); + return new FileRecords(file, channel, this.start + position, this.start + position + availableBytes, true); Review comment: nit: since we have `start + position` here a couple times, perhaps we may as well use a field: ``` int startPosition = this.start + position; ``` ########## File path: raft/src/main/java/org/apache/kafka/snapshot/SnapshotWriter.java ########## @@ -146,8 +149,9 @@ public void close() throws IOException { private void appendBatches(List<CompletedBatch<T>> batches) throws IOException { try { - for (CompletedBatch batch : batches) { - snapshot.append(batch.data.buffer()); + for (CompletedBatch<T> batch : batches) { + ByteBuffer buffer = batch.data.buffer(); + snapshot.append(new UnalignedMemoryRecords(buffer)); Review comment: Could we use the overload `snapshot.append(batch.data)`? ########## File path: clients/src/main/java/org/apache/kafka/common/record/FileRecords.java ########## @@ -144,11 +165,11 @@ public FileRecords slice(int position, int size) throws IOException { if (size < 0) throw new IllegalArgumentException("Invalid size: " + size + " in read from " + this); - int end = this.start + position + size; - // handle integer overflow or if end is beyond the end of the file - if (end < 0 || end > start + currentSizeInBytes) - end = start + currentSizeInBytes; - return new FileRecords(file, channel, this.start + position, end, true); + int limit = this.start + position + size; Review comment: nit: my bad for suggesting `limit`. Can we use `end` instead as in the original code? ########## File path: raft/src/test/java/org/apache/kafka/raft/MockLog.java ########## @@ -522,12 +524,21 @@ public long sizeInBytes() { } @Override - public void append(ByteBuffer buffer) { + public void append(UnalignedMemoryRecords records) { if (frozen) { throw new RuntimeException("Snapshot is already frozen " + snapshotId); } + data.write(records.buffer()); + Review comment: nit: remove newline ########## File path: clients/src/test/java/org/apache/kafka/common/record/UnalignedFileRecordsTest.java ########## @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.record; + + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Iterator; + +import static org.apache.kafka.test.TestUtils.tempFile; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class UnalignedFileRecordsTest { + + private byte[][] values = new byte[][] { + "foo".getBytes(), + "bar".getBytes() + }; + private FileRecords fileRecords; + + @BeforeEach + public void setup() throws IOException { + this.fileRecords = createFileRecords(values); Review comment: nit: should we add an `@After` so that we can close the file? ########## File path: clients/src/test/java/org/apache/kafka/common/record/UnalignedFileRecordsTest.java ########## @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.record; + + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Iterator; + +import static org.apache.kafka.test.TestUtils.tempFile; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class UnalignedFileRecordsTest { + + private byte[][] values = new byte[][] { + "foo".getBytes(), + "bar".getBytes() + }; + private FileRecords fileRecords; + + @BeforeEach + public void setup() throws IOException { + this.fileRecords = createFileRecords(values); + } + + @Test + public void testWriteTo() throws IOException { + + org.apache.kafka.common.requests.ByteBufferChannel channel = new org.apache.kafka.common.requests.ByteBufferChannel(fileRecords.sizeInBytes()); + int size = fileRecords.sizeInBytes(); + + UnalignedFileRecords records1 = fileRecords.sliceUnaligned(0, size / 2); + UnalignedFileRecords records2 = fileRecords.sliceUnaligned(size / 2, size - size / 2); Review comment: nit: `size - size / 2` looks a little funny. Is that the same as `size / 2 + 1`? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org