jsancio commented on a change in pull request #11209:
URL: https://github.com/apache/kafka/pull/11209#discussion_r800268813
##########
File path:
raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java
##########
@@ -1535,6 +1535,46 @@ public void
testFetchSnapshotRequestClusterIdValidation() throws Exception {
context.assertSentFetchSnapshotResponse(Errors.INCONSISTENT_CLUSTER_ID);
}
+ @Test
+ public void testInconsistentClusterIdInFetchSnapshotResponse() throws
Exception {
+ int localId = 0;
+ int leaderId = localId + 1;
+ Set<Integer> voters = Utils.mkSet(localId, leaderId);
+ int epoch = 2;
+ OffsetAndEpoch snapshotId = new OffsetAndEpoch(100L, 1);
+
+ RaftClientTestContext context = new
RaftClientTestContext.Builder(localId, voters)
+ .withElectedLeader(epoch, leaderId)
+ .build();
+
+ // Send a request
+ context.pollUntilRequest();
+ RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
+
+ // Firstly receive a response with a valid cluster id
+ context.deliverResponse(
+ fetchRequest.correlationId,
+ fetchRequest.destinationId(),
+ snapshotFetchResponse(context.metadataPartition,
context.metadataTopicId, epoch, leaderId, snapshotId, 200L)
+ );
+
+ // Send fetch snapshot request
+ context.pollUntilRequest();
+ RaftRequest.Outbound snapshotRequest =
context.assertSentFetchSnapshotRequest();
+
+ // Secondly receive a response with an inconsistent cluster id
+ context.deliverResponse(
+ snapshotRequest.correlationId,
+ snapshotRequest.destinationId(),
+ new
FetchSnapshotResponseData().setErrorCode(Errors.INCONSISTENT_CLUSTER_ID.code())
+ );
+
+ // Inconsistent cluster id are not fatal if a previous response
contained a valid cluster id
+ assertDoesNotThrow(context.client::poll);
+
+ // It's impossible to receive a be begin quorum response before any
other request so we don't test
Review comment:
I am trying to understand this comment. Can you please explain why this
is true? And why do you think that this comment is important in this test?
This comment applies to a few places.
##########
File path: raft/src/main/java/org/apache/kafka/raft/RaftUtil.java
##########
@@ -159,4 +161,18 @@ static boolean
hasValidTopicPartition(DescribeQuorumRequestData data, TopicParti
data.topics().get(0).partitions().size() == 1 &&
data.topics().get(0).partitions().get(0).partitionIndex()
== topicPartition.partition();
}
+
+ static boolean hasValidTopicPartition(FetchSnapshotRequestData data,
TopicPartition topicPartition) {
Review comment:
How about changing this to return an `Errors`?
1. `INVALID_REQUEST` if there is more than one topic partition
2. `UNKNOWN_TOPIC_OR_PARTITION` if the topic partition doesn't match the
log's name and partition
3. `NONE` otherwise
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]