wicknicks commented on a change in pull request #8829:
URL: https://github.com/apache/kafka/pull/8829#discussion_r438452151



##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
##########
@@ -556,6 +556,9 @@ private void deliverMessages() {
             log.trace("{} Delivering batch of {} messages to task", this, 
messageBatch.size());
             long start = time.milliseconds();
             task.put(new ArrayList<>(messageBatch));
+            if (workerErrantRecordReporter != null && 
workerErrantRecordReporter.mustThrowException()) {
+                throw workerErrantRecordReporter.getExceptionToThrow();
+            }

Review comment:
       instead, you can just check: 
   
   ```
           if (retryWithToleranceOperator.failed()) {
               throw retryWithToleranceOperator.error();
           }
   ```
   
   because we are already storing the error in the processing context. you can 
expose that through the operator. 

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/WorkerErrantRecordReporter.java
##########
@@ -99,8 +102,15 @@ public WorkerErrantRecordReporter(
                 valLength, key, value, headers);
         }
 
-        Future<Void> future = 
retryWithToleranceOperator.executeFailed(Stage.TASK_PUT,
-            SinkTask.class, consumerRecord, error);
+        Future<Void> future;
+        try {
+            future = retryWithToleranceOperator.executeFailed(Stage.TASK_PUT,
+                SinkTask.class, consumerRecord, error);
+        } catch (ConnectException e) {
+            mustThrowException = true;
+            exceptionToThrow = e;
+            throw e;
+        }

Review comment:
       we don't need these vars, the errors are already stored in the 
ProcessingContext. look at comment above.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to