This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 3202459394 KAFKA-13877: Fix flakiness in RackAwarenessIntegrationTest 
(#12468)
3202459394 is described below

commit 32024593947f8bb497f2f5d392e0d1d892a16ff3
Author: Guozhang Wang <wangg...@gmail.com>
AuthorDate: Wed Aug 3 09:17:38 2022 -0700

    KAFKA-13877: Fix flakiness in RackAwarenessIntegrationTest (#12468)
    
    In the current test, we check for tag distribution immediately after 
everyone is on the running state, however due to the fact of the follow-up 
rebalances, "everyone is now in running state" does not mean that the cluster 
is now stable. In fact, a follow-up rebalance may occur, upon which the local 
thread metadata would return empty which would cause the distribution verifier 
to fail.
    
    Reviewers: Divij Vaidya <di...@amazon.com>, Luke Chen <show...@gmail.com>
---
 .../kafka/streams/processor/internals/StreamTask.java      |  2 +-
 .../streams/integration/RackAwarenessIntegrationTest.java  | 14 +++++++-------
 2 files changed, 8 insertions(+), 8 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 07c4494225..f7bf8a5e74 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -1267,7 +1267,7 @@ public class StreamTask extends AbstractTask implements 
ProcessorNodePunctuator,
             final SourceNode<?, ?> source = topology.source(partition.topic());
             if (source == null) {
                 throw new TopologyException(
-                        "Topic is unknown to the topology. " +
+                        "Topic " + partition.topic() + " is unknown to the 
topology. " +
                                 "This may happen if different KafkaStreams 
instances of the same application execute different Topologies. " +
                                 "Note that Topologies are only identical if 
all operators are added in the same order."
                 );
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/RackAwarenessIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/RackAwarenessIntegrationTest.java
index 677633e9b0..7c93b769f5 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/RackAwarenessIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/RackAwarenessIntegrationTest.java
@@ -55,8 +55,8 @@ import java.util.stream.Stream;
 import static java.util.Arrays.asList;
 import static java.util.Collections.singletonList;
 import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
+import static org.apache.kafka.test.TestUtils.waitForCondition;
 import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertTrue;
 
 @Timeout(600)
 @Tag("integration")
@@ -143,13 +143,13 @@ public class RackAwarenessIntegrationTest {
         createAndStart(clientTags2, clientTagKeys, numberOfStandbyReplicas);
 
         waitUntilAllKafkaStreamsClientsAreRunning();
-        assertTrue(isIdealTaskDistributionReachedForTags(clientTagKeys));
+        waitForCondition(() -> 
isIdealTaskDistributionReachedForTags(clientTagKeys), "not all tags are evenly 
distributed");
 
         stopKafkaStreamsInstanceWithIndex(0);
 
         waitUntilAllKafkaStreamsClientsAreRunning();
 
-        assertTrue(isIdealTaskDistributionReachedForTags(clientTagKeys));
+        waitForCondition(() -> 
isIdealTaskDistributionReachedForTags(clientTagKeys), "not all tags are evenly 
distributed");
     }
 
     @Test
@@ -165,7 +165,7 @@ public class RackAwarenessIntegrationTest {
         createAndStart(buildClientTags(TAG_VALUE_EU_CENTRAL_1B, 
TAG_VALUE_K8_CLUSTER_1), asList(TAG_ZONE, TAG_CLUSTER), 
numberOfStandbyReplicas);
 
         waitUntilAllKafkaStreamsClientsAreRunning();
-        
assertTrue(isIdealTaskDistributionReachedForTags(singletonList(TAG_ZONE)));
+        waitForCondition(() -> 
isIdealTaskDistributionReachedForTags(singletonList(TAG_ZONE)), "not all tags 
are evenly distributed");
     }
 
     @Test
@@ -186,7 +186,7 @@ public class RackAwarenessIntegrationTest {
         createAndStart(buildClientTags(TAG_VALUE_EU_CENTRAL_1C, 
TAG_VALUE_K8_CLUSTER_3), asList(TAG_ZONE, TAG_CLUSTER), 
numberOfStandbyReplicas);
 
         waitUntilAllKafkaStreamsClientsAreRunning();
-        assertTrue(isIdealTaskDistributionReachedForTags(asList(TAG_ZONE, 
TAG_CLUSTER)));
+        waitForCondition(() -> 
isIdealTaskDistributionReachedForTags(asList(TAG_ZONE, TAG_CLUSTER)), "not all 
tags are evenly distributed");
     }
 
     @Test
@@ -204,8 +204,8 @@ public class RackAwarenessIntegrationTest {
 
         waitUntilAllKafkaStreamsClientsAreRunning();
 
-        
assertTrue(isIdealTaskDistributionReachedForTags(singletonList(TAG_ZONE)));
-        
assertTrue(isPartialTaskDistributionReachedForTags(singletonList(TAG_CLUSTER)));
+        waitForCondition(() -> 
isIdealTaskDistributionReachedForTags(singletonList(TAG_ZONE)), "not all tags 
are evenly distributed");
+        waitForCondition(() -> 
isPartialTaskDistributionReachedForTags(singletonList(TAG_CLUSTER)), "not all 
tags are evenly distributed");
     }
 
     private void stopKafkaStreamsInstanceWithIndex(final int index) {

Reply via email to