jsancio commented on a change in pull request #11116:
URL: https://github.com/apache/kafka/pull/11116#discussion_r680254022
##########
File path:
metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
##########
@@ -619,108 +620,114 @@ public void testMissingInMemorySnapshot() throws
Exception {
int numPartitions = 3;
String topicName = "topic-name";
- try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(1,
Optional.empty())) {
- try (QuorumControllerTestEnv controlEnv =
- new QuorumControllerTestEnv(logEnv, b ->
b.setConfigDefs(CONFIGS))) {
- QuorumController controller = controlEnv.activeController();
-
- Map<Integer, Long> brokerEpochs = registerBrokers(controller,
numBrokers);
-
- // Create a lot of partitions
- List<CreatableReplicaAssignment> partitions = IntStream
- .range(0, numPartitions)
- .mapToObj(partitionIndex -> new
CreatableReplicaAssignment()
- .setPartitionIndex(partitionIndex)
- .setBrokerIds(Arrays.asList(0, 1, 2))
- )
- .collect(Collectors.toList());
-
- Uuid topicId = controller.createTopics(
- new CreateTopicsRequestData()
- .setTopics(
- new CreatableTopicCollection(
- Collections.singleton(
- new CreatableTopic()
- .setName(topicName)
- .setNumPartitions(-1)
- .setReplicationFactor((short) -1)
- .setAssignments(new
CreatableReplicaAssignmentCollection(partitions.iterator()))
- ).iterator()
- )
+ try (
+ LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(1,
Optional.empty());
+ QuorumControllerTestEnv controlEnv =
+ new QuorumControllerTestEnv(logEnv, b ->
b.setConfigDefs(CONFIGS))
+ ) {
+ QuorumController controller = controlEnv.activeController();
+
+ Map<Integer, Long> brokerEpochs = registerBrokers(controller,
numBrokers);
+
+ // Create a lot of partitions
+ List<CreatableReplicaAssignment> partitions = IntStream
+ .range(0, numPartitions)
+ .mapToObj(partitionIndex -> new CreatableReplicaAssignment()
+ .setPartitionIndex(partitionIndex)
+ .setBrokerIds(Arrays.asList(0, 1, 2))
+ )
+ .collect(Collectors.toList());
+
+ Uuid topicId = controller.createTopics(
+ new CreateTopicsRequestData()
+ .setTopics(
+ new CreatableTopicCollection(
+ Collections.singleton(
+ new CreatableTopic()
+ .setName(topicName)
+ .setNumPartitions(-1)
+ .setReplicationFactor((short) -1)
+ .setAssignments(new
CreatableReplicaAssignmentCollection(partitions.iterator()))
+ ).iterator()
)
- ).get().topics().find(topicName).topicId();
-
- // Create a lot of alter isr
- List<AlterIsrRequestData.PartitionData> alterIsrs = IntStream
- .range(0, numPartitions)
- .mapToObj(partitionIndex -> {
- PartitionRegistration partitionRegistration =
controller.replicationControl().getPartition(
- topicId,
- partitionIndex
- );
-
- return new AlterIsrRequestData.PartitionData()
- .setPartitionIndex(partitionIndex)
- .setLeaderEpoch(partitionRegistration.leaderEpoch)
-
.setCurrentIsrVersion(partitionRegistration.partitionEpoch)
- .setNewIsr(Arrays.asList(0, 1));
- })
- .collect(Collectors.toList());
-
- AlterIsrRequestData.TopicData topicData = new
AlterIsrRequestData.TopicData()
- .setName(topicName);
- topicData.partitions().addAll(alterIsrs);
-
- int leaderId = 0;
- AlterIsrRequestData alterIsrRequest = new AlterIsrRequestData()
- .setBrokerId(leaderId)
- .setBrokerEpoch(brokerEpochs.get(leaderId));
- alterIsrRequest.topics().add(topicData);
-
- logEnv.logManagers().get(0).resignAfterNonAtomicCommit();
-
- assertThrows(
- ExecutionException.class,
- () -> controller.alterIsr(alterIsrRequest).get()
- );
-
- // Wait for the new active controller
- final QuorumController newController =
controlEnv.activeController();
-
- // Since the alterIsr partially failed we expect to see
- // some partitions to still have 2 in the ISR.
- int partitionsWithReplica2 = Utils.toList(
- newController
- .replicationControl()
- .brokersToIsrs()
- .partitionsWithBrokerInIsr(2)
- ).size();
- int partitionsWithReplica0 = Utils.toList(
- newController
- .replicationControl()
- .brokersToIsrs()
- .partitionsWithBrokerInIsr(0)
- ).size();
-
- assertEquals(numPartitions, partitionsWithReplica0);
- assertNotEquals(0, partitionsWithReplica2);
- assertTrue(
- partitionsWithReplica0 > partitionsWithReplica2,
- String.format(
- "partitionsWithReplica0 = %s, partitionsWithReplica2 =
%s",
- partitionsWithReplica0,
- partitionsWithReplica2
)
- );
- }
+ ).get().topics().find(topicName).topicId();
+
+ // Create a lot of alter isr
+ List<AlterIsrRequestData.PartitionData> alterIsrs = IntStream
+ .range(0, numPartitions)
+ .mapToObj(partitionIndex -> {
+ PartitionRegistration partitionRegistration =
controller.replicationControl().getPartition(
+ topicId,
+ partitionIndex
+ );
+
+ return new AlterIsrRequestData.PartitionData()
+ .setPartitionIndex(partitionIndex)
+ .setLeaderEpoch(partitionRegistration.leaderEpoch)
+
.setCurrentIsrVersion(partitionRegistration.partitionEpoch)
+ .setNewIsr(Arrays.asList(0, 1));
+ })
+ .collect(Collectors.toList());
+
+ AlterIsrRequestData.TopicData topicData = new
AlterIsrRequestData.TopicData()
+ .setName(topicName);
+ topicData.partitions().addAll(alterIsrs);
+
+ int leaderId = 0;
+ AlterIsrRequestData alterIsrRequest = new AlterIsrRequestData()
+ .setBrokerId(leaderId)
+ .setBrokerEpoch(brokerEpochs.get(leaderId));
+ alterIsrRequest.topics().add(topicData);
+
+ logEnv.logManagers().get(0).resignAfterNonAtomicCommit();
+
+ int oldClaimEpoch = controller.curClaimEpoch();
+ assertThrows(
+ ExecutionException.class,
+ () -> controller.alterIsr(alterIsrRequest).get()
+ );
+
+ // Wait for the controller to become active again
+ assertSame(controller, controlEnv.activeController());
+ assertTrue(
+ oldClaimEpoch < controller.curClaimEpoch(),
+ String.format("oldClaimEpoch = %s, newClaimEpoch = %s",
oldClaimEpoch, controller.curClaimEpoch())
+ );
Review comment:
Only this should have changed. The rest are indentation changes from the
previous commit.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]