ahuang98 commented on code in PR #20645:
URL: https://github.com/apache/kafka/pull/20645#discussion_r2604843710


##########
metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java:
##########
@@ -649,88 +649,115 @@ public void testMinIsrUpdateWithElr() throws Throwable {
 
             // Unfence all brokers and create a topic foo (min ISR 2)
             sendBrokerHeartbeatToUnfenceBrokers(active, allBrokers, 
brokerEpochs);
-            CreateTopicsRequestData createTopicsRequestData = new 
CreateTopicsRequestData().setTopics(
-                new CreatableTopicCollection(List.of(
-                    new CreatableTopic().setName("foo").setNumPartitions(1).
-                        setReplicationFactor(replicationFactor),
-                    new CreatableTopic().setName("bar").setNumPartitions(1).
-                        setReplicationFactor(replicationFactor)
-                ).iterator()));
-            CreateTopicsResponseData createTopicsResponseData = 
active.createTopics(
-                ANONYMOUS_CONTEXT, createTopicsRequestData,
-                Set.of("foo", "bar")).get();
-            assertEquals(Errors.NONE, 
Errors.forCode(createTopicsResponseData.topics().find("foo").errorCode()));
-            assertEquals(Errors.NONE, 
Errors.forCode(createTopicsResponseData.topics().find("bar").errorCode()));
-            Uuid topicIdFoo = 
createTopicsResponseData.topics().find("foo").topicId();
-            Uuid topicIdBar = 
createTopicsResponseData.topics().find("bar").topicId();
-            ConfigRecord configRecord = new ConfigRecord()
-                .setResourceType(BROKER.id())
-                .setResourceName("")
-                .setName(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG)
-                .setValue("2");
-            RecordTestUtils.replayAll(active.configurationControl(), 
List.of(new ApiMessageAndVersion(configRecord, (short) 0)));
 
-            // Fence brokers
-            TestUtils.waitForCondition(() -> {
-                    sendBrokerHeartbeatToUnfenceBrokers(active, 
brokersToKeepUnfenced, brokerEpochs);
-                    for (Integer brokerId : brokersToFence) {
-                        if (active.clusterControl().isUnfenced(brokerId)) {
-                            return false;
-                        }
+            // Heartbeat pumper 
+            final java.util.concurrent.ScheduledExecutorService hbExec =
+                
java.util.concurrent.Executors.newSingleThreadScheduledExecutor();
+            final java.util.concurrent.atomic.AtomicBoolean keepOnly =
+                new java.util.concurrent.atomic.AtomicBoolean(false);
+            final long periodMs = Math.max(50L, sessionTimeoutMillis / 3); 
+
+            hbExec.scheduleAtFixedRate(() -> {
+                try {
+                    if (keepOnly.get()) {
+                        sendBrokerHeartbeatToUnfenceBrokers(active, 
brokersToKeepUnfenced, brokerEpochs);
+                    } else {
+                        sendBrokerHeartbeatToUnfenceBrokers(active, 
allBrokers, brokerEpochs);
                     }
-                    return true;
-                }, sessionTimeoutMillis * 30,
-                "Fencing of brokers did not process within expected time"
-            );
+                } catch (Throwable t) {
+                    throw new RuntimeException(t);
+                }
+            }, 0L, periodMs, java.util.concurrent.TimeUnit.MILLISECONDS);
 
-            // Send another heartbeat to the brokers we want to keep alive
-            sendBrokerHeartbeatToUnfenceBrokers(active, brokersToKeepUnfenced, 
brokerEpochs);
+            try {
+                CreateTopicsRequestData createTopicsRequestData = new 
CreateTopicsRequestData().setTopics(
+                    new CreatableTopicCollection(List.of(
+                        new 
CreatableTopic().setName("foo").setNumPartitions(1).
+                            setReplicationFactor(replicationFactor),
+                        new 
CreatableTopic().setName("bar").setNumPartitions(1).
+                            setReplicationFactor(replicationFactor)
+                    ).iterator()));
+                CreateTopicsResponseData createTopicsResponseData = 
active.createTopics(
+                    ANONYMOUS_CONTEXT, createTopicsRequestData,
+                    Set.of("foo", "bar")).get();
+                assertEquals(Errors.NONE, 
Errors.forCode(createTopicsResponseData.topics().find("foo").errorCode()));
+                assertEquals(Errors.NONE, 
Errors.forCode(createTopicsResponseData.topics().find("bar").errorCode()));
+                Uuid topicIdFoo = 
createTopicsResponseData.topics().find("foo").topicId();
+                Uuid topicIdBar = 
createTopicsResponseData.topics().find("bar").topicId();
+                ConfigRecord configRecord = new ConfigRecord()
+                    .setResourceType(BROKER.id())
+                    .setResourceName("")
+                    .setName(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG)
+                    .setValue("2");
+                RecordTestUtils.replayAll(active.configurationControl(), 
List.of(new ApiMessageAndVersion(configRecord, (short) 0)));
+
+                // Before fencing wait, switch pumper to only keep 
brokersToKeepUnfenced alive
+                keepOnly.set(true);
+
+                // Fence brokers
+                TestUtils.waitForCondition(() -> {
+                        sendBrokerHeartbeatToUnfenceBrokers(active, 
brokersToKeepUnfenced, brokerEpochs);
+                        for (Integer brokerId : brokersToFence) {
+                            if (active.clusterControl().isUnfenced(brokerId)) {
+                                return false;
+                            }
+                        }
+                        return true;
+                    }, sessionTimeoutMillis * 30,
+                    "Fencing of brokers did not process within expected time"
+                );
 
-            // At this point only the brokers we want to fence (broker 2, 3) 
should be fenced.
-            brokersToKeepUnfenced.forEach(brokerId -> {
-                assertTrue(active.clusterControl().isUnfenced(brokerId),
-                    "Broker " + brokerId + " should have been unfenced");
-            });
-            brokersToFence.forEach(brokerId -> {
-                assertFalse(active.clusterControl().isUnfenced(brokerId),
-                    "Broker " + brokerId + " should have been fenced");
-            });
-            sendBrokerHeartbeatToUnfenceBrokers(active, brokersToKeepUnfenced, 
brokerEpochs);
+                // Send another heartbeat to the brokers we want to keep alive
+                sendBrokerHeartbeatToUnfenceBrokers(active, 
brokersToKeepUnfenced, brokerEpochs);
 
-            // Verify the isr and elr for the topic partition
-            PartitionRegistration partition = 
active.replicationControl().getPartition(topicIdFoo, 0);
-            assertArrayEquals(new int[]{1}, partition.isr, 
partition.toString());
+                // At this point only the brokers we want to fence (broker 2, 
3) should be fenced.
+                brokersToKeepUnfenced.forEach(brokerId -> {
+                    assertTrue(active.clusterControl().isUnfenced(brokerId),
+                        "Broker " + brokerId + " should have been unfenced");
+                });
+                brokersToFence.forEach(brokerId -> {
+                    assertFalse(active.clusterControl().isUnfenced(brokerId),
+                        "Broker " + brokerId + " should have been fenced");
+                });
+                sendBrokerHeartbeatToUnfenceBrokers(active, 
brokersToKeepUnfenced, brokerEpochs);
 
-            // The ELR set is not determined but the size is 1.
-            assertEquals(1, partition.elr.length, partition.toString());
+                // Verify the isr and elr for the topic partition

Review Comment:
   (along with calling unfence more liberally) it seems quite reasonable to me 
that we could also additionally adjust the test's `sessionTimeoutMillis` to a 
slightly higher value



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to