C0urante commented on a change in pull request #10528: URL: https://github.com/apache/kafka/pull/10528#discussion_r631842691
########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java ########## @@ -482,6 +486,22 @@ private synchronized void recordSent(final ProducerRecord<byte[], byte[]> record } } + private synchronized void recordSendFailed(ProducerRecord<byte[], byte[]> record) { + if (outstandingMessages.containsKey(record)) { + currentBatchFailed = true; + if (flushing) { + // flush thread may be waiting on the outstanding messages to clear + this.notifyAll(); Review comment: > Is this intentional because we've failed and can't clear the outstanding messages? Yep, exactly. We only need to `notifyAll` here in order to potentially wake up an offset commit thread that's parked [here](https://github.com/C0urante/kafka/blob/01797a5004838d970db21a54bd99a29c7c63f5d4/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java#L538) waiting for all outstanding messages to be flushed. If the failed message isn't a part of that batch, or we're not in the middle of flushing at all, no wakeup is necessary. ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java ########## @@ -482,6 +486,22 @@ private synchronized void recordSent(final ProducerRecord<byte[], byte[]> record } } + private synchronized void recordSendFailed(ProducerRecord<byte[], byte[]> record) { + if (outstandingMessages.containsKey(record)) { + currentBatchFailed = true; + if (flushing) { + // flush thread may be waiting on the outstanding messages to clear + this.notifyAll(); Review comment: > Is this intentional because we've failed and can't clear the outstanding messages? Yep, exactly. We only need to `notifyAll` here in order to potentially wake up an offset commit thread that's parked [here](https://github.com/C0urante/kafka/blob/01797a5004838d970db21a54bd99a29c7c63f5d4/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java#L538) waiting for all outstanding messages to be flushed. If the failed message isn't a part of that batch, or we're not in the middle of flushing at all, no wakeup is necessary. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org