rhauch commented on a change in pull request #8829: URL: https://github.com/apache/kafka/pull/8829#discussion_r436411235
########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java ########## @@ -87,6 +87,12 @@ public RetryWithToleranceOperator(long errorRetryTimeout, long errorMaxDelayInMi public Future<Void> executeFailed(Stage stage, Class<?> executingClass, ConsumerRecord<byte[], byte[]> consumerRecord, Throwable error) { + if (!withinToleranceLimits()) { + errorHandlingMetrics.recordFailure(); + markAsFailed(); + throw new ConnectException("Tolerance exceeded in the errant record reporter", error); + } + Review comment: This marks the failure when we get a new error but the previous error already put us over the limit. IOW, if this is the first error that is reported, then `totalFailures` will be 0 when this method is called and thus the `withinToleranceLimits()` will return `true` (i.e., we haven't recorded any errors yet) and we will *not* enter the if block due to the negation. Seems like we should actually do this check *after* we record the error. That would be something like: ``` markAsFailed(); // This is what increments the `totalFailures`, and we were missing this as well context.consumerRecord(consumerRecord); context.currentContext(stage, executingClass); context.error(error); errorHandlingMetrics.recordError(); if (!withinToleranceLimits()) { errorHandlingMetrics.recordFailure(); throw new ConnectException("Tolerance exceeded in error handler", error); } return context.report(); ``` IIUC, then when we get to the if-block on the first error being reported, the `markAsFailed()` method will increment the `totalFailures`, and if `errors.tolerance=NONE` is used we will fail on the first error. Also, it'd be great to have unit tests that verify this behavior. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org