mjsax commented on a change in pull request #10301: URL: https://github.com/apache/kafka/pull/10301#discussion_r602910027
########## File path: streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java ########## @@ -379,19 +382,21 @@ public void shouldBeAbleToPerformMultipleTransactions() throws Exception { public void shouldNotViolateEosIfOneTaskFails() throws Exception { // this test writes 10 + 5 + 5 records per partition (running with 2 partitions) // the app is supposed to copy all 40 records into the output topic - // the app commits after each 10 records per partition, and thus will have 2*5 uncommitted writes + // + // the app first commits after each 10 records per partition(total 20 records), and thus will have 2 * 5 uncommitted writes // // the failure gets inject after 20 committed and 30 uncommitted records got received // -> the failure only kills one thread // after fail over, we should read 40 committed records (even if 50 record got written) - try (final KafkaStreams streams = getKafkaStreams("dummy", false, "appDir", 2, eosConfig)) { + try (final KafkaStreams streams = getKafkaStreams("dummy", false, "appDir", 2, eosConfig, MAX_POLL_INTERVAL_MS)) { startKafkaStreamsAndWaitForRunningState(streams, MAX_WAIT_TIME_MS); final List<KeyValue<Long, Long>> committedDataBeforeFailure = prepareData(0L, 10L, 0L, 1L); final List<KeyValue<Long, Long>> uncommittedDataBeforeFailure = prepareData(10L, 15L, 0L, 1L); - final List<KeyValue<Long, Long>> dataBeforeFailure = new ArrayList<>(); + final List<KeyValue<Long, Long>> dataBeforeFailure = new ArrayList<>( + committedDataBeforeFailure.size() + uncommittedDataBeforeFailure.size()); Review comment: Ok to have, but we don't really need to optimize this in a test... (sounds like a case of premature optimization). In test, we usually prefer simple code over optimized code. ########## File path: streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java ########## @@ -403,13 +408,25 @@ public void shouldNotViolateEosIfOneTaskFails() throws Exception { () -> commitRequested.get() == 2, MAX_WAIT_TIME_MS, "StreamsTasks did not request commit."); - writeInputData(uncommittedDataBeforeFailure); + // expected end state per output partition (C == COMMIT; A == ABORT; ---> indicate the changes): + // + // p-0: ---> 10 rec + C + // p-1: ---> 10 rec + C - final List<KeyValue<Long, Long>> uncommittedRecords = readResult(dataBeforeFailure.size(), null); final List<KeyValue<Long, Long>> committedRecords = readResult(committedDataBeforeFailure.size(), CONSUMER_GROUP_ID); + checkResultPerKey(committedRecords, committedDataBeforeFailure, + "The committed records before failure do not match what expected"); - checkResultPerKey(committedRecords, committedDataBeforeFailure); - checkResultPerKey(uncommittedRecords, dataBeforeFailure); + writeInputData(uncommittedDataBeforeFailure); + + // expected end state per output partition (C == COMMIT; A == ABORT; ---> indicate the changes): + // + // p-0: ---> 10 rec + C + 5 rec (pending) + // p-1: ---> 10 rec + C + 5 rec (pending) + + final List<KeyValue<Long, Long>> uncommittedRecords = readResult(dataBeforeFailure.size(), null); + checkResultPerKey(uncommittedRecords, dataBeforeFailure, Review comment: as above: formatting (more of those below -- won't comment again). ########## File path: streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java ########## @@ -403,13 +408,25 @@ public void shouldNotViolateEosIfOneTaskFails() throws Exception { () -> commitRequested.get() == 2, MAX_WAIT_TIME_MS, "StreamsTasks did not request commit."); - writeInputData(uncommittedDataBeforeFailure); + // expected end state per output partition (C == COMMIT; A == ABORT; ---> indicate the changes): + // + // p-0: ---> 10 rec + C + // p-1: ---> 10 rec + C - final List<KeyValue<Long, Long>> uncommittedRecords = readResult(dataBeforeFailure.size(), null); final List<KeyValue<Long, Long>> committedRecords = readResult(committedDataBeforeFailure.size(), CONSUMER_GROUP_ID); + checkResultPerKey(committedRecords, committedDataBeforeFailure, + "The committed records before failure do not match what expected"); Review comment: Nit (as above): if we break the line, we should break per parameter: ``` checkResultPerKey( committedRecords, committedDataBeforeFailure, "The committed records before failure do not match what expected" ); ``` ########## File path: streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java ########## @@ -267,18 +269,19 @@ private void runSimpleCopyTest(final int numberOfRestarts, inputData.size() ); - checkResultPerKey(committedRecords, inputData); + checkResultPerKey(committedRecords, inputData, "The committed records do not match what expected"); } } } - private void checkResultPerKey(final List<KeyValue<Long, Long>> result, final List<KeyValue<Long, Long>> expectedResult) { + private void checkResultPerKey(final List<KeyValue<Long, Long>> result, final List<KeyValue<Long, Long>> expectedResult, + final String reason) { Review comment: nit: formatting (if we break the line, we should break for all parameter: ``` private void checkResultPerKey(final List<KeyValue<Long, Long>> result, final List<KeyValue<Long, Long>> expectedResult, final String reason) { ``` ########## File path: streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java ########## @@ -121,6 +121,8 @@ private static final AtomicInteger TEST_NUMBER = new AtomicInteger(0); + private volatile boolean hasUnexpectedError = false; Review comment: Thanks for this fix! We had the same bug in `EosBetaUpgradeIntegrationTest` and fixed it there already -- we missed that the same issue is in this test. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org