[ 
https://issues.apache.org/jira/browse/KAFKA-6963?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16664471#comment-16664471
 ] 

ASF GitHub Bot commented on KAFKA-6963:
---------------------------------------

rhysmccaig closed pull request #5438: KAFKA-6963: KIP-310: Add a Kafka Source 
Connector to Kafka Connect
URL: https://github.com/apache/kafka/pull/5438
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/build.gradle b/build.gradle
index 3e8558d4739..f883eb54658 100644
--- a/build.gradle
+++ b/build.gradle
@@ -516,7 +516,7 @@ for ( sv in availableScalaVersions ) {
   }
 }
 
-def connectPkgs = ['connect:api', 'connect:runtime', 'connect:transforms', 
'connect:json', 'connect:file', 'connect:basic-auth-extension']
+def connectPkgs = ['connect:api', 'connect:runtime', 'connect:transforms', 
'connect:json', 'connect:file', 'connect:basic-auth-extension', 'connect:kafka' 
]
 def pkgs = ['clients', 'examples', 'log4j-appender', 'tools', 'streams', 
'streams:streams-scala', 'streams:test-utils', 'streams:examples'] + connectPkgs
 
 /** Create one task per default Scala version */
@@ -742,6 +742,8 @@ project(':core') {
     from(project(':connect:file').configurations.runtime) { into("libs/") }
     from(project(':connect:basic-auth-extension').jar) { into("libs/") }
     from(project(':connect:basic-auth-extension').configurations.runtime) { 
into("libs/") }
+    from(project(':connect:kafka').jar) { into("libs/") }
+    from(project(':connect:kafka').configurations.runtime) { into("libs/") }
     from(project(':streams').jar) { into("libs/") }
     from(project(':streams').configurations.runtime) { into("libs/") }
     from(project(':streams:streams-scala').jar) { into("libs/") }
@@ -1501,6 +1503,44 @@ project(':connect:basic-auth-extension') {
   }
 }
 
