gharris1727 commented on code in PR #15154:
URL: https://github.com/apache/kafka/pull/15154#discussion_r1449347346


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java:
##########
@@ -236,7 +237,7 @@ protected <V> V execAndHandleError(Operation<V> operation, 
Class<? extends Excep
     }
 
     // Visible for testing
-    void markAsFailed() {
+    synchronized void markAsFailed() {
         errorHandlingMetrics.recordErrorTimestamp();
         totalFailures++;
     }

Review Comment:
   I think in your scenario, that behavior is satisfactory.
   
   1. If withinToleranceLimits is false, then we're using errors.tolerance=none 
and any single error is enough to exceed the limit
   2. The main work thread (in RetryWithToleranceOperator::execute) already 
called markAsFailed, incrementing the counter
   3. Regardless of which error occurred first, one of them has to be "seen" by 
the main thread first, and either one will cause the task to fail-safe.
   
   I see what you mean though. If someone were to call withinToleranceLimits() 
without previously calling markAsFailed(), they would have no idea what error 
to blame for the failure. That was why I needed the 
WorkerErrantRecordReporter#taskPutException. It's a bit of a sharp edge, but 
I'm not sure what to do about it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to