kpatelatwork commented on a change in pull request #10528: URL: https://github.com/apache/kafka/pull/10528#discussion_r614161831
########## File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java ########## @@ -662,20 +666,94 @@ public void testSendRecordsProducerCallbackFail() throws Exception { expectSendRecordProducerCallbackFail(); + EasyMock.expect(offsetWriter.willFlush()).andReturn(true); + PowerMock.replayAll(); Whitebox.setInternalState(workerTask, "toSend", Arrays.asList(record1, record2)); assertThrows(ConnectException.class, () -> Whitebox.invokeMethod(workerTask, "sendRecords")); + assertFalse(workerTask.shouldCommitOffsets()); } @Test - public void testSendRecordsProducerSendFailsImmediately() { - if (!enableTopicCreation) - // should only test with topic creation enabled - return; + public void testSendRecordsProducerCallbackFailInBacklog() throws Exception { + /* + 1. A record is sent successfully + 2. Flush for offset commit begins + 3. Another record is dispatched to the producer and, because of the active offset commit, added to the backlog + 4. The producer fails to send that record and notifies the worker via producer callback + 5. The first offset commit succeeds as the first record has been sent successfully + 6. No further offset commits are attempted as the only remaining record has failed with a non-retriable error + */ + createWorkerTask(); Review comment: two calls to createWorkerTask , is this a copy paste or I am missing something :)? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org