jsancio commented on a change in pull request #9553:
URL: https://github.com/apache/kafka/pull/9553#discussion_r547554416



##########
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##########
@@ -1101,6 +1140,174 @@ private DescribeQuorumResponseData 
handleDescribeQuorumRequest(
         );
     }
 
+    private FetchSnapshotResponseData handleFetchSnapshotRequest(
+        RaftRequest.Inbound requestMetadata
+    ) throws IOException {
+        FetchSnapshotRequestData data = (FetchSnapshotRequestData) 
requestMetadata.data;
+
+        if (data.topics().size() != 1 && 
data.topics().get(0).partitions().size() != 1) {
+            return FetchSnapshotResponse.withTopError(Errors.INVALID_REQUEST);
+        }
+
+        Optional<FetchSnapshotRequestData.PartitionSnapshot> 
partitionSnapshotOpt = FetchSnapshotRequest
+            .forTopicPartition(data, log.topicPartition());
+        if (!partitionSnapshotOpt.isPresent()) {
+            // The Raft client assumes that there is only one topic partition.
+            TopicPartition unknownTopicPartition = new TopicPartition(
+                data.topics().get(0).name(),
+                data.topics().get(0).partitions().get(0).partition()
+            );
+
+            return FetchSnapshotResponse.singleton(
+                unknownTopicPartition,
+                responsePartitionSnapshot -> {
+                    return responsePartitionSnapshot
+                        
.setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code());
+                }
+            );
+        }
+
+        FetchSnapshotRequestData.PartitionSnapshot partitionSnapshot = 
partitionSnapshotOpt.get();
+        Optional<Errors> leaderValidation = validateLeaderOnlyRequest(
+                partitionSnapshot.currentLeaderEpoch()
+        );
+        if (leaderValidation.isPresent()) {
+            return FetchSnapshotResponse.singleton(
+                log.topicPartition(),
+                responsePartitionSnapshot -> {
+                    return addQuorumLeader(responsePartitionSnapshot)
+                        .setErrorCode(leaderValidation.get().code());
+                }
+            );
+        }
+
+        OffsetAndEpoch snapshotId = new OffsetAndEpoch(
+            partitionSnapshot.snapshotId().endOffset(),
+            partitionSnapshot.snapshotId().epoch()
+        );
+        Optional<RawSnapshotReader> snapshotOpt = log.readSnapshot(snapshotId);
+        if (!snapshotOpt.isPresent()) {
+            return FetchSnapshotResponse.singleton(
+                log.topicPartition(),
+                responsePartitionSnapshot -> {
+                    return addQuorumLeader(responsePartitionSnapshot)
+                        .setErrorCode(Errors.SNAPSHOT_NOT_FOUND.code());
+                }
+            );
+        }
+
+        try (RawSnapshotReader snapshot = snapshotOpt.get()) {
+            int maxSnapshotSize;
+            try {
+                maxSnapshotSize = Math.toIntExact(snapshot.sizeInBytes());
+            } catch (ArithmeticException e) {
+                maxSnapshotSize = Integer.MAX_VALUE;
+            }
+
+            // TODO: Make sure that we also limit based on the fetch max bytes 
configuration
+            ByteBuffer buffer = ByteBuffer.allocate(Math.min(data.maxBytes(), 
maxSnapshotSize));
+            snapshot.read(buffer, partitionSnapshot.position());

Review comment:
       Done.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to