hachikuji commented on a change in pull request #9819:
URL: https://github.com/apache/kafka/pull/9819#discussion_r561625246



##########
File path: 
raft/src/main/java/org/apache/kafka/snapshot/FileRawSnapshotWriter.java
##########
@@ -53,13 +56,20 @@ public long sizeInBytes() throws IOException {
     }
 
     @Override
-    public void append(ByteBuffer buffer) throws IOException {
+    public void append(BaseRecords records) throws IOException {
         if (frozen) {
             throw new IllegalStateException(
-                String.format("Append is not supported. Snapshot is already 
frozen: id = %s; temp path = %s", snapshotId, tempSnapshotPath)
+                    String.format("Append is not supported. Snapshot is 
already frozen: id = %s; temp path = %s", snapshotId, tempSnapshotPath)
             );
         }
-
+        ByteBuffer buffer;
+        if (records instanceof MemoryRecords) {
+            buffer = ((MemoryRecords) records).buffer();
+        } else {

Review comment:
       I feel more inclined to raise an exception if we get a `BaseRecords` 
type that is not `MemoryRecords`. If we really get an unexpected file in here, 
then we need to reconsider the IO model instead of hiding a big copy. We could 
even make the expectation explicit in the parameter type even if it is not 100% 
symmetric with `RawSnapshotReader`.

##########
File path: clients/src/main/java/org/apache/kafka/common/utils/Utils.java
##########
@@ -1093,9 +1093,11 @@ public static final void readFully(InputStream 
inputStream, ByteBuffer destinati
         destinationBuffer.position(destinationBuffer.position() + 
totalBytesRead);
     }
 
-    public static void writeFully(FileChannel channel, ByteBuffer 
sourceBuffer) throws IOException {
+    public static int writeFully(FileChannel channel, ByteBuffer sourceBuffer) 
throws IOException {
+        int size = 0;

Review comment:
       Hmm.. Not sure we need to compute this. Wouldn't it be the same as 
`sourceBuffer.remaining()`?

##########
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##########
@@ -1220,34 +1216,35 @@ private FetchSnapshotResponseData 
handleFetchSnapshotRequest(
         if (!snapshotOpt.isPresent()) {
             return FetchSnapshotResponse.singleton(
                 log.topicPartition(),
-                responsePartitionSnapshot -> {
-                    return addQuorumLeader(responsePartitionSnapshot)
-                        .setErrorCode(Errors.SNAPSHOT_NOT_FOUND.code());
-                }
+                responsePartitionSnapshot -> 
addQuorumLeader(responsePartitionSnapshot)
+                    .setErrorCode(Errors.SNAPSHOT_NOT_FOUND.code())
             );
         }
 
         try (RawSnapshotReader snapshot = snapshotOpt.get()) {
             if (partitionSnapshot.position() < 0 || 
partitionSnapshot.position() >= snapshot.sizeInBytes()) {
                 return FetchSnapshotResponse.singleton(
                     log.topicPartition(),
-                    responsePartitionSnapshot -> {
-                        return addQuorumLeader(responsePartitionSnapshot)
-                            .setErrorCode(Errors.POSITION_OUT_OF_RANGE.code());
-                    }
+                    responsePartitionSnapshot -> 
addQuorumLeader(responsePartitionSnapshot)
+                        .setErrorCode(Errors.POSITION_OUT_OF_RANGE.code())
                 );
             }
 
             int maxSnapshotSize;
+            int maxSnapshotPosition;
             try {
                 maxSnapshotSize = Math.toIntExact(snapshot.sizeInBytes());
             } catch (ArithmeticException e) {
                 maxSnapshotSize = Integer.MAX_VALUE;
             }
 
-            ByteBuffer buffer = ByteBuffer.allocate(Math.min(data.maxBytes(), 
maxSnapshotSize));
-            snapshot.read(buffer, partitionSnapshot.position());
-            buffer.flip();
+            try {
+                maxSnapshotPosition = 
Math.toIntExact(partitionSnapshot.position());
+            } catch (ArithmeticException e) {
+                maxSnapshotPosition = Integer.MAX_VALUE;

Review comment:
       I agree we should probably throw this. Snapshot size limits are an 
interesting point which I hadn't thought about. Currently `FileRecords` does 
not support files which are larger than Int.MaxValue. That gives us a 2GB 
limit. My feeling is that is probably good enough initially, but perhaps that 
adds some fuel for the effort to generalize the zero-copy support.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to