gharris1727 commented on code in PR #18146:
URL: https://github.com/apache/kafka/pull/18146#discussion_r1907854665
##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTestUtils.java:
##########
@@ -155,4 +165,29 @@ public static void assertAssignment(boolean expectFailed,
assertEquals(expectedDelay, assignment.delay(),
"Wrong rebalance delay in " + assignment);
}
+
+ public static TransformationChain
getTransformationChain(RetryWithToleranceOperator toleranceOperator,
List<Object> results) {
Review Comment:
Could you add generic arguments to TransformationChain and
RetryWithToleranceOperator in this method and callers?
##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java:
##########
@@ -215,7 +215,12 @@ protected <V> V execAndRetry(ProcessingContext<T> context,
Operation<V> operatio
} else {
log.trace("Can't retry. start={}, attempt={},
deadline={}", startTime, attempt, deadline);
context.error(e);
- return null;
+ markAsFailed();
+ if (withinToleranceLimits()) {
+ return null;
+ } else {
+ throw new ConnectException("Exceeded deadline &
tolerance for retriable exception", e);
+ }
}
if (stopping) {
log.trace("Shutdown has been scheduled. Marking operation
as failed.");
Review Comment:
This data loss scenario is making me think about this `stopping` flag, and
whether it could cause data loss. Running out of retries and stopping retries
due to a shutdown should probably behave similarly, and probably shouldn't skip
the record.
But stopping is only set by `#triggerStop`/`WorkerTask#cancel`, which is a
hard-shutdown operation after no further data is expected from the task, and no
offsets are being committed.
I think we can probably leave this in-place, and if we ever change task
shutdown we can address it then.
##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java:
##########
@@ -215,7 +215,12 @@ protected <V> V execAndRetry(ProcessingContext<T> context,
Operation<V> operatio
} else {
log.trace("Can't retry. start={}, attempt={},
deadline={}", startTime, attempt, deadline);
context.error(e);
- return null;
+ markAsFailed();
+ if (withinToleranceLimits()) {
+ return null;
+ } else {
+ throw new ConnectException("Exceeded deadline &
tolerance for retriable exception", e);
+ }
Review Comment:
This duplicates (and double-wraps) the exception, once with "Tolerance
exceeded in error handler" and once with "Exceeded deadline & tolerance for
retriable exception".
I think we can reuse the existing handling for non-retriable exceptions in
`#execAndHandleError` by just rethrowing `e` right after the trace message.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]