C0urante commented on a change in pull request #10528:
URL: https://github.com/apache/kafka/pull/10528#discussion_r614208924



##########
File path: 
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
##########
@@ -662,20 +666,94 @@ public void testSendRecordsProducerCallbackFail() throws 
Exception {
 
         expectSendRecordProducerCallbackFail();
 
+        EasyMock.expect(offsetWriter.willFlush()).andReturn(true);
+
         PowerMock.replayAll();
 
         Whitebox.setInternalState(workerTask, "toSend", Arrays.asList(record1, 
record2));
         assertThrows(ConnectException.class, () -> 
Whitebox.invokeMethod(workerTask, "sendRecords"));
+        assertFalse(workerTask.shouldCommitOffsets());
     }
 
     @Test
-    public void testSendRecordsProducerSendFailsImmediately() {
-        if (!enableTopicCreation)
-            // should only test with topic creation enabled
-            return;
+    public void testSendRecordsProducerCallbackFailInBacklog() throws 
Exception {
+        /*
+            1. A record is sent successfully
+            2. Flush for offset commit begins
+            3. Another record is dispatched to the producer and, because of 
the active offset commit, added to the backlog
+            4. The producer fails to send that record and notifies the worker 
via producer callback
+            5. The first offset commit succeeds as the first record has been 
sent successfully
+            6. No further offset commits are attempted as the only remaining 
record has failed with a non-retriable error
+         */
+        createWorkerTask();

Review comment:
       Whoops! Thanks for catching this.

##########
File path: 
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
##########
@@ -662,20 +666,94 @@ public void testSendRecordsProducerCallbackFail() throws 
Exception {
 
         expectSendRecordProducerCallbackFail();
 
+        EasyMock.expect(offsetWriter.willFlush()).andReturn(true);
+
         PowerMock.replayAll();
 
         Whitebox.setInternalState(workerTask, "toSend", Arrays.asList(record1, 
record2));
         assertThrows(ConnectException.class, () -> 
Whitebox.invokeMethod(workerTask, "sendRecords"));
+        assertFalse(workerTask.shouldCommitOffsets());
     }
 
     @Test
-    public void testSendRecordsProducerSendFailsImmediately() {
-        if (!enableTopicCreation)
-            // should only test with topic creation enabled
-            return;
+    public void testSendRecordsProducerCallbackFailInBacklog() throws 
Exception {
+        /*
+            1. A record is sent successfully
+            2. Flush for offset commit begins
+            3. Another record is dispatched to the producer and, because of 
the active offset commit, added to the backlog
+            4. The producer fails to send that record and notifies the worker 
via producer callback
+            5. The first offset commit succeeds as the first record has been 
sent successfully
+            6. No further offset commits are attempted as the only remaining 
record has failed with a non-retriable error
+         */
+        createWorkerTask();
 
         createWorkerTask();
 
+        SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, "topic", 1, 
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+        SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, "topic", 2, 
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+
+        expectTopicCreation(TOPIC);
+
+        expectSendRecordOnce(false);
+        expectSendRecordProducerCallbackFail();
+
+        expectOffsetFlush(true);
+        EasyMock.expect(offsetWriter.willFlush()).andReturn(true).anyTimes();
+
+        PowerMock.replayAll();
+
+        Whitebox.setInternalState(workerTask, "toSend", 
Collections.singletonList(record1));
+        Whitebox.invokeMethod(workerTask, "sendRecords");
+        Whitebox.setInternalState(workerTask, "flushing", true);
+        Whitebox.setInternalState(workerTask, "toSend", 
Collections.singletonList(record2));
+        Whitebox.invokeMethod(workerTask, "sendRecords");
+        assertTrue(workerTask.shouldCommitOffsets());
+        assertTrue(workerTask.commitOffsets());
+        assertFalse(workerTask.shouldCommitOffsets());
+    }
+
+
+    @Test
+    public void 
testSendRecordsProducerCallbackFailInBacklogWithNonRetriedOffsetCommit() throws 
Exception {
+        /*
+            1. A record is sent successfully
+            2. Flush for offset commit begins
+            3. Another record is dispatched to the producer and, because of 
the active offset commit, added to the backlog
+            4. The producer fails to send that record and notifies the worker 
via producer callback
+            5. The first offset commit fails even though first record has been 
sent successfully, (possibly from failure to produce offsets to Kafka)
+            6. No further offset commits are attempted as the new record batch 
contains the second record, which has failed with a non-retriable error
+         */
+        createWorkerTask();

Review comment:
       Whoops! Thanks for catching this.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to