wicknicks commented on a change in pull request #8829: URL: https://github.com/apache/kafka/pull/8829#discussion_r438452151
########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java ########## @@ -556,6 +556,9 @@ private void deliverMessages() { log.trace("{} Delivering batch of {} messages to task", this, messageBatch.size()); long start = time.milliseconds(); task.put(new ArrayList<>(messageBatch)); + if (workerErrantRecordReporter != null && workerErrantRecordReporter.mustThrowException()) { + throw workerErrantRecordReporter.getExceptionToThrow(); + } Review comment: instead, you can just check: ``` if (retryWithToleranceOperator.failed()) { throw retryWithToleranceOperator.error(); } ``` because we are already storing the error in the processing context. you can expose that through the operator. ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/WorkerErrantRecordReporter.java ########## @@ -99,8 +102,15 @@ public WorkerErrantRecordReporter( valLength, key, value, headers); } - Future<Void> future = retryWithToleranceOperator.executeFailed(Stage.TASK_PUT, - SinkTask.class, consumerRecord, error); + Future<Void> future; + try { + future = retryWithToleranceOperator.executeFailed(Stage.TASK_PUT, + SinkTask.class, consumerRecord, error); + } catch (ConnectException e) { + mustThrowException = true; + exceptionToThrow = e; + throw e; + } Review comment: we don't need these vars, the errors are already stored in the ProcessingContext. look at comment above. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org