aakashnshah commented on a change in pull request #8829:
URL: https://github.com/apache/kafka/pull/8829#discussion_r436437989



##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java
##########
@@ -87,6 +87,12 @@ public RetryWithToleranceOperator(long errorRetryTimeout, 
long errorMaxDelayInMi
     public Future<Void> executeFailed(Stage stage, Class<?> executingClass,
                                       ConsumerRecord<byte[], byte[]> 
consumerRecord,
                                       Throwable error) {
+        if (!withinToleranceLimits()) {
+            errorHandlingMetrics.recordFailure();
+            markAsFailed();
+            throw new ConnectException("Tolerance exceeded in the errant 
record reporter", error);
+        }
+

Review comment:
       Thanks @rhauch for the comments, I agree with what you're saying. I 
think we should call both `errorHandlingMetrics.recordError()` and 
`errorHandlingMetrics.recordFailure()`. It seems like `recordError()` and 
`recordFailure()` represent basically the same thing (failed operations) and 
they're both called at some point when `execute(...)` is called by the 
`RetryWithToleranceOperator`, albeit at different times.
   
   Additionally, it looks as if in other circumstances, the error is reported 
to the various reporters even if the error tolerance has been exceeded, so I'll 
adjust the order of operations accordingly.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to