jsancio commented on a change in pull request #9819:
URL: https://github.com/apache/kafka/pull/9819#discussion_r564124971



##########
File path: clients/src/main/java/org/apache/kafka/common/utils/Utils.java
##########
@@ -1098,6 +1099,29 @@ public static void writeFully(FileChannel channel, 
ByteBuffer sourceBuffer) thro
             channel.write(sourceBuffer);
     }
 
+    /**
+     * Trying to write data in source buffer to a {@link TransferableChannel}, 
we may need to call this method multiple
+     * times since this method doesn't ensure data in source buffer can be 
fully written to dest channel.
+     *
+     * @param destChannel The dest channel
+     * @param position From which the source buffer will be written
+     * @param length The max size of bytes can be written
+     * @param sourceBuffer The source buffer
+     *
+     * @return The length of the actual written data
+     * @throws IOException If an I/O error occurs
+     */
+    public static long tryWriteTo(TransferableChannel destChannel,
+                                  int position,
+                                  int length,
+                                  ByteBuffer sourceBuffer) throws IOException {
+
+        ByteBuffer dup = sourceBuffer.duplicate();
+        dup.position(position);
+        dup.limit(position + length);

Review comment:
       Isn't this `position()` and `limit()` modification the same as 
`ByteBuffer.slice`?
   
   Having said that I find it strange that we want to set the absolute position 
and limit. Shouldn't this be set relative to the current position? For example 
think about `sourceBuffer.position() > position`.

##########
File path: clients/src/main/java/org/apache/kafka/common/utils/Utils.java
##########
@@ -1098,6 +1099,29 @@ public static void writeFully(FileChannel channel, 
ByteBuffer sourceBuffer) thro
             channel.write(sourceBuffer);
     }
 
