yashmayya commented on code in PR #14762:
URL: https://github.com/apache/kafka/pull/14762#discussion_r1397042412


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java:
##########
@@ -163,6 +163,12 @@ public void initialize(TaskConfig taskConfig) {
         }
     }
 
+    @Override
+    public void cancel() {
+        super.cancel();
+        Utils.closeQuietly(consumer, "consumer");

Review Comment:
   ```suggestion
           Utils.closeQuietly(consumer, "consumer for sink task: " + id);
   ```
   nit



##########
connect/runtime/src/test/java/org/apache/kafka/connect/integration/OffsetsApiIntegrationTest.java:
##########
@@ -767,11 +775,7 @@ public void testResetSinkConnectorOffsetsZombieSinkTasks() 
throws Exception {
             connect.kafka().produce(TOPIC, 0, "key", "value");
         }
 
-        // Configure a sink connector whose sink task blocks in its stop method
-        Map<String, String> connectorConfigs = new HashMap<>();
-        connectorConfigs.put(CONNECTOR_CLASS_CONFIG, 
BlockingConnectorTest.BlockingSinkConnector.class.getName());
-        connectorConfigs.put(TOPICS_CONFIG, TOPIC);
-        connectorConfigs.put("block", "Task::stop");
+        Map<String, String> connectorConfigs = baseSinkConnectorConfigs();

Review Comment:
   Same as above, `baseSinkConnectorConfigs()` can be used directly in the call 
to `configureConnector`



##########
connect/runtime/src/test/java/org/apache/kafka/connect/integration/OffsetsApiIntegrationTest.java:
##########
@@ -408,27 +412,30 @@ public void 
testAlterSinkConnectorOffsetsZombieSinkTasks() throws Exception {
             connect.kafka().produce(TOPIC, 0, "key", "value");
         }
 
-        // Configure a sink connector whose sink task blocks in its stop method
-        Map<String, String> connectorConfigs = new HashMap<>();
-        connectorConfigs.put(CONNECTOR_CLASS_CONFIG, 
BlockingConnectorTest.BlockingSinkConnector.class.getName());
-        connectorConfigs.put(TOPICS_CONFIG, TOPIC);
-        connectorConfigs.put("block", "Task::stop");
+        Map<String, String> connectorConfigs = baseSinkConnectorConfigs();

Review Comment:
   This can be used directly now (i.e. without defining a separate local 
variable)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to