mjsax commented on code in PR #16988:
URL: https://github.com/apache/kafka/pull/16988#discussion_r1828532147


##########
streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java:
##########
@@ -1104,16 +1100,16 @@ public void process(final Record<Long, Long> record) {
 
         final KafkaStreams streams = new KafkaStreams(builder.build(), config);
 
-        streams.setUncaughtExceptionHandler((t, e) -> {
+        streams.setUncaughtExceptionHandler(e -> {
             if (uncaughtException != null ||
-                !(e instanceof StreamsException) ||
-                !e.getCause().getMessage().equals("Injected test exception.")) 
{
+                    !(e instanceof StreamsException) ||
+                    !e.getCause().getMessage().equals("Injected test 
exception.")) {

Review Comment:
   nit: avoid unnecessary reformatting



##########
streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java:
##########
@@ -537,10 +537,6 @@ public void 
shouldNotViolateEosIfOneTaskFailsWithState(final boolean processingT
             errorInjected.set(true);
             writeInputData(dataAfterFailure);
 
-            waitForCondition(
-                () -> uncaughtException != null, MAX_WAIT_TIME_MS,

Review Comment:
   Why are we removing this condition?



##########
streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java:
##########
@@ -1104,16 +1100,16 @@ public void process(final Record<Long, Long> record) {
 
         final KafkaStreams streams = new KafkaStreams(builder.build(), config);
 
-        streams.setUncaughtExceptionHandler((t, e) -> {
+        streams.setUncaughtExceptionHandler(e -> {
             if (uncaughtException != null ||
-                !(e instanceof StreamsException) ||
-                !e.getCause().getMessage().equals("Injected test exception.")) 
{
+                    !(e instanceof StreamsException) ||
+                    !e.getCause().getMessage().equals("Injected test 
exception.")) {
                 e.printStackTrace(System.err);
                 hasUnexpectedError = true;
             }
             uncaughtException = e;
+            return 
StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD;
         });
-

Review Comment:
   nit: avoid unnecessary reformatting



##########
streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java:
##########
@@ -418,7 +419,6 @@ public void shouldNotViolateEosIfOneTaskFails(final boolean 
processingThreadsEna
                 uncommittedRecords,
                 dataBeforeFailure,
                 "The uncommitted records before failure do not match what 
expected");
-

Review Comment:
   nit: avoid unnecessary reformatting



##########
streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java:
##########
@@ -1191,7 +1187,7 @@ private <K, V> void 
ensureCommittedRecordsInTopicPartition(final String topic,
             }
             if (tries >= maxTries) {
                 throw new AssertionError("No committed records in topic " + 
topic
-                    + ", partition " + partition + " after " + maxTries + " 
retries.");
+                        + ", partition " + partition + " after " + maxTries + 
" retries.");

Review Comment:
   nit: avoid unnecessary reformatting
   
   (might be some auto-formatting setting in your IDE -- I would recommend to 
disable auto formatting)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to