vvcephei commented on a change in pull request #9521:
URL: https://github.com/apache/kafka/pull/9521#discussion_r514323639



##########
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java
##########
@@ -117,17 +126,34 @@ public void 
shouldQueryOnlyActivePartitionStoresByDefault() throws Exception {
 
         // Assert that all messages in the first batch were processed in a 
timely manner
         assertThat(semaphore.tryAcquire(batch1NumMessages, 60, 
TimeUnit.SECONDS), is(equalTo(true)));
-        final KeyQueryMetadata keyQueryMetadata = 
kafkaStreams1.queryMetadataForKey(TABLE_NAME, key, (topic, somekey, value, 
numPartitions) -> 0);
-
-        final QueryableStoreType<ReadOnlyKeyValueStore<Integer, Integer>> 
queryableStoreType = QueryableStoreTypes.keyValueStore();
-        final ReadOnlyKeyValueStore<Integer, Integer> store1 = 
IntegrationTestUtils.getStore(TABLE_NAME, kafkaStreams1, queryableStoreType);
-        final ReadOnlyKeyValueStore<Integer, Integer> store2 = 
IntegrationTestUtils.getStore(TABLE_NAME, kafkaStreams2, queryableStoreType);
-
-        final boolean kafkaStreams1IsActive = 
(keyQueryMetadata.activeHost().port() % 2) == 1;
-
-        // Assert that only active is able to query for a key by default
-        assertThat(kafkaStreams1IsActive ? store1.get(key) : store2.get(key), 
is(notNullValue()));
-        assertThat(kafkaStreams1IsActive ? store2.get(key) : store1.get(key), 
is(nullValue()));
+        until(() -> {
+
+            final KeyQueryMetadata keyQueryMetadata = 
kafkaStreams1.queryMetadataForKey(TABLE_NAME, key, (topic, somekey, value, 
numPartitions) -> 0);
+
+            final QueryableStoreType<ReadOnlyKeyValueStore<Integer, Integer>> 
queryableStoreType = keyValueStore();
+            final ReadOnlyKeyValueStore<Integer, Integer> store1 = 
getStore(TABLE_NAME, kafkaStreams1, queryableStoreType);
+            final ReadOnlyKeyValueStore<Integer, Integer> store2 = 
getStore(TABLE_NAME, kafkaStreams2, queryableStoreType);
+
+            final boolean kafkaStreams1IsActive = 
(keyQueryMetadata.activeHost().port() % 2) == 1;
+
+            // Assert that only active is able to query for a key by default
+            assertThat(kafkaStreams1IsActive ? store1.get(key) : 
store2.get(key), is(notNullValue()));
+            try {
+                if (kafkaStreams1IsActive) {
+                    assertThat(store2.get(key), is(nullValue()));
+                } else {
+                    assertThat(store1.get(key), is(nullValue()));
+                }
+                return true;
+            } catch (final InvalidStateStoreException exception) {

Review comment:
       I wanted to keep the concerns separate, so that unexpected exceptions 
would cause the test to fail fast. The idea is that `until` is the inverse of 
`while`, namely, it just loops as long as the condition evaluates to `false`. 
If the condition throws an exception, then the loop also throws, just like the 
real `while` loop.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to