gharris1727 commented on code in PR #18146:
URL: https://github.com/apache/kafka/pull/18146#discussion_r1907854665


##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTestUtils.java:
##########
@@ -155,4 +165,29 @@ public static void assertAssignment(boolean expectFailed,
         assertEquals(expectedDelay, assignment.delay(),
                 "Wrong rebalance delay in " + assignment);
     }
+
+    public static TransformationChain 
getTransformationChain(RetryWithToleranceOperator toleranceOperator, 
List<Object> results) {

Review Comment:
   Could you add generic arguments to TransformationChain and 
RetryWithToleranceOperator in this method and callers?



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java:
##########
@@ -215,7 +215,12 @@ protected <V> V execAndRetry(ProcessingContext<T> context, 
Operation<V> operatio
                 } else {
                     log.trace("Can't retry. start={}, attempt={}, 
deadline={}", startTime, attempt, deadline);
                     context.error(e);
-                    return null;
+                    markAsFailed();
+                    if (withinToleranceLimits()) {
+                        return null;
+                    } else {
+                        throw new ConnectException("Exceeded deadline & 
tolerance for retriable exception", e);
+                    }
                 }
                 if (stopping) {
                     log.trace("Shutdown has been scheduled. Marking operation 
as failed.");

Review Comment:
   This data loss scenario is making me think about this `stopping` flag, and 
whether it could cause data loss. Running out of retries and stopping retries 
due to a shutdown should probably behave similarly, and probably shouldn't skip 
the record.
   
   But stopping is only set by `#triggerStop`/`WorkerTask#cancel`, which is a 
hard-shutdown operation after no further data is expected from the task, and no 
offsets are being committed.
   
   I think we can probably leave this in-place, and if we ever change task 
shutdown we can address it then.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java:
##########
@@ -215,7 +215,12 @@ protected <V> V execAndRetry(ProcessingContext<T> context, 
Operation<V> operatio
                 } else {
                     log.trace("Can't retry. start={}, attempt={}, 
deadline={}", startTime, attempt, deadline);
                     context.error(e);
-                    return null;
+                    markAsFailed();
+                    if (withinToleranceLimits()) {
+                        return null;
+                    } else {
+                        throw new ConnectException("Exceeded deadline & 
tolerance for retriable exception", e);
+                    }

Review Comment:
   This duplicates (and double-wraps) the exception, once with "Tolerance 
exceeded in error handler" and once with "Exceeded deadline & tolerance for 
retriable exception".
   
   I think we can reuse the existing handling for non-retriable exceptions in 
`#execAndHandleError` by just rethrowing `e` right after the trace message.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to