+project(':connect:kafka') {
+  archivesBaseName = "connect-kafka"
+
+  dependencies {
+    compile project(':connect:api')
+    compile project(':clients')
+    compile libs.slf4jApi
+
+    testCompile libs.bcpkix
+    testCompile libs.easymock
+    testCompile libs.junit
+    testCompile libs.powermockJunit4
+    testCompile libs.powermockEasymock
+    testCompile project(':clients').sourceSets.test.output
+  }
+
+  javadoc {
+    enabled = false
+  }
+
+  tasks.create(name: "copyDependantLibs", type: Copy) {
+    from (configurations.testRuntime) {
+      include('slf4j-log4j12*')
+      include('log4j*jar')
+    }
+    from (configurations.runtime) {
+      exclude('kafka-clients*')
+      exclude('connect-*')
+    }
+    into "$buildDir/dependant-libs"
+    duplicatesStrategy 'exclude'
+  }
+
+  jar {
+    dependsOn copyDependantLibs
+  }
+}
+
 task aggregatedJavadoc(type: Javadoc) {
   def projectsWithJavadoc = subprojects.findAll { it.javadoc.enabled }
   source = projectsWithJavadoc.collect { it.sourceSets.main.allJava }
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 35f42e3a700..d23bc2bb8bc 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -370,6 +370,15 @@
       <allow pkg="org.powermock" />
     </subpackage>
 
+    <subpackage name="kafka">
+      <allow pkg="org.apache.kafka.connect" />
+      <allow pkg="org.apache.kafka.clients.consumer" />
+      <!-- for tests -->
+      <allow pkg="org.easymock" />
+      <allow pkg="org.powermock" />
+    </subpackage>
+
+
     <subpackage name="tools">
       <allow pkg="org.apache.kafka.connect" />
       <allow pkg="org.apache.kafka.tools" />
diff --git a/config/connect-kafka-source.properties 
b/config/connect-kafka-source.properties
new file mode 100644
index 00000000000..7564478e447
--- /dev/null
+++ b/config/connect-kafka-source.properties
@@ -0,0 +1,25 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+name=kafka-source
+tasks.max=1
+key.converter=org.apache.kafka.connect.converters.ByteArrayConverter
+value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
+connector.class=org.apache.kafka.connect.kafka.KafkaSourceConnector
+source.bootstrap.server=kafka.bootstrap.server1:9092,kafka.bootstrap.server2:9093
+source.topic.whitelist=test.topic.*
+source.auto.offset.reset=earliest
+source.group.id=kafka-connect-testing
+destination.topics.prefix=aggregate.
diff --git 
a/connect/kafka/src/main/java/org/apache/kafka/connect/kafka/KafkaSourceConnector.java
 
b/connect/kafka/src/main/java/org/apache/kafka/connect/kafka/KafkaSourceConnector.java
new file mode 100644
index 00000000000..4ef9dc18e7a
--- /dev/null
+++ 
b/connect/kafka/src/main/java/org/apache/kafka/connect/kafka/KafkaSourceConnector.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.kafka;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.utils.AppInfoParser;
+import org.apache.kafka.connect.connector.Task;
+import org.apache.kafka.connect.source.SourceConnector;
+import org.apache.kafka.connect.util.ConnectorUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.HashMap;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.stream.Collectors;
+
+
+/**
+ * KafkaConnector is a Kafka Connect Connector implementation that generates 
tasks
+ * to ingest messages from a source kafka cluster
+ */
+
+public class KafkaSourceConnector extends SourceConnector {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(KafkaSourceConnector.class);
+    private KafkaSourceConnectorConfig connectorConfig;
+
+    private PartitionMonitor partitionMonitor;
+
+    @Override
+    public String version() {
+        return AppInfoParser.getVersion();
+    }
+
+    @Override
+    public void start(Map<String, String> config) throws ConfigException {
+        LOG.info("Connector starting");
+        connectorConfig = new KafkaSourceConnectorConfig(config);
+        LOG.info("Starting Partition Monitor to monitor source kafka cluster 
partitions");
+        partitionMonitor = new PartitionMonitor(context, connectorConfig);
+        partitionMonitor.start();
+    }
+
+
+    @Override
+    public Class<? extends Task> taskClass() {
+        return KafkaSourceTask.class;
+    }
+
+
+    @Override
+    public List<Map<String, String>> taskConfigs(int maxTasks) {
+        List<String> leaderTopicPartitions = 
partitionMonitor.getCurrentLeaderTopicPartitions()
+            .stream()
+            .map(LeaderTopicPartition::toString)
+            .sorted() // Potential task performance/overhead improvement by 
roughly grouping tasks and leaders
+            .collect(Collectors.toList());
+        int taskCount = Math.min(maxTasks, leaderTopicPartitions.size());
+        if (taskCount < 1) {
+            LOG.warn("No tasks to start.");
+            return new ArrayList<>();
+        }
+        return ConnectorUtils.groupPartitions(leaderTopicPartitions, taskCount)
+            .stream()
+            .map(leaderTopicPartitionsGroup -> {
+                Map<String, String> taskConfig = new HashMap<>();
+                taskConfig.putAll(connectorConfig.allAsStrings());
+                
taskConfig.put(KafkaSourceConnectorConfig.TASK_LEADER_TOPIC_PARTITION_CONFIG, 
String.join(",", leaderTopicPartitionsGroup));
+                return taskConfig;
+            })
+            .collect(Collectors.toList());
+    }
+
+    @Override
+    public void stop() {
+        LOG.info("Connector received stop(). Cleaning Up.");
+        partitionMonitor.shutdown();
+        LOG.info("Connector stopped.");
+    }
+
+    @Override
+    public ConfigDef config() {
+        return KafkaSourceConnectorConfig.CONFIG;
+    }
+    
+}
diff --git 
a/connect/kafka/src/main/java/org/apache/kafka/connect/kafka/KafkaSourceConnectorConfig.java
 
b/connect/kafka/src/main/java/org/apache/kafka/connect/kafka/KafkaSourceConnectorConfig.java
new file mode 100644
index 00000000000..8baca3cf41d
--- /dev/null
+++ 
b/connect/kafka/src/main/java/org/apache/kafka/connect/kafka/KafkaSourceConnectorConfig.java
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.kafka;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigDef.Importance;
+import org.apache.kafka.common.config.ConfigDef.Type;
+import org.apache.kafka.common.config.ConfigDef.Validator;
+import org.apache.kafka.common.config.ConfigDef.ValidString;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+
+import java.util.List;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Properties;
+import java.util.regex.Pattern;
+import java.util.regex.PatternSyntaxException;
+
+public class KafkaSourceConnectorConfig extends AbstractConfig {
+
+    private static final Validator NON_EMPTY_LIST_VALIDATOR =  new 
ConfigDef.Validator() {
+        @Override
+        @SuppressWarnings("unchecked")
+        public void ensureValid(String name, Object value) {
+            if (((List<String>) value).isEmpty()) {
+                throw new ConfigException("At least one bootstrap server must 
be configured in " + name);
+            }
+        }
+    };
+
+    private static final Validator TOPIC_WHITELIST_REGEX_VALIDATOR =  new 
ConfigDef.Validator() {
+        @Override
+        public void ensureValid(String name, Object value) {
+            getTopicWhitelistPattern((String) value);
+        }
+    };
+
+    // Config Prefixes
+    public static final String SOURCE_PREFIX =              "source.";
+    public static final String DESTINATION_PREFIX =         "destination.";
+
+    // Any CONFIG beginning with this prefix will set the CONFIG parameters 
for the kafka consumer used in this connector
+    public static final String CONSUMER_PREFIX =            
"connector.consumer.";
+    // Any CONFIG beginning with this prefix will set the CONFIG parameters 
for the admin client used by the partition monitor
+    public static final String ADMIN_CLIENT_PREFIX =        "connector.admin.";
+
+    public static final String TASK_PREFIX =                "task.";
+
+    // Topic partition list we send to each task. Not user configurable.
+    public static final String TASK_LEADER_TOPIC_PARTITION_CONFIG = 
TASK_PREFIX.concat("leader.topic.partitions");
+
+    // General Connector CONFIG
+    // Topics
+    public static final String SOURCE_TOPIC_WHITELIST_CONFIG =         
SOURCE_PREFIX.concat("topic.whitelist");
+    public static final String SOURCE_TOPIC_WHITELIST_DOC =            
"Regular expressions indicating the topics to consume from the source cluster. 
" +
+            "Under the hood, the regex is compiled to a 
<code>java.util.regex.Pattern</code>. " +
+            "For convenience, comma (',') is interpreted as interpreted as the 
regex-choice symbol ('|').";
+    public static final Object SOURCE_TOPIC_WHITELIST_DEFAULT =         
ConfigDef.NO_DEFAULT_VALUE;
+    public static final String DESTINATION_TOPIC_PREFIX_CONFIG =        
DESTINATION_PREFIX.concat("topics.prefix");
+    public static final String DESTINATION_TOPIC_PREFIX_DOC =           
"Prefix to add to source topic names when delivering messages to destination 
server";
+    public static final String DESTINATION_TOPIC_PREFIX_DEFAULT =       "";
+
+    // Message headers
+    public static final String INCLUDE_MESSAGE_HEADERS_CONFIG =         
"include.message.headers";
+    public static final String INCLUDE_MESSAGE_HEADERS_DOC =            
"Indicates whether message headers from source records should be included in 
output";
+    public static final boolean INCLUDE_MESSAGE_HEADERS_DEFAULT =       true;
+
+    // Partition Monitor
+    public static final String TOPIC_LIST_TIMEOUT_MS_CONFIG =               
"topic.list.timeout.ms";
+    public static final String TOPIC_LIST_TIMEOUT_MS_DOC  =                 
"Amount of time the partition monitor thread should wait for kafka to return 
topic information before logging a timeout error.";
+    public static final int TOPIC_LIST_TIMEOUT_MS_DEFAULT =                 
60000;
+    public static final String TOPIC_LIST_POLL_INTERVAL_MS_CONFIG =         
"topic.list.poll.interval.ms";
+    public static final String TOPIC_LIST_POLL_INTERVAL_MS_DOC =            
"How long to wait before re-querying the source cluster for a change in the 
partitions to be consumed";
+    public static final int TOPIC_LIST_POLL_INTERVAL_MS_DEFAULT =           
300000;
+    public static final String RECONFIGURE_TASKS_ON_LEADER_CHANGE_CONFIG =  
"reconfigure.tasks.on.partition.leader.change";
+    public static final String RECONFIGURE_TASKS_ON_LEADER_CHANGE_DOC =     
"Indicates whether the partition monitor should request a task reconfiguration 
when partition leaders have changed";
+    public static final boolean RECONFIGURE_TASKS_ON_LEADER_CHANGE_DEFAULT = 
false;
+    // Internal Connector Timing
+    public static final String POLL_LOOP_TIMEOUT_MS_CONFIG =    
"poll.loop.timeout.ms";
+    public static final String POLL_LOOP_TIMEOUT_MS_DOC =       "Maximum 
amount of time to wait in each poll loop without data before cancelling the 
poll and returning control to the worker task";
+    public static final int POLL_LOOP_TIMEOUT_MS_DEFAULT =      1000;
+    public static final String MAX_SHUTDOWN_WAIT_MS_CONFIG =    
"max.shutdown.wait.ms";
+    public static final String MAX_SHUTDOWN_WAIT_MS_DOC =       "Maximum 
amount of time to wait before forcing the consumer to close";
+    public static final int MAX_SHUTDOWN_WAIT_MS_DEFAULT =      2000;
+
+    // General Source Kafka Config - Applies to Consumer and Admin Client if 
not overridden by CONSUMER_PREFIX or ADMIN_CLIENT_PREFIX
+    public static final String SOURCE_BOOTSTRAP_SERVERS_CONFIG =          
SOURCE_PREFIX.concat(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG);
+    public static final String SOURCE_BOOTSTRAP_SERVERS_DOC =             
"list of kafka brokers to use to bootstrap the source cluster";
+    public static final Object SOURCE_BOOTSTRAP_SERVERS_DEFAULT =         
ConfigDef.NO_DEFAULT_VALUE;
+
+    // These are the kafka consumer configs we override defaults for
+    // Note that *any* kafka consumer CONFIG can be set by adding the
+    // CONSUMER_PREFIX in front of the standard consumer CONFIG strings
+    public static final String CONSUMER_MAX_POLL_RECORDS_CONFIG =           
SOURCE_PREFIX.concat(ConsumerConfig.MAX_POLL_RECORDS_CONFIG);
+    public static final String CONSUMER_MAX_POLL_RECORDS_DOC =              
"Maximum number of records to return from each poll of the consumer";
+    public static final int CONSUMER_MAX_POLL_RECORDS_DEFAULT =             
500;
+    public static final String CONSUMER_AUTO_OFFSET_RESET_CONFIG =          
SOURCE_PREFIX.concat(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG);
+    public static final String CONSUMER_AUTO_OFFSET_RESET_DOC =             
"If there is no stored offset for a partition, where to reset from 
[earliest|latest].";
+    public static final String CONSUMER_AUTO_OFFSET_RESET_DEFAULT =         
"earliest";
+    public static final ValidString CONSUMER_AUTO_OFFSET_RESET_VALIDATOR =  
ConfigDef.ValidString.in("earliest", "latest");
+    public static final String CONSUMER_KEY_DESERIALIZER_CONFIG =           
SOURCE_PREFIX.concat(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG);
+    public static final String CONSUMER_KEY_DESERIALIZER_DOC =              
"Key deserializer to use for the kafka consumers connecting to the source 
cluster.";
+    public static final String CONSUMER_KEY_DESERIALIZER_DEFAULT =          
ByteArrayDeserializer.class.getName();
+    public static final String CONSUMER_VALUE_DESERIALIZER_CONFIG =         
SOURCE_PREFIX.concat(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
+    public static final String CONSUMER_VALUE_DESERIALIZER_DOC =            
"Value deserializer to use for the kafka consumers connecting to the source 
cluster.";
+    public static final String CONSUMER_VALUE_DESERIALIZER_DEFAULT =        
ByteArrayDeserializer.class.getName();
+    public static final String CONSUMER_ENABLE_AUTO_COMMIT_CONFIG =         
SOURCE_PREFIX.concat(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG);
+    public static final String CONSUMER_ENABLE_AUTO_COMMIT_DOC =            
"If true the consumer's offset will be periodically committed to the source 
cluster in the background. " +
+            "Note that these offsets are not used to resume the connector 
(They are stored in the Kafka Connect offset store), but may be useful in 
monitoring the current offset lag " +
+            "of this connector on the source cluster";
+    public static final Boolean CONSUMER_ENABLE_AUTO_COMMIT_DEFAULT =        
true;
+
+    public static final String CONSUMER_GROUP_ID_CONFIG =         
SOURCE_PREFIX.concat(ConsumerConfig.GROUP_ID_CONFIG);
+    public static final String CONSUMER_GROUP_ID_DOC =            "Source 
Kafka Consumer group id. This must be set if source.enable.auto.commit is set 
as a group id is required for offset tracking on the source cluster";
+    public static final Object CONSUMER_GROUP_ID_DEFAULT =        
ConfigDef.NO_DEFAULT_VALUE;
+
+
+    // Config definition
+    public static final ConfigDef CONFIG = new ConfigDef()
+        .define(SOURCE_TOPIC_WHITELIST_CONFIG, Type.STRING, 
SOURCE_TOPIC_WHITELIST_DEFAULT, TOPIC_WHITELIST_REGEX_VALIDATOR, 
Importance.HIGH, SOURCE_TOPIC_WHITELIST_DOC)
+        .define(DESTINATION_TOPIC_PREFIX_CONFIG, Type.STRING, 
DESTINATION_TOPIC_PREFIX_DEFAULT, Importance.MEDIUM, 
DESTINATION_TOPIC_PREFIX_DOC)
+        .define(INCLUDE_MESSAGE_HEADERS_CONFIG, Type.BOOLEAN, 
INCLUDE_MESSAGE_HEADERS_DEFAULT, Importance.MEDIUM, INCLUDE_MESSAGE_HEADERS_DOC)
+        .define(TOPIC_LIST_TIMEOUT_MS_CONFIG, Type.INT, 
TOPIC_LIST_TIMEOUT_MS_DEFAULT, Importance.LOW, TOPIC_LIST_TIMEOUT_MS_DOC)
+        .define(TOPIC_LIST_POLL_INTERVAL_MS_CONFIG, Type.INT, 
TOPIC_LIST_POLL_INTERVAL_MS_DEFAULT, Importance.MEDIUM, 
TOPIC_LIST_POLL_INTERVAL_MS_DOC)
+        .define(RECONFIGURE_TASKS_ON_LEADER_CHANGE_CONFIG, Type.BOOLEAN, 
RECONFIGURE_TASKS_ON_LEADER_CHANGE_DEFAULT, Importance.MEDIUM, 
RECONFIGURE_TASKS_ON_LEADER_CHANGE_DOC)
+        .define(POLL_LOOP_TIMEOUT_MS_CONFIG, Type.INT, 
POLL_LOOP_TIMEOUT_MS_DEFAULT, Importance.LOW, POLL_LOOP_TIMEOUT_MS_DOC)
+        .define(MAX_SHUTDOWN_WAIT_MS_CONFIG, Type.INT, 
MAX_SHUTDOWN_WAIT_MS_DEFAULT, Importance.LOW, MAX_SHUTDOWN_WAIT_MS_DOC)
+        .define(SOURCE_BOOTSTRAP_SERVERS_CONFIG, Type.LIST, 
SOURCE_BOOTSTRAP_SERVERS_DEFAULT, NON_EMPTY_LIST_VALIDATOR, Importance.HIGH, 
SOURCE_BOOTSTRAP_SERVERS_DOC)
+        .define(CONSUMER_MAX_POLL_RECORDS_CONFIG, Type.INT, 
CONSUMER_MAX_POLL_RECORDS_DEFAULT, Importance.LOW, 
CONSUMER_MAX_POLL_RECORDS_DOC)
+        .define(CONSUMER_AUTO_OFFSET_RESET_CONFIG, Type.STRING, 
CONSUMER_AUTO_OFFSET_RESET_DEFAULT, CONSUMER_AUTO_OFFSET_RESET_VALIDATOR, 
Importance.MEDIUM, CONSUMER_AUTO_OFFSET_RESET_DOC)
+        .define(CONSUMER_KEY_DESERIALIZER_CONFIG, Type.STRING, 
CONSUMER_KEY_DESERIALIZER_DEFAULT, Importance.LOW, 
CONSUMER_KEY_DESERIALIZER_DOC)
+        .define(CONSUMER_VALUE_DESERIALIZER_CONFIG, Type.STRING, 
CONSUMER_VALUE_DESERIALIZER_DEFAULT, Importance.LOW, 
CONSUMER_VALUE_DESERIALIZER_DOC)
+        .define(CONSUMER_ENABLE_AUTO_COMMIT_CONFIG, Type.BOOLEAN, 
CONSUMER_ENABLE_AUTO_COMMIT_DEFAULT, Importance.LOW, 
CONSUMER_ENABLE_AUTO_COMMIT_DOC)
+        .define(CONSUMER_GROUP_ID_CONFIG, Type.STRING, 
CONSUMER_GROUP_ID_DEFAULT, new ConfigDef.NonEmptyString(), Importance.MEDIUM, 
CONSUMER_GROUP_ID_DOC);
+
+    public KafkaSourceConnectorConfig(Map<String, String> props) {
+        super(CONFIG, props);
+    }
+
+
+    // Returns all values with a specified prefix with the prefix stripped 
from the key
+    public Map<String, Object> allWithPrefix(String prefix) {
+        return allWithPrefix(prefix, true);
+    }
+
+    // Returns all values with a specified prefix with the prefix stripped 
from the key if desired
+    // Original input is set first, then overwritten (if applicable) with the 
parsed values
+    public Map<String, Object> allWithPrefix(String prefix, boolean 
stripPrefix) {
+        Map<String, Object> result = originalsWithPrefix(prefix, stripPrefix);
+        for (Map.Entry<String, ?> entry : values().entrySet()) {
+            if (entry.getKey().startsWith(prefix) && entry.getKey().length() > 
prefix.length()) {
+                if (stripPrefix)
+                    result.put(entry.getKey().substring(prefix.length()), 
entry.getValue());
+                else
+                    result.put(entry.getKey(), entry.getValue());
+            }
+        }
+        return result;
+    }
+
+    // Returns all values (part of definition or original strings) as strings 
so they can be used with functions accepting Map<String,String> configs
+    public Map<String, String> allAsStrings() {
+        Map<String, String> result = new HashMap<>();
+        result.put(DESTINATION_TOPIC_PREFIX_CONFIG, 
getString(DESTINATION_TOPIC_PREFIX_CONFIG));
+        result.put(INCLUDE_MESSAGE_HEADERS_CONFIG, 
String.valueOf(getBoolean(INCLUDE_MESSAGE_HEADERS_CONFIG)));
+        result.put(TOPIC_LIST_TIMEOUT_MS_CONFIG, 
String.valueOf(getInt(TOPIC_LIST_TIMEOUT_MS_CONFIG)));
+        result.put(TOPIC_LIST_POLL_INTERVAL_MS_CONFIG, 
String.valueOf(getInt(TOPIC_LIST_POLL_INTERVAL_MS_CONFIG)));
+        result.put(RECONFIGURE_TASKS_ON_LEADER_CHANGE_CONFIG, 
String.valueOf(getBoolean(RECONFIGURE_TASKS_ON_LEADER_CHANGE_CONFIG)));
+        result.put(POLL_LOOP_TIMEOUT_MS_CONFIG, 
String.valueOf(getInt(POLL_LOOP_TIMEOUT_MS_CONFIG)));
+        result.put(MAX_SHUTDOWN_WAIT_MS_CONFIG, 
String.valueOf(getInt(MAX_SHUTDOWN_WAIT_MS_CONFIG)));
+        result.put(CONSUMER_MAX_POLL_RECORDS_CONFIG, 
String.valueOf(getInt(CONSUMER_MAX_POLL_RECORDS_CONFIG)));
+        result.put(CONSUMER_AUTO_OFFSET_RESET_CONFIG, 
getString(CONSUMER_AUTO_OFFSET_RESET_CONFIG));
+        result.put(CONSUMER_KEY_DESERIALIZER_CONFIG, 
getString(CONSUMER_KEY_DESERIALIZER_CONFIG));
+        result.put(CONSUMER_VALUE_DESERIALIZER_CONFIG, 
getString(CONSUMER_VALUE_DESERIALIZER_CONFIG));
+        result.putAll(originalsStrings()); // Will set any values without 
defaults and will capture additional configs like consumer settings if supplied
+        return result;
+    }
+
+    // Return a Properties Object that can be passed to AdminClient.create to 
configure a Kafka AdminClient instance
+    public Properties getAdminClientProperties() {
+        Properties adminClientProps = new Properties();
+        // By Default use any settings under SOURCE_PREFIX
+        adminClientProps.putAll(allWithPrefix(SOURCE_PREFIX));
+        // But override with anything under ADMIN_CLIENT_PREFIX
+        adminClientProps.putAll(allWithPrefix(ADMIN_CLIENT_PREFIX));
+        return adminClientProps;
+    }
+
+    // Return a Properties Object that can be passed to KafkaConsumer
+    public Properties getKafkaConsumerProperties() {
+        Properties kafkaConsumerProps = new Properties();
+        // By Default use any settings under SOURCE_PREFIX
+        kafkaConsumerProps.putAll(allWithPrefix(SOURCE_PREFIX));
+        // But override with anything under CONSUMER_PREFIX
+        kafkaConsumerProps.putAll(allWithPrefix(CONSUMER_PREFIX));
+        return kafkaConsumerProps;
+    }
+
+    public Pattern getTopicWhitelistPattern() {
+        return 
getTopicWhitelistPattern(getString(SOURCE_TOPIC_WHITELIST_CONFIG));
+    }
+
+    // Returns a java regex pattern that can be used to match kafka topics
+    private static Pattern getTopicWhitelistPattern(String rawRegex) {
+        String regex = rawRegex
+                .trim()
+                .replace(',', '|')
+                .replace(" ", "")
+                .replaceAll("^[\"']+", "")
+                .replaceAll("[\"']+$", ""); // property files may bring quotes
+        try {
+            return Pattern.compile(regex);
+        } catch (PatternSyntaxException e) {
+            throw new ConfigException(regex + " is an invalid regex for CONFIG 
" + SOURCE_TOPIC_WHITELIST_CONFIG);
+        }
+    }
+
+}
\ No newline at end of file
diff --git 
a/connect/kafka/src/main/java/org/apache/kafka/connect/kafka/KafkaSourceTask.java
 
b/connect/kafka/src/main/java/org/apache/kafka/connect/kafka/KafkaSourceTask.java
new file mode 100644
index 00000000000..3b8cbb1f7ff
--- /dev/null
+++ 
b/connect/kafka/src/main/java/org/apache/kafka/connect/kafka/KafkaSourceTask.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.kafka;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.utils.AppInfoParser;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.header.ConnectHeaders;
+import org.apache.kafka.connect.source.SourceTask;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Arrays;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+import java.time.Duration;
+
+
+public class KafkaSourceTask extends SourceTask {
+    private static final Logger LOG = 
LoggerFactory.getLogger(KafkaSourceTask.class);
+    public static final String TOPIC_PARTITION_KEY = "topic:partition";
+    public static final String OFFSET_KEY = "offset";
+
+    // Used to ensure we can be nice and call consumer.close() on shutdown
+    private final CountDownLatch stopLatch = new CountDownLatch(1);
+    // Flag to the poll() loop that we are awaiting shutdown so it can clean 
up.
+    private AtomicBoolean stop = new AtomicBoolean(false);
+    // Flag to the stop() function that it needs to wait for poll() to wrap up 
before trying to close the kafka consumer.
+    private AtomicBoolean poll = new AtomicBoolean(false);
+    // Used to enforce synchronized access to stop and poll
+    private final Object stopLock = new Object();
+
+    // Settings
+    private int maxShutdownWait;
+    private int pollTimeout;
+    private String topicPrefix;
+    private boolean includeHeaders;
+
+    // Consumer
+    private KafkaConsumer<byte[], byte[]> consumer;
+
+    public void start(Map<String, String> opts) {
+        LOG.info("{}: task is starting.", this);
+        KafkaSourceConnectorConfig sourceConnectorConfig = new 
KafkaSourceConnectorConfig(opts);
+        maxShutdownWait = 
sourceConnectorConfig.getInt(KafkaSourceConnectorConfig.MAX_SHUTDOWN_WAIT_MS_CONFIG);
+        pollTimeout = 
sourceConnectorConfig.getInt(KafkaSourceConnectorConfig.POLL_LOOP_TIMEOUT_MS_CONFIG);
+        topicPrefix = 
sourceConnectorConfig.getString(KafkaSourceConnectorConfig.DESTINATION_TOPIC_PREFIX_CONFIG);
+        includeHeaders = 
sourceConnectorConfig.getBoolean(KafkaSourceConnectorConfig.INCLUDE_MESSAGE_HEADERS_CONFIG);
+        String unknownOffsetResetPosition = 
sourceConnectorConfig.getString(KafkaSourceConnectorConfig.CONSUMER_AUTO_OFFSET_RESET_CONFIG);
+        // Get the leader topic partitions to work with
+        List<LeaderTopicPartition> leaderTopicPartitions = 
Arrays.asList(opts.get(KafkaSourceConnectorConfig.TASK_LEADER_TOPIC_PARTITION_CONFIG)
+            .split(","))
+            .stream()
+            .map(LeaderTopicPartition::fromString)
+            .collect(Collectors.toList());
+        // retrieve the existing offsets (if any) for the configured partitions
+        List<Map<String, String>> offsetLookupPartitions = 
leaderTopicPartitions.stream()
+                .map(leaderTopicPartition -> 
Collections.singletonMap(TOPIC_PARTITION_KEY, 
leaderTopicPartition.toTopicPartitionString()))
+                .collect(Collectors.toList());
+        Map<String, Long> topicPartitionStringsOffsets = 
context.offsetStorageReader().offsets(offsetLookupPartitions)
+            .entrySet()
+            .stream()
+            .filter(e -> e != null && e.getKey() != null && 
e.getKey().get(TOPIC_PARTITION_KEY) != null && e.getValue() != null && 
e.getValue().get(OFFSET_KEY) != null)
+            .collect(Collectors.toMap(e -> 
e.getKey().get(TOPIC_PARTITION_KEY), e -> (long) e.getValue().get(OFFSET_KEY)));
+        // Set up Kafka consumer
+        consumer = new KafkaConsumer<byte[], 
byte[]>(sourceConnectorConfig.getKafkaConsumerProperties());
+        // Get topic partitions and offsets so we can seek() to them
+        Map<TopicPartition, Long> topicPartitionOffsets = new HashMap<>();
+        List<TopicPartition> topicPartitionsWithUnknownOffset = new 
ArrayList<>();
+        for (LeaderTopicPartition leaderTopicPartition : 
leaderTopicPartitions) {
+            String topicPartitionString = 
leaderTopicPartition.toTopicPartitionString();
+            TopicPartition topicPartition = 
leaderTopicPartition.toTopicPartition();
+            if 
(topicPartitionStringsOffsets.containsKey(topicPartitionString)) {
+                topicPartitionOffsets.put(topicPartition, 
topicPartitionStringsOffsets.get(topicPartitionString));
+            } else {
+                // No stored offset? No worries, we will place it it the list 
to lookup
+                topicPartitionsWithUnknownOffset.add(topicPartition);
+            }
+        }
+        // Set default offsets for partitions without stored offsets
+        if (topicPartitionsWithUnknownOffset.size() > 0) {
+            Map<TopicPartition, Long> defaultOffsets;
+            LOG.info("The following partitions do not have existing offset 
data: {}", topicPartitionsWithUnknownOffset);
+            if (unknownOffsetResetPosition.equals("earliest")) {
+                LOG.info("Using earliest offsets for partitions without 
existing offset data.");
+                defaultOffsets = 
consumer.beginningOffsets(topicPartitionsWithUnknownOffset);
+            } else if (unknownOffsetResetPosition.equals("latest")) {
+                LOG.info("Using latest offsets for partitions without existing 
offset data.");
+                defaultOffsets = 
consumer.endOffsets(topicPartitionsWithUnknownOffset);
+            } else {
+                LOG.warn("Config value {}, is set to an unknown value: {}. 
Partitions without existing offset data will not be consumed.", 
KafkaSourceConnectorConfig.CONSUMER_AUTO_OFFSET_RESET_CONFIG, 
unknownOffsetResetPosition);
+                defaultOffsets = new HashMap<>();
+            }
+            topicPartitionOffsets.putAll(defaultOffsets);
+        }
+        // List of topic partitions to assign
+        List<TopicPartition> topicPartitionsToAssign = new 
ArrayList<>(topicPartitionOffsets.keySet());
+        consumer.assign(topicPartitionsToAssign);
+        // Seek to desired offset for each partition
+        topicPartitionOffsets.forEach((key, value) -> consumer.seek(key, 
value));
+    }
+
+
+    @Override
+    public List<SourceRecord> poll() {
+        if (LOG.isDebugEnabled()) LOG.debug("{}: poll()", this);
+        synchronized (stopLock) {
+            if (!stop.get())
+                poll.set(true);
+        }
+        ArrayList<SourceRecord> records = new ArrayList<>();
+        if (poll.get()) {
+            try {
+                ConsumerRecords<byte[], byte[]> krecords = 
consumer.poll(Duration.ofMillis(pollTimeout));
+                if (LOG.isDebugEnabled()) LOG.debug("{}: Got {} records from 
source.", this, krecords.count());
+                for (ConsumerRecord<byte[], byte[]> krecord : krecords) {
+                    Map<String, String> sourcePartition = 
Collections.singletonMap(TOPIC_PARTITION_KEY, 
krecord.topic().concat(":").concat(Integer.toString(krecord.partition())));
+                    Map<String, Long> sourceOffset = 
Collections.singletonMap(OFFSET_KEY, krecord.offset());
+                    String destinationTopic = 
topicPrefix.concat(krecord.topic());
+                    if (LOG.isDebugEnabled()) {
+                        LOG.trace(
+                                "Task: sourceTopic:{} sourcePartition:{} 
sourceOffSet:{} destinationTopic:{}, key:{}, valueSize:{}",
+                                krecord.topic(), krecord.partition(), 
krecord.offset(), destinationTopic, krecord.key(), krecord.serializedValueSize()
+                        );
+                    }
+                    if (includeHeaders) {
+                        // Mapping from source type: 
org.apache.kafka.common.header.Headers, to destination type: 
org.apache.kafka.connect.Headers
+                        Headers sourceHeaders = krecord.headers();
+                        ConnectHeaders destinationHeaders = new 
ConnectHeaders();
+                        for (Header header: sourceHeaders) {
+                            if (header != null) {
+                                destinationHeaders.add(header.key(), 
header.value(), Schema.OPTIONAL_BYTES_SCHEMA);
+                            }
+                        }
+                        records.add(new SourceRecord(sourcePartition, 
sourceOffset, destinationTopic, null, Schema.OPTIONAL_BYTES_SCHEMA, 
krecord.key(), Schema.OPTIONAL_BYTES_SCHEMA, krecord.value(), 
krecord.timestamp(), destinationHeaders));
+                    } else {
+                        records.add(new SourceRecord(sourcePartition, 
sourceOffset, destinationTopic, null, Schema.OPTIONAL_BYTES_SCHEMA, 
krecord.key(), Schema.OPTIONAL_BYTES_SCHEMA, krecord.value(), 
krecord.timestamp()));
+                    }
+                }
+            } catch (WakeupException e) {
+                LOG.info("{}: Caught WakeupException. Probably shutting 
down.", this);
+            }
+        }
+        poll.set(false);
+        // If stop has been set  processing, then stop the consumer.
+        if (stop.get()) {
+            LOG.debug("{}: stop flag set during poll(), opening stopLatch", 
this);
+            stopLatch.countDown();
+        }
+        if (LOG.isDebugEnabled()) LOG.debug("{}: Returning {} records to 
connect", this, records.size());
+        return records;
+    }
+
+    @Override
+    public synchronized void stop() {
+        long startWait = System.currentTimeMillis();
+        synchronized (stopLock) {
+            stop.set(true);
+            LOG.info("{}: stop() called. Waking up consumer and shutting 
down", this);
+            consumer.wakeup();
+            if (poll.get()) {
+                LOG.info("{}: poll() active, awaiting for consumer to wake 
before attempting to shut down consumer", this);
+                try {
+                    stopLatch.await(Math.max(0, maxShutdownWait - 
(System.currentTimeMillis() - startWait)), TimeUnit.MILLISECONDS);
+                } catch (InterruptedException e) {
+                    LOG.warn("{}: Got InterruptedException while waiting on 
stopLatch", this);
+                }
+            }
+            LOG.info("{}: Shutting down consumer.", this);
+            consumer.close(Duration.ofMillis(Math.max(0, maxShutdownWait - 
(System.currentTimeMillis() - startWait))));
+        }
+        LOG.info("{}: task has been stopped", this);
+    }
+
+    @Override
+    public String version() {
+        return AppInfoParser.getVersion();
+    }
+
+    public String toString() {
+        return "KafkaSourceTask@" + Integer.toHexString(hashCode());
+    }
+
+
+}
\ No newline at end of file
diff --git 
a/connect/kafka/src/main/java/org/apache/kafka/connect/kafka/LeaderTopicPartition.java
 
b/connect/kafka/src/main/java/org/apache/kafka/connect/kafka/LeaderTopicPartition.java
new file mode 100644
index 00000000000..46e3397f3cf
--- /dev/null
+++ 
b/connect/kafka/src/main/java/org/apache/kafka/connect/kafka/LeaderTopicPartition.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.kafka;
+
+import org.apache.kafka.common.TopicPartition;
+
+public class LeaderTopicPartition extends Object {
+
+    private static final String STRING_DELIMITER = ":";
+
+    private int hash = 0;
+
+    private final int leaderId;
+    private final String topicName;
+    private final int partition;
+
+    public LeaderTopicPartition(int leaderId, String topicName, int partition) 
throws IllegalArgumentException {
+        this.leaderId = leaderId;
+        if (topicName == null)
+            throw new IllegalArgumentException("topicName can not be null");
+        this.topicName = topicName;
+        this.partition = partition;
+    }
+
+    public static LeaderTopicPartition fromString(String 
leaderTopicPartitionString) {
+        String[] tokens = leaderTopicPartitionString.split(STRING_DELIMITER);
+        if (tokens.length != 3)
+            throw new IllegalArgumentException("leaderTopicPartitionString 
must be in the format <leader>:<topic>:<partition>");
+        return new LeaderTopicPartition(Integer.parseInt(tokens[0], 10), 
tokens[1], Integer.parseInt(tokens[2], 10));
+    }
+
+    @Override
+    public String toString() {
+        return String.valueOf(leaderId) +
+                STRING_DELIMITER +
+                topicName +
+                STRING_DELIMITER +
+                String.valueOf(partition);
+    }
+
+    public TopicPartition toTopicPartition() {
+        return new TopicPartition(topicName, partition);
+    }
+
+    public String toTopicPartitionString() {
+        return topicName + STRING_DELIMITER + String.valueOf(partition);
+    }
+
+
+    @Override
+    public int hashCode() {
+        if (hash != 0)
+            return hash;
+        int result = 1;
+        result = result * 23 + leaderId;
+        result = result * 37 + (topicName == null ? 0 : topicName.hashCode());
+        result = result * 11 + partition;
+        this.hash = result;
+        return result;
+    }
+
+    @Override
+    public boolean equals(Object other) {
+        if (this == other)
+            return true;
+        if (!(other instanceof LeaderTopicPartition))
+            return false;
+        LeaderTopicPartition otherLeaderTopicPartition = 
(LeaderTopicPartition) other;
+        return leaderId == otherLeaderTopicPartition.leaderId
+                && ((topicName == null) ? otherLeaderTopicPartition.topicName 
== null : topicName.equals(otherLeaderTopicPartition.topicName))
+                && partition == otherLeaderTopicPartition.partition;
+    }
+
+}
diff --git 
a/connect/kafka/src/main/java/org/apache/kafka/connect/kafka/PartitionMonitor.java
 
b/connect/kafka/src/main/java/org/apache/kafka/connect/kafka/PartitionMonitor.java
new file mode 100644
index 00000000000..c6c08ebd479
--- /dev/null
+++ 
b/connect/kafka/src/main/java/org/apache/kafka/connect/kafka/PartitionMonitor.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.kafka;
+
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.ListTopicsOptions;
+import org.apache.kafka.clients.admin.DescribeTopicsOptions;
+import org.apache.kafka.clients.admin.TopicDescription;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.connect.connector.ConnectorContext;
+import org.apache.kafka.connect.errors.ConnectException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+
+public class PartitionMonitor {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(PartitionMonitor.class);
+
+    // No need to make these options configurable.
+
+    private AtomicBoolean shutdown = new AtomicBoolean(false);
+    private AdminClient partitionMonitorClient;
+    private Pattern topicWhitelistPattern;
+    private volatile Set<LeaderTopicPartition> currentLeaderTopicPartitions = 
new HashSet<>();
+
+    private int maxShutdownWaitMs;
+    private int topicRequestTimeoutMs;
+    private boolean reconfigureTasksOnLeaderChange;
+    private Runnable pollThread;
+    private int topicPollIntervalMs;
+    private ScheduledExecutorService pollExecutorService;
+    private ScheduledFuture<?> pollHandle;
+
+    PartitionMonitor(ConnectorContext connectorContext, 
KafkaSourceConnectorConfig sourceConnectorConfig) {
+        topicWhitelistPattern = 
sourceConnectorConfig.getTopicWhitelistPattern();
+        reconfigureTasksOnLeaderChange = 
sourceConnectorConfig.getBoolean(KafkaSourceConnectorConfig.RECONFIGURE_TASKS_ON_LEADER_CHANGE_CONFIG);
+        topicPollIntervalMs = 
sourceConnectorConfig.getInt(KafkaSourceConnectorConfig.TOPIC_LIST_POLL_INTERVAL_MS_CONFIG);
+        maxShutdownWaitMs = 
sourceConnectorConfig.getInt(KafkaSourceConnectorConfig.MAX_SHUTDOWN_WAIT_MS_CONFIG);
+        topicRequestTimeoutMs = 
sourceConnectorConfig.getInt(KafkaSourceConnectorConfig.TOPIC_LIST_TIMEOUT_MS_CONFIG);
+        partitionMonitorClient = 
AdminClient.create(sourceConnectorConfig.getAdminClientProperties());
+        // Thread to periodically poll the kafka cluster for changes in topics 
or partitions
+        pollThread = new Runnable() {
+            @Override
+            public void run() {
+                if (!shutdown.get()) {
+                    LOG.info("Fetching latest topic partitions.");
+                    try {
+                        Set<LeaderTopicPartition> 
retrievedLeaderTopicPartitions = 
retrieveLeaderTopicPartitions(topicRequestTimeoutMs);
+                        if (LOG.isDebugEnabled()) {
+                            LOG.debug("retrievedLeaderTopicPartitions: {}", 
retrievedLeaderTopicPartitions);
+                            LOG.debug("currentLeaderTopicPartitions: {}", 
getCurrentLeaderTopicPartitions());
+                        }
+                        boolean requestTaskReconfiguration = false;
+                        if (reconfigureTasksOnLeaderChange) {
+                            if 
(!retrievedLeaderTopicPartitions.equals(getCurrentLeaderTopicPartitions())) {
+                                LOG.info("Retrieved leaders and topic 
partitions do not match currently stored leaders and topic partitions, will 
request task reconfiguration");
+                                requestTaskReconfiguration = true;
+                            }
+                        } else {
+                            Set<TopicPartition> retrievedTopicPartitions = 
retrievedLeaderTopicPartitions.stream()
+                                    
.map(LeaderTopicPartition::toTopicPartition)
+                                    .collect(Collectors.toSet());
+                            if (LOG.isDebugEnabled())
+                                LOG.debug("retrievedTopicPartitions: {}", 
retrievedTopicPartitions);
+                            Set<TopicPartition> currentTopicPartitions = 
getCurrentLeaderTopicPartitions().stream()
+                                    
.map(LeaderTopicPartition::toTopicPartition)
+                                    .collect(Collectors.toSet());
+                            if (LOG.isDebugEnabled())
+                                LOG.debug("currentTopicPartitions: {}", 
currentTopicPartitions);
+                            if 
(!retrievedTopicPartitions.equals(currentTopicPartitions)) {
+                                LOG.info("Retrieved topic partitions do not 
match currently stored topic partitions, will request task reconfiguration");
+                                requestTaskReconfiguration = true;
+                            }
+                        }
+                        
setCurrentLeaderTopicPartitions(retrievedLeaderTopicPartitions);
+                        if (requestTaskReconfiguration)
+                            connectorContext.requestTaskReconfiguration();
+                        else
+                            LOG.info("No partition changes which require 
reconfiguration have been detected.");
+                    } catch (TimeoutException e) {
+                        LOG.error("Timeout while waiting for AdminClient to 
return topic list. This indicates a (possibly transient) connection issue, or 
is an indicator that the timeout is set too low. {}", e);
+                    } catch (ExecutionException e) {
+                        LOG.error("Unexpected ExecutionException. {}", e);
+                    } catch (InterruptedException e) {
+                        LOG.error("InterruptedException. Probably shutting 
down. {}, e");
+                    }
+                }
+            }
+        };
+    }
+
+    public void start() {
+        // On start, block until we retrieve the initial list of topic 
partitions (or at least until timeout)
+        try {
+            // This will block while waiting to retrieve data form kafka. 
Timeout is set so that we don't hang the kafka connect herder if an invalid 
configuration causes us to retry infinitely.
+            LOG.info("Retrieving initial topic list from kafka.");
+            
setCurrentLeaderTopicPartitions(retrieveLeaderTopicPartitions(topicRequestTimeoutMs));
+        } catch (TimeoutException e) {
+            LOG.error("Timeout while waiting for AdminClient to return topic 
list. This likely indicates a (possibly transient) connection issue, but could 
be an indicator that the timeout is set too low. {}", e);
+            throw new ConnectException("Timeout while waiting for AdminClient 
to return topic list. This likely indicates a (possibly transient) connection 
issue, but could be an indicator that the timeout is set too low.");
+        } catch (ExecutionException e) {
+            LOG.error("Unexpected ExecutionException. {}", e);
+            throw new ConnectException("Unexpected  while starting 
PartitionMonitor.");
+        } catch (InterruptedException e) {
+            LOG.error("InterruptedException. {}, e");
+            throw new ConnectException("Unexpected InterruptedException while 
starting PartitionMonitor.");
+        }
+        // Schedule a task to periodically run to poll for new data
+        pollExecutorService = Executors.newSingleThreadScheduledExecutor();
+        pollHandle = pollExecutorService.scheduleWithFixedDelay(pollThread, 
topicPollIntervalMs, topicPollIntervalMs, TimeUnit.MILLISECONDS);
+    }
+
+    private boolean matchedTopicFilter(String topic) {
+        return topicWhitelistPattern.matcher(topic).matches();
+    }
+
+    private synchronized void 
setCurrentLeaderTopicPartitions(Set<LeaderTopicPartition> 
leaderTopicPartitions) {
+        currentLeaderTopicPartitions = leaderTopicPartitions;
+    }
+
+    public synchronized Set<LeaderTopicPartition> 
getCurrentLeaderTopicPartitions() {
+        return currentLeaderTopicPartitions;
+    }
+
+    // Allow the main thread a chance to shut down gracefully
+    public void shutdown() {
+        LOG.info("Shutdown called.");
+        long startWait = System.currentTimeMillis();
+        shutdown.set(true);
+        partitionMonitorClient.close(maxShutdownWaitMs - 
(System.currentTimeMillis() - startWait), TimeUnit.MILLISECONDS);
+        // Cancel our scheduled task, but wait for an existing task to 
complete if running
+        pollHandle.cancel(false);
+        // Ask nicely to shut down the partition monitor executor service if 
it hasn't already
+        if (!pollExecutorService.isShutdown()) {
+            try {
+                pollExecutorService.awaitTermination(maxShutdownWaitMs - 
(System.currentTimeMillis() - startWait), TimeUnit.MILLISECONDS);
+            } catch (InterruptedException e) {
+                LOG.warn("Got InterruptedException while waiting for 
pollExecutorService to shutdown, shutdown will be forced.");
+            }
+        }
+        if (!pollExecutorService.isShutdown()) {
+            pollExecutorService.shutdownNow();
+        }
+        LOG.info("Shutdown Complete.");
+    }
+
+
+    // Retrieve a list of LeaderTopicPartitions that match our topic filter
+    private synchronized Set<LeaderTopicPartition> 
retrieveLeaderTopicPartitions(int requestTimeoutMs) throws 
InterruptedException, ExecutionException, TimeoutException {
+        long startWait = System.currentTimeMillis();
+
+        ListTopicsOptions listTopicsOptions = new 
ListTopicsOptions().listInternal(false).timeoutMs((int) (requestTimeoutMs - 
(System.currentTimeMillis() - startWait)));
+        Set<String> retrievedTopicSet = 
partitionMonitorClient.listTopics(listTopicsOptions).names().get(requestTimeoutMs
 - (System.currentTimeMillis() - startWait), TimeUnit.MILLISECONDS);
+        LOG.debug("Server topic list: {}", retrievedTopicSet);
+        Set<String> matchedTopicSet = retrievedTopicSet.stream()
+            .filter(topic -> matchedTopicFilter(topic))
+            .collect(Collectors.toSet());
+        LOG.debug("Matched topic list: {}", matchedTopicSet);
+
+        DescribeTopicsOptions describeTopicsOptions = new 
DescribeTopicsOptions().timeoutMs((int) (requestTimeoutMs - 
(System.currentTimeMillis() - startWait)));
+        Map<String, TopicDescription> retrievedTopicDescriptions = 
partitionMonitorClient.describeTopics(matchedTopicSet, 
describeTopicsOptions).all().get(requestTimeoutMs - (System.currentTimeMillis() 
- startWait), TimeUnit.MILLISECONDS);
+        return retrievedTopicDescriptions.values().stream()
+            .map(topicDescription ->
+                topicDescription.partitions().stream()
+                .map(partitionInfo -> new 
LeaderTopicPartition(partitionInfo.leader().id(), topicDescription.name(), 
partitionInfo.partition()))
+            )
+            .flatMap(Function.identity())
+            .collect(Collectors.toSet());
+    }
+
+}
diff --git 
a/connect/kafka/src/test/java/org/apache/kafka/connect/kafka/KafkaSourceConnectorTest.java
 
b/connect/kafka/src/test/java/org/apache/kafka/connect/kafka/KafkaSourceConnectorTest.java
new file mode 100644
index 00000000000..cb825570148
--- /dev/null
+++ 
b/connect/kafka/src/test/java/org/apache/kafka/connect/kafka/KafkaSourceConnectorTest.java
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.kafka;
+
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.connect.connector.ConnectorContext;
+
+import java.util.List;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Set;
+import java.util.HashSet;
+
+import static org.junit.Assert.assertEquals;
+import static 
org.powermock.api.support.membermodification.MemberMatcher.method;
+import static 
org.powermock.api.support.membermodification.MemberModifier.suppress;
+
+import org.easymock.EasyMock;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.After;
+import org.junit.runner.RunWith;
+
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.powermock.api.easymock.PowerMock;
+import org.easymock.EasyMockSupport;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({KafkaSourceConnector.class, PartitionMonitor.class})
+@PowerMockIgnore("javax.management.*")
+public class KafkaSourceConnectorTest extends EasyMockSupport {
+
+    private PartitionMonitor partitionMonitorMock;
+    private ConnectorContext connectorContextMock;
+
+    private Set<LeaderTopicPartition> stubLeaderTopicPartitions;
+
+    private KafkaSourceConnector connector;
+
+    private Map<String, String> sourceProperties;
+
+    private static final String SOURCE_TOPICS_VALUE = "test.topic";
+    private static final String SOURCE_BOOTSTRAP_SERVERS_CONFIG = 
"localhost:6000";
+    private static final String POLL_LOOP_TIMEOUT_MS_VALUE = "2000";
+    private static final String TOPIC_LIST_TIMEOUT_MS_VALUE = "5000";
+    private static final String CONSUMER_GROUP_ID_VALUE = 
"test-consumer-group";
+
+    @Before
+    public void setUp() throws Exception {
+        connector = new KafkaSourceConnector();
+        connectorContextMock = PowerMock.createMock(ConnectorContext.class);
+        partitionMonitorMock = PowerMock.createMock(PartitionMonitor.class);
+        connector.initialize(connectorContextMock);
+
+        // Default test settings
+        sourceProperties = new HashMap<>();
+        
sourceProperties.put(KafkaSourceConnectorConfig.SOURCE_TOPIC_WHITELIST_CONFIG, 
SOURCE_TOPICS_VALUE);
+        
sourceProperties.put(KafkaSourceConnectorConfig.SOURCE_BOOTSTRAP_SERVERS_CONFIG,
 SOURCE_BOOTSTRAP_SERVERS_CONFIG);
+        
sourceProperties.put(KafkaSourceConnectorConfig.POLL_LOOP_TIMEOUT_MS_CONFIG, 
POLL_LOOP_TIMEOUT_MS_VALUE);
+        
sourceProperties.put(KafkaSourceConnectorConfig.TOPIC_LIST_TIMEOUT_MS_CONFIG, 
TOPIC_LIST_TIMEOUT_MS_VALUE);
+        
sourceProperties.put(KafkaSourceConnectorConfig.CONSUMER_GROUP_ID_CONFIG, 
CONSUMER_GROUP_ID_VALUE);
+
+        // Default leader topic partitions to return (just one)
+        stubLeaderTopicPartitions = new HashSet<>();
+        LeaderTopicPartition leaderTopicPartition = new 
LeaderTopicPartition(0, SOURCE_TOPICS_VALUE, 0);
+        stubLeaderTopicPartitions.add(leaderTopicPartition);
+    }
+
+    @After
+    public void tearDown() {
+    }
+
+
+    @Test(expected = ConfigException.class)
+    public void testStartMissingBootstrapServers() {
+        suppress(method(PartitionMonitor.class, "start"));
+        PowerMock.replayAll();
+
+        
sourceProperties.remove(KafkaSourceConnectorConfig.SOURCE_BOOTSTRAP_SERVERS_CONFIG);
+        connector.start(sourceProperties);
+
+        PowerMock.verifyAll();
+    }
+
+    @Test(expected = ConfigException.class)
+    public void testStartBlankBootstrapServers() {
+        suppress(method(PartitionMonitor.class, "start"));
+        PowerMock.replayAll();
+
+        
sourceProperties.put(KafkaSourceConnectorConfig.SOURCE_BOOTSTRAP_SERVERS_CONFIG,
 "");
+        connector.start(sourceProperties);
+
+        PowerMock.verifyAll();
+    }
+
+    @Test(expected = ConfigException.class)
+    public void testStartTopicWhitelistMissing() {
+        suppress(method(PartitionMonitor.class, "start"));
+        replayAll();
+
+        
sourceProperties.remove(KafkaSourceConnectorConfig.SOURCE_TOPIC_WHITELIST_CONFIG);
+        connector.start(sourceProperties);
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testStartCorrectConfig() throws Exception {
+        PowerMock.expectNew(
+                PartitionMonitor.class,
+                new Class<?>[] {ConnectorContext.class, 
KafkaSourceConnectorConfig.class},
+                EasyMock.anyObject(ConnectorContext.class),
+                EasyMock.anyObject(KafkaSourceConnectorConfig.class)
+        ).andStubReturn(partitionMonitorMock);
+        partitionMonitorMock.start();
+        PowerMock.expectLastCall().andVoid();
+        PowerMock.replayAll();
+
+        connector.start(sourceProperties);
+
+        verifyAll();
+    }
+
+    @Test
+    public void testTaskConfigsReturns1TaskOnOneTopicPartition() throws 
Exception {
+        PowerMock.expectNew(
+                PartitionMonitor.class,
+                new Class<?>[] {ConnectorContext.class, 
KafkaSourceConnectorConfig.class},
+                EasyMock.anyObject(ConnectorContext.class),
+                EasyMock.anyObject(KafkaSourceConnectorConfig.class)
+        ).andStubReturn(partitionMonitorMock);
+        partitionMonitorMock.start();
+        PowerMock.expectLastCall().andVoid();
+        
EasyMock.expect(partitionMonitorMock.getCurrentLeaderTopicPartitions()).andReturn(stubLeaderTopicPartitions);
+        PowerMock.replayAll();
+
+        connector.start(sourceProperties);
+        List<Map<String, String>> taskConfigs = connector.taskConfigs(2);
+
+        assertEquals(1, taskConfigs.size());
+        assertEquals("0:test.topic:0",
+                taskConfigs.get(0).get("task.leader.topic.partitions"));
+        assertEquals(SOURCE_TOPICS_VALUE,
+                
taskConfigs.get(0).get(KafkaSourceConnectorConfig.SOURCE_TOPIC_WHITELIST_CONFIG));
+        assertEquals(SOURCE_BOOTSTRAP_SERVERS_CONFIG,
+                
taskConfigs.get(0).get(KafkaSourceConnectorConfig.SOURCE_BOOTSTRAP_SERVERS_CONFIG));
+
+        verifyAll();
+    }
+
+    @Test
+    public void testTaskConfigsReturns1TaskOnTwoTopicPartitions() throws 
Exception {
+        // Default leader topic partitions to return (just one)
+        stubLeaderTopicPartitions = new HashSet<>();
+        LeaderTopicPartition leaderTopicPartition1 = new 
LeaderTopicPartition(0, SOURCE_TOPICS_VALUE, 0);
+        stubLeaderTopicPartitions.add(leaderTopicPartition1);
+        LeaderTopicPartition leaderTopicPartition2 = new 
LeaderTopicPartition(0, SOURCE_TOPICS_VALUE, 1);
+        stubLeaderTopicPartitions.add(leaderTopicPartition2);
+
+        PowerMock.expectNew(
+                PartitionMonitor.class,
+                new Class<?>[] {ConnectorContext.class, 
KafkaSourceConnectorConfig.class},
+                EasyMock.anyObject(ConnectorContext.class),
+                EasyMock.anyObject(KafkaSourceConnectorConfig.class)
+        ).andStubReturn(partitionMonitorMock);
+
+        partitionMonitorMock.start();
+        PowerMock.expectLastCall().andVoid();
+        
EasyMock.expect(partitionMonitorMock.getCurrentLeaderTopicPartitions()).andReturn(stubLeaderTopicPartitions);
+        PowerMock.replayAll();
+
+        connector.start(sourceProperties);
+        List<Map<String, String>> taskConfigs = connector.taskConfigs(1);
+
+        assertEquals(1, taskConfigs.size());
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testTaskConfigsReturns2TasksOnTwoTopicPartitions() throws 
Exception {
+        // Default leader topic partitions to return (just one)
+        stubLeaderTopicPartitions = new HashSet<>();
+        LeaderTopicPartition leaderTopicPartition1 = new 
LeaderTopicPartition(0, SOURCE_TOPICS_VALUE, 0);
+        stubLeaderTopicPartitions.add(leaderTopicPartition1);
+        LeaderTopicPartition leaderTopicPartition2 = new 
LeaderTopicPartition(0, SOURCE_TOPICS_VALUE, 1);
+        stubLeaderTopicPartitions.add(leaderTopicPartition2);
+
+        PowerMock.expectNew(
+                PartitionMonitor.class,
+                new Class<?>[] {ConnectorContext.class, 
KafkaSourceConnectorConfig.class},
+                EasyMock.anyObject(ConnectorContext.class),
+                EasyMock.anyObject(KafkaSourceConnectorConfig.class)
+        ).andStubReturn(partitionMonitorMock);
+
+        partitionMonitorMock.start();
+        PowerMock.expectLastCall().andVoid();
+        
EasyMock.expect(partitionMonitorMock.getCurrentLeaderTopicPartitions()).andReturn(stubLeaderTopicPartitions);
+        PowerMock.replayAll();
+
+        connector.start(sourceProperties);
+        List<Map<String, String>> taskConfigs = connector.taskConfigs(2);
+
+        assertEquals(2, taskConfigs.size());
+
+        PowerMock.verifyAll();
+    }
+
+
+    @Test
+    public void testTaskClass() {
+        replayAll();
+
+        assertEquals(KafkaSourceTask.class, connector.taskClass());
+
+        verifyAll();
+    }
+
+
+}
diff --git 
a/connect/kafka/src/test/java/org/apache/kafka/connect/kafka/KafkaSourceTaskTest.java
 
b/connect/kafka/src/test/java/org/apache/kafka/connect/kafka/KafkaSourceTaskTest.java
new file mode 100644
index 00000000000..b8269102115
--- /dev/null
+++ 
b/connect/kafka/src/test/java/org/apache/kafka/connect/kafka/KafkaSourceTaskTest.java
@@ -0,0 +1,399 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.kafka;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.header.internals.RecordHeader;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.header.ConnectHeaders;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.apache.kafka.connect.source.SourceTaskContext;
+import org.apache.kafka.connect.storage.OffsetStorageReader;
+import org.easymock.EasyMock;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.easymock.PowerMock;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.time.Duration;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Properties;
+
+import static org.junit.Assert.assertEquals;
+
+import static org.apache.kafka.connect.kafka.KafkaSourceTask.OFFSET_KEY;
+import static 
org.apache.kafka.connect.kafka.KafkaSourceTask.TOPIC_PARTITION_KEY;
+import static org.powermock.api.easymock.PowerMock.createMock;
+import static org.powermock.api.easymock.PowerMock.replayAll;
+import static org.powermock.api.easymock.PowerMock.verifyAll;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({
+        KafkaSourceTask.class,
+        OffsetStorageReader.class,
+        SourceTaskContext.class,
+        KafkaConsumer.class
+})
+@PowerMockIgnore("javax.management")
+
+public class KafkaSourceTaskTest {
+
+    private KafkaSourceTask objectUnderTest;
+
+    private Map<String, String> opts;
+    private Properties props;
+    private KafkaSourceConnectorConfig config;
+
+    private static final String MAX_SHUTDOWN_WAIT_MS_VALUE = "2000";
+    private static final int POLL_LOOP_TIMEOUT_MS_VALUE = 25;
+    private static final String DESTINATION_TOPIC_PREFIX_VALUE = 
"test.destination";
+    private static final String INCLUDE_MESSAGE_HEADERS_VALUE = "false";
+    private static final String CONSUMER_AUTO_OFFSET_RESET_VALUE = "0";
+    private static final String SOURCE_BOOTSTRAP_SERVERS_VALUE = 
"localhost:6000";
+    private static final String TASK_LEADER_TOPIC_PARTITION_VALUE = 
"0:test.topic:1";
+    private static final String AUTO_OFFSET_RESET_VALUE = "latest";
+    private static final String SOURCE_TOPICS_WHITELIST_VALUE = "test*";
+    private static final String CONSUMER_GROUP_ID_VALUE = 
"test-consumer-group";
+
+    private static final String FIRST_TOPIC = "test.topic";
+    private static final int FIRST_PARTITION = 1;
+    private static final long FIRST_OFFSET = 123L;
+    private static final String SECOND_TOPIC = "another.test.topic";
+    private static final int SECOND_PARTITION = 0;
+    private static final long SECOND_OFFSET = 456L;
+
+
+    private OffsetStorageReader offsetStorageReader;
+    private SourceTaskContext context;
+    private KafkaConsumer consumer;
+
+    @Before
+    public void setup() {
+
+        opts = new HashMap<>();
+        opts.put(KafkaSourceConnectorConfig.SOURCE_TOPIC_WHITELIST_CONFIG, 
SOURCE_TOPICS_WHITELIST_VALUE);
+        opts.put(KafkaSourceConnectorConfig.MAX_SHUTDOWN_WAIT_MS_CONFIG, 
MAX_SHUTDOWN_WAIT_MS_VALUE);
+        opts.put(KafkaSourceConnectorConfig.POLL_LOOP_TIMEOUT_MS_CONFIG, 
String.valueOf(POLL_LOOP_TIMEOUT_MS_VALUE));
+        opts.put(KafkaSourceConnectorConfig.DESTINATION_TOPIC_PREFIX_CONFIG, 
DESTINATION_TOPIC_PREFIX_VALUE);
+        opts.put(KafkaSourceConnectorConfig.INCLUDE_MESSAGE_HEADERS_CONFIG, 
INCLUDE_MESSAGE_HEADERS_VALUE);
+        opts.put(KafkaSourceConnectorConfig.CONSUMER_AUTO_OFFSET_RESET_CONFIG, 
CONSUMER_AUTO_OFFSET_RESET_VALUE);
+        opts.put(KafkaSourceConnectorConfig.SOURCE_BOOTSTRAP_SERVERS_CONFIG, 
SOURCE_BOOTSTRAP_SERVERS_VALUE);
+        
opts.put(KafkaSourceConnectorConfig.TASK_LEADER_TOPIC_PARTITION_CONFIG, 
TASK_LEADER_TOPIC_PARTITION_VALUE);
+        opts.put(KafkaSourceConnectorConfig.CONSUMER_AUTO_OFFSET_RESET_CONFIG, 
AUTO_OFFSET_RESET_VALUE);
+        opts.put(KafkaSourceConnectorConfig.CONSUMER_GROUP_ID_CONFIG, 
CONSUMER_GROUP_ID_VALUE);
+
+        config = new KafkaSourceConnectorConfig(opts);
+        props = new Properties();
+        
props.putAll(config.allWithPrefix(KafkaSourceConnectorConfig.CONSUMER_PREFIX));
+
+        objectUnderTest = new KafkaSourceTask();
+
+        offsetStorageReader = createMock(OffsetStorageReader.class);
+        context = createMock(SourceTaskContext.class);
+        consumer = createMock(KafkaConsumer.class);
+        objectUnderTest.initialize(context);
+    }
+
+    @After
+    public void teardown() {
+        objectUnderTest = null;
+    }
+
+    private ConsumerRecords createTestRecordsWithHeaders() {
+        RecordHeader header = new RecordHeader("testHeader", new byte[0]);
+        RecordHeaders headers = new RecordHeaders();
+        headers.add(header);
+        TimestampType timestampType = TimestampType.NO_TIMESTAMP_TYPE;
+
+        byte testByte = 0;
+        byte[] testKey = {testByte};
+        byte[] testValue = {testByte};
+
+        ConnectHeaders destinationHeaders = new ConnectHeaders();
+        destinationHeaders.add(header.key(), header.value(), 
Schema.OPTIONAL_BYTES_SCHEMA);
+        ConsumerRecord<byte[], byte[]> testConsumerRecord = new 
ConsumerRecord<byte[], byte[]>(
+                FIRST_TOPIC,
+                FIRST_PARTITION,
+                FIRST_OFFSET,
+                System.currentTimeMillis(),
+                timestampType,
+                0L,
+                0,
+                0,
+                testKey,
+                testValue,
+                headers
+        );
+
+        TopicPartition topicPartition = new TopicPartition(FIRST_TOPIC, 
FIRST_PARTITION);
+        List<ConsumerRecord<byte[], byte[]>> consumerRecords = new 
ArrayList<>();
+        consumerRecords.add(testConsumerRecord);
+
+        Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> 
consumerRecordMap = new HashMap<>(1);
+        consumerRecordMap.put(topicPartition, consumerRecords);
+        ConsumerRecords testRecords = new ConsumerRecords<>(consumerRecordMap);
+        return testRecords;
+    }
+
+    private ConsumerRecords createTestRecords() {
+        byte testByte = 0;
+        byte[] testKey = {testByte};
+        byte[] testValue = {testByte};
+        ConsumerRecord<byte[], byte[]> testConsumerRecord = new 
ConsumerRecord<byte[], byte[]>(FIRST_TOPIC, FIRST_PARTITION, FIRST_OFFSET, 
testKey, testValue);
+        TopicPartition topicPartition = new TopicPartition(FIRST_TOPIC, 
FIRST_PARTITION);
+        List<ConsumerRecord<byte[], byte[]>> consumerRecords = new 
ArrayList<>();
+        consumerRecords.add(testConsumerRecord);
+
+        Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> 
consumerRecordMap = new HashMap<>(1);
+        consumerRecordMap.put(topicPartition, consumerRecords);
+        ConsumerRecords testRecords = new ConsumerRecords<>(consumerRecordMap);
+        return testRecords;
+    }
+
+    private void mockConsumerInitialization() throws Exception {
+        TopicPartition firstTopicPartition = new TopicPartition(FIRST_TOPIC, 
FIRST_PARTITION);
+        Collection<TopicPartition> topicPartitions = new ArrayList<>();
+        topicPartitions.add(firstTopicPartition);
+        Map<TopicPartition, Long> endOffsets = 
Collections.singletonMap(firstTopicPartition, FIRST_OFFSET);
+
+        
EasyMock.expect(context.offsetStorageReader()).andReturn(offsetStorageReader);
+        EasyMock.expect(offsetStorageReader.offsets(EasyMock.<List<Map<String, 
String>>>anyObject())).andReturn(new HashMap<>());
+        PowerMock.expectNew(KafkaConsumer.class, new 
Class[]{Properties.class}, 
config.getKafkaConsumerProperties()).andReturn(consumer);
+        
EasyMock.expect(consumer.endOffsets(topicPartitions)).andReturn(endOffsets);
+        consumer.assign(topicPartitions);
+        EasyMock.expectLastCall();
+        consumer.seek(firstTopicPartition, FIRST_OFFSET);
+        EasyMock.expectLastCall();
+    }
+
+    @Test
+    public void testStartNoStoredPartitionsStartEnd() throws Exception {
+        TopicPartition firstTopicPartition = new TopicPartition(FIRST_TOPIC, 
FIRST_PARTITION);
+        Collection<TopicPartition> topicPartitions = new ArrayList<>();
+        topicPartitions.add(firstTopicPartition);
+        Map<TopicPartition, Long> endOffsets = 
Collections.singletonMap(firstTopicPartition, FIRST_OFFSET);
+
+        
EasyMock.expect(context.offsetStorageReader()).andReturn(offsetStorageReader);
+        EasyMock.expect(offsetStorageReader.offsets(EasyMock.<List<Map<String, 
String>>>anyObject())).andReturn(new HashMap<>());
+        PowerMock.expectNew(KafkaConsumer.class, new 
Class[]{Properties.class}, 
config.getKafkaConsumerProperties()).andReturn(consumer);
+        
EasyMock.expect(consumer.endOffsets(topicPartitions)).andReturn(endOffsets);
+        consumer.assign(topicPartitions);
+        EasyMock.expectLastCall();
+        consumer.seek(firstTopicPartition, FIRST_OFFSET);
+        EasyMock.expectLastCall();
+        replayAll();
+
+        objectUnderTest.start(opts);
+
+        verifyAll();
+    }
+
+    @Test
+    public void testStartNoStoredPartitionsStartBeginning() throws Exception {
+        opts.put(KafkaSourceConnectorConfig.CONSUMER_AUTO_OFFSET_RESET_CONFIG, 
"earliest");
+        config = new KafkaSourceConnectorConfig(opts);
+        props = new Properties();
+        
props.putAll(config.allWithPrefix(KafkaSourceConnectorConfig.CONSUMER_PREFIX));
+
+        TopicPartition firstTopicPartition = new TopicPartition(FIRST_TOPIC, 
FIRST_PARTITION);
+        Collection<TopicPartition> topicPartitions = new ArrayList<>();
+        topicPartitions.add(firstTopicPartition);
+        Map<TopicPartition, Long> endOffsets = 
Collections.singletonMap(firstTopicPartition, FIRST_OFFSET);
+
+        
EasyMock.expect(context.offsetStorageReader()).andReturn(offsetStorageReader);
+        EasyMock.expect(offsetStorageReader.offsets(EasyMock.<List<Map<String, 
String>>>anyObject())).andReturn(new HashMap<>());
+        PowerMock.expectNew(KafkaConsumer.class, new 
Class[]{Properties.class}, 
config.getKafkaConsumerProperties()).andReturn(consumer);
+        
EasyMock.expect(consumer.beginningOffsets(topicPartitions)).andReturn(endOffsets);
+        consumer.assign(topicPartitions);
+        EasyMock.expectLastCall();
+        consumer.seek(firstTopicPartition, FIRST_OFFSET);
+        EasyMock.expectLastCall();
+        replayAll();
+
+        objectUnderTest.start(opts);
+
+        verifyAll();
+    }
+
+    @Test
+    public void testStartAllStoredPartitions() throws Exception {
+        TopicPartition firstTopicPartition = new TopicPartition(FIRST_TOPIC, 
FIRST_PARTITION);
+        Collection<TopicPartition> topicPartitions = new ArrayList<>();
+        topicPartitions.add(firstTopicPartition);
+        Map<Map<String, String>, Map<String, Object>> storedOffsets = 
Collections.singletonMap(
+                Collections.singletonMap(TOPIC_PARTITION_KEY, 
String.format("%s:%d", FIRST_TOPIC, FIRST_PARTITION)),
+                Collections.singletonMap(OFFSET_KEY, FIRST_OFFSET)
+        );
+
+        
EasyMock.expect(context.offsetStorageReader()).andReturn(offsetStorageReader);
+        EasyMock.expect(offsetStorageReader.offsets(EasyMock.<List<Map<String, 
String>>>anyObject())).andReturn(storedOffsets);
+        PowerMock.expectNew(KafkaConsumer.class, new 
Class[]{Properties.class}, 
config.getKafkaConsumerProperties()).andReturn(consumer);
+        consumer.assign(topicPartitions);
+        EasyMock.expectLastCall();
+        consumer.seek(firstTopicPartition, FIRST_OFFSET);
+        EasyMock.expectLastCall();
+        replayAll();
+
+        objectUnderTest.start(opts);
+
+        verifyAll();
+    }
+
+    @Test
+    public void testStartSomeStoredPartitions() throws Exception {
+        
opts.put(KafkaSourceConnectorConfig.TASK_LEADER_TOPIC_PARTITION_CONFIG, 
TASK_LEADER_TOPIC_PARTITION_VALUE + "," + "0:" + SECOND_TOPIC + ":" + 
SECOND_PARTITION);
+        config = new KafkaSourceConnectorConfig(opts);
+        props = new Properties();
+        
props.putAll(config.allWithPrefix(KafkaSourceConnectorConfig.CONSUMER_PREFIX));
+
+        TopicPartition firstTopicPartition = new TopicPartition(FIRST_TOPIC, 
FIRST_PARTITION);
+        TopicPartition secondTopicPartition = new TopicPartition(SECOND_TOPIC, 
SECOND_PARTITION);
+        Collection<TopicPartition> topicPartitions = new ArrayList<>();
+        topicPartitions.add(firstTopicPartition);
+        topicPartitions.add(secondTopicPartition);
+        Map<TopicPartition, Long> endOffsets = 
Collections.singletonMap(firstTopicPartition, FIRST_OFFSET);
+        Map<Map<String, String>, Map<String, Object>> storedOffsets = 
Collections.singletonMap(
+                Collections.singletonMap(TOPIC_PARTITION_KEY, 
String.format("%s:%d", SECOND_TOPIC, SECOND_PARTITION)),
+                Collections.singletonMap(OFFSET_KEY, SECOND_OFFSET)
+        );
+
+        
EasyMock.expect(context.offsetStorageReader()).andReturn(offsetStorageReader);
+        EasyMock.expect(offsetStorageReader.offsets(EasyMock.<List<Map<String, 
String>>>anyObject())).andReturn(storedOffsets);
+        PowerMock.expectNew(KafkaConsumer.class, new 
Class[]{Properties.class}, 
config.getKafkaConsumerProperties()).andReturn(consumer);
+        
EasyMock.expect(consumer.endOffsets(Collections.singletonList(firstTopicPartition))).andReturn(endOffsets);
+        consumer.assign(topicPartitions);
+        EasyMock.expectLastCall();
+        consumer.seek(firstTopicPartition, FIRST_OFFSET);
+        EasyMock.expectLastCall();
+        consumer.seek(secondTopicPartition, SECOND_OFFSET);
+        EasyMock.expectLastCall();
+
+        replayAll();
+
+        objectUnderTest.start(opts);
+
+        verifyAll();
+    }
+
+
+    @Test
+    public void testPollNoRecords() throws Exception {
+        mockConsumerInitialization();
+        
EasyMock.expect(consumer.poll(Duration.ofMillis(POLL_LOOP_TIMEOUT_MS_VALUE))).andReturn(new
 ConsumerRecords<>(Collections.EMPTY_MAP));
+        replayAll();
+
+        objectUnderTest.start(opts);
+        List<SourceRecord> records = objectUnderTest.poll();
+
+        assertEquals(0, records.size());
+
+        verifyAll();
+    }
+
+
+    @Test
+    public void testPollRecordReturnedNoIncludeHeaders() throws Exception {
+        mockConsumerInitialization();
+        
EasyMock.expect(consumer.poll(Duration.ofMillis(POLL_LOOP_TIMEOUT_MS_VALUE))).andReturn(createTestRecords());
+        replayAll();
+
+        objectUnderTest.start(opts);
+        List<SourceRecord> records = objectUnderTest.poll();
+
+        SourceRecord testRecord = records.get(0);
+        assertEquals(String.format("%s:%d", FIRST_TOPIC, FIRST_PARTITION), 
testRecord.sourcePartition().get(TOPIC_PARTITION_KEY));
+        assertEquals(FIRST_OFFSET, testRecord.sourceOffset().get(OFFSET_KEY));
+        assertEquals(0, testRecord.headers().size());
+
+        verifyAll();
+    }
+
+    @Test
+    public void testPollRecordReturnedIncludeHeaders() throws Exception {
+        opts.put(KafkaSourceConnectorConfig.INCLUDE_MESSAGE_HEADERS_CONFIG, 
"true");
+        config = new KafkaSourceConnectorConfig(opts);
+        props = new Properties();
+        
props.putAll(config.allWithPrefix(KafkaSourceConnectorConfig.CONSUMER_PREFIX));
+
+        objectUnderTest = new KafkaSourceTask();
+        offsetStorageReader = createMock(OffsetStorageReader.class);
+        context = createMock(SourceTaskContext.class);
+        consumer = createMock(KafkaConsumer.class);
+        objectUnderTest.initialize(context);
+
+        TopicPartition firstTopicPartition = new TopicPartition(FIRST_TOPIC, 
FIRST_PARTITION);
+        Collection<TopicPartition> topicPartitions = new ArrayList<>();
+        topicPartitions.add(firstTopicPartition);
+        Map<TopicPartition, Long> endOffsets = 
Collections.singletonMap(firstTopicPartition, FIRST_OFFSET);
+
+        
EasyMock.expect(context.offsetStorageReader()).andReturn(offsetStorageReader);
+        EasyMock.expect(offsetStorageReader.offsets(EasyMock.<List<Map<String, 
String>>>anyObject())).andReturn(new HashMap<>());
+        PowerMock.expectNew(KafkaConsumer.class, new 
Class[]{Properties.class}, 
config.getKafkaConsumerProperties()).andReturn(consumer);
+        
EasyMock.expect(consumer.endOffsets(topicPartitions)).andReturn(endOffsets);
+        consumer.assign(topicPartitions);
+        EasyMock.expectLastCall();
+        consumer.seek(firstTopicPartition, FIRST_OFFSET);
+        EasyMock.expectLastCall();
+
+
+        // expectation for poll
+        
EasyMock.expect(consumer.poll(Duration.ofMillis(POLL_LOOP_TIMEOUT_MS_VALUE))).andReturn(createTestRecordsWithHeaders());
+        replayAll();
+
+        objectUnderTest.start(opts);
+        List<SourceRecord> records = objectUnderTest.poll();
+
+        SourceRecord testRecord = records.get(0);
+        assertEquals(String.format("%s:%d", FIRST_TOPIC, FIRST_PARTITION), 
testRecord.sourcePartition().get(TOPIC_PARTITION_KEY));
+        assertEquals(FIRST_OFFSET, testRecord.sourceOffset().get(OFFSET_KEY));
+        assertEquals(1, testRecord.headers().size());
+
+        verifyAll();
+    }
+
+
+    @Test
+    public void testStopClosesConsumer() throws Exception {
+        mockConsumerInitialization();
+
+        consumer.wakeup();
+        EasyMock.expectLastCall();
+        consumer.close(EasyMock.anyObject());
+        EasyMock.expectLastCall();
+
+        replayAll();
+
+        objectUnderTest.start(opts);
+        objectUnderTest.stop();
+
+        verifyAll();
+    }
+}
diff --git 
a/connect/kafka/src/test/java/org/apache/kafka/connect/kafka/LeaderTopicPartitionTest.java
 
b/connect/kafka/src/test/java/org/apache/kafka/connect/kafka/LeaderTopicPartitionTest.java
new file mode 100644
index 00000000000..4be4fa8a404
--- /dev/null
+++ 
b/connect/kafka/src/test/java/org/apache/kafka/connect/kafka/LeaderTopicPartitionTest.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.kafka;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.After;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import static org.junit.Assert.assertFalse;
+
+import static org.junit.Assert.assertEquals;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({LeaderTopicPartition.class})
+@PowerMockIgnore("javax.management.*")
+
+public class LeaderTopicPartitionTest {
+
+    private LeaderTopicPartition objectUnderTest;
+
+    private static final int LEADER_ID = 0;
+    private static final String TOPIC = "test.topic";
+    private static final int PARTITION = 1;
+
+    private static final String LEADER_TOPIC_PARTITION = "0:test.topic:1";
+    private static final String TOPIC_PARTITION = "test.topic:1";
+
+    @Before
+    public void setup() {
+        objectUnderTest = new LeaderTopicPartition(
+                LEADER_ID,
+                TOPIC,
+                PARTITION
+        );
+    }
+
+    @After
+    public void teardown() {
+        objectUnderTest = null;
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testNullTopicName() {
+        objectUnderTest = new LeaderTopicPartition(
+                LEADER_ID,
+                null,
+                PARTITION
+        );
+    }
+
+    @Test
+    public void testToString() {
+        assertEquals(LEADER_TOPIC_PARTITION, objectUnderTest.toString());
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testFromInvalidString() {
+        String invalidString = "test";
+        LeaderTopicPartition.fromString(invalidString);
+    }
+
+    @Test
+    public void testFromString() {
+        objectUnderTest = 
LeaderTopicPartition.fromString(LEADER_TOPIC_PARTITION);
+        assertEquals(LEADER_TOPIC_PARTITION, objectUnderTest.toString());
+    }
+
+    @Test
+    public void testToTopicPartitionString() {
+        assertEquals(TOPIC_PARTITION, 
objectUnderTest.toTopicPartitionString());
+    }
+
+
+    @Test
+    public void testEquals() {
+        LeaderTopicPartition objectUnderTest1 = new LeaderTopicPartition(
+                LEADER_ID,
+                TOPIC,
+                PARTITION
+        );
+        LeaderTopicPartition objectUnderTest2 = new LeaderTopicPartition(
+                LEADER_ID,
+                TOPIC,
+                PARTITION
+        );
+        assert objectUnderTest1.equals(objectUnderTest2);
+    }
+
+    @Test
+    public void testEqualsWithOtherInstance() {
+        String unexpectedString = "NOT_LEADER_TOPIC_PARTITION";
+        assertFalse(objectUnderTest.equals(unexpectedString));
+    }
+
+}
diff --git a/gradle/findbugs-exclude.xml b/gradle/findbugs-exclude.xml
index 09981852f95..cae57fba391 100644
--- a/gradle/findbugs-exclude.xml
+++ b/gradle/findbugs-exclude.xml
@@ -255,6 +255,16 @@ For a detailed description of findbugs bug categories, see 
http://findbugs.sourc
         <Bug pattern="IS2_INCONSISTENT_SYNC" />
     </Match>
 
+    <Match>
+        <!-- Ignore spurious warning about inconsistent synchronization on 
instance variable. Some accesses of the
+             variable must occur in synchronized blocks - stop(), while some 
need not - start(). Connect framework
+             shouldn't be calling stop() before start() has returned for the 
connector.
+             This seems to throw the static checker off. -->
+        <Package name="org.apache.kafka.connect.kafka" />
+        <Source name="KafkaSourceTask.java" />
+        <Bug pattern="IS2_INCONSISTENT_SYNC" />
+    </Match>
+
     <Match>
         <!-- Suppress a spurious warning about an unreleased lock. -->
         <Class name="kafka.utils.timer.SystemTimer"/>
diff --git a/settings.gradle b/settings.gradle
index bbcdc3174b8..d3ff9e3cb72 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -17,4 +17,4 @@ include 'core', 'examples', 'clients', 'tools', 'streams', 
'streams:streams-scal
         'streams:upgrade-system-tests-0100', 
'streams:upgrade-system-tests-0101', 'streams:upgrade-system-tests-0102',
         'streams:upgrade-system-tests-0110', 
'streams:upgrade-system-tests-10', 'streams:upgrade-system-tests-11',
         'log4j-appender', 'connect:api', 'connect:transforms', 
'connect:runtime', 'connect:json', 'connect:file',
-        'connect:basic-auth-extension', 'jmh-benchmarks'
+        'connect:basic-auth-extension', 'connect:kafka', 'jmh-benchmarks'


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> KIP-310: Add a Kafka Source Connector to Kafka Connect
> ------------------------------------------------------
>
>                 Key: KAFKA-6963
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6963
>             Project: Kafka
>          Issue Type: Improvement
>            Reporter: Rhys Anthony McCaig
>            Priority: Major
>
> This proposal introduces a new Kafka Connect Source Connector.
> See the KIP at 
> [KIP-310|https://cwiki.apache.org/confluence/display/KAFKA/KIP-310%3A+Add+a+Kafka+Source+Connector+to+Kafka+Connect]
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to