C0urante commented on a change in pull request #8618:
URL: https://github.com/apache/kafka/pull/8618#discussion_r420248205



##########
File path: 
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
##########
@@ -856,6 +857,49 @@ public void run() {
         PowerMock.verifyAll();
     }
 
+    @Test
+    public void testSinkTasksHandleCloseErrors() throws Exception {
+        createTask(initialState);
+        expectInitializeTask();
+        expectTaskGetTopic(true);
+
+        // Put one message through the task to get some offsets to commit
+        expectConsumerPoll(1);
+        expectConversionAndTransformation(1);
+        sinkTask.put(EasyMock.anyObject());
+        PowerMock.expectLastCall().andVoid();
+
+        // Throw an exception on the next put to trigger shutdown behavior
+        // This exception is the true "cause" of the failure
+        expectConsumerPoll(1);
+        expectConversionAndTransformation(1);
+        Throwable a = new RuntimeException();

Review comment:
       If we're going to refer to this later while making assertions, a more 
descriptive name might help readability. Something like `putFailure` here and 
`closeFailure` below, maybe?

##########
File path: 
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
##########
@@ -856,6 +857,49 @@ public void run() {
         PowerMock.verifyAll();
     }
 
+    @Test
+    public void testSinkTasksHandleCloseErrors() throws Exception {
+        createTask(initialState);
+        expectInitializeTask();
+        expectTaskGetTopic(true);
+
+        // Put one message through the task to get some offsets to commit
+        expectConsumerPoll(1);
+        expectConversionAndTransformation(1);
+        sinkTask.put(EasyMock.anyObject());
+        PowerMock.expectLastCall().andVoid();
+
+        // Throw an exception on the next put to trigger shutdown behavior
+        // This exception is the true "cause" of the failure
+        expectConsumerPoll(1);
+        expectConversionAndTransformation(1);
+        Throwable a = new RuntimeException();
+        sinkTask.put(EasyMock.anyObject());
+        PowerMock.expectLastCall().andThrow(a);
+
+        // Throw another exception while closing the task's assignment
+        EasyMock.expect(sinkTask.preCommit(EasyMock.anyObject()))
+            .andStubReturn(Collections.emptyMap());
+        Throwable b = new RuntimeException();
+        sinkTask.close(EasyMock.anyObject());
+        PowerMock.expectLastCall().andThrow(b);
+
+        PowerMock.replayAll();
+
+        workerTask.initialize(TASK_CONFIG);
+        try {
+            workerTask.execute();
+            fail();
+        } catch (Throwable t) {
+            PowerMock.verifyAll();
+            // The exception from close should not shadow the exception from 
put.

Review comment:
       Might be better to use this text as the message for the assertions 
instead of putting it here as a comment?

##########
File path: 
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
##########
@@ -856,6 +857,49 @@ public void run() {
         PowerMock.verifyAll();
     }
 
+    @Test
+    public void testSinkTasksHandleCloseErrors() throws Exception {
+        createTask(initialState);
+        expectInitializeTask();
+        expectTaskGetTopic(true);
+
+        // Put one message through the task to get some offsets to commit
+        expectConsumerPoll(1);
+        expectConversionAndTransformation(1);
+        sinkTask.put(EasyMock.anyObject());
+        PowerMock.expectLastCall().andVoid();
+
+        // Throw an exception on the next put to trigger shutdown behavior
+        // This exception is the true "cause" of the failure
+        expectConsumerPoll(1);
+        expectConversionAndTransformation(1);
+        Throwable a = new RuntimeException();
+        sinkTask.put(EasyMock.anyObject());
+        PowerMock.expectLastCall().andThrow(a);
+
+        // Throw another exception while closing the task's assignment
+        EasyMock.expect(sinkTask.preCommit(EasyMock.anyObject()))
+            .andStubReturn(Collections.emptyMap());
+        Throwable b = new RuntimeException();
+        sinkTask.close(EasyMock.anyObject());
+        PowerMock.expectLastCall().andThrow(b);
+
+        PowerMock.replayAll();
+
+        workerTask.initialize(TASK_CONFIG);
+        try {
+            workerTask.execute();
+            fail();
+        } catch (Throwable t) {

Review comment:
       The call to `fail()` above is going to get caught here, isn't it? Think 
that might make this difficult to debug if a future change causes this to fail 
for some reason.
   
   Based on the `deliverMessages` method where `SinkTask::put` is invoked, it 
looks like we might be able to narrow this down to a `ConnectException`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to