twthorn commented on code in PR #18146:
URL: https://github.com/apache/kafka/pull/18146#discussion_r1909190199
##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java:
##########
@@ -215,7 +215,12 @@ protected <V> V execAndRetry(ProcessingContext<T> context,
Operation<V> operatio
} else {
log.trace("Can't retry. start={}, attempt={},
deadline={}", startTime, attempt, deadline);
context.error(e);
- return null;
+ markAsFailed();
+ if (withinToleranceLimits()) {
+ return null;
+ } else {
+ throw new ConnectException("Exceeded deadline &
tolerance for retriable exception", e);
+ }
}
if (stopping) {
log.trace("Shutdown has been scheduled. Marking operation
as failed.");
Review Comment:
Agreed. In both source/sinks the worker doesn't commit the offsets if it's
been cancelled. So data loss should not be possible because of that conditional
check. Will leave as is.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]