+    /**
+     * Trying to write data in source buffer to a {@link TransferableChannel}, 
we may need to call this method multiple
+     * times since this method doesn't ensure data in source buffer can be 
fully written to dest channel.
+     *
+     * @param destChannel The dest channel
+     * @param position From which the source buffer will be written
+     * @param length The max size of bytes can be written
+     * @param sourceBuffer The source buffer
+     *
+     * @return The length of the actual written data
+     * @throws IOException If an I/O error occurs
+     */
+    public static long tryWriteTo(TransferableChannel destChannel,
+                                  int position,
+                                  int length,
+                                  ByteBuffer sourceBuffer) throws IOException {

Review comment:
       How about removing the IO from this method with a signature like:
   ```java
   public static ByteBuffer relativeSlice(ByteBuffer buffer, int position, int 
length);
   ```
   The caller of this method can then `long written = 
destChannel.write(Utils.relativeSlice(buffer, position, length))`.

##########
File path: 
clients/src/test/java/org/apache/kafka/common/record/UnalignedFileRecordsTest.java
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.record;
+
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+
+import static org.apache.kafka.test.TestUtils.tempFile;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class UnalignedFileRecordsTest {
+
+    private byte[][] values = new byte[][] {
+            "foo".getBytes(),
+            "bar".getBytes()
+    };
+    private FileRecords fileRecords;
+
+    @BeforeEach
+    public void setup() throws IOException {
+        this.fileRecords = createFileRecords(values);
+    }
+
+    @Test
+    public void testWriteTo() throws IOException {
+
+        org.apache.kafka.common.requests.ByteBufferChannel channel = new 
org.apache.kafka.common.requests.ByteBufferChannel(fileRecords.sizeInBytes());
+        int size = fileRecords.sizeInBytes();
+
+        UnalignedFileRecords records1 = fileRecords.sliceUnaligned(0, size / 
2);
+        UnalignedFileRecords records2 = fileRecords.sliceUnaligned(size / 2, 
size - size / 2);

Review comment:
       `4 - 4/2 = 2` vs `4/2 + 1 = 3`.

##########
File path: 
clients/src/main/java/org/apache/kafka/common/record/UnalignedFileRecords.java
##########
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.record;
+
+import org.apache.kafka.common.network.TransferableChannel;
+
+import java.io.IOException;
+import java.nio.channels.FileChannel;
+
+/**
+ * Represents a file record set which is not necessarily offset-aligned
+ */
+public class UnalignedFileRecords implements UnalignedRecords {
+
+    private final FileChannel channel;
+    private final long position;
+    private final int size;
+
+    public UnalignedFileRecords(FileChannel channel, long position, int size) {
+        this.channel = channel;
+        this.position = position;
+        this.size = size;
+    }
+
+    @Override
+    public int sizeInBytes() {
+        return size;
+    }
+
+    @Override
+    public long writeTo(TransferableChannel destChannel, long 
previouslyWritten, int remaining) throws IOException {

Review comment:
       Hmm. The semantic between the in-memory implementation vs the file 
channel implementation is slightly different. The in-memory version throws an 
exception if there are not enough bytes to send `length`. While the file 
channel version just sends less bytes if there is not enough to send 
`remaining`.
   
   I suspect that we want to throw in both cases else the calling code won't 
know why `writeTo` sent less bytes. It could loop forever if it is attempting 
to send up to `remaining` bytes.

##########
File path: raft/src/test/java/org/apache/kafka/snapshot/FileRawSnapshotTest.java
##########
@@ -107,6 +112,48 @@ public void testWriteReadSnapshot() throws IOException {
         }
     }
 
+    @Test
+    public void testPartialWriteReadSnapshot() throws IOException {
+        Path tempDir = TestUtils.tempDirectory().toPath();
+        OffsetAndEpoch offsetAndEpoch = new OffsetAndEpoch(10L, 3);
+
+        ByteBuffer records = 
buildRecords(ByteBuffer.wrap(Utils.utf8("foo"))).buffer();
+
+        ByteBuffer expectedBuffer = ByteBuffer.wrap(records.array());
+
+        ByteBuffer buffer1 = expectedBuffer.duplicate();
+        buffer1.position(0);
+        buffer1.limit(expectedBuffer.limit() / 2);
+        ByteBuffer buffer2 = expectedBuffer.duplicate();
+        buffer2.position(expectedBuffer.limit() / 2);
+        buffer2.limit(expectedBuffer.limit());
+
+        try (FileRawSnapshotWriter snapshot = 
FileRawSnapshotWriter.create(tempDir, offsetAndEpoch)) {
+
+            snapshot.append(new UnalignedMemoryRecords(buffer1));
+            snapshot.append(new UnalignedMemoryRecords(buffer2));
+            snapshot.freeze();
+        }
+
+        try (FileRawSnapshotReader snapshot = 
FileRawSnapshotReader.open(tempDir, offsetAndEpoch)) {
+

Review comment:
       Extra newline.

##########
File path: raft/src/test/java/org/apache/kafka/snapshot/FileRawSnapshotTest.java
##########
@@ -107,6 +112,48 @@ public void testWriteReadSnapshot() throws IOException {
         }
     }
 
+    @Test
+    public void testPartialWriteReadSnapshot() throws IOException {
+        Path tempDir = TestUtils.tempDirectory().toPath();
+        OffsetAndEpoch offsetAndEpoch = new OffsetAndEpoch(10L, 3);
+
+        ByteBuffer records = 
buildRecords(ByteBuffer.wrap(Utils.utf8("foo"))).buffer();
+
+        ByteBuffer expectedBuffer = ByteBuffer.wrap(records.array());
+
+        ByteBuffer buffer1 = expectedBuffer.duplicate();
+        buffer1.position(0);
+        buffer1.limit(expectedBuffer.limit() / 2);
+        ByteBuffer buffer2 = expectedBuffer.duplicate();
+        buffer2.position(expectedBuffer.limit() / 2);
+        buffer2.limit(expectedBuffer.limit());
+
+        try (FileRawSnapshotWriter snapshot = 
FileRawSnapshotWriter.create(tempDir, offsetAndEpoch)) {
+

Review comment:
       Extra newline.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to