jsancio commented on a change in pull request #9553:
URL: https://github.com/apache/kafka/pull/9553#discussion_r538060397



##########
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##########
@@ -1101,6 +1140,174 @@ private DescribeQuorumResponseData 
handleDescribeQuorumRequest(
         );
     }
 
+    private FetchSnapshotResponseData handleFetchSnapshotRequest(
+        RaftRequest.Inbound requestMetadata
+    ) throws IOException {
+        FetchSnapshotRequestData data = (FetchSnapshotRequestData) 
requestMetadata.data;
+
+        if (data.topics().size() != 1 && 
data.topics().get(0).partitions().size() != 1) {
+            return FetchSnapshotResponse.withTopError(Errors.INVALID_REQUEST);
+        }
+
+        Optional<FetchSnapshotRequestData.PartitionSnapshot> 
partitionSnapshotOpt = FetchSnapshotRequest
+            .forTopicPartition(data, log.topicPartition());
+        if (!partitionSnapshotOpt.isPresent()) {
+            // The Raft client assumes that there is only one topic partition.
+            TopicPartition unknownTopicPartition = new TopicPartition(
+                data.topics().get(0).name(),
+                data.topics().get(0).partitions().get(0).partition()
+            );
+
+            return FetchSnapshotResponse.singleton(
+                unknownTopicPartition,
+                responsePartitionSnapshot -> {
+                    return responsePartitionSnapshot
+                        
.setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code());
+                }
+            );
+        }
+
+        FetchSnapshotRequestData.PartitionSnapshot partitionSnapshot = 
partitionSnapshotOpt.get();
+        Optional<Errors> leaderValidation = validateLeaderOnlyRequest(
+                partitionSnapshot.currentLeaderEpoch()
+        );
+        if (leaderValidation.isPresent()) {
+            return FetchSnapshotResponse.singleton(
+                log.topicPartition(),
+                responsePartitionSnapshot -> {
+                    return addQuorumLeader(responsePartitionSnapshot)
+                        .setErrorCode(leaderValidation.get().code());
+                }
+            );
+        }
+
+        OffsetAndEpoch snapshotId = new OffsetAndEpoch(
+            partitionSnapshot.snapshotId().endOffset(),
+            partitionSnapshot.snapshotId().epoch()
+        );
+        Optional<RawSnapshotReader> snapshotOpt = log.readSnapshot(snapshotId);
+        if (!snapshotOpt.isPresent()) {
+            return FetchSnapshotResponse.singleton(
+                log.topicPartition(),
+                responsePartitionSnapshot -> {
+                    return addQuorumLeader(responsePartitionSnapshot)
+                        .setErrorCode(Errors.SNAPSHOT_NOT_FOUND.code());
+                }
+            );
+        }
+
+        try (RawSnapshotReader snapshot = snapshotOpt.get()) {
+            int maxSnapshotSize;
+            try {
+                maxSnapshotSize = Math.toIntExact(snapshot.sizeInBytes());
+            } catch (ArithmeticException e) {
+                maxSnapshotSize = Integer.MAX_VALUE;
+            }
+
+            // TODO: Make sure that we also limit based on the fetch max bytes 
configuration
+            ByteBuffer buffer = ByteBuffer.allocate(Math.min(data.maxBytes(), 
maxSnapshotSize));
+            snapshot.read(buffer, partitionSnapshot.position());
+            buffer.flip();
+
+            long snapshotSize = snapshot.sizeInBytes();
+
+            return FetchSnapshotResponse.singleton(
+                log.topicPartition(),
+                responsePartitionSnapshot -> {
+                    addQuorumLeader(responsePartitionSnapshot)
+                        .snapshotId()
+                        .setEndOffset(snapshotId.offset)
+                        .setEpoch(snapshotId.epoch);
+
+                    return responsePartitionSnapshot
+                        .setSize(snapshotSize)
+                        .setPosition(partitionSnapshot.position())
+                        .setBytes(buffer);
+                }
+            );
+        }
+    }
+
+    private boolean handleFetchSnapshotResponse(
+        RaftResponse.Inbound responseMetadata,
+        long currentTimeMs
+    ) throws IOException {
+        FetchSnapshotResponseData data = (FetchSnapshotResponseData) 
responseMetadata.data;
+        Errors topLevelError = Errors.forCode(data.errorCode());
+        if (topLevelError != Errors.NONE) {
+            // TODO: check what values this expression returns
+            return handleTopLevelError(topLevelError, responseMetadata);
+        }
+
+        if (data.topics().size() != 1 && 
data.topics().get(0).partitions().size() != 1) {
+            return false;
+        }
+
+        Optional<FetchSnapshotResponseData.PartitionSnapshot> 
partitionSnapshotOpt = FetchSnapshotResponse
+            .forTopicPartition(data, log.topicPartition());
+        if (!partitionSnapshotOpt.isPresent()) {
+            return false;
+        }
+
+        FetchSnapshotResponseData.PartitionSnapshot partitionSnapshot = 
partitionSnapshotOpt.get();
+
+        FetchSnapshotResponseData.LeaderIdAndEpoch currentLeaderIdAndEpoch = 
partitionSnapshot.currentLeader();
+        OptionalInt responseLeaderId = 
optionalLeaderId(currentLeaderIdAndEpoch.leaderId());
+        int responseEpoch = currentLeaderIdAndEpoch.leaderEpoch();
+        Errors error = Errors.forCode(partitionSnapshot.errorCode());
+
+        Optional<Boolean> handled = maybeHandleCommonResponse(
+            error, responseLeaderId, responseEpoch, currentTimeMs);
+        if (handled.isPresent()) {
+            // TODO: check what values this expression returns
+            return handled.get();
+        }
+
+        FollowerState state = quorum.followerStateOrThrow();
+
+        if (Errors.forCode(partitionSnapshot.errorCode()) == 
Errors.SNAPSHOT_NOT_FOUND ||
+            partitionSnapshot.snapshotId().endOffset() < 0 ||
+            partitionSnapshot.snapshotId().epoch() < 0) {
+
+            /* The leader deleted the snapshot before the follower could 
download it. Start over by
+             * reseting the fetching snapshot state and sending another fetch 
request.
+             */
+            state.setFetchingSnapshot(Optional.empty());
+            state.resetFetchTimeout(currentTimeMs);
+            return true;
+        }
+
+        OffsetAndEpoch snapshotId = new OffsetAndEpoch(
+            partitionSnapshot.snapshotId().endOffset(),
+            partitionSnapshot.snapshotId().epoch()
+        );
+
+        RawSnapshotWriter snapshot;
+        if (state.fetchingSnapshot().isPresent()) {
+            snapshot = state.fetchingSnapshot().get();
+        } else {
+            throw new IllegalStateException("Received unexpected fetch 
snapshot response: " + partitionSnapshot);
+        }
+
+        if (!snapshot.snapshotId().equals(snapshotId)) {
+            throw new IllegalStateException(String.format("Received fetch 
snapshot response with an invalid id. Expected %s; Received %s", 
snapshot.snapshotId(), snapshotId));
+        }
+        if (snapshot.sizeInBytes() != partitionSnapshot.position()) {
+            throw new IllegalStateException(String.format("Received fetch 
snapshot response with an invalid position. Expected %s; Received %s", 
snapshot.sizeInBytes(), partitionSnapshot.position()));
+        }
+
+        snapshot.append(partitionSnapshot.bytes());
+
+        if (snapshot.sizeInBytes() == partitionSnapshot.size()) {
+            // Finished fetching the snapshot.
+            snapshot.freeze();
+            state.setFetchingSnapshot(Optional.empty());

Review comment:
       The follower needs to update the log start offset and log end offset 
after it has successfully fetched a snapshot. I want to implement this part in 
this JIRA: https://issues.apache.org/jira/browse/KAFKA-10820




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to