C0urante commented on a change in pull request #10528: URL: https://github.com/apache/kafka/pull/10528#discussion_r614208924
########## File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java ########## @@ -662,20 +666,94 @@ public void testSendRecordsProducerCallbackFail() throws Exception { expectSendRecordProducerCallbackFail(); + EasyMock.expect(offsetWriter.willFlush()).andReturn(true); + PowerMock.replayAll(); Whitebox.setInternalState(workerTask, "toSend", Arrays.asList(record1, record2)); assertThrows(ConnectException.class, () -> Whitebox.invokeMethod(workerTask, "sendRecords")); + assertFalse(workerTask.shouldCommitOffsets()); } @Test - public void testSendRecordsProducerSendFailsImmediately() { - if (!enableTopicCreation) - // should only test with topic creation enabled - return; + public void testSendRecordsProducerCallbackFailInBacklog() throws Exception { + /* + 1. A record is sent successfully + 2. Flush for offset commit begins + 3. Another record is dispatched to the producer and, because of the active offset commit, added to the backlog + 4. The producer fails to send that record and notifies the worker via producer callback + 5. The first offset commit succeeds as the first record has been sent successfully + 6. No further offset commits are attempted as the only remaining record has failed with a non-retriable error + */ + createWorkerTask(); Review comment: Whoops! Thanks for catching this. ########## File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java ########## @@ -662,20 +666,94 @@ public void testSendRecordsProducerCallbackFail() throws Exception { expectSendRecordProducerCallbackFail(); + EasyMock.expect(offsetWriter.willFlush()).andReturn(true); + PowerMock.replayAll(); Whitebox.setInternalState(workerTask, "toSend", Arrays.asList(record1, record2)); assertThrows(ConnectException.class, () -> Whitebox.invokeMethod(workerTask, "sendRecords")); + assertFalse(workerTask.shouldCommitOffsets()); } @Test - public void testSendRecordsProducerSendFailsImmediately() { - if (!enableTopicCreation) - // should only test with topic creation enabled - return; + public void testSendRecordsProducerCallbackFailInBacklog() throws Exception { + /* + 1. A record is sent successfully + 2. Flush for offset commit begins + 3. Another record is dispatched to the producer and, because of the active offset commit, added to the backlog + 4. The producer fails to send that record and notifies the worker via producer callback + 5. The first offset commit succeeds as the first record has been sent successfully + 6. No further offset commits are attempted as the only remaining record has failed with a non-retriable error + */ + createWorkerTask(); createWorkerTask(); + SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, "topic", 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD); + SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, "topic", 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD); + + expectTopicCreation(TOPIC); + + expectSendRecordOnce(false); + expectSendRecordProducerCallbackFail(); + + expectOffsetFlush(true); + EasyMock.expect(offsetWriter.willFlush()).andReturn(true).anyTimes(); + + PowerMock.replayAll(); + + Whitebox.setInternalState(workerTask, "toSend", Collections.singletonList(record1)); + Whitebox.invokeMethod(workerTask, "sendRecords"); + Whitebox.setInternalState(workerTask, "flushing", true); + Whitebox.setInternalState(workerTask, "toSend", Collections.singletonList(record2)); + Whitebox.invokeMethod(workerTask, "sendRecords"); + assertTrue(workerTask.shouldCommitOffsets()); + assertTrue(workerTask.commitOffsets()); + assertFalse(workerTask.shouldCommitOffsets()); + } + + + @Test + public void testSendRecordsProducerCallbackFailInBacklogWithNonRetriedOffsetCommit() throws Exception { + /* + 1. A record is sent successfully + 2. Flush for offset commit begins + 3. Another record is dispatched to the producer and, because of the active offset commit, added to the backlog + 4. The producer fails to send that record and notifies the worker via producer callback + 5. The first offset commit fails even though first record has been sent successfully, (possibly from failure to produce offsets to Kafka) + 6. No further offset commits are attempted as the new record batch contains the second record, which has failed with a non-retriable error + */ + createWorkerTask(); Review comment: Whoops! Thanks for catching this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org