showuon commented on a change in pull request #10301:
URL: https://github.com/apache/kafka/pull/10301#discussion_r592065468



##########
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
##########
@@ -426,40 +448,51 @@ public void shouldNotViolateEosIfOneTaskFails() throws 
Exception {
                 uncommittedDataBeforeFailure.size() + dataAfterFailure.size(),
                 CONSUMER_GROUP_ID);
 
-            final List<KeyValue<Long, Long>> 
allExpectedCommittedRecordsAfterRecovery = new ArrayList<>();
+            final int allCommittedRecordsAfterRecoverySize = 
committedDataBeforeFailure.size() +
+                uncommittedDataBeforeFailure.size() + dataAfterFailure.size();
+            final List<KeyValue<Long, Long>> 
allExpectedCommittedRecordsAfterRecovery = new 
ArrayList<>(allCommittedRecordsAfterRecoverySize);
             
allExpectedCommittedRecordsAfterRecovery.addAll(committedDataBeforeFailure);
             
allExpectedCommittedRecordsAfterRecovery.addAll(uncommittedDataBeforeFailure);
             allExpectedCommittedRecordsAfterRecovery.addAll(dataAfterFailure);
 
-            final List<KeyValue<Long, Long>> 
expectedCommittedRecordsAfterRecovery = new ArrayList<>();
+            final int committedRecordsAfterRecoverySize = 
uncommittedDataBeforeFailure.size() + dataAfterFailure.size();
+            final List<KeyValue<Long, Long>> 
expectedCommittedRecordsAfterRecovery = new 
ArrayList<>(committedRecordsAfterRecoverySize);
             
expectedCommittedRecordsAfterRecovery.addAll(uncommittedDataBeforeFailure);
             expectedCommittedRecordsAfterRecovery.addAll(dataAfterFailure);
 
-            checkResultPerKey(allCommittedRecords, 
allExpectedCommittedRecordsAfterRecovery);
-            checkResultPerKey(committedRecordsAfterFailure, 
expectedCommittedRecordsAfterRecovery);
+            checkResultPerKey(allCommittedRecords, 
allExpectedCommittedRecordsAfterRecovery,
+                "The all committed records after recovery do not match what 
expected");
+            checkResultPerKey(committedRecordsAfterFailure, 
expectedCommittedRecordsAfterRecovery,
+                "The committed records after recovery do not match what 
expected");
+
+            assertThat("Should only get one uncaught exception from Streams.", 
hasUnexpectedError, is(false));
         }
     }
 
     @Test
     public void shouldNotViolateEosIfOneTaskFailsWithState() throws Exception {
         // this test updates a store with 10 + 5 + 5 records per partition 
(running with 2 partitions)
         // the app is supposed to emit all 40 update records into the output 
topic
-        // the app commits after each 10 records per partition, and thus will 
have 2*5 uncommitted writes
+        //
+        // the app first commits after each 10 records per partition (total 20 
records), and thus will have 2 * 5 uncommitted writes
         // and store updates (ie, another 5 uncommitted writes to a changelog 
topic per partition)
-        // in the uncommitted batch sending some data for the new key to 
validate that upon resuming they will not be shown up in the store
+        // in the uncommitted batch, sending some data for the new key to 
validate that upon resuming they will not be shown up in the store
         //
-        // the failure gets inject after 20 committed and 10 uncommitted 
records got received
+        // the failure gets inject after 20 committed and 30 uncommitted 
records got received
         // -> the failure only kills one thread
         // after fail over, we should read 40 committed records and the state 
stores should contain the correct sums
         // per key (even if some records got processed twice)
 
-        try (final KafkaStreams streams = getKafkaStreams("dummy", true, 
"appDir", 2, eosConfig)) {
+        // We need more processing time under "with state" situation, so 
increasing the max.poll.interval.ms
+        // to avoid unexpected rebalance during test, which will cause 
unexpected fail over triggered
+        try (final KafkaStreams streams = getKafkaStreams("dummy", true, 
"appDir", 2, eosConfig, 3 * MAX_POLL_INTERVAL_MS)) {

Review comment:
       Key to fix the flaky test, by increasing the max.poll.interval.ms for 
`withState` test.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to