hachikuji commented on a change in pull request #9819: URL: https://github.com/apache/kafka/pull/9819#discussion_r561625246
########## File path: raft/src/main/java/org/apache/kafka/snapshot/FileRawSnapshotWriter.java ########## @@ -53,13 +56,20 @@ public long sizeInBytes() throws IOException { } @Override - public void append(ByteBuffer buffer) throws IOException { + public void append(BaseRecords records) throws IOException { if (frozen) { throw new IllegalStateException( - String.format("Append is not supported. Snapshot is already frozen: id = %s; temp path = %s", snapshotId, tempSnapshotPath) + String.format("Append is not supported. Snapshot is already frozen: id = %s; temp path = %s", snapshotId, tempSnapshotPath) ); } - + ByteBuffer buffer; + if (records instanceof MemoryRecords) { + buffer = ((MemoryRecords) records).buffer(); + } else { Review comment: I feel more inclined to raise an exception if we get a `BaseRecords` type that is not `MemoryRecords`. If we really get an unexpected file in here, then we need to reconsider the IO model instead of hiding a big copy. We could even make the expectation explicit in the parameter type even if it is not 100% symmetric with `RawSnapshotReader`. ########## File path: clients/src/main/java/org/apache/kafka/common/utils/Utils.java ########## @@ -1093,9 +1093,11 @@ public static final void readFully(InputStream inputStream, ByteBuffer destinati destinationBuffer.position(destinationBuffer.position() + totalBytesRead); } - public static void writeFully(FileChannel channel, ByteBuffer sourceBuffer) throws IOException { + public static int writeFully(FileChannel channel, ByteBuffer sourceBuffer) throws IOException { + int size = 0; Review comment: Hmm.. Not sure we need to compute this. Wouldn't it be the same as `sourceBuffer.remaining()`? ########## File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java ########## @@ -1220,34 +1216,35 @@ private FetchSnapshotResponseData handleFetchSnapshotRequest( if (!snapshotOpt.isPresent()) { return FetchSnapshotResponse.singleton( log.topicPartition(), - responsePartitionSnapshot -> { - return addQuorumLeader(responsePartitionSnapshot) - .setErrorCode(Errors.SNAPSHOT_NOT_FOUND.code()); - } + responsePartitionSnapshot -> addQuorumLeader(responsePartitionSnapshot) + .setErrorCode(Errors.SNAPSHOT_NOT_FOUND.code()) ); } try (RawSnapshotReader snapshot = snapshotOpt.get()) { if (partitionSnapshot.position() < 0 || partitionSnapshot.position() >= snapshot.sizeInBytes()) { return FetchSnapshotResponse.singleton( log.topicPartition(), - responsePartitionSnapshot -> { - return addQuorumLeader(responsePartitionSnapshot) - .setErrorCode(Errors.POSITION_OUT_OF_RANGE.code()); - } + responsePartitionSnapshot -> addQuorumLeader(responsePartitionSnapshot) + .setErrorCode(Errors.POSITION_OUT_OF_RANGE.code()) ); } int maxSnapshotSize; + int maxSnapshotPosition; try { maxSnapshotSize = Math.toIntExact(snapshot.sizeInBytes()); } catch (ArithmeticException e) { maxSnapshotSize = Integer.MAX_VALUE; } - ByteBuffer buffer = ByteBuffer.allocate(Math.min(data.maxBytes(), maxSnapshotSize)); - snapshot.read(buffer, partitionSnapshot.position()); - buffer.flip(); + try { + maxSnapshotPosition = Math.toIntExact(partitionSnapshot.position()); + } catch (ArithmeticException e) { + maxSnapshotPosition = Integer.MAX_VALUE; Review comment: I agree we should probably throw this. Snapshot size limits are an interesting point which I hadn't thought about. Currently `FileRecords` does not support files which are larger than Int.MaxValue. That gives us a 2GB limit. My feeling is that is probably good enough initially, but perhaps that adds some fuel for the effort to generalize the zero-copy support. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org