tombentley commented on a change in pull request #10528: URL: https://github.com/apache/kafka/pull/10528#discussion_r732678989
########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java ########## @@ -506,9 +530,10 @@ public boolean commitOffsets() { while (!outstandingMessages.isEmpty()) { try { long timeoutMs = timeout - time.milliseconds(); - // If the task has been cancelled, no more records will be sent from the producer; in that case, if any outstanding messages remain, - // we can stop flushing immediately - if (isCancelled() || timeoutMs <= 0) { + // If the producer has failed to send a record in the current batch with a non-retriable error, we'll never be able to clear the + // outstanding messages, so we can stop flushing immediately. Similarly, if the task has been cancelled, no more records will be + // sent from the producer; in that case, if any outstanding messages remain, we can also stop flushing immediately + if (currentBatchFailed || isCancelled() || timeoutMs <= 0) { log.error("{} Failed to flush, timed out while waiting for producer to flush outstanding {} messages", this, outstandingMessages.size()); Review comment: The "timed out" part of the error is a little misleading now. ########## File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java ########## @@ -1771,4 +1772,20 @@ private void expectTopicCreation(String topic) { EasyMock.expect(admin.createOrFindTopics(EasyMock.capture(newTopicCapture))).andReturn(createdTopic(topic)); } } + + private void assertShouldSkipCommit() { + assertFalse(workerTask.shouldCommitOffsets()); + + LogCaptureAppender.setClassLoggerToTrace(SourceTaskOffsetCommitter.class); + LogCaptureAppender.setClassLoggerToTrace(WorkerSourceTask.class); + try (LogCaptureAppender committerAppender = LogCaptureAppender.createAndRegister(SourceTaskOffsetCommitter.class)) { + try (LogCaptureAppender taskAppender = LogCaptureAppender.createAndRegister(WorkerSourceTask.class)) { Review comment: Any reason not to use multi-resource try-with-resources? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org