mjsax commented on code in PR #16988:
URL: https://github.com/apache/kafka/pull/16988#discussion_r1828532147
##########
streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java:
##########
@@ -1104,16 +1100,16 @@ public void process(final Record<Long, Long> record) {
final KafkaStreams streams = new KafkaStreams(builder.build(), config);
- streams.setUncaughtExceptionHandler((t, e) -> {
+ streams.setUncaughtExceptionHandler(e -> {
if (uncaughtException != null ||
- !(e instanceof StreamsException) ||
- !e.getCause().getMessage().equals("Injected test exception."))
{
+ !(e instanceof StreamsException) ||
+ !e.getCause().getMessage().equals("Injected test
exception.")) {
Review Comment:
nit: avoid unnecessary reformatting
##########
streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java:
##########
@@ -537,10 +537,6 @@ public void
shouldNotViolateEosIfOneTaskFailsWithState(final boolean processingT
errorInjected.set(true);
writeInputData(dataAfterFailure);
- waitForCondition(
- () -> uncaughtException != null, MAX_WAIT_TIME_MS,
Review Comment:
Why are we removing this condition?
##########
streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java:
##########
@@ -1104,16 +1100,16 @@ public void process(final Record<Long, Long> record) {
final KafkaStreams streams = new KafkaStreams(builder.build(), config);
- streams.setUncaughtExceptionHandler((t, e) -> {
+ streams.setUncaughtExceptionHandler(e -> {
if (uncaughtException != null ||
- !(e instanceof StreamsException) ||
- !e.getCause().getMessage().equals("Injected test exception."))
{
+ !(e instanceof StreamsException) ||
+ !e.getCause().getMessage().equals("Injected test
exception.")) {
e.printStackTrace(System.err);
hasUnexpectedError = true;
}
uncaughtException = e;
+ return
StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD;
});
-
Review Comment:
nit: avoid unnecessary reformatting
##########
streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java:
##########
@@ -418,7 +419,6 @@ public void shouldNotViolateEosIfOneTaskFails(final boolean
processingThreadsEna
uncommittedRecords,
dataBeforeFailure,
"The uncommitted records before failure do not match what
expected");
-
Review Comment:
nit: avoid unnecessary reformatting
##########
streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java:
##########
@@ -1191,7 +1187,7 @@ private <K, V> void
ensureCommittedRecordsInTopicPartition(final String topic,
}
if (tries >= maxTries) {
throw new AssertionError("No committed records in topic " +
topic
- + ", partition " + partition + " after " + maxTries + "
retries.");
+ + ", partition " + partition + " after " + maxTries +
" retries.");
Review Comment:
nit: avoid unnecessary reformatting
(might be some auto-formatting setting in your IDE -- I would recommend to
disable auto formatting)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]