vvcephei commented on code in PR #12186:
URL: https://github.com/apache/kafka/pull/12186#discussion_r877582344
##########
streams/src/test/java/org/apache/kafka/streams/integration/OptimizedKTableIntegrationTest.java:
##########
@@ -125,31 +131,37 @@ public void shouldApplyUpdatesToStandbyStore() throws
Exception {
// Assert that all messages in the first batch were processed in a
timely manner
assertThat(semaphore.tryAcquire(batch1NumMessages, 60,
TimeUnit.SECONDS), is(equalTo(true)));
- final ReadOnlyKeyValueStore<Integer, Integer> store1 =
IntegrationTestUtils.getStore(TABLE_NAME, kafkaStreams1,
QueryableStoreTypes.keyValueStore());
- final ReadOnlyKeyValueStore<Integer, Integer> store2 =
IntegrationTestUtils.getStore(TABLE_NAME, kafkaStreams2,
QueryableStoreTypes.keyValueStore());
-
- final boolean kafkaStreams1WasFirstActive;
- final KeyQueryMetadata keyQueryMetadata =
kafkaStreams1.queryMetadataForKey(TABLE_NAME, key, (topic, somekey, value,
numPartitions) -> 0);
-
- // Assert that the current value in store reflects all messages being
processed
- if ((keyQueryMetadata.activeHost().port() % 2) == 1) {
- assertThat(store1.get(key), is(equalTo(batch1NumMessages - 1)));
- kafkaStreams1WasFirstActive = true;
- } else {
- assertThat(store2.get(key), is(equalTo(batch1NumMessages - 1)));
- kafkaStreams1WasFirstActive = false;
- }
-
- if (kafkaStreams1WasFirstActive) {
- kafkaStreams1.close();
- } else {
- kafkaStreams2.close();
- }
+ final AtomicReference<ReadOnlyKeyValueStore<Integer, Integer>>
newActiveStore = new AtomicReference<>(null);
+ TestUtils.retryOnExceptionWithTimeout(() -> {
+ final ReadOnlyKeyValueStore<Integer, Integer> store1 =
IntegrationTestUtils.getStore(TABLE_NAME, kafkaStreams1,
QueryableStoreTypes.keyValueStore());
+ final ReadOnlyKeyValueStore<Integer, Integer> store2 =
IntegrationTestUtils.getStore(TABLE_NAME, kafkaStreams2,
QueryableStoreTypes.keyValueStore());
+
+ final KeyQueryMetadata keyQueryMetadata =
kafkaStreams1.queryMetadataForKey(TABLE_NAME, key, (topic, somekey, value,
numPartitions) -> 0);
+
+ try {
+ // Assert that the current value in store reflects all
messages being processed
+ if ((keyQueryMetadata.activeHost().port() % 2) == 1) {
+ assertThat(store1.get(key), is(equalTo(batch1NumMessages -
1)));
+ kafkaStreams1.close();
+ newActiveStore.set(store2);
+ } else {
+ assertThat(store2.get(key), is(equalTo(batch1NumMessages -
1)));
+ kafkaStreams2.close();
+ newActiveStore.set(store1);
+ }
+ } catch (final InvalidStateStoreException e) {
+ LOG.warn("Detected an unexpected rebalance during test.
Retrying if possible.", e);
+ throw e;
Review Comment:
This triggers the retryOnException logic
##########
streams/src/test/java/org/apache/kafka/streams/integration/OptimizedKTableIntegrationTest.java:
##########
@@ -125,31 +131,37 @@ public void shouldApplyUpdatesToStandbyStore() throws
Exception {
// Assert that all messages in the first batch were processed in a
timely manner
assertThat(semaphore.tryAcquire(batch1NumMessages, 60,
TimeUnit.SECONDS), is(equalTo(true)));
- final ReadOnlyKeyValueStore<Integer, Integer> store1 =
IntegrationTestUtils.getStore(TABLE_NAME, kafkaStreams1,
QueryableStoreTypes.keyValueStore());
- final ReadOnlyKeyValueStore<Integer, Integer> store2 =
IntegrationTestUtils.getStore(TABLE_NAME, kafkaStreams2,
QueryableStoreTypes.keyValueStore());
-
- final boolean kafkaStreams1WasFirstActive;
- final KeyQueryMetadata keyQueryMetadata =
kafkaStreams1.queryMetadataForKey(TABLE_NAME, key, (topic, somekey, value,
numPartitions) -> 0);
-
- // Assert that the current value in store reflects all messages being
processed
- if ((keyQueryMetadata.activeHost().port() % 2) == 1) {
- assertThat(store1.get(key), is(equalTo(batch1NumMessages - 1)));
- kafkaStreams1WasFirstActive = true;
- } else {
- assertThat(store2.get(key), is(equalTo(batch1NumMessages - 1)));
- kafkaStreams1WasFirstActive = false;
- }
-
- if (kafkaStreams1WasFirstActive) {
- kafkaStreams1.close();
- } else {
- kafkaStreams2.close();
- }
+ final AtomicReference<ReadOnlyKeyValueStore<Integer, Integer>>
newActiveStore = new AtomicReference<>(null);
+ TestUtils.retryOnExceptionWithTimeout(() -> {
+ final ReadOnlyKeyValueStore<Integer, Integer> store1 =
IntegrationTestUtils.getStore(TABLE_NAME, kafkaStreams1,
QueryableStoreTypes.keyValueStore());
+ final ReadOnlyKeyValueStore<Integer, Integer> store2 =
IntegrationTestUtils.getStore(TABLE_NAME, kafkaStreams2,
QueryableStoreTypes.keyValueStore());
Review Comment:
Depending on where and when the rebalance happens, we might need to
re-resolve the stores, so I just included it in the retry block for resilience.
##########
streams/src/test/java/org/apache/kafka/streams/integration/OptimizedKTableIntegrationTest.java:
##########
@@ -125,31 +131,37 @@ public void shouldApplyUpdatesToStandbyStore() throws
Exception {
// Assert that all messages in the first batch were processed in a
timely manner
assertThat(semaphore.tryAcquire(batch1NumMessages, 60,
TimeUnit.SECONDS), is(equalTo(true)));
- final ReadOnlyKeyValueStore<Integer, Integer> store1 =
IntegrationTestUtils.getStore(TABLE_NAME, kafkaStreams1,
QueryableStoreTypes.keyValueStore());
- final ReadOnlyKeyValueStore<Integer, Integer> store2 =
IntegrationTestUtils.getStore(TABLE_NAME, kafkaStreams2,
QueryableStoreTypes.keyValueStore());
-
- final boolean kafkaStreams1WasFirstActive;
- final KeyQueryMetadata keyQueryMetadata =
kafkaStreams1.queryMetadataForKey(TABLE_NAME, key, (topic, somekey, value,
numPartitions) -> 0);
-
- // Assert that the current value in store reflects all messages being
processed
- if ((keyQueryMetadata.activeHost().port() % 2) == 1) {
- assertThat(store1.get(key), is(equalTo(batch1NumMessages - 1)));
- kafkaStreams1WasFirstActive = true;
- } else {
- assertThat(store2.get(key), is(equalTo(batch1NumMessages - 1)));
- kafkaStreams1WasFirstActive = false;
- }
-
- if (kafkaStreams1WasFirstActive) {
- kafkaStreams1.close();
- } else {
- kafkaStreams2.close();
- }
+ final AtomicReference<ReadOnlyKeyValueStore<Integer, Integer>>
newActiveStore = new AtomicReference<>(null);
+ TestUtils.retryOnExceptionWithTimeout(() -> {
+ final ReadOnlyKeyValueStore<Integer, Integer> store1 =
IntegrationTestUtils.getStore(TABLE_NAME, kafkaStreams1,
QueryableStoreTypes.keyValueStore());
+ final ReadOnlyKeyValueStore<Integer, Integer> store2 =
IntegrationTestUtils.getStore(TABLE_NAME, kafkaStreams2,
QueryableStoreTypes.keyValueStore());
+
+ final KeyQueryMetadata keyQueryMetadata =
kafkaStreams1.queryMetadataForKey(TABLE_NAME, key, (topic, somekey, value,
numPartitions) -> 0);
Review Comment:
This is essential to the retry. After the rebalance, we must re-resolve
which host is now active.
##########
streams/src/test/java/org/apache/kafka/streams/integration/OptimizedKTableIntegrationTest.java:
##########
@@ -125,31 +131,37 @@ public void shouldApplyUpdatesToStandbyStore() throws
Exception {
// Assert that all messages in the first batch were processed in a
timely manner
assertThat(semaphore.tryAcquire(batch1NumMessages, 60,
TimeUnit.SECONDS), is(equalTo(true)));
- final ReadOnlyKeyValueStore<Integer, Integer> store1 =
IntegrationTestUtils.getStore(TABLE_NAME, kafkaStreams1,
QueryableStoreTypes.keyValueStore());
- final ReadOnlyKeyValueStore<Integer, Integer> store2 =
IntegrationTestUtils.getStore(TABLE_NAME, kafkaStreams2,
QueryableStoreTypes.keyValueStore());
-
- final boolean kafkaStreams1WasFirstActive;
- final KeyQueryMetadata keyQueryMetadata =
kafkaStreams1.queryMetadataForKey(TABLE_NAME, key, (topic, somekey, value,
numPartitions) -> 0);
-
- // Assert that the current value in store reflects all messages being
processed
- if ((keyQueryMetadata.activeHost().port() % 2) == 1) {
- assertThat(store1.get(key), is(equalTo(batch1NumMessages - 1)));
- kafkaStreams1WasFirstActive = true;
- } else {
- assertThat(store2.get(key), is(equalTo(batch1NumMessages - 1)));
- kafkaStreams1WasFirstActive = false;
- }
-
- if (kafkaStreams1WasFirstActive) {
- kafkaStreams1.close();
- } else {
- kafkaStreams2.close();
- }
+ final AtomicReference<ReadOnlyKeyValueStore<Integer, Integer>>
newActiveStore = new AtomicReference<>(null);
+ TestUtils.retryOnExceptionWithTimeout(() -> {
+ final ReadOnlyKeyValueStore<Integer, Integer> store1 =
IntegrationTestUtils.getStore(TABLE_NAME, kafkaStreams1,
QueryableStoreTypes.keyValueStore());
+ final ReadOnlyKeyValueStore<Integer, Integer> store2 =
IntegrationTestUtils.getStore(TABLE_NAME, kafkaStreams2,
QueryableStoreTypes.keyValueStore());
+
+ final KeyQueryMetadata keyQueryMetadata =
kafkaStreams1.queryMetadataForKey(TABLE_NAME, key, (topic, somekey, value,
numPartitions) -> 0);
+
+ try {
+ // Assert that the current value in store reflects all
messages being processed
+ if ((keyQueryMetadata.activeHost().port() % 2) == 1) {
+ assertThat(store1.get(key), is(equalTo(batch1NumMessages -
1)));
+ kafkaStreams1.close();
+ newActiveStore.set(store2);
+ } else {
+ assertThat(store2.get(key), is(equalTo(batch1NumMessages -
1)));
+ kafkaStreams2.close();
+ newActiveStore.set(store1);
+ }
+ } catch (final InvalidStateStoreException e) {
+ LOG.warn("Detected an unexpected rebalance during test.
Retrying if possible.", e);
+ throw e;
+ } catch (final Throwable t) {
+ LOG.error("Caught non-retriable exception in test. Exiting.",
t);
+ throw new NoRetryException(t);
+ }
Review Comment:
I didn't want to make the test retry unexpected exceptions, since that might
make it miss a bug, so all other exceptions will cause the test to fail.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]