vvcephei commented on a change in pull request #8890: URL: https://github.com/apache/kafka/pull/8890#discussion_r442998201
########## File path: streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java ########## @@ -151,6 +173,198 @@ private KafkaStreams buildStreamWithDirtyStateDir(final String stateDirPath, return new KafkaStreams(builder.build(), props); } + @Test + public void shouldWipeOutStandbyStateDirectoryIfCheckpointIsMissing() throws Exception { + final String base = TestUtils.tempDirectory(appId).getPath(); + + IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp( + inputTopic, + Collections.singletonList( + new KeyValue<>(KEY_0, 0) + ), + TestUtils.producerConfig( + CLUSTER.bootstrapServers(), + IntegerSerializer.class, + IntegerSerializer.class, + new Properties() + ), + 10L + ); + + try ( + final KafkaStreams streamInstanceOne = buildWithDeduplicationTopology(base + "-1"); + final KafkaStreams streamInstanceTwo = buildWithDeduplicationTopology(base + "-2"); + final KafkaStreams streamInstanceOneRecovery = buildWithDeduplicationTopology(base + "-1") + ) { + // start first instance and wait for processing + startApplicationAndWaitUntilRunning(Collections.singletonList(streamInstanceOne), Duration.ofSeconds(30)); + IntegrationTestUtils.waitUntilMinRecordsReceived( + TestUtils.consumerConfig( + CLUSTER.bootstrapServers(), + IntegerDeserializer.class, + IntegerDeserializer.class + ), + outputTopic, + 1 + ); + + // start second instance and wait for standby replication + startApplicationAndWaitUntilRunning(Collections.singletonList(streamInstanceTwo), Duration.ofSeconds(30)); + waitForCondition( + () -> streamInstanceTwo.store( + StoreQueryParameters.fromNameAndType( + storeName, + QueryableStoreTypes.<Integer, Integer>keyValueStore() + ).enableStaleStores() + ).get(KEY_0) != null, + REBALANCE_TIMEOUT, + "Could not get key from standby store" + ); + // sanity check that first instance is still active + waitForCondition( + () -> streamInstanceOne.store( + StoreQueryParameters.fromNameAndType( + storeName, + QueryableStoreTypes.<Integer, Integer>keyValueStore() + ) + ).get(KEY_0) != null, + "Could not get key from main store" + ); + + // inject poison pill and wait for crash of first instance and recovery on second instance + IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp( + inputTopic, + Collections.singletonList( + new KeyValue<>(KEY_1, 0) + ), + TestUtils.producerConfig( + CLUSTER.bootstrapServers(), + IntegerSerializer.class, + IntegerSerializer.class, + new Properties() + ), + 10L + ); + waitForCondition( + () -> streamInstanceOne.state() == KafkaStreams.State.ERROR, + "Stream instance 1 did not go into error state" + ); + streamInstanceOne.close(); + + IntegrationTestUtils.waitUntilMinRecordsReceived( + TestUtils.consumerConfig( + CLUSTER.bootstrapServers(), + IntegerDeserializer.class, + IntegerDeserializer.class + ), + outputTopic, + 2 + ); + + // "restart" first client and wait for standby recovery + // (could actually also be active, but it does not matter as long as we enable "state stores" Review comment: Did you mean "stale"? ########## File path: streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java ########## @@ -162,6 +376,8 @@ private Properties props(final String stateDirPath) { streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass()); streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass()); streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000); + // need to set to zero to get predictable active/standby task assignments + streamsConfiguration.put(StreamsConfig.ACCEPTABLE_RECOVERY_LAG_CONFIG, 0); Review comment: Do we really need this? It seems like the only thing that depends on knowing which instance would get the active is just waiting for the crash after the poison pill. Could we instead just wait for once of the instances to crash, but not worry about which? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org