C0urante commented on a change in pull request #10528:
URL: https://github.com/apache/kafka/pull/10528#discussion_r631842691



##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -482,6 +486,22 @@ private synchronized void recordSent(final 
ProducerRecord<byte[], byte[]> record
         }
     }
 
+    private synchronized void recordSendFailed(ProducerRecord<byte[], byte[]> 
record) {
+        if (outstandingMessages.containsKey(record)) {
+            currentBatchFailed = true;
+            if (flushing) {
+                // flush thread may be waiting on the outstanding messages to 
clear
+                this.notifyAll();

Review comment:
       > Is this intentional because we've failed and can't clear the 
outstanding messages?
   Yep, exactly. We only need to `notifyAll` here in order to potentially wake 
up an offset commit thread that's parked 
[here](https://github.com/C0urante/kafka/blob/01797a5004838d970db21a54bd99a29c7c63f5d4/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java#L538)
 waiting for all outstanding messages to be flushed. If the failed message 
isn't a part of that batch, or we're not in the middle of flushing at all, no 
wakeup is necessary.

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -482,6 +486,22 @@ private synchronized void recordSent(final 
ProducerRecord<byte[], byte[]> record
         }
     }
 
+    private synchronized void recordSendFailed(ProducerRecord<byte[], byte[]> 
record) {
+        if (outstandingMessages.containsKey(record)) {
+            currentBatchFailed = true;
+            if (flushing) {
+                // flush thread may be waiting on the outstanding messages to 
clear
+                this.notifyAll();

Review comment:
       > Is this intentional because we've failed and can't clear the 
outstanding messages?
   
   Yep, exactly. We only need to `notifyAll` here in order to potentially wake 
up an offset commit thread that's parked 
[here](https://github.com/C0urante/kafka/blob/01797a5004838d970db21a54bd99a29c7c63f5d4/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java#L538)
 waiting for all outstanding messages to be flushed. If the failed message 
isn't a part of that batch, or we're not in the middle of flushing at all, no 
wakeup is necessary.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to