rhauch commented on a change in pull request #8829:
URL: https://github.com/apache/kafka/pull/8829#discussion_r436411235



##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java
##########
@@ -87,6 +87,12 @@ public RetryWithToleranceOperator(long errorRetryTimeout, 
long errorMaxDelayInMi
     public Future<Void> executeFailed(Stage stage, Class<?> executingClass,
                                       ConsumerRecord<byte[], byte[]> 
consumerRecord,
                                       Throwable error) {
+        if (!withinToleranceLimits()) {
+            errorHandlingMetrics.recordFailure();
+            markAsFailed();
+            throw new ConnectException("Tolerance exceeded in the errant 
record reporter", error);
+        }
+

Review comment:
       This marks the failure when we get a new error but the previous error 
already put us over the limit. IOW, if this is the first error that is 
reported, then `totalFailures` will be 0 when this method is called and thus 
the `withinToleranceLimits()` will return `true` (i.e., we haven't recorded any 
errors yet) and we will *not* enter the if block due to the negation.
   
   Seems like we should actually do this check *after* we record the error. 
That would be something like:
   ```
           markAsFailed();     // This is what increments the `totalFailures`, 
and we were missing this as well
           context.consumerRecord(consumerRecord);
           context.currentContext(stage, executingClass);
           context.error(error);
           errorHandlingMetrics.recordError();
           if (!withinToleranceLimits()) {
               errorHandlingMetrics.recordFailure();
               throw new ConnectException("Tolerance exceeded in error 
handler", error);
           }
           return context.report();
   ```
   
   IIUC, then when we get to the if-block on the first error being reported, 
the `markAsFailed()` method will have incremented the `totalFailures` (we were 
not doing that in this method before this PR), and if `errors.tolerance=NONE` 
is used we will fail on the *first* error -- which is what we want.
   
   Also, it'd be great to have unit tests that verify this behavior.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to