jsancio commented on a change in pull request #11209:
URL: https://github.com/apache/kafka/pull/11209#discussion_r800268813



##########
File path: 
raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java
##########
@@ -1535,6 +1535,46 @@ public void 
testFetchSnapshotRequestClusterIdValidation() throws Exception {
         
context.assertSentFetchSnapshotResponse(Errors.INCONSISTENT_CLUSTER_ID);
     }
 
+    @Test
+    public void testInconsistentClusterIdInFetchSnapshotResponse() throws 
Exception {
+        int localId = 0;
+        int leaderId = localId + 1;
+        Set<Integer> voters = Utils.mkSet(localId, leaderId);
+        int epoch = 2;
+        OffsetAndEpoch snapshotId = new OffsetAndEpoch(100L, 1);
+
+        RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters)
+            .withElectedLeader(epoch, leaderId)
+            .build();
+
+        // Send a request
+        context.pollUntilRequest();
+        RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
+
+        // Firstly receive a response with a valid cluster id
+        context.deliverResponse(
+            fetchRequest.correlationId,
+            fetchRequest.destinationId(),
+            snapshotFetchResponse(context.metadataPartition, 
context.metadataTopicId, epoch, leaderId, snapshotId, 200L)
+        );
+
+        // Send fetch snapshot request
+        context.pollUntilRequest();
+        RaftRequest.Outbound snapshotRequest = 
context.assertSentFetchSnapshotRequest();
+
+        // Secondly receive a response with an inconsistent cluster id
+        context.deliverResponse(
+            snapshotRequest.correlationId,
+            snapshotRequest.destinationId(),
+            new 
FetchSnapshotResponseData().setErrorCode(Errors.INCONSISTENT_CLUSTER_ID.code())
+        );
+
+        // Inconsistent cluster id are not fatal if a previous response 
contained a valid cluster id
+        assertDoesNotThrow(context.client::poll);
+
+        // It's impossible to receive a be begin quorum response before any 
other request so we don't test

Review comment:
       I am trying to understand this comment. Can you please explain why this 
is true? And why do you think that this comment is important in this test?
   
   This comment applies to a few places.

##########
File path: raft/src/main/java/org/apache/kafka/raft/RaftUtil.java
##########
@@ -159,4 +161,18 @@ static boolean 
hasValidTopicPartition(DescribeQuorumRequestData data, TopicParti
                    data.topics().get(0).partitions().size() == 1 &&
                    data.topics().get(0).partitions().get(0).partitionIndex() 
== topicPartition.partition();
     }
+
+    static boolean hasValidTopicPartition(FetchSnapshotRequestData data, 
TopicPartition topicPartition) {

Review comment:
       How about changing this to return an `Errors`?
   1. `INVALID_REQUEST` if there is more than one topic partition
   2. `UNKNOWN_TOPIC_OR_PARTITION` if the topic partition doesn't match the 
log's name and partition
   3. `NONE` otherwise




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to