Gunnar Morling created KAFKA-6566: ------------------------------------- Summary: SourceTask#stop() not called after exception raised in poll() Key: KAFKA-6566 URL: https://issues.apache.org/jira/browse/KAFKA-6566 Project: Kafka Issue Type: Bug Reporter: Gunnar Morling
Having discussed this with [~rhauch], it has been my assumption that {{SourceTask#stop()}} will be called by the Kafka Connect framework in case an exception has been raised in {{poll()}}. That's not the case, though. As an example see the connector and task below. Calling {{stop()}} after an exception in {{poll()}} seems like a very useful action to take, as it'll allow the task to clean up any resources such as releasing any database connections, right after that failure and not only once the connector is stopped. {code} package com.example; import java.util.Collections; import java.util.List; import java.util.Map; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.connect.connector.Task; import org.apache.kafka.connect.source.SourceConnector; import org.apache.kafka.connect.source.SourceRecord; import org.apache.kafka.connect.source.SourceTask; public class TestConnector extends SourceConnector { @Override public String version() { return null; } @Override public void start(Map<String, String> props) { } @Override public Class<? extends Task> taskClass() { return TestTask.class; } @Override public List<Map<String, String>> taskConfigs(int maxTasks) { return Collections.singletonList(Collections.singletonMap("foo", "bar")); } @Override public void stop() { } @Override public ConfigDef config() { return new ConfigDef(); } public static class TestTask extends SourceTask { @Override public String version() { return null; } @Override public void start(Map<String, String> props) { } @Override public List<SourceRecord> poll() throws InterruptedException { throw new RuntimeException(); } @Override public void stop() { System.out.println("stop() called"); } } } {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)