vvcephei commented on a change in pull request #8890:
URL: https://github.com/apache/kafka/pull/8890#discussion_r442998201



##########
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java
##########
@@ -151,6 +173,198 @@ private KafkaStreams buildStreamWithDirtyStateDir(final 
String stateDirPath,
         return new KafkaStreams(builder.build(), props);
     }
 
+    @Test
+    public void shouldWipeOutStandbyStateDirectoryIfCheckpointIsMissing() 
throws Exception {
+        final String base = TestUtils.tempDirectory(appId).getPath();
+
+        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
+            inputTopic,
+            Collections.singletonList(
+                new KeyValue<>(KEY_0, 0)
+            ),
+            TestUtils.producerConfig(
+                CLUSTER.bootstrapServers(),
+                IntegerSerializer.class,
+                IntegerSerializer.class,
+                new Properties()
+            ),
+            10L
+        );
+
+        try (
+            final KafkaStreams streamInstanceOne = 
buildWithDeduplicationTopology(base + "-1");
+            final KafkaStreams streamInstanceTwo = 
buildWithDeduplicationTopology(base + "-2");
+            final KafkaStreams streamInstanceOneRecovery = 
buildWithDeduplicationTopology(base + "-1")
+        ) {
+            // start first instance and wait for processing
+            
startApplicationAndWaitUntilRunning(Collections.singletonList(streamInstanceOne),
 Duration.ofSeconds(30));
+            IntegrationTestUtils.waitUntilMinRecordsReceived(
+                TestUtils.consumerConfig(
+                    CLUSTER.bootstrapServers(),
+                    IntegerDeserializer.class,
+                    IntegerDeserializer.class
+                ),
+                outputTopic,
+                1
+            );
+
+            // start second instance and wait for standby replication
+            
startApplicationAndWaitUntilRunning(Collections.singletonList(streamInstanceTwo),
 Duration.ofSeconds(30));
+            waitForCondition(
+                () -> streamInstanceTwo.store(
+                    StoreQueryParameters.fromNameAndType(
+                        storeName,
+                        QueryableStoreTypes.<Integer, Integer>keyValueStore()
+                    ).enableStaleStores()
+                ).get(KEY_0) != null,
+                REBALANCE_TIMEOUT,
+                "Could not get key from standby store"
+            );
+            // sanity check that first instance is still active
+            waitForCondition(
+                () -> streamInstanceOne.store(
+                    StoreQueryParameters.fromNameAndType(
+                        storeName,
+                        QueryableStoreTypes.<Integer, Integer>keyValueStore()
+                    )
+                ).get(KEY_0) != null,
+                "Could not get key from main store"
+            );
+
+            // inject poison pill and wait for crash of first instance and 
recovery on second instance
+            IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
+                inputTopic,
+                Collections.singletonList(
+                    new KeyValue<>(KEY_1, 0)
+                ),
+                TestUtils.producerConfig(
+                    CLUSTER.bootstrapServers(),
+                    IntegerSerializer.class,
+                    IntegerSerializer.class,
+                    new Properties()
+                ),
+                10L
+            );
+            waitForCondition(
+                () -> streamInstanceOne.state() == KafkaStreams.State.ERROR,
+                "Stream instance 1 did not go into error state"
+            );
+            streamInstanceOne.close();
+
+            IntegrationTestUtils.waitUntilMinRecordsReceived(
+                TestUtils.consumerConfig(
+                    CLUSTER.bootstrapServers(),
+                    IntegerDeserializer.class,
+                    IntegerDeserializer.class
+                ),
+                outputTopic,
+                2
+            );
+
+            // "restart" first client and wait for standby recovery
+            // (could actually also be active, but it does not matter as long 
as we enable "state stores"

Review comment:
       Did you mean "stale"?

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java
##########
@@ -162,6 +376,8 @@ private Properties props(final String stateDirPath) {
         streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
Serdes.Integer().getClass());
         
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
Serdes.Integer().getClass());
         streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 
1000);
+        // need to set to zero to get predictable active/standby task 
assignments
+        streamsConfiguration.put(StreamsConfig.ACCEPTABLE_RECOVERY_LAG_CONFIG, 
0);

Review comment:
       Do we really need this? It seems like the only thing that depends on 
knowing which instance would get the active is just waiting for the crash after 
the poison pill. Could we instead just wait for once of the instances to crash, 
but not worry about which?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to