gharris1727 commented on a change in pull request #10016:
URL: https://github.com/apache/kafka/pull/10016#discussion_r568190708



##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -202,6 +198,8 @@ public void removeMetrics() {
     public void cancel() {
         super.cancel();
         offsetReader.close();
+        // Run on a separate thread to avoid potentially blocking the herder 
thread
+        new Thread(() -> closeProducer(0)).start();

Review comment:
       nit: i don't want to have to guess what 0 means
   ```suggestion
           new Thread(() -> closeProducer(Duration.ofSeconds(0))).start();
   ```
   also, don't we have an executor for this sort of stuff...

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -171,13 +171,9 @@ protected void close() {
                 log.warn("Could not stop task", t);
             }
         }
-        if (producer != null) {
-            try {
-                producer.close(Duration.ofSeconds(30));
-            } catch (Throwable t) {
-                log.warn("Could not close producer", t);
-            }
-        }
+
+        closeProducer(30);
+
         if (admin != null) {

Review comment:
       Should we also clean up the admin client in a cancel scenario? or is it 
already closed by this point? Maybe this can be outside of the scope of this 
ticket, but it seems like it might have a similar leak behavior.
   The transformation chain and retryWithToleranceOperator could also be 
leaking.
   
   And at that point, is duplicating these calls in `cancel()` better than 
asynchronously calling `close()` (or some refactored variant without the 
`task.stop()`)?

##########
File path: 
connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java
##########
@@ -288,14 +289,42 @@ public void testTaskStatuses() throws Exception {
                 decreasedNumTasks, "Connector task statuses did not update in 
time.");
     }
 
+    @Test
+    public void testSourceTaskOnNonExistentTopic() throws Exception {
+        connect = connectBuilder
+            .numWorkers(1)
+            .numBrokers(1)
+            .build();
+        connect.start();
+
+        connect.assertions().assertAtLeastNumWorkersAreUp(1, "Initial group of 
workers did not start in time.");
+
+        Map<String, String> props = 
defaultSourceConnectorProps("nonexistenttopic");
+        props.remove(DEFAULT_TOPIC_CREATION_PREFIX + 
REPLICATION_FACTOR_CONFIG);
+        props.remove(DEFAULT_TOPIC_CREATION_PREFIX + PARTITIONS_CONFIG);
+        props.put("throughput", "-1");
+
+        ConnectorHandle connector = 
RuntimeHandles.get().connectorHandle(CONNECTOR_NAME);
+        connector.expectedRecords(NUM_TASKS * MESSAGES_PER_POLL);
+        connect.configureConnector(CONNECTOR_NAME, props);
+        
connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(CONNECTOR_NAME,
+            NUM_TASKS, "Connector tasks did not start in time");
+        connector.awaitRecords(TimeUnit.MINUTES.toMillis(1));
+
+        StartAndStopLatch stopCounter = connector.expectedStops(1);

Review comment:
       Oh, the failure strings are throwing me for a loop. I thought this was 
encoding the infinite hanging as an intended behavior, which is ridiculous. 
   
   Is this test asserting that without topic auto-creation enabled, the task 
stops (fails?) in a timely manner? Could you add a comment to that effect? The 
name of the test doesn't tell me the expected behavior, only the setup 
conditions.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to