kpatelatwork commented on a change in pull request #10528:
URL: https://github.com/apache/kafka/pull/10528#discussion_r614161831



##########
File path: 
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
##########
@@ -662,20 +666,94 @@ public void testSendRecordsProducerCallbackFail() throws 
Exception {
 
         expectSendRecordProducerCallbackFail();
 
+        EasyMock.expect(offsetWriter.willFlush()).andReturn(true);
+
         PowerMock.replayAll();
 
         Whitebox.setInternalState(workerTask, "toSend", Arrays.asList(record1, 
record2));
         assertThrows(ConnectException.class, () -> 
Whitebox.invokeMethod(workerTask, "sendRecords"));
+        assertFalse(workerTask.shouldCommitOffsets());
     }
 
     @Test
-    public void testSendRecordsProducerSendFailsImmediately() {
-        if (!enableTopicCreation)
-            // should only test with topic creation enabled
-            return;
+    public void testSendRecordsProducerCallbackFailInBacklog() throws 
Exception {
+        /*
+            1. A record is sent successfully
+            2. Flush for offset commit begins
+            3. Another record is dispatched to the producer and, because of 
the active offset commit, added to the backlog
+            4. The producer fails to send that record and notifies the worker 
via producer callback
+            5. The first offset commit succeeds as the first record has been 
sent successfully
+            6. No further offset commits are attempted as the only remaining 
record has failed with a non-retriable error
+         */
+        createWorkerTask();

Review comment:
       two calls to createWorkerTask , is this a  copy paste or I am missing 
something :)?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to