Gunnar Morling created KAFKA-6566:
-------------------------------------

             Summary: SourceTask#stop() not called after exception raised in 
poll()
                 Key: KAFKA-6566
                 URL: https://issues.apache.org/jira/browse/KAFKA-6566
             Project: Kafka
          Issue Type: Bug
            Reporter: Gunnar Morling


Having discussed this with [~rhauch], it has been my assumption that 
{{SourceTask#stop()}} will be called by the Kafka Connect framework in case an 
exception has been raised in {{poll()}}. That's not the case, though. As an 
example see the connector and task below.

Calling {{stop()}} after an exception in {{poll()}} seems like a very useful 
action to take, as it'll allow the task to clean up any resources such as 
releasing any database connections, right after that failure and not only once 
the connector is stopped.

{code}
package com.example;

import java.util.Collections;
import java.util.List;
import java.util.Map;

import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.source.SourceConnector;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;

public class TestConnector extends SourceConnector {

    @Override
    public String version() {
        return null;
    }

    @Override
    public void start(Map<String, String> props) {
    }

    @Override
    public Class<? extends Task> taskClass() {
        return TestTask.class;
    }

    @Override
    public List<Map<String, String>> taskConfigs(int maxTasks) {
        return Collections.singletonList(Collections.singletonMap("foo", 
"bar"));
    }

    @Override
    public void stop() {
    }

    @Override
    public ConfigDef config() {
        return new ConfigDef();
    }

    public static class TestTask extends SourceTask {

        @Override
        public String version() {
            return null;
        }

        @Override
        public void start(Map<String, String> props) {
        }

        @Override
        public List<SourceRecord> poll() throws InterruptedException {
            throw new RuntimeException();
        }

        @Override
        public void stop() {
            System.out.println("stop() called");
        }
    }
}

{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to