vvcephei commented on a change in pull request #9521:
URL: https://github.com/apache/kafka/pull/9521#discussion_r513628177



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/QueryableStoreType.java
##########
@@ -44,7 +44,7 @@
      * @param storeProvider     provides access to all the underlying 
StateStore instances
      * @param storeName         The name of the Store
      * @return a read-only interface over a {@code StateStore}
-     *        (cf. {@link 
org.apache.kafka.streams.state.QueryableStoreTypes.KeyValueStoreType})
+     *        (cf. {@link QueryableStoreTypes.KeyValueStoreType})

Review comment:
       This class is in the same package, so the fully-qualified name is not 
necessary.

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java
##########
@@ -153,51 +179,75 @@ public void shouldQuerySpecificActivePartitionStores() 
throws Exception {
 
         // Assert that all messages in the first batch were processed in a 
timely manner
         assertThat(semaphore.tryAcquire(batch1NumMessages, 60, 
TimeUnit.SECONDS), is(equalTo(true)));
-        final KeyQueryMetadata keyQueryMetadata = 
kafkaStreams1.queryMetadataForKey(TABLE_NAME, key, (topic, somekey, value, 
numPartitions) -> 0);
-
-        //key belongs to this partition
-        final int keyPartition = keyQueryMetadata.partition();
-
-        //key doesn't belongs to this partition
-        final int keyDontBelongPartition = (keyPartition == 0) ? 1 : 0;
-        final boolean kafkaStreams1IsActive = 
(keyQueryMetadata.activeHost().port() % 2) == 1;
-
-        StoreQueryParameters<ReadOnlyKeyValueStore<Integer, Integer>> 
storeQueryParam =
-            StoreQueryParameters.<ReadOnlyKeyValueStore<Integer, 
Integer>>fromNameAndType(TABLE_NAME, QueryableStoreTypes.keyValueStore())
-                .withPartition(keyPartition);
-        ReadOnlyKeyValueStore<Integer, Integer> store1 = null;
-        ReadOnlyKeyValueStore<Integer, Integer> store2 = null;
-        if (kafkaStreams1IsActive) {
-            store1 = IntegrationTestUtils.getStore(kafkaStreams1, 
storeQueryParam);
-        } else {
-            store2 = IntegrationTestUtils.getStore(kafkaStreams2, 
storeQueryParam);
-        }
-
-        if (kafkaStreams1IsActive) {
-            assertThat(store1, is(notNullValue()));
-            assertThat(store2, is(nullValue()));
-        } else {
-            assertThat(store2, is(notNullValue()));
-            assertThat(store1, is(nullValue()));
-        }
+        until(() -> {
+            final KeyQueryMetadata keyQueryMetadata = 
kafkaStreams1.queryMetadataForKey(TABLE_NAME, key, (topic, somekey, value, 
numPartitions) -> 0);
+
+            //key belongs to this partition
+            final int keyPartition = keyQueryMetadata.partition();
+
+            //key doesn't belongs to this partition
+            final int keyDontBelongPartition = (keyPartition == 0) ? 1 : 0;
+            final boolean kafkaStreams1IsActive = 
(keyQueryMetadata.activeHost().port() % 2) == 1;
+
+            final StoreQueryParameters<ReadOnlyKeyValueStore<Integer, 
Integer>> storeQueryParam =
+                StoreQueryParameters.<ReadOnlyKeyValueStore<Integer, 
Integer>>fromNameAndType(TABLE_NAME, keyValueStore())
+                    .withPartition(keyPartition);
+            ReadOnlyKeyValueStore<Integer, Integer> store1 = null;
+            ReadOnlyKeyValueStore<Integer, Integer> store2 = null;
+            if (kafkaStreams1IsActive) {
+                store1 = getStore(kafkaStreams1, storeQueryParam);
+            } else {
+                store2 = getStore(kafkaStreams2, storeQueryParam);
+            }
+
+            if (kafkaStreams1IsActive) {
+                assertThat(store1, is(notNullValue()));
+                assertThat(store2, is(nullValue()));
+            } else {
+                assertThat(store2, is(notNullValue()));
+                assertThat(store1, is(nullValue()));
+            }
+
+            // Assert that only active for a specific requested partition 
serves key if stale stores and not enabled
+            assertThat(kafkaStreams1IsActive ? store1.get(key) : 
store2.get(key), is(notNullValue()));
+
+            final StoreQueryParameters<ReadOnlyKeyValueStore<Integer, 
Integer>> storeQueryParam2 =
+                StoreQueryParameters.<ReadOnlyKeyValueStore<Integer, 
Integer>>fromNameAndType(TABLE_NAME, keyValueStore())
+                .withPartition(keyDontBelongPartition);
 
-        // Assert that only active for a specific requested partition serves 
key if stale stores and not enabled
-        assertThat(kafkaStreams1IsActive ? store1.get(key) : store2.get(key), 
is(notNullValue()));
 
-        storeQueryParam = StoreQueryParameters.<ReadOnlyKeyValueStore<Integer, 
Integer>>fromNameAndType(TABLE_NAME, QueryableStoreTypes.keyValueStore())
-            .withPartition(keyDontBelongPartition);
-        ReadOnlyKeyValueStore<Integer, Integer> store3 = null;
-        ReadOnlyKeyValueStore<Integer, Integer> store4 = null;
-        if (!kafkaStreams1IsActive) {
-            store3 = IntegrationTestUtils.getStore(kafkaStreams1, 
storeQueryParam);
-        } else {
-            store4 = IntegrationTestUtils.getStore(kafkaStreams2, 
storeQueryParam);
-        }
 
-        // Assert that key is not served when wrong specific partition is 
requested
-        // If kafkaStreams1 is active for keyPartition, kafkaStreams2 would be 
active for keyDontBelongPartition
-        // So, in that case, store3 would be null and the store4 would not 
return the value for key as wrong partition was requested
-        assertThat(kafkaStreams1IsActive ? store4.get(key) : store3.get(key), 
is(nullValue()));
+            try {
+                // Assert that key is not served when wrong specific partition 
is requested
+                // If kafkaStreams1 is active for keyPartition, kafkaStreams2 
would be active for keyDontBelongPartition
+                // So, in that case, store3 would be null and the store4 would 
not return the value for key as wrong partition was requested
+                if (kafkaStreams1IsActive) {
+                    assertThat(getStore(kafkaStreams2, 
storeQueryParam2).get(key), is(nullValue()));
+                    final InvalidStateStoreException exception =
+                        assertThrows(InvalidStateStoreException.class, () -> 
getStore(kafkaStreams1, storeQueryParam2).get(key));
+                    assertThat(
+                        exception.getMessage(),
+                        containsString("The specified partition 1 for store 
source-table does not exist.")
+                    );
+                } else {
+                    assertThat(getStore(kafkaStreams1, 
storeQueryParam2).get(key), is(nullValue()));
+                    final InvalidStateStoreException exception =
+                        assertThrows(InvalidStateStoreException.class, () -> 
getStore(kafkaStreams2, storeQueryParam2).get(key));
+                    assertThat(
+                        exception.getMessage(),
+                        containsString("The specified partition 1 for store 
source-table does not exist.")
+                    );
+                }
+                return true;
+            } catch (final InvalidStateStoreException exception) {
+                assertThat(
+                    exception.getMessage(),
+                    containsString("Cannot get state store source-table 
because the stream thread is PARTITIONS_ASSIGNED, not RUNNING")
+                );
+                LOG.info("Streams wasn't running. Will try again.");
+                return false;

Review comment:
       Also, here, if we find that Streams is rebalancing, we'll try the whole 
verification again, including to re-discover the stores in case the stores have 
swapped ownership.

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java
##########
@@ -337,34 +385,49 @@ public void 
shouldQuerySpecificStalePartitionStoresMultiStreamThreads() throws E
 
         //key doesn't belongs to this partition
         final int keyDontBelongPartition = (keyPartition == 0) ? 1 : 0;
-        final QueryableStoreType<ReadOnlyKeyValueStore<Integer, Integer>> 
queryableStoreType = QueryableStoreTypes.keyValueStore();
+        final QueryableStoreType<ReadOnlyKeyValueStore<Integer, Integer>> 
queryableStoreType = keyValueStore();
 
         // Assert that both active and standby are able to query for a key
         final StoreQueryParameters<ReadOnlyKeyValueStore<Integer, Integer>> 
param = StoreQueryParameters
-                .fromNameAndType(TABLE_NAME, queryableStoreType)
-                .enableStaleStores()
-                .withPartition(keyPartition);
+            .fromNameAndType(TABLE_NAME, queryableStoreType)
+            .enableStaleStores()
+            .withPartition(keyPartition);
         TestUtils.waitForCondition(() -> {
-            final ReadOnlyKeyValueStore<Integer, Integer> store1 = 
IntegrationTestUtils.getStore(kafkaStreams1, param);
+            final ReadOnlyKeyValueStore<Integer, Integer> store1 = 
getStore(kafkaStreams1, param);
             return store1.get(key) != null;
         }, "store1 cannot find results for key");
         TestUtils.waitForCondition(() -> {
-            final ReadOnlyKeyValueStore<Integer, Integer> store2 = 
IntegrationTestUtils.getStore(kafkaStreams2, param);
+            final ReadOnlyKeyValueStore<Integer, Integer> store2 = 
getStore(kafkaStreams2, param);
             return store2.get(key) != null;
         }, "store2 cannot find results for key");
 
         final StoreQueryParameters<ReadOnlyKeyValueStore<Integer, Integer>> 
otherParam = StoreQueryParameters
-                .fromNameAndType(TABLE_NAME, queryableStoreType)
-                .enableStaleStores()
-                .withPartition(keyDontBelongPartition);
-        final ReadOnlyKeyValueStore<Integer, Integer> store3 = 
IntegrationTestUtils.getStore(kafkaStreams1, otherParam);
-        final ReadOnlyKeyValueStore<Integer, Integer> store4 = 
IntegrationTestUtils.getStore(kafkaStreams2, otherParam);
+            .fromNameAndType(TABLE_NAME, queryableStoreType)
+            .enableStaleStores()
+            .withPartition(keyDontBelongPartition);
+        final ReadOnlyKeyValueStore<Integer, Integer> store3 = 
getStore(kafkaStreams1, otherParam);
+        final ReadOnlyKeyValueStore<Integer, Integer> store4 = 
getStore(kafkaStreams2, otherParam);
 
         // Assert that
         assertThat(store3.get(key), is(nullValue()));
         assertThat(store4.get(key), is(nullValue()));
     }
 
+    private static void until(final TestCondition condition) {

Review comment:
       Note, this is different than `TestUtils.waitForCondition`, which does 
the inverse thing. That one will retry on exceptions and otherwise verify that 
the return is `true`. We need to fail on exceptions and retry as long as the 
return is `false`.
   
   I opted to keep this method here, since it might be confusing next to the 
other util method.

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java
##########
@@ -117,17 +126,34 @@ public void 
shouldQueryOnlyActivePartitionStoresByDefault() throws Exception {
 
         // Assert that all messages in the first batch were processed in a 
timely manner
         assertThat(semaphore.tryAcquire(batch1NumMessages, 60, 
TimeUnit.SECONDS), is(equalTo(true)));
-        final KeyQueryMetadata keyQueryMetadata = 
kafkaStreams1.queryMetadataForKey(TABLE_NAME, key, (topic, somekey, value, 
numPartitions) -> 0);
-
-        final QueryableStoreType<ReadOnlyKeyValueStore<Integer, Integer>> 
queryableStoreType = QueryableStoreTypes.keyValueStore();
-        final ReadOnlyKeyValueStore<Integer, Integer> store1 = 
IntegrationTestUtils.getStore(TABLE_NAME, kafkaStreams1, queryableStoreType);
-        final ReadOnlyKeyValueStore<Integer, Integer> store2 = 
IntegrationTestUtils.getStore(TABLE_NAME, kafkaStreams2, queryableStoreType);
-
-        final boolean kafkaStreams1IsActive = 
(keyQueryMetadata.activeHost().port() % 2) == 1;
-
-        // Assert that only active is able to query for a key by default
-        assertThat(kafkaStreams1IsActive ? store1.get(key) : store2.get(key), 
is(notNullValue()));
-        assertThat(kafkaStreams1IsActive ? store2.get(key) : store1.get(key), 
is(nullValue()));
+        until(() -> {
+
+            final KeyQueryMetadata keyQueryMetadata = 
kafkaStreams1.queryMetadataForKey(TABLE_NAME, key, (topic, somekey, value, 
numPartitions) -> 0);
+
+            final QueryableStoreType<ReadOnlyKeyValueStore<Integer, Integer>> 
queryableStoreType = keyValueStore();
+            final ReadOnlyKeyValueStore<Integer, Integer> store1 = 
getStore(TABLE_NAME, kafkaStreams1, queryableStoreType);
+            final ReadOnlyKeyValueStore<Integer, Integer> store2 = 
getStore(TABLE_NAME, kafkaStreams2, queryableStoreType);
+
+            final boolean kafkaStreams1IsActive = 
(keyQueryMetadata.activeHost().port() % 2) == 1;
+
+            // Assert that only active is able to query for a key by default
+            assertThat(kafkaStreams1IsActive ? store1.get(key) : 
store2.get(key), is(notNullValue()));
+            try {
+                if (kafkaStreams1IsActive) {
+                    assertThat(store2.get(key), is(nullValue()));
+                } else {
+                    assertThat(store1.get(key), is(nullValue()));
+                }
+                return true;
+            } catch (final InvalidStateStoreException exception) {
+                assertThat(
+                    exception.getMessage(),
+                    containsString("Cannot get state store source-table 
because the stream thread is PARTITIONS_ASSIGNED, not RUNNING")
+                );
+                LOG.info("Streams wasn't running. Will try again.");
+                return false;

Review comment:
       This is the meat of this change. If we do get an exception, we can still 
verify the exception is the one we expected to get, and then we return `false` 
to indicate we should try again later to get a successful verification.

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java
##########
@@ -102,10 +111,10 @@ public void 
shouldQueryOnlyActivePartitionStoresByDefault() throws Exception {
 
         final StreamsBuilder builder = new StreamsBuilder();
         builder.table(INPUT_TOPIC_NAME, Consumed.with(Serdes.Integer(), 
Serdes.Integer()),
-                        Materialized.<Integer, Integer, KeyValueStore<Bytes, 
byte[]>>as(TABLE_NAME)
-                                .withCachingDisabled())
-                .toStream()
-                .peek((k, v) -> semaphore.release());
+                      Materialized.<Integer, Integer, KeyValueStore<Bytes, 
byte[]>>as(TABLE_NAME)
+                          .withCachingDisabled())
+               .toStream()
+               .peek((k, v) -> semaphore.release());

Review comment:
       I went ahead and fixed the whitespace also, since this PR is relatively 
small.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to