[
https://issues.apache.org/jira/browse/KAFKA-6566?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Matthias J. Sax reassigned KAFKA-6566:
--------------------------------------
Assignee: Matthias J. Sax (was: Robert Yokota)
> SourceTask#stop() not called after exception raised in poll()
> -------------------------------------------------------------
>
> Key: KAFKA-6566
> URL: https://issues.apache.org/jira/browse/KAFKA-6566
> Project: Kafka
> Issue Type: Bug
> Components: KafkaConnect
> Affects Versions: 1.0.0
> Reporter: Gunnar Morling
> Assignee: Matthias J. Sax
> Priority: Blocker
> Fix For: 0.10.2.2, 0.11.0.3, 1.0.2, 1.1.1, 2.0.0
>
>
> Having discussed this with [~rhauch], it has been my assumption that
> {{SourceTask#stop()}} will be called by the Kafka Connect framework in case
> an exception has been raised in {{poll()}}. That's not the case, though. As
> an example see the connector and task below.
> Calling {{stop()}} after an exception in {{poll()}} seems like a very useful
> action to take, as it'll allow the task to clean up any resources such as
> releasing any database connections, right after that failure and not only
> once the connector is stopped.
> {code}
> package com.example;
> import java.util.Collections;
> import java.util.List;
> import java.util.Map;
> import org.apache.kafka.common.config.ConfigDef;
> import org.apache.kafka.connect.connector.Task;
> import org.apache.kafka.connect.source.SourceConnector;
> import org.apache.kafka.connect.source.SourceRecord;
> import org.apache.kafka.connect.source.SourceTask;
> public class TestConnector extends SourceConnector {
> @Override
> public String version() {
> return null;
> }
> @Override
> public void start(Map<String, String> props) {
> }
> @Override
> public Class<? extends Task> taskClass() {
> return TestTask.class;
> }
> @Override
> public List<Map<String, String>> taskConfigs(int maxTasks) {
> return Collections.singletonList(Collections.singletonMap("foo",
> "bar"));
> }
> @Override
> public void stop() {
> }
> @Override
> public ConfigDef config() {
> return new ConfigDef();
> }
> public static class TestTask extends SourceTask {
> @Override
> public String version() {
> return null;
> }
> @Override
> public void start(Map<String, String> props) {
> }
> @Override
> public List<SourceRecord> poll() throws InterruptedException {
> throw new RuntimeException();
> }
> @Override
> public void stop() {
> System.out.println("stop() called");
> }
> }
> }
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)