showuon commented on a change in pull request #10301: URL: https://github.com/apache/kafka/pull/10301#discussion_r592065468
########## File path: streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java ########## @@ -426,40 +448,51 @@ public void shouldNotViolateEosIfOneTaskFails() throws Exception { uncommittedDataBeforeFailure.size() + dataAfterFailure.size(), CONSUMER_GROUP_ID); - final List<KeyValue<Long, Long>> allExpectedCommittedRecordsAfterRecovery = new ArrayList<>(); + final int allCommittedRecordsAfterRecoverySize = committedDataBeforeFailure.size() + + uncommittedDataBeforeFailure.size() + dataAfterFailure.size(); + final List<KeyValue<Long, Long>> allExpectedCommittedRecordsAfterRecovery = new ArrayList<>(allCommittedRecordsAfterRecoverySize); allExpectedCommittedRecordsAfterRecovery.addAll(committedDataBeforeFailure); allExpectedCommittedRecordsAfterRecovery.addAll(uncommittedDataBeforeFailure); allExpectedCommittedRecordsAfterRecovery.addAll(dataAfterFailure); - final List<KeyValue<Long, Long>> expectedCommittedRecordsAfterRecovery = new ArrayList<>(); + final int committedRecordsAfterRecoverySize = uncommittedDataBeforeFailure.size() + dataAfterFailure.size(); + final List<KeyValue<Long, Long>> expectedCommittedRecordsAfterRecovery = new ArrayList<>(committedRecordsAfterRecoverySize); expectedCommittedRecordsAfterRecovery.addAll(uncommittedDataBeforeFailure); expectedCommittedRecordsAfterRecovery.addAll(dataAfterFailure); - checkResultPerKey(allCommittedRecords, allExpectedCommittedRecordsAfterRecovery); - checkResultPerKey(committedRecordsAfterFailure, expectedCommittedRecordsAfterRecovery); + checkResultPerKey(allCommittedRecords, allExpectedCommittedRecordsAfterRecovery, + "The all committed records after recovery do not match what expected"); + checkResultPerKey(committedRecordsAfterFailure, expectedCommittedRecordsAfterRecovery, + "The committed records after recovery do not match what expected"); + + assertThat("Should only get one uncaught exception from Streams.", hasUnexpectedError, is(false)); } } @Test public void shouldNotViolateEosIfOneTaskFailsWithState() throws Exception { // this test updates a store with 10 + 5 + 5 records per partition (running with 2 partitions) // the app is supposed to emit all 40 update records into the output topic - // the app commits after each 10 records per partition, and thus will have 2*5 uncommitted writes + // + // the app first commits after each 10 records per partition (total 20 records), and thus will have 2 * 5 uncommitted writes // and store updates (ie, another 5 uncommitted writes to a changelog topic per partition) - // in the uncommitted batch sending some data for the new key to validate that upon resuming they will not be shown up in the store + // in the uncommitted batch, sending some data for the new key to validate that upon resuming they will not be shown up in the store // - // the failure gets inject after 20 committed and 10 uncommitted records got received + // the failure gets inject after 20 committed and 30 uncommitted records got received // -> the failure only kills one thread // after fail over, we should read 40 committed records and the state stores should contain the correct sums // per key (even if some records got processed twice) - try (final KafkaStreams streams = getKafkaStreams("dummy", true, "appDir", 2, eosConfig)) { + // We need more processing time under "with state" situation, so increasing the max.poll.interval.ms + // to avoid unexpected rebalance during test, which will cause unexpected fail over triggered + try (final KafkaStreams streams = getKafkaStreams("dummy", true, "appDir", 2, eosConfig, 3 * MAX_POLL_INTERVAL_MS)) { Review comment: Key to fix the flaky test, by increasing the max.poll.interval.ms for `withState` test. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org