[ https://issues.apache.org/jira/browse/KAFKA-6963?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16664471#comment-16664471 ]
ASF GitHub Bot commented on KAFKA-6963: --------------------------------------- rhysmccaig closed pull request #5438: KAFKA-6963: KIP-310: Add a Kafka Source Connector to Kafka Connect URL: https://github.com/apache/kafka/pull/5438 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/build.gradle b/build.gradle index 3e8558d4739..f883eb54658 100644 --- a/build.gradle +++ b/build.gradle @@ -516,7 +516,7 @@ for ( sv in availableScalaVersions ) { } } -def connectPkgs = ['connect:api', 'connect:runtime', 'connect:transforms', 'connect:json', 'connect:file', 'connect:basic-auth-extension'] +def connectPkgs = ['connect:api', 'connect:runtime', 'connect:transforms', 'connect:json', 'connect:file', 'connect:basic-auth-extension', 'connect:kafka' ] def pkgs = ['clients', 'examples', 'log4j-appender', 'tools', 'streams', 'streams:streams-scala', 'streams:test-utils', 'streams:examples'] + connectPkgs /** Create one task per default Scala version */ @@ -742,6 +742,8 @@ project(':core') { from(project(':connect:file').configurations.runtime) { into("libs/") } from(project(':connect:basic-auth-extension').jar) { into("libs/") } from(project(':connect:basic-auth-extension').configurations.runtime) { into("libs/") } + from(project(':connect:kafka').jar) { into("libs/") } + from(project(':connect:kafka').configurations.runtime) { into("libs/") } from(project(':streams').jar) { into("libs/") } from(project(':streams').configurations.runtime) { into("libs/") } from(project(':streams:streams-scala').jar) { into("libs/") } @@ -1501,6 +1503,44 @@ project(':connect:basic-auth-extension') { } } +project(':connect:kafka') { + archivesBaseName = "connect-kafka" + + dependencies { + compile project(':connect:api') + compile project(':clients') + compile libs.slf4jApi + + testCompile libs.bcpkix + testCompile libs.easymock + testCompile libs.junit + testCompile libs.powermockJunit4 + testCompile libs.powermockEasymock + testCompile project(':clients').sourceSets.test.output + } + + javadoc { + enabled = false + } + + tasks.create(name: "copyDependantLibs", type: Copy) { + from (configurations.testRuntime) { + include('slf4j-log4j12*') + include('log4j*jar') + } + from (configurations.runtime) { + exclude('kafka-clients*') + exclude('connect-*') + } + into "$buildDir/dependant-libs" + duplicatesStrategy 'exclude' + } + + jar { + dependsOn copyDependantLibs + } +} + task aggregatedJavadoc(type: Javadoc) { def projectsWithJavadoc = subprojects.findAll { it.javadoc.enabled } source = projectsWithJavadoc.collect { it.sourceSets.main.allJava } diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml index 35f42e3a700..d23bc2bb8bc 100644 --- a/checkstyle/import-control.xml +++ b/checkstyle/import-control.xml @@ -370,6 +370,15 @@ <allow pkg="org.powermock" /> </subpackage> + <subpackage name="kafka"> + <allow pkg="org.apache.kafka.connect" /> + <allow pkg="org.apache.kafka.clients.consumer" /> + <!-- for tests --> + <allow pkg="org.easymock" /> + <allow pkg="org.powermock" /> + </subpackage> + + <subpackage name="tools"> <allow pkg="org.apache.kafka.connect" /> <allow pkg="org.apache.kafka.tools" /> diff --git a/config/connect-kafka-source.properties b/config/connect-kafka-source.properties new file mode 100644 index 00000000000..7564478e447 --- /dev/null +++ b/config/connect-kafka-source.properties @@ -0,0 +1,25 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +name=kafka-source +tasks.max=1 +key.converter=org.apache.kafka.connect.converters.ByteArrayConverter +value.converter=org.apache.kafka.connect.converters.ByteArrayConverter +connector.class=org.apache.kafka.connect.kafka.KafkaSourceConnector +source.bootstrap.server=kafka.bootstrap.server1:9092,kafka.bootstrap.server2:9093 +source.topic.whitelist=test.topic.* +source.auto.offset.reset=earliest +source.group.id=kafka-connect-testing +destination.topics.prefix=aggregate. diff --git a/connect/kafka/src/main/java/org/apache/kafka/connect/kafka/KafkaSourceConnector.java b/connect/kafka/src/main/java/org/apache/kafka/connect/kafka/KafkaSourceConnector.java new file mode 100644 index 00000000000..4ef9dc18e7a --- /dev/null +++ b/connect/kafka/src/main/java/org/apache/kafka/connect/kafka/KafkaSourceConnector.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.kafka; + +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.common.utils.AppInfoParser; +import org.apache.kafka.connect.connector.Task; +import org.apache.kafka.connect.source.SourceConnector; +import org.apache.kafka.connect.util.ConnectorUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.HashMap; +import java.util.List; +import java.util.ArrayList; +import java.util.stream.Collectors; + + +/** + * KafkaConnector is a Kafka Connect Connector implementation that generates tasks + * to ingest messages from a source kafka cluster + */ + +public class KafkaSourceConnector extends SourceConnector { + + private static final Logger LOG = LoggerFactory.getLogger(KafkaSourceConnector.class); + private KafkaSourceConnectorConfig connectorConfig; + + private PartitionMonitor partitionMonitor; + + @Override + public String version() { + return AppInfoParser.getVersion(); + } + + @Override + public void start(Map<String, String> config) throws ConfigException { + LOG.info("Connector starting"); + connectorConfig = new KafkaSourceConnectorConfig(config); + LOG.info("Starting Partition Monitor to monitor source kafka cluster partitions"); + partitionMonitor = new PartitionMonitor(context, connectorConfig); + partitionMonitor.start(); + } + + + @Override + public Class<? extends Task> taskClass() { + return KafkaSourceTask.class; + } + + + @Override + public List<Map<String, String>> taskConfigs(int maxTasks) { + List<String> leaderTopicPartitions = partitionMonitor.getCurrentLeaderTopicPartitions() + .stream() + .map(LeaderTopicPartition::toString) + .sorted() // Potential task performance/overhead improvement by roughly grouping tasks and leaders + .collect(Collectors.toList()); + int taskCount = Math.min(maxTasks, leaderTopicPartitions.size()); + if (taskCount < 1) { + LOG.warn("No tasks to start."); + return new ArrayList<>(); + } + return ConnectorUtils.groupPartitions(leaderTopicPartitions, taskCount) + .stream() + .map(leaderTopicPartitionsGroup -> { + Map<String, String> taskConfig = new HashMap<>(); + taskConfig.putAll(connectorConfig.allAsStrings()); + taskConfig.put(KafkaSourceConnectorConfig.TASK_LEADER_TOPIC_PARTITION_CONFIG, String.join(",", leaderTopicPartitionsGroup)); + return taskConfig; + }) + .collect(Collectors.toList()); + } + + @Override + public void stop() { + LOG.info("Connector received stop(). Cleaning Up."); + partitionMonitor.shutdown(); + LOG.info("Connector stopped."); + } + + @Override + public ConfigDef config() { + return KafkaSourceConnectorConfig.CONFIG; + } + +} diff --git a/connect/kafka/src/main/java/org/apache/kafka/connect/kafka/KafkaSourceConnectorConfig.java b/connect/kafka/src/main/java/org/apache/kafka/connect/kafka/KafkaSourceConnectorConfig.java new file mode 100644 index 00000000000..8baca3cf41d --- /dev/null +++ b/connect/kafka/src/main/java/org/apache/kafka/connect/kafka/KafkaSourceConnectorConfig.java @@ -0,0 +1,236 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.kafka; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigDef.Importance; +import org.apache.kafka.common.config.ConfigDef.Type; +import org.apache.kafka.common.config.ConfigDef.Validator; +import org.apache.kafka.common.config.ConfigDef.ValidString; +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; + +import java.util.List; +import java.util.Map; +import java.util.HashMap; +import java.util.Properties; +import java.util.regex.Pattern; +import java.util.regex.PatternSyntaxException; + +public class KafkaSourceConnectorConfig extends AbstractConfig { + + private static final Validator NON_EMPTY_LIST_VALIDATOR = new ConfigDef.Validator() { + @Override + @SuppressWarnings("unchecked") + public void ensureValid(String name, Object value) { + if (((List<String>) value).isEmpty()) { + throw new ConfigException("At least one bootstrap server must be configured in " + name); + } + } + }; + + private static final Validator TOPIC_WHITELIST_REGEX_VALIDATOR = new ConfigDef.Validator() { + @Override + public void ensureValid(String name, Object value) { + getTopicWhitelistPattern((String) value); + } + }; + + // Config Prefixes + public static final String SOURCE_PREFIX = "source."; + public static final String DESTINATION_PREFIX = "destination."; + + // Any CONFIG beginning with this prefix will set the CONFIG parameters for the kafka consumer used in this connector + public static final String CONSUMER_PREFIX = "connector.consumer."; + // Any CONFIG beginning with this prefix will set the CONFIG parameters for the admin client used by the partition monitor + public static final String ADMIN_CLIENT_PREFIX = "connector.admin."; + + public static final String TASK_PREFIX = "task."; + + // Topic partition list we send to each task. Not user configurable. + public static final String TASK_LEADER_TOPIC_PARTITION_CONFIG = TASK_PREFIX.concat("leader.topic.partitions"); + + // General Connector CONFIG + // Topics + public static final String SOURCE_TOPIC_WHITELIST_CONFIG = SOURCE_PREFIX.concat("topic.whitelist"); + public static final String SOURCE_TOPIC_WHITELIST_DOC = "Regular expressions indicating the topics to consume from the source cluster. " + + "Under the hood, the regex is compiled to a <code>java.util.regex.Pattern</code>. " + + "For convenience, comma (',') is interpreted as interpreted as the regex-choice symbol ('|')."; + public static final Object SOURCE_TOPIC_WHITELIST_DEFAULT = ConfigDef.NO_DEFAULT_VALUE; + public static final String DESTINATION_TOPIC_PREFIX_CONFIG = DESTINATION_PREFIX.concat("topics.prefix"); + public static final String DESTINATION_TOPIC_PREFIX_DOC = "Prefix to add to source topic names when delivering messages to destination server"; + public static final String DESTINATION_TOPIC_PREFIX_DEFAULT = ""; + + // Message headers + public static final String INCLUDE_MESSAGE_HEADERS_CONFIG = "include.message.headers"; + public static final String INCLUDE_MESSAGE_HEADERS_DOC = "Indicates whether message headers from source records should be included in output"; + public static final boolean INCLUDE_MESSAGE_HEADERS_DEFAULT = true; + + // Partition Monitor + public static final String TOPIC_LIST_TIMEOUT_MS_CONFIG = "topic.list.timeout.ms"; + public static final String TOPIC_LIST_TIMEOUT_MS_DOC = "Amount of time the partition monitor thread should wait for kafka to return topic information before logging a timeout error."; + public static final int TOPIC_LIST_TIMEOUT_MS_DEFAULT = 60000; + public static final String TOPIC_LIST_POLL_INTERVAL_MS_CONFIG = "topic.list.poll.interval.ms"; + public static final String TOPIC_LIST_POLL_INTERVAL_MS_DOC = "How long to wait before re-querying the source cluster for a change in the partitions to be consumed"; + public static final int TOPIC_LIST_POLL_INTERVAL_MS_DEFAULT = 300000; + public static final String RECONFIGURE_TASKS_ON_LEADER_CHANGE_CONFIG = "reconfigure.tasks.on.partition.leader.change"; + public static final String RECONFIGURE_TASKS_ON_LEADER_CHANGE_DOC = "Indicates whether the partition monitor should request a task reconfiguration when partition leaders have changed"; + public static final boolean RECONFIGURE_TASKS_ON_LEADER_CHANGE_DEFAULT = false; + // Internal Connector Timing + public static final String POLL_LOOP_TIMEOUT_MS_CONFIG = "poll.loop.timeout.ms"; + public static final String POLL_LOOP_TIMEOUT_MS_DOC = "Maximum amount of time to wait in each poll loop without data before cancelling the poll and returning control to the worker task"; + public static final int POLL_LOOP_TIMEOUT_MS_DEFAULT = 1000; + public static final String MAX_SHUTDOWN_WAIT_MS_CONFIG = "max.shutdown.wait.ms"; + public static final String MAX_SHUTDOWN_WAIT_MS_DOC = "Maximum amount of time to wait before forcing the consumer to close"; + public static final int MAX_SHUTDOWN_WAIT_MS_DEFAULT = 2000; + + // General Source Kafka Config - Applies to Consumer and Admin Client if not overridden by CONSUMER_PREFIX or ADMIN_CLIENT_PREFIX + public static final String SOURCE_BOOTSTRAP_SERVERS_CONFIG = SOURCE_PREFIX.concat(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG); + public static final String SOURCE_BOOTSTRAP_SERVERS_DOC = "list of kafka brokers to use to bootstrap the source cluster"; + public static final Object SOURCE_BOOTSTRAP_SERVERS_DEFAULT = ConfigDef.NO_DEFAULT_VALUE; + + // These are the kafka consumer configs we override defaults for + // Note that *any* kafka consumer CONFIG can be set by adding the + // CONSUMER_PREFIX in front of the standard consumer CONFIG strings + public static final String CONSUMER_MAX_POLL_RECORDS_CONFIG = SOURCE_PREFIX.concat(ConsumerConfig.MAX_POLL_RECORDS_CONFIG); + public static final String CONSUMER_MAX_POLL_RECORDS_DOC = "Maximum number of records to return from each poll of the consumer"; + public static final int CONSUMER_MAX_POLL_RECORDS_DEFAULT = 500; + public static final String CONSUMER_AUTO_OFFSET_RESET_CONFIG = SOURCE_PREFIX.concat(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG); + public static final String CONSUMER_AUTO_OFFSET_RESET_DOC = "If there is no stored offset for a partition, where to reset from [earliest|latest]."; + public static final String CONSUMER_AUTO_OFFSET_RESET_DEFAULT = "earliest"; + public static final ValidString CONSUMER_AUTO_OFFSET_RESET_VALIDATOR = ConfigDef.ValidString.in("earliest", "latest"); + public static final String CONSUMER_KEY_DESERIALIZER_CONFIG = SOURCE_PREFIX.concat(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG); + public static final String CONSUMER_KEY_DESERIALIZER_DOC = "Key deserializer to use for the kafka consumers connecting to the source cluster."; + public static final String CONSUMER_KEY_DESERIALIZER_DEFAULT = ByteArrayDeserializer.class.getName(); + public static final String CONSUMER_VALUE_DESERIALIZER_CONFIG = SOURCE_PREFIX.concat(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG); + public static final String CONSUMER_VALUE_DESERIALIZER_DOC = "Value deserializer to use for the kafka consumers connecting to the source cluster."; + public static final String CONSUMER_VALUE_DESERIALIZER_DEFAULT = ByteArrayDeserializer.class.getName(); + public static final String CONSUMER_ENABLE_AUTO_COMMIT_CONFIG = SOURCE_PREFIX.concat(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG); + public static final String CONSUMER_ENABLE_AUTO_COMMIT_DOC = "If true the consumer's offset will be periodically committed to the source cluster in the background. " + + "Note that these offsets are not used to resume the connector (They are stored in the Kafka Connect offset store), but may be useful in monitoring the current offset lag " + + "of this connector on the source cluster"; + public static final Boolean CONSUMER_ENABLE_AUTO_COMMIT_DEFAULT = true; + + public static final String CONSUMER_GROUP_ID_CONFIG = SOURCE_PREFIX.concat(ConsumerConfig.GROUP_ID_CONFIG); + public static final String CONSUMER_GROUP_ID_DOC = "Source Kafka Consumer group id. This must be set if source.enable.auto.commit is set as a group id is required for offset tracking on the source cluster"; + public static final Object CONSUMER_GROUP_ID_DEFAULT = ConfigDef.NO_DEFAULT_VALUE; + + + // Config definition + public static final ConfigDef CONFIG = new ConfigDef() + .define(SOURCE_TOPIC_WHITELIST_CONFIG, Type.STRING, SOURCE_TOPIC_WHITELIST_DEFAULT, TOPIC_WHITELIST_REGEX_VALIDATOR, Importance.HIGH, SOURCE_TOPIC_WHITELIST_DOC) + .define(DESTINATION_TOPIC_PREFIX_CONFIG, Type.STRING, DESTINATION_TOPIC_PREFIX_DEFAULT, Importance.MEDIUM, DESTINATION_TOPIC_PREFIX_DOC) + .define(INCLUDE_MESSAGE_HEADERS_CONFIG, Type.BOOLEAN, INCLUDE_MESSAGE_HEADERS_DEFAULT, Importance.MEDIUM, INCLUDE_MESSAGE_HEADERS_DOC) + .define(TOPIC_LIST_TIMEOUT_MS_CONFIG, Type.INT, TOPIC_LIST_TIMEOUT_MS_DEFAULT, Importance.LOW, TOPIC_LIST_TIMEOUT_MS_DOC) + .define(TOPIC_LIST_POLL_INTERVAL_MS_CONFIG, Type.INT, TOPIC_LIST_POLL_INTERVAL_MS_DEFAULT, Importance.MEDIUM, TOPIC_LIST_POLL_INTERVAL_MS_DOC) + .define(RECONFIGURE_TASKS_ON_LEADER_CHANGE_CONFIG, Type.BOOLEAN, RECONFIGURE_TASKS_ON_LEADER_CHANGE_DEFAULT, Importance.MEDIUM, RECONFIGURE_TASKS_ON_LEADER_CHANGE_DOC) + .define(POLL_LOOP_TIMEOUT_MS_CONFIG, Type.INT, POLL_LOOP_TIMEOUT_MS_DEFAULT, Importance.LOW, POLL_LOOP_TIMEOUT_MS_DOC) + .define(MAX_SHUTDOWN_WAIT_MS_CONFIG, Type.INT, MAX_SHUTDOWN_WAIT_MS_DEFAULT, Importance.LOW, MAX_SHUTDOWN_WAIT_MS_DOC) + .define(SOURCE_BOOTSTRAP_SERVERS_CONFIG, Type.LIST, SOURCE_BOOTSTRAP_SERVERS_DEFAULT, NON_EMPTY_LIST_VALIDATOR, Importance.HIGH, SOURCE_BOOTSTRAP_SERVERS_DOC) + .define(CONSUMER_MAX_POLL_RECORDS_CONFIG, Type.INT, CONSUMER_MAX_POLL_RECORDS_DEFAULT, Importance.LOW, CONSUMER_MAX_POLL_RECORDS_DOC) + .define(CONSUMER_AUTO_OFFSET_RESET_CONFIG, Type.STRING, CONSUMER_AUTO_OFFSET_RESET_DEFAULT, CONSUMER_AUTO_OFFSET_RESET_VALIDATOR, Importance.MEDIUM, CONSUMER_AUTO_OFFSET_RESET_DOC) + .define(CONSUMER_KEY_DESERIALIZER_CONFIG, Type.STRING, CONSUMER_KEY_DESERIALIZER_DEFAULT, Importance.LOW, CONSUMER_KEY_DESERIALIZER_DOC) + .define(CONSUMER_VALUE_DESERIALIZER_CONFIG, Type.STRING, CONSUMER_VALUE_DESERIALIZER_DEFAULT, Importance.LOW, CONSUMER_VALUE_DESERIALIZER_DOC) + .define(CONSUMER_ENABLE_AUTO_COMMIT_CONFIG, Type.BOOLEAN, CONSUMER_ENABLE_AUTO_COMMIT_DEFAULT, Importance.LOW, CONSUMER_ENABLE_AUTO_COMMIT_DOC) + .define(CONSUMER_GROUP_ID_CONFIG, Type.STRING, CONSUMER_GROUP_ID_DEFAULT, new ConfigDef.NonEmptyString(), Importance.MEDIUM, CONSUMER_GROUP_ID_DOC); + + public KafkaSourceConnectorConfig(Map<String, String> props) { + super(CONFIG, props); + } + + + // Returns all values with a specified prefix with the prefix stripped from the key + public Map<String, Object> allWithPrefix(String prefix) { + return allWithPrefix(prefix, true); + } + + // Returns all values with a specified prefix with the prefix stripped from the key if desired + // Original input is set first, then overwritten (if applicable) with the parsed values + public Map<String, Object> allWithPrefix(String prefix, boolean stripPrefix) { + Map<String, Object> result = originalsWithPrefix(prefix, stripPrefix); + for (Map.Entry<String, ?> entry : values().entrySet()) { + if (entry.getKey().startsWith(prefix) && entry.getKey().length() > prefix.length()) { + if (stripPrefix) + result.put(entry.getKey().substring(prefix.length()), entry.getValue()); + else + result.put(entry.getKey(), entry.getValue()); + } + } + return result; + } + + // Returns all values (part of definition or original strings) as strings so they can be used with functions accepting Map<String,String> configs + public Map<String, String> allAsStrings() { + Map<String, String> result = new HashMap<>(); + result.put(DESTINATION_TOPIC_PREFIX_CONFIG, getString(DESTINATION_TOPIC_PREFIX_CONFIG)); + result.put(INCLUDE_MESSAGE_HEADERS_CONFIG, String.valueOf(getBoolean(INCLUDE_MESSAGE_HEADERS_CONFIG))); + result.put(TOPIC_LIST_TIMEOUT_MS_CONFIG, String.valueOf(getInt(TOPIC_LIST_TIMEOUT_MS_CONFIG))); + result.put(TOPIC_LIST_POLL_INTERVAL_MS_CONFIG, String.valueOf(getInt(TOPIC_LIST_POLL_INTERVAL_MS_CONFIG))); + result.put(RECONFIGURE_TASKS_ON_LEADER_CHANGE_CONFIG, String.valueOf(getBoolean(RECONFIGURE_TASKS_ON_LEADER_CHANGE_CONFIG))); + result.put(POLL_LOOP_TIMEOUT_MS_CONFIG, String.valueOf(getInt(POLL_LOOP_TIMEOUT_MS_CONFIG))); + result.put(MAX_SHUTDOWN_WAIT_MS_CONFIG, String.valueOf(getInt(MAX_SHUTDOWN_WAIT_MS_CONFIG))); + result.put(CONSUMER_MAX_POLL_RECORDS_CONFIG, String.valueOf(getInt(CONSUMER_MAX_POLL_RECORDS_CONFIG))); + result.put(CONSUMER_AUTO_OFFSET_RESET_CONFIG, getString(CONSUMER_AUTO_OFFSET_RESET_CONFIG)); + result.put(CONSUMER_KEY_DESERIALIZER_CONFIG, getString(CONSUMER_KEY_DESERIALIZER_CONFIG)); + result.put(CONSUMER_VALUE_DESERIALIZER_CONFIG, getString(CONSUMER_VALUE_DESERIALIZER_CONFIG)); + result.putAll(originalsStrings()); // Will set any values without defaults and will capture additional configs like consumer settings if supplied + return result; + } + + // Return a Properties Object that can be passed to AdminClient.create to configure a Kafka AdminClient instance + public Properties getAdminClientProperties() { + Properties adminClientProps = new Properties(); + // By Default use any settings under SOURCE_PREFIX + adminClientProps.putAll(allWithPrefix(SOURCE_PREFIX)); + // But override with anything under ADMIN_CLIENT_PREFIX + adminClientProps.putAll(allWithPrefix(ADMIN_CLIENT_PREFIX)); + return adminClientProps; + } + + // Return a Properties Object that can be passed to KafkaConsumer + public Properties getKafkaConsumerProperties() { + Properties kafkaConsumerProps = new Properties(); + // By Default use any settings under SOURCE_PREFIX + kafkaConsumerProps.putAll(allWithPrefix(SOURCE_PREFIX)); + // But override with anything under CONSUMER_PREFIX + kafkaConsumerProps.putAll(allWithPrefix(CONSUMER_PREFIX)); + return kafkaConsumerProps; + } + + public Pattern getTopicWhitelistPattern() { + return getTopicWhitelistPattern(getString(SOURCE_TOPIC_WHITELIST_CONFIG)); + } + + // Returns a java regex pattern that can be used to match kafka topics + private static Pattern getTopicWhitelistPattern(String rawRegex) { + String regex = rawRegex + .trim() + .replace(',', '|') + .replace(" ", "") + .replaceAll("^[\"']+", "") + .replaceAll("[\"']+$", ""); // property files may bring quotes + try { + return Pattern.compile(regex); + } catch (PatternSyntaxException e) { + throw new ConfigException(regex + " is an invalid regex for CONFIG " + SOURCE_TOPIC_WHITELIST_CONFIG); + } + } + +} \ No newline at end of file diff --git a/connect/kafka/src/main/java/org/apache/kafka/connect/kafka/KafkaSourceTask.java b/connect/kafka/src/main/java/org/apache/kafka/connect/kafka/KafkaSourceTask.java new file mode 100644 index 00000000000..3b8cbb1f7ff --- /dev/null +++ b/connect/kafka/src/main/java/org/apache/kafka/connect/kafka/KafkaSourceTask.java @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.kafka; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.WakeupException; +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.utils.AppInfoParser; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.header.ConnectHeaders; +import org.apache.kafka.connect.source.SourceTask; +import org.apache.kafka.connect.source.SourceRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.HashMap; +import java.util.List; +import java.util.Arrays; +import java.util.ArrayList; +import java.util.Collections; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; +import java.time.Duration; + + +public class KafkaSourceTask extends SourceTask { + private static final Logger LOG = LoggerFactory.getLogger(KafkaSourceTask.class); + public static final String TOPIC_PARTITION_KEY = "topic:partition"; + public static final String OFFSET_KEY = "offset"; + + // Used to ensure we can be nice and call consumer.close() on shutdown + private final CountDownLatch stopLatch = new CountDownLatch(1); + // Flag to the poll() loop that we are awaiting shutdown so it can clean up. + private AtomicBoolean stop = new AtomicBoolean(false); + // Flag to the stop() function that it needs to wait for poll() to wrap up before trying to close the kafka consumer. + private AtomicBoolean poll = new AtomicBoolean(false); + // Used to enforce synchronized access to stop and poll + private final Object stopLock = new Object(); + + // Settings + private int maxShutdownWait; + private int pollTimeout; + private String topicPrefix; + private boolean includeHeaders; + + // Consumer + private KafkaConsumer<byte[], byte[]> consumer; + + public void start(Map<String, String> opts) { + LOG.info("{}: task is starting.", this); + KafkaSourceConnectorConfig sourceConnectorConfig = new KafkaSourceConnectorConfig(opts); + maxShutdownWait = sourceConnectorConfig.getInt(KafkaSourceConnectorConfig.MAX_SHUTDOWN_WAIT_MS_CONFIG); + pollTimeout = sourceConnectorConfig.getInt(KafkaSourceConnectorConfig.POLL_LOOP_TIMEOUT_MS_CONFIG); + topicPrefix = sourceConnectorConfig.getString(KafkaSourceConnectorConfig.DESTINATION_TOPIC_PREFIX_CONFIG); + includeHeaders = sourceConnectorConfig.getBoolean(KafkaSourceConnectorConfig.INCLUDE_MESSAGE_HEADERS_CONFIG); + String unknownOffsetResetPosition = sourceConnectorConfig.getString(KafkaSourceConnectorConfig.CONSUMER_AUTO_OFFSET_RESET_CONFIG); + // Get the leader topic partitions to work with + List<LeaderTopicPartition> leaderTopicPartitions = Arrays.asList(opts.get(KafkaSourceConnectorConfig.TASK_LEADER_TOPIC_PARTITION_CONFIG) + .split(",")) + .stream() + .map(LeaderTopicPartition::fromString) + .collect(Collectors.toList()); + // retrieve the existing offsets (if any) for the configured partitions + List<Map<String, String>> offsetLookupPartitions = leaderTopicPartitions.stream() + .map(leaderTopicPartition -> Collections.singletonMap(TOPIC_PARTITION_KEY, leaderTopicPartition.toTopicPartitionString())) + .collect(Collectors.toList()); + Map<String, Long> topicPartitionStringsOffsets = context.offsetStorageReader().offsets(offsetLookupPartitions) + .entrySet() + .stream() + .filter(e -> e != null && e.getKey() != null && e.getKey().get(TOPIC_PARTITION_KEY) != null && e.getValue() != null && e.getValue().get(OFFSET_KEY) != null) + .collect(Collectors.toMap(e -> e.getKey().get(TOPIC_PARTITION_KEY), e -> (long) e.getValue().get(OFFSET_KEY))); + // Set up Kafka consumer + consumer = new KafkaConsumer<byte[], byte[]>(sourceConnectorConfig.getKafkaConsumerProperties()); + // Get topic partitions and offsets so we can seek() to them + Map<TopicPartition, Long> topicPartitionOffsets = new HashMap<>(); + List<TopicPartition> topicPartitionsWithUnknownOffset = new ArrayList<>(); + for (LeaderTopicPartition leaderTopicPartition : leaderTopicPartitions) { + String topicPartitionString = leaderTopicPartition.toTopicPartitionString(); + TopicPartition topicPartition = leaderTopicPartition.toTopicPartition(); + if (topicPartitionStringsOffsets.containsKey(topicPartitionString)) { + topicPartitionOffsets.put(topicPartition, topicPartitionStringsOffsets.get(topicPartitionString)); + } else { + // No stored offset? No worries, we will place it it the list to lookup + topicPartitionsWithUnknownOffset.add(topicPartition); + } + } + // Set default offsets for partitions without stored offsets + if (topicPartitionsWithUnknownOffset.size() > 0) { + Map<TopicPartition, Long> defaultOffsets; + LOG.info("The following partitions do not have existing offset data: {}", topicPartitionsWithUnknownOffset); + if (unknownOffsetResetPosition.equals("earliest")) { + LOG.info("Using earliest offsets for partitions without existing offset data."); + defaultOffsets = consumer.beginningOffsets(topicPartitionsWithUnknownOffset); + } else if (unknownOffsetResetPosition.equals("latest")) { + LOG.info("Using latest offsets for partitions without existing offset data."); + defaultOffsets = consumer.endOffsets(topicPartitionsWithUnknownOffset); + } else { + LOG.warn("Config value {}, is set to an unknown value: {}. Partitions without existing offset data will not be consumed.", KafkaSourceConnectorConfig.CONSUMER_AUTO_OFFSET_RESET_CONFIG, unknownOffsetResetPosition); + defaultOffsets = new HashMap<>(); + } + topicPartitionOffsets.putAll(defaultOffsets); + } + // List of topic partitions to assign + List<TopicPartition> topicPartitionsToAssign = new ArrayList<>(topicPartitionOffsets.keySet()); + consumer.assign(topicPartitionsToAssign); + // Seek to desired offset for each partition + topicPartitionOffsets.forEach((key, value) -> consumer.seek(key, value)); + } + + + @Override + public List<SourceRecord> poll() { + if (LOG.isDebugEnabled()) LOG.debug("{}: poll()", this); + synchronized (stopLock) { + if (!stop.get()) + poll.set(true); + } + ArrayList<SourceRecord> records = new ArrayList<>(); + if (poll.get()) { + try { + ConsumerRecords<byte[], byte[]> krecords = consumer.poll(Duration.ofMillis(pollTimeout)); + if (LOG.isDebugEnabled()) LOG.debug("{}: Got {} records from source.", this, krecords.count()); + for (ConsumerRecord<byte[], byte[]> krecord : krecords) { + Map<String, String> sourcePartition = Collections.singletonMap(TOPIC_PARTITION_KEY, krecord.topic().concat(":").concat(Integer.toString(krecord.partition()))); + Map<String, Long> sourceOffset = Collections.singletonMap(OFFSET_KEY, krecord.offset()); + String destinationTopic = topicPrefix.concat(krecord.topic()); + if (LOG.isDebugEnabled()) { + LOG.trace( + "Task: sourceTopic:{} sourcePartition:{} sourceOffSet:{} destinationTopic:{}, key:{}, valueSize:{}", + krecord.topic(), krecord.partition(), krecord.offset(), destinationTopic, krecord.key(), krecord.serializedValueSize() + ); + } + if (includeHeaders) { + // Mapping from source type: org.apache.kafka.common.header.Headers, to destination type: org.apache.kafka.connect.Headers + Headers sourceHeaders = krecord.headers(); + ConnectHeaders destinationHeaders = new ConnectHeaders(); + for (Header header: sourceHeaders) { + if (header != null) { + destinationHeaders.add(header.key(), header.value(), Schema.OPTIONAL_BYTES_SCHEMA); + } + } + records.add(new SourceRecord(sourcePartition, sourceOffset, destinationTopic, null, Schema.OPTIONAL_BYTES_SCHEMA, krecord.key(), Schema.OPTIONAL_BYTES_SCHEMA, krecord.value(), krecord.timestamp(), destinationHeaders)); + } else { + records.add(new SourceRecord(sourcePartition, sourceOffset, destinationTopic, null, Schema.OPTIONAL_BYTES_SCHEMA, krecord.key(), Schema.OPTIONAL_BYTES_SCHEMA, krecord.value(), krecord.timestamp())); + } + } + } catch (WakeupException e) { + LOG.info("{}: Caught WakeupException. Probably shutting down.", this); + } + } + poll.set(false); + // If stop has been set processing, then stop the consumer. + if (stop.get()) { + LOG.debug("{}: stop flag set during poll(), opening stopLatch", this); + stopLatch.countDown(); + } + if (LOG.isDebugEnabled()) LOG.debug("{}: Returning {} records to connect", this, records.size()); + return records; + } + + @Override + public synchronized void stop() { + long startWait = System.currentTimeMillis(); + synchronized (stopLock) { + stop.set(true); + LOG.info("{}: stop() called. Waking up consumer and shutting down", this); + consumer.wakeup(); + if (poll.get()) { + LOG.info("{}: poll() active, awaiting for consumer to wake before attempting to shut down consumer", this); + try { + stopLatch.await(Math.max(0, maxShutdownWait - (System.currentTimeMillis() - startWait)), TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + LOG.warn("{}: Got InterruptedException while waiting on stopLatch", this); + } + } + LOG.info("{}: Shutting down consumer.", this); + consumer.close(Duration.ofMillis(Math.max(0, maxShutdownWait - (System.currentTimeMillis() - startWait)))); + } + LOG.info("{}: task has been stopped", this); + } + + @Override + public String version() { + return AppInfoParser.getVersion(); + } + + public String toString() { + return "KafkaSourceTask@" + Integer.toHexString(hashCode()); + } + + +} \ No newline at end of file diff --git a/connect/kafka/src/main/java/org/apache/kafka/connect/kafka/LeaderTopicPartition.java b/connect/kafka/src/main/java/org/apache/kafka/connect/kafka/LeaderTopicPartition.java new file mode 100644 index 00000000000..46e3397f3cf --- /dev/null +++ b/connect/kafka/src/main/java/org/apache/kafka/connect/kafka/LeaderTopicPartition.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.kafka; + +import org.apache.kafka.common.TopicPartition; + +public class LeaderTopicPartition extends Object { + + private static final String STRING_DELIMITER = ":"; + + private int hash = 0; + + private final int leaderId; + private final String topicName; + private final int partition; + + public LeaderTopicPartition(int leaderId, String topicName, int partition) throws IllegalArgumentException { + this.leaderId = leaderId; + if (topicName == null) + throw new IllegalArgumentException("topicName can not be null"); + this.topicName = topicName; + this.partition = partition; + } + + public static LeaderTopicPartition fromString(String leaderTopicPartitionString) { + String[] tokens = leaderTopicPartitionString.split(STRING_DELIMITER); + if (tokens.length != 3) + throw new IllegalArgumentException("leaderTopicPartitionString must be in the format <leader>:<topic>:<partition>"); + return new LeaderTopicPartition(Integer.parseInt(tokens[0], 10), tokens[1], Integer.parseInt(tokens[2], 10)); + } + + @Override + public String toString() { + return String.valueOf(leaderId) + + STRING_DELIMITER + + topicName + + STRING_DELIMITER + + String.valueOf(partition); + } + + public TopicPartition toTopicPartition() { + return new TopicPartition(topicName, partition); + } + + public String toTopicPartitionString() { + return topicName + STRING_DELIMITER + String.valueOf(partition); + } + + + @Override + public int hashCode() { + if (hash != 0) + return hash; + int result = 1; + result = result * 23 + leaderId; + result = result * 37 + (topicName == null ? 0 : topicName.hashCode()); + result = result * 11 + partition; + this.hash = result; + return result; + } + + @Override + public boolean equals(Object other) { + if (this == other) + return true; + if (!(other instanceof LeaderTopicPartition)) + return false; + LeaderTopicPartition otherLeaderTopicPartition = (LeaderTopicPartition) other; + return leaderId == otherLeaderTopicPartition.leaderId + && ((topicName == null) ? otherLeaderTopicPartition.topicName == null : topicName.equals(otherLeaderTopicPartition.topicName)) + && partition == otherLeaderTopicPartition.partition; + } + +} diff --git a/connect/kafka/src/main/java/org/apache/kafka/connect/kafka/PartitionMonitor.java b/connect/kafka/src/main/java/org/apache/kafka/connect/kafka/PartitionMonitor.java new file mode 100644 index 00000000000..c6c08ebd479 --- /dev/null +++ b/connect/kafka/src/main/java/org/apache/kafka/connect/kafka/PartitionMonitor.java @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.kafka; + +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.ListTopicsOptions; +import org.apache.kafka.clients.admin.DescribeTopicsOptions; +import org.apache.kafka.clients.admin.TopicDescription; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.connect.connector.ConnectorContext; +import org.apache.kafka.connect.errors.ConnectException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.Set; +import java.util.HashSet; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Function; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + + +public class PartitionMonitor { + + private static final Logger LOG = LoggerFactory.getLogger(PartitionMonitor.class); + + // No need to make these options configurable. + + private AtomicBoolean shutdown = new AtomicBoolean(false); + private AdminClient partitionMonitorClient; + private Pattern topicWhitelistPattern; + private volatile Set<LeaderTopicPartition> currentLeaderTopicPartitions = new HashSet<>(); + + private int maxShutdownWaitMs; + private int topicRequestTimeoutMs; + private boolean reconfigureTasksOnLeaderChange; + private Runnable pollThread; + private int topicPollIntervalMs; + private ScheduledExecutorService pollExecutorService; + private ScheduledFuture<?> pollHandle; + + PartitionMonitor(ConnectorContext connectorContext, KafkaSourceConnectorConfig sourceConnectorConfig) { + topicWhitelistPattern = sourceConnectorConfig.getTopicWhitelistPattern(); + reconfigureTasksOnLeaderChange = sourceConnectorConfig.getBoolean(KafkaSourceConnectorConfig.RECONFIGURE_TASKS_ON_LEADER_CHANGE_CONFIG); + topicPollIntervalMs = sourceConnectorConfig.getInt(KafkaSourceConnectorConfig.TOPIC_LIST_POLL_INTERVAL_MS_CONFIG); + maxShutdownWaitMs = sourceConnectorConfig.getInt(KafkaSourceConnectorConfig.MAX_SHUTDOWN_WAIT_MS_CONFIG); + topicRequestTimeoutMs = sourceConnectorConfig.getInt(KafkaSourceConnectorConfig.TOPIC_LIST_TIMEOUT_MS_CONFIG); + partitionMonitorClient = AdminClient.create(sourceConnectorConfig.getAdminClientProperties()); + // Thread to periodically poll the kafka cluster for changes in topics or partitions + pollThread = new Runnable() { + @Override + public void run() { + if (!shutdown.get()) { + LOG.info("Fetching latest topic partitions."); + try { + Set<LeaderTopicPartition> retrievedLeaderTopicPartitions = retrieveLeaderTopicPartitions(topicRequestTimeoutMs); + if (LOG.isDebugEnabled()) { + LOG.debug("retrievedLeaderTopicPartitions: {}", retrievedLeaderTopicPartitions); + LOG.debug("currentLeaderTopicPartitions: {}", getCurrentLeaderTopicPartitions()); + } + boolean requestTaskReconfiguration = false; + if (reconfigureTasksOnLeaderChange) { + if (!retrievedLeaderTopicPartitions.equals(getCurrentLeaderTopicPartitions())) { + LOG.info("Retrieved leaders and topic partitions do not match currently stored leaders and topic partitions, will request task reconfiguration"); + requestTaskReconfiguration = true; + } + } else { + Set<TopicPartition> retrievedTopicPartitions = retrievedLeaderTopicPartitions.stream() + .map(LeaderTopicPartition::toTopicPartition) + .collect(Collectors.toSet()); + if (LOG.isDebugEnabled()) + LOG.debug("retrievedTopicPartitions: {}", retrievedTopicPartitions); + Set<TopicPartition> currentTopicPartitions = getCurrentLeaderTopicPartitions().stream() + .map(LeaderTopicPartition::toTopicPartition) + .collect(Collectors.toSet()); + if (LOG.isDebugEnabled()) + LOG.debug("currentTopicPartitions: {}", currentTopicPartitions); + if (!retrievedTopicPartitions.equals(currentTopicPartitions)) { + LOG.info("Retrieved topic partitions do not match currently stored topic partitions, will request task reconfiguration"); + requestTaskReconfiguration = true; + } + } + setCurrentLeaderTopicPartitions(retrievedLeaderTopicPartitions); + if (requestTaskReconfiguration) + connectorContext.requestTaskReconfiguration(); + else + LOG.info("No partition changes which require reconfiguration have been detected."); + } catch (TimeoutException e) { + LOG.error("Timeout while waiting for AdminClient to return topic list. This indicates a (possibly transient) connection issue, or is an indicator that the timeout is set too low. {}", e); + } catch (ExecutionException e) { + LOG.error("Unexpected ExecutionException. {}", e); + } catch (InterruptedException e) { + LOG.error("InterruptedException. Probably shutting down. {}, e"); + } + } + } + }; + } + + public void start() { + // On start, block until we retrieve the initial list of topic partitions (or at least until timeout) + try { + // This will block while waiting to retrieve data form kafka. Timeout is set so that we don't hang the kafka connect herder if an invalid configuration causes us to retry infinitely. + LOG.info("Retrieving initial topic list from kafka."); + setCurrentLeaderTopicPartitions(retrieveLeaderTopicPartitions(topicRequestTimeoutMs)); + } catch (TimeoutException e) { + LOG.error("Timeout while waiting for AdminClient to return topic list. This likely indicates a (possibly transient) connection issue, but could be an indicator that the timeout is set too low. {}", e); + throw new ConnectException("Timeout while waiting for AdminClient to return topic list. This likely indicates a (possibly transient) connection issue, but could be an indicator that the timeout is set too low."); + } catch (ExecutionException e) { + LOG.error("Unexpected ExecutionException. {}", e); + throw new ConnectException("Unexpected while starting PartitionMonitor."); + } catch (InterruptedException e) { + LOG.error("InterruptedException. {}, e"); + throw new ConnectException("Unexpected InterruptedException while starting PartitionMonitor."); + } + // Schedule a task to periodically run to poll for new data + pollExecutorService = Executors.newSingleThreadScheduledExecutor(); + pollHandle = pollExecutorService.scheduleWithFixedDelay(pollThread, topicPollIntervalMs, topicPollIntervalMs, TimeUnit.MILLISECONDS); + } + + private boolean matchedTopicFilter(String topic) { + return topicWhitelistPattern.matcher(topic).matches(); + } + + private synchronized void setCurrentLeaderTopicPartitions(Set<LeaderTopicPartition> leaderTopicPartitions) { + currentLeaderTopicPartitions = leaderTopicPartitions; + } + + public synchronized Set<LeaderTopicPartition> getCurrentLeaderTopicPartitions() { + return currentLeaderTopicPartitions; + } + + // Allow the main thread a chance to shut down gracefully + public void shutdown() { + LOG.info("Shutdown called."); + long startWait = System.currentTimeMillis(); + shutdown.set(true); + partitionMonitorClient.close(maxShutdownWaitMs - (System.currentTimeMillis() - startWait), TimeUnit.MILLISECONDS); + // Cancel our scheduled task, but wait for an existing task to complete if running + pollHandle.cancel(false); + // Ask nicely to shut down the partition monitor executor service if it hasn't already + if (!pollExecutorService.isShutdown()) { + try { + pollExecutorService.awaitTermination(maxShutdownWaitMs - (System.currentTimeMillis() - startWait), TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + LOG.warn("Got InterruptedException while waiting for pollExecutorService to shutdown, shutdown will be forced."); + } + } + if (!pollExecutorService.isShutdown()) { + pollExecutorService.shutdownNow(); + } + LOG.info("Shutdown Complete."); + } + + + // Retrieve a list of LeaderTopicPartitions that match our topic filter + private synchronized Set<LeaderTopicPartition> retrieveLeaderTopicPartitions(int requestTimeoutMs) throws InterruptedException, ExecutionException, TimeoutException { + long startWait = System.currentTimeMillis(); + + ListTopicsOptions listTopicsOptions = new ListTopicsOptions().listInternal(false).timeoutMs((int) (requestTimeoutMs - (System.currentTimeMillis() - startWait))); + Set<String> retrievedTopicSet = partitionMonitorClient.listTopics(listTopicsOptions).names().get(requestTimeoutMs - (System.currentTimeMillis() - startWait), TimeUnit.MILLISECONDS); + LOG.debug("Server topic list: {}", retrievedTopicSet); + Set<String> matchedTopicSet = retrievedTopicSet.stream() + .filter(topic -> matchedTopicFilter(topic)) + .collect(Collectors.toSet()); + LOG.debug("Matched topic list: {}", matchedTopicSet); + + DescribeTopicsOptions describeTopicsOptions = new DescribeTopicsOptions().timeoutMs((int) (requestTimeoutMs - (System.currentTimeMillis() - startWait))); + Map<String, TopicDescription> retrievedTopicDescriptions = partitionMonitorClient.describeTopics(matchedTopicSet, describeTopicsOptions).all().get(requestTimeoutMs - (System.currentTimeMillis() - startWait), TimeUnit.MILLISECONDS); + return retrievedTopicDescriptions.values().stream() + .map(topicDescription -> + topicDescription.partitions().stream() + .map(partitionInfo -> new LeaderTopicPartition(partitionInfo.leader().id(), topicDescription.name(), partitionInfo.partition())) + ) + .flatMap(Function.identity()) + .collect(Collectors.toSet()); + } + +} diff --git a/connect/kafka/src/test/java/org/apache/kafka/connect/kafka/KafkaSourceConnectorTest.java b/connect/kafka/src/test/java/org/apache/kafka/connect/kafka/KafkaSourceConnectorTest.java new file mode 100644 index 00000000000..cb825570148 --- /dev/null +++ b/connect/kafka/src/test/java/org/apache/kafka/connect/kafka/KafkaSourceConnectorTest.java @@ -0,0 +1,236 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.kafka; + +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.connect.connector.ConnectorContext; + +import java.util.List; +import java.util.Map; +import java.util.HashMap; +import java.util.Set; +import java.util.HashSet; + +import static org.junit.Assert.assertEquals; +import static org.powermock.api.support.membermodification.MemberMatcher.method; +import static org.powermock.api.support.membermodification.MemberModifier.suppress; + +import org.easymock.EasyMock; +import org.junit.Before; +import org.junit.Test; +import org.junit.After; +import org.junit.runner.RunWith; + +import org.powermock.core.classloader.annotations.PowerMockIgnore; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; +import org.powermock.api.easymock.PowerMock; +import org.easymock.EasyMockSupport; + +@RunWith(PowerMockRunner.class) +@PrepareForTest({KafkaSourceConnector.class, PartitionMonitor.class}) +@PowerMockIgnore("javax.management.*") +public class KafkaSourceConnectorTest extends EasyMockSupport { + + private PartitionMonitor partitionMonitorMock; + private ConnectorContext connectorContextMock; + + private Set<LeaderTopicPartition> stubLeaderTopicPartitions; + + private KafkaSourceConnector connector; + + private Map<String, String> sourceProperties; + + private static final String SOURCE_TOPICS_VALUE = "test.topic"; + private static final String SOURCE_BOOTSTRAP_SERVERS_CONFIG = "localhost:6000"; + private static final String POLL_LOOP_TIMEOUT_MS_VALUE = "2000"; + private static final String TOPIC_LIST_TIMEOUT_MS_VALUE = "5000"; + private static final String CONSUMER_GROUP_ID_VALUE = "test-consumer-group"; + + @Before + public void setUp() throws Exception { + connector = new KafkaSourceConnector(); + connectorContextMock = PowerMock.createMock(ConnectorContext.class); + partitionMonitorMock = PowerMock.createMock(PartitionMonitor.class); + connector.initialize(connectorContextMock); + + // Default test settings + sourceProperties = new HashMap<>(); + sourceProperties.put(KafkaSourceConnectorConfig.SOURCE_TOPIC_WHITELIST_CONFIG, SOURCE_TOPICS_VALUE); + sourceProperties.put(KafkaSourceConnectorConfig.SOURCE_BOOTSTRAP_SERVERS_CONFIG, SOURCE_BOOTSTRAP_SERVERS_CONFIG); + sourceProperties.put(KafkaSourceConnectorConfig.POLL_LOOP_TIMEOUT_MS_CONFIG, POLL_LOOP_TIMEOUT_MS_VALUE); + sourceProperties.put(KafkaSourceConnectorConfig.TOPIC_LIST_TIMEOUT_MS_CONFIG, TOPIC_LIST_TIMEOUT_MS_VALUE); + sourceProperties.put(KafkaSourceConnectorConfig.CONSUMER_GROUP_ID_CONFIG, CONSUMER_GROUP_ID_VALUE); + + // Default leader topic partitions to return (just one) + stubLeaderTopicPartitions = new HashSet<>(); + LeaderTopicPartition leaderTopicPartition = new LeaderTopicPartition(0, SOURCE_TOPICS_VALUE, 0); + stubLeaderTopicPartitions.add(leaderTopicPartition); + } + + @After + public void tearDown() { + } + + + @Test(expected = ConfigException.class) + public void testStartMissingBootstrapServers() { + suppress(method(PartitionMonitor.class, "start")); + PowerMock.replayAll(); + + sourceProperties.remove(KafkaSourceConnectorConfig.SOURCE_BOOTSTRAP_SERVERS_CONFIG); + connector.start(sourceProperties); + + PowerMock.verifyAll(); + } + + @Test(expected = ConfigException.class) + public void testStartBlankBootstrapServers() { + suppress(method(PartitionMonitor.class, "start")); + PowerMock.replayAll(); + + sourceProperties.put(KafkaSourceConnectorConfig.SOURCE_BOOTSTRAP_SERVERS_CONFIG, ""); + connector.start(sourceProperties); + + PowerMock.verifyAll(); + } + + @Test(expected = ConfigException.class) + public void testStartTopicWhitelistMissing() { + suppress(method(PartitionMonitor.class, "start")); + replayAll(); + + sourceProperties.remove(KafkaSourceConnectorConfig.SOURCE_TOPIC_WHITELIST_CONFIG); + connector.start(sourceProperties); + + PowerMock.verifyAll(); + } + + @Test + public void testStartCorrectConfig() throws Exception { + PowerMock.expectNew( + PartitionMonitor.class, + new Class<?>[] {ConnectorContext.class, KafkaSourceConnectorConfig.class}, + EasyMock.anyObject(ConnectorContext.class), + EasyMock.anyObject(KafkaSourceConnectorConfig.class) + ).andStubReturn(partitionMonitorMock); + partitionMonitorMock.start(); + PowerMock.expectLastCall().andVoid(); + PowerMock.replayAll(); + + connector.start(sourceProperties); + + verifyAll(); + } + + @Test + public void testTaskConfigsReturns1TaskOnOneTopicPartition() throws Exception { + PowerMock.expectNew( + PartitionMonitor.class, + new Class<?>[] {ConnectorContext.class, KafkaSourceConnectorConfig.class}, + EasyMock.anyObject(ConnectorContext.class), + EasyMock.anyObject(KafkaSourceConnectorConfig.class) + ).andStubReturn(partitionMonitorMock); + partitionMonitorMock.start(); + PowerMock.expectLastCall().andVoid(); + EasyMock.expect(partitionMonitorMock.getCurrentLeaderTopicPartitions()).andReturn(stubLeaderTopicPartitions); + PowerMock.replayAll(); + + connector.start(sourceProperties); + List<Map<String, String>> taskConfigs = connector.taskConfigs(2); + + assertEquals(1, taskConfigs.size()); + assertEquals("0:test.topic:0", + taskConfigs.get(0).get("task.leader.topic.partitions")); + assertEquals(SOURCE_TOPICS_VALUE, + taskConfigs.get(0).get(KafkaSourceConnectorConfig.SOURCE_TOPIC_WHITELIST_CONFIG)); + assertEquals(SOURCE_BOOTSTRAP_SERVERS_CONFIG, + taskConfigs.get(0).get(KafkaSourceConnectorConfig.SOURCE_BOOTSTRAP_SERVERS_CONFIG)); + + verifyAll(); + } + + @Test + public void testTaskConfigsReturns1TaskOnTwoTopicPartitions() throws Exception { + // Default leader topic partitions to return (just one) + stubLeaderTopicPartitions = new HashSet<>(); + LeaderTopicPartition leaderTopicPartition1 = new LeaderTopicPartition(0, SOURCE_TOPICS_VALUE, 0); + stubLeaderTopicPartitions.add(leaderTopicPartition1); + LeaderTopicPartition leaderTopicPartition2 = new LeaderTopicPartition(0, SOURCE_TOPICS_VALUE, 1); + stubLeaderTopicPartitions.add(leaderTopicPartition2); + + PowerMock.expectNew( + PartitionMonitor.class, + new Class<?>[] {ConnectorContext.class, KafkaSourceConnectorConfig.class}, + EasyMock.anyObject(ConnectorContext.class), + EasyMock.anyObject(KafkaSourceConnectorConfig.class) + ).andStubReturn(partitionMonitorMock); + + partitionMonitorMock.start(); + PowerMock.expectLastCall().andVoid(); + EasyMock.expect(partitionMonitorMock.getCurrentLeaderTopicPartitions()).andReturn(stubLeaderTopicPartitions); + PowerMock.replayAll(); + + connector.start(sourceProperties); + List<Map<String, String>> taskConfigs = connector.taskConfigs(1); + + assertEquals(1, taskConfigs.size()); + + PowerMock.verifyAll(); + } + + @Test + public void testTaskConfigsReturns2TasksOnTwoTopicPartitions() throws Exception { + // Default leader topic partitions to return (just one) + stubLeaderTopicPartitions = new HashSet<>(); + LeaderTopicPartition leaderTopicPartition1 = new LeaderTopicPartition(0, SOURCE_TOPICS_VALUE, 0); + stubLeaderTopicPartitions.add(leaderTopicPartition1); + LeaderTopicPartition leaderTopicPartition2 = new LeaderTopicPartition(0, SOURCE_TOPICS_VALUE, 1); + stubLeaderTopicPartitions.add(leaderTopicPartition2); + + PowerMock.expectNew( + PartitionMonitor.class, + new Class<?>[] {ConnectorContext.class, KafkaSourceConnectorConfig.class}, + EasyMock.anyObject(ConnectorContext.class), + EasyMock.anyObject(KafkaSourceConnectorConfig.class) + ).andStubReturn(partitionMonitorMock); + + partitionMonitorMock.start(); + PowerMock.expectLastCall().andVoid(); + EasyMock.expect(partitionMonitorMock.getCurrentLeaderTopicPartitions()).andReturn(stubLeaderTopicPartitions); + PowerMock.replayAll(); + + connector.start(sourceProperties); + List<Map<String, String>> taskConfigs = connector.taskConfigs(2); + + assertEquals(2, taskConfigs.size()); + + PowerMock.verifyAll(); + } + + + @Test + public void testTaskClass() { + replayAll(); + + assertEquals(KafkaSourceTask.class, connector.taskClass()); + + verifyAll(); + } + + +} diff --git a/connect/kafka/src/test/java/org/apache/kafka/connect/kafka/KafkaSourceTaskTest.java b/connect/kafka/src/test/java/org/apache/kafka/connect/kafka/KafkaSourceTaskTest.java new file mode 100644 index 00000000000..b8269102115 --- /dev/null +++ b/connect/kafka/src/test/java/org/apache/kafka/connect/kafka/KafkaSourceTaskTest.java @@ -0,0 +1,399 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.kafka; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.header.internals.RecordHeader; +import org.apache.kafka.common.header.internals.RecordHeaders; +import org.apache.kafka.common.record.TimestampType; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.header.ConnectHeaders; +import org.apache.kafka.connect.source.SourceRecord; +import org.apache.kafka.connect.source.SourceTaskContext; +import org.apache.kafka.connect.storage.OffsetStorageReader; +import org.easymock.EasyMock; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.powermock.api.easymock.PowerMock; +import org.powermock.core.classloader.annotations.PowerMockIgnore; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import java.time.Duration; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.ArrayList; +import java.util.Map; +import java.util.HashMap; +import java.util.Properties; + +import static org.junit.Assert.assertEquals; + +import static org.apache.kafka.connect.kafka.KafkaSourceTask.OFFSET_KEY; +import static org.apache.kafka.connect.kafka.KafkaSourceTask.TOPIC_PARTITION_KEY; +import static org.powermock.api.easymock.PowerMock.createMock; +import static org.powermock.api.easymock.PowerMock.replayAll; +import static org.powermock.api.easymock.PowerMock.verifyAll; + +@RunWith(PowerMockRunner.class) +@PrepareForTest({ + KafkaSourceTask.class, + OffsetStorageReader.class, + SourceTaskContext.class, + KafkaConsumer.class +}) +@PowerMockIgnore("javax.management") + +public class KafkaSourceTaskTest { + + private KafkaSourceTask objectUnderTest; + + private Map<String, String> opts; + private Properties props; + private KafkaSourceConnectorConfig config; + + private static final String MAX_SHUTDOWN_WAIT_MS_VALUE = "2000"; + private static final int POLL_LOOP_TIMEOUT_MS_VALUE = 25; + private static final String DESTINATION_TOPIC_PREFIX_VALUE = "test.destination"; + private static final String INCLUDE_MESSAGE_HEADERS_VALUE = "false"; + private static final String CONSUMER_AUTO_OFFSET_RESET_VALUE = "0"; + private static final String SOURCE_BOOTSTRAP_SERVERS_VALUE = "localhost:6000"; + private static final String TASK_LEADER_TOPIC_PARTITION_VALUE = "0:test.topic:1"; + private static final String AUTO_OFFSET_RESET_VALUE = "latest"; + private static final String SOURCE_TOPICS_WHITELIST_VALUE = "test*"; + private static final String CONSUMER_GROUP_ID_VALUE = "test-consumer-group"; + + private static final String FIRST_TOPIC = "test.topic"; + private static final int FIRST_PARTITION = 1; + private static final long FIRST_OFFSET = 123L; + private static final String SECOND_TOPIC = "another.test.topic"; + private static final int SECOND_PARTITION = 0; + private static final long SECOND_OFFSET = 456L; + + + private OffsetStorageReader offsetStorageReader; + private SourceTaskContext context; + private KafkaConsumer consumer; + + @Before + public void setup() { + + opts = new HashMap<>(); + opts.put(KafkaSourceConnectorConfig.SOURCE_TOPIC_WHITELIST_CONFIG, SOURCE_TOPICS_WHITELIST_VALUE); + opts.put(KafkaSourceConnectorConfig.MAX_SHUTDOWN_WAIT_MS_CONFIG, MAX_SHUTDOWN_WAIT_MS_VALUE); + opts.put(KafkaSourceConnectorConfig.POLL_LOOP_TIMEOUT_MS_CONFIG, String.valueOf(POLL_LOOP_TIMEOUT_MS_VALUE)); + opts.put(KafkaSourceConnectorConfig.DESTINATION_TOPIC_PREFIX_CONFIG, DESTINATION_TOPIC_PREFIX_VALUE); + opts.put(KafkaSourceConnectorConfig.INCLUDE_MESSAGE_HEADERS_CONFIG, INCLUDE_MESSAGE_HEADERS_VALUE); + opts.put(KafkaSourceConnectorConfig.CONSUMER_AUTO_OFFSET_RESET_CONFIG, CONSUMER_AUTO_OFFSET_RESET_VALUE); + opts.put(KafkaSourceConnectorConfig.SOURCE_BOOTSTRAP_SERVERS_CONFIG, SOURCE_BOOTSTRAP_SERVERS_VALUE); + opts.put(KafkaSourceConnectorConfig.TASK_LEADER_TOPIC_PARTITION_CONFIG, TASK_LEADER_TOPIC_PARTITION_VALUE); + opts.put(KafkaSourceConnectorConfig.CONSUMER_AUTO_OFFSET_RESET_CONFIG, AUTO_OFFSET_RESET_VALUE); + opts.put(KafkaSourceConnectorConfig.CONSUMER_GROUP_ID_CONFIG, CONSUMER_GROUP_ID_VALUE); + + config = new KafkaSourceConnectorConfig(opts); + props = new Properties(); + props.putAll(config.allWithPrefix(KafkaSourceConnectorConfig.CONSUMER_PREFIX)); + + objectUnderTest = new KafkaSourceTask(); + + offsetStorageReader = createMock(OffsetStorageReader.class); + context = createMock(SourceTaskContext.class); + consumer = createMock(KafkaConsumer.class); + objectUnderTest.initialize(context); + } + + @After + public void teardown() { + objectUnderTest = null; + } + + private ConsumerRecords createTestRecordsWithHeaders() { + RecordHeader header = new RecordHeader("testHeader", new byte[0]); + RecordHeaders headers = new RecordHeaders(); + headers.add(header); + TimestampType timestampType = TimestampType.NO_TIMESTAMP_TYPE; + + byte testByte = 0; + byte[] testKey = {testByte}; + byte[] testValue = {testByte}; + + ConnectHeaders destinationHeaders = new ConnectHeaders(); + destinationHeaders.add(header.key(), header.value(), Schema.OPTIONAL_BYTES_SCHEMA); + ConsumerRecord<byte[], byte[]> testConsumerRecord = new ConsumerRecord<byte[], byte[]>( + FIRST_TOPIC, + FIRST_PARTITION, + FIRST_OFFSET, + System.currentTimeMillis(), + timestampType, + 0L, + 0, + 0, + testKey, + testValue, + headers + ); + + TopicPartition topicPartition = new TopicPartition(FIRST_TOPIC, FIRST_PARTITION); + List<ConsumerRecord<byte[], byte[]>> consumerRecords = new ArrayList<>(); + consumerRecords.add(testConsumerRecord); + + Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> consumerRecordMap = new HashMap<>(1); + consumerRecordMap.put(topicPartition, consumerRecords); + ConsumerRecords testRecords = new ConsumerRecords<>(consumerRecordMap); + return testRecords; + } + + private ConsumerRecords createTestRecords() { + byte testByte = 0; + byte[] testKey = {testByte}; + byte[] testValue = {testByte}; + ConsumerRecord<byte[], byte[]> testConsumerRecord = new ConsumerRecord<byte[], byte[]>(FIRST_TOPIC, FIRST_PARTITION, FIRST_OFFSET, testKey, testValue); + TopicPartition topicPartition = new TopicPartition(FIRST_TOPIC, FIRST_PARTITION); + List<ConsumerRecord<byte[], byte[]>> consumerRecords = new ArrayList<>(); + consumerRecords.add(testConsumerRecord); + + Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> consumerRecordMap = new HashMap<>(1); + consumerRecordMap.put(topicPartition, consumerRecords); + ConsumerRecords testRecords = new ConsumerRecords<>(consumerRecordMap); + return testRecords; + } + + private void mockConsumerInitialization() throws Exception { + TopicPartition firstTopicPartition = new TopicPartition(FIRST_TOPIC, FIRST_PARTITION); + Collection<TopicPartition> topicPartitions = new ArrayList<>(); + topicPartitions.add(firstTopicPartition); + Map<TopicPartition, Long> endOffsets = Collections.singletonMap(firstTopicPartition, FIRST_OFFSET); + + EasyMock.expect(context.offsetStorageReader()).andReturn(offsetStorageReader); + EasyMock.expect(offsetStorageReader.offsets(EasyMock.<List<Map<String, String>>>anyObject())).andReturn(new HashMap<>()); + PowerMock.expectNew(KafkaConsumer.class, new Class[]{Properties.class}, config.getKafkaConsumerProperties()).andReturn(consumer); + EasyMock.expect(consumer.endOffsets(topicPartitions)).andReturn(endOffsets); + consumer.assign(topicPartitions); + EasyMock.expectLastCall(); + consumer.seek(firstTopicPartition, FIRST_OFFSET); + EasyMock.expectLastCall(); + } + + @Test + public void testStartNoStoredPartitionsStartEnd() throws Exception { + TopicPartition firstTopicPartition = new TopicPartition(FIRST_TOPIC, FIRST_PARTITION); + Collection<TopicPartition> topicPartitions = new ArrayList<>(); + topicPartitions.add(firstTopicPartition); + Map<TopicPartition, Long> endOffsets = Collections.singletonMap(firstTopicPartition, FIRST_OFFSET); + + EasyMock.expect(context.offsetStorageReader()).andReturn(offsetStorageReader); + EasyMock.expect(offsetStorageReader.offsets(EasyMock.<List<Map<String, String>>>anyObject())).andReturn(new HashMap<>()); + PowerMock.expectNew(KafkaConsumer.class, new Class[]{Properties.class}, config.getKafkaConsumerProperties()).andReturn(consumer); + EasyMock.expect(consumer.endOffsets(topicPartitions)).andReturn(endOffsets); + consumer.assign(topicPartitions); + EasyMock.expectLastCall(); + consumer.seek(firstTopicPartition, FIRST_OFFSET); + EasyMock.expectLastCall(); + replayAll(); + + objectUnderTest.start(opts); + + verifyAll(); + } + + @Test + public void testStartNoStoredPartitionsStartBeginning() throws Exception { + opts.put(KafkaSourceConnectorConfig.CONSUMER_AUTO_OFFSET_RESET_CONFIG, "earliest"); + config = new KafkaSourceConnectorConfig(opts); + props = new Properties(); + props.putAll(config.allWithPrefix(KafkaSourceConnectorConfig.CONSUMER_PREFIX)); + + TopicPartition firstTopicPartition = new TopicPartition(FIRST_TOPIC, FIRST_PARTITION); + Collection<TopicPartition> topicPartitions = new ArrayList<>(); + topicPartitions.add(firstTopicPartition); + Map<TopicPartition, Long> endOffsets = Collections.singletonMap(firstTopicPartition, FIRST_OFFSET); + + EasyMock.expect(context.offsetStorageReader()).andReturn(offsetStorageReader); + EasyMock.expect(offsetStorageReader.offsets(EasyMock.<List<Map<String, String>>>anyObject())).andReturn(new HashMap<>()); + PowerMock.expectNew(KafkaConsumer.class, new Class[]{Properties.class}, config.getKafkaConsumerProperties()).andReturn(consumer); + EasyMock.expect(consumer.beginningOffsets(topicPartitions)).andReturn(endOffsets); + consumer.assign(topicPartitions); + EasyMock.expectLastCall(); + consumer.seek(firstTopicPartition, FIRST_OFFSET); + EasyMock.expectLastCall(); + replayAll(); + + objectUnderTest.start(opts); + + verifyAll(); + } + + @Test + public void testStartAllStoredPartitions() throws Exception { + TopicPartition firstTopicPartition = new TopicPartition(FIRST_TOPIC, FIRST_PARTITION); + Collection<TopicPartition> topicPartitions = new ArrayList<>(); + topicPartitions.add(firstTopicPartition); + Map<Map<String, String>, Map<String, Object>> storedOffsets = Collections.singletonMap( + Collections.singletonMap(TOPIC_PARTITION_KEY, String.format("%s:%d", FIRST_TOPIC, FIRST_PARTITION)), + Collections.singletonMap(OFFSET_KEY, FIRST_OFFSET) + ); + + EasyMock.expect(context.offsetStorageReader()).andReturn(offsetStorageReader); + EasyMock.expect(offsetStorageReader.offsets(EasyMock.<List<Map<String, String>>>anyObject())).andReturn(storedOffsets); + PowerMock.expectNew(KafkaConsumer.class, new Class[]{Properties.class}, config.getKafkaConsumerProperties()).andReturn(consumer); + consumer.assign(topicPartitions); + EasyMock.expectLastCall(); + consumer.seek(firstTopicPartition, FIRST_OFFSET); + EasyMock.expectLastCall(); + replayAll(); + + objectUnderTest.start(opts); + + verifyAll(); + } + + @Test + public void testStartSomeStoredPartitions() throws Exception { + opts.put(KafkaSourceConnectorConfig.TASK_LEADER_TOPIC_PARTITION_CONFIG, TASK_LEADER_TOPIC_PARTITION_VALUE + "," + "0:" + SECOND_TOPIC + ":" + SECOND_PARTITION); + config = new KafkaSourceConnectorConfig(opts); + props = new Properties(); + props.putAll(config.allWithPrefix(KafkaSourceConnectorConfig.CONSUMER_PREFIX)); + + TopicPartition firstTopicPartition = new TopicPartition(FIRST_TOPIC, FIRST_PARTITION); + TopicPartition secondTopicPartition = new TopicPartition(SECOND_TOPIC, SECOND_PARTITION); + Collection<TopicPartition> topicPartitions = new ArrayList<>(); + topicPartitions.add(firstTopicPartition); + topicPartitions.add(secondTopicPartition); + Map<TopicPartition, Long> endOffsets = Collections.singletonMap(firstTopicPartition, FIRST_OFFSET); + Map<Map<String, String>, Map<String, Object>> storedOffsets = Collections.singletonMap( + Collections.singletonMap(TOPIC_PARTITION_KEY, String.format("%s:%d", SECOND_TOPIC, SECOND_PARTITION)), + Collections.singletonMap(OFFSET_KEY, SECOND_OFFSET) + ); + + EasyMock.expect(context.offsetStorageReader()).andReturn(offsetStorageReader); + EasyMock.expect(offsetStorageReader.offsets(EasyMock.<List<Map<String, String>>>anyObject())).andReturn(storedOffsets); + PowerMock.expectNew(KafkaConsumer.class, new Class[]{Properties.class}, config.getKafkaConsumerProperties()).andReturn(consumer); + EasyMock.expect(consumer.endOffsets(Collections.singletonList(firstTopicPartition))).andReturn(endOffsets); + consumer.assign(topicPartitions); + EasyMock.expectLastCall(); + consumer.seek(firstTopicPartition, FIRST_OFFSET); + EasyMock.expectLastCall(); + consumer.seek(secondTopicPartition, SECOND_OFFSET); + EasyMock.expectLastCall(); + + replayAll(); + + objectUnderTest.start(opts); + + verifyAll(); + } + + + @Test + public void testPollNoRecords() throws Exception { + mockConsumerInitialization(); + EasyMock.expect(consumer.poll(Duration.ofMillis(POLL_LOOP_TIMEOUT_MS_VALUE))).andReturn(new ConsumerRecords<>(Collections.EMPTY_MAP)); + replayAll(); + + objectUnderTest.start(opts); + List<SourceRecord> records = objectUnderTest.poll(); + + assertEquals(0, records.size()); + + verifyAll(); + } + + + @Test + public void testPollRecordReturnedNoIncludeHeaders() throws Exception { + mockConsumerInitialization(); + EasyMock.expect(consumer.poll(Duration.ofMillis(POLL_LOOP_TIMEOUT_MS_VALUE))).andReturn(createTestRecords()); + replayAll(); + + objectUnderTest.start(opts); + List<SourceRecord> records = objectUnderTest.poll(); + + SourceRecord testRecord = records.get(0); + assertEquals(String.format("%s:%d", FIRST_TOPIC, FIRST_PARTITION), testRecord.sourcePartition().get(TOPIC_PARTITION_KEY)); + assertEquals(FIRST_OFFSET, testRecord.sourceOffset().get(OFFSET_KEY)); + assertEquals(0, testRecord.headers().size()); + + verifyAll(); + } + + @Test + public void testPollRecordReturnedIncludeHeaders() throws Exception { + opts.put(KafkaSourceConnectorConfig.INCLUDE_MESSAGE_HEADERS_CONFIG, "true"); + config = new KafkaSourceConnectorConfig(opts); + props = new Properties(); + props.putAll(config.allWithPrefix(KafkaSourceConnectorConfig.CONSUMER_PREFIX)); + + objectUnderTest = new KafkaSourceTask(); + offsetStorageReader = createMock(OffsetStorageReader.class); + context = createMock(SourceTaskContext.class); + consumer = createMock(KafkaConsumer.class); + objectUnderTest.initialize(context); + + TopicPartition firstTopicPartition = new TopicPartition(FIRST_TOPIC, FIRST_PARTITION); + Collection<TopicPartition> topicPartitions = new ArrayList<>(); + topicPartitions.add(firstTopicPartition); + Map<TopicPartition, Long> endOffsets = Collections.singletonMap(firstTopicPartition, FIRST_OFFSET); + + EasyMock.expect(context.offsetStorageReader()).andReturn(offsetStorageReader); + EasyMock.expect(offsetStorageReader.offsets(EasyMock.<List<Map<String, String>>>anyObject())).andReturn(new HashMap<>()); + PowerMock.expectNew(KafkaConsumer.class, new Class[]{Properties.class}, config.getKafkaConsumerProperties()).andReturn(consumer); + EasyMock.expect(consumer.endOffsets(topicPartitions)).andReturn(endOffsets); + consumer.assign(topicPartitions); + EasyMock.expectLastCall(); + consumer.seek(firstTopicPartition, FIRST_OFFSET); + EasyMock.expectLastCall(); + + + // expectation for poll + EasyMock.expect(consumer.poll(Duration.ofMillis(POLL_LOOP_TIMEOUT_MS_VALUE))).andReturn(createTestRecordsWithHeaders()); + replayAll(); + + objectUnderTest.start(opts); + List<SourceRecord> records = objectUnderTest.poll(); + + SourceRecord testRecord = records.get(0); + assertEquals(String.format("%s:%d", FIRST_TOPIC, FIRST_PARTITION), testRecord.sourcePartition().get(TOPIC_PARTITION_KEY)); + assertEquals(FIRST_OFFSET, testRecord.sourceOffset().get(OFFSET_KEY)); + assertEquals(1, testRecord.headers().size()); + + verifyAll(); + } + + + @Test + public void testStopClosesConsumer() throws Exception { + mockConsumerInitialization(); + + consumer.wakeup(); + EasyMock.expectLastCall(); + consumer.close(EasyMock.anyObject()); + EasyMock.expectLastCall(); + + replayAll(); + + objectUnderTest.start(opts); + objectUnderTest.stop(); + + verifyAll(); + } +} diff --git a/connect/kafka/src/test/java/org/apache/kafka/connect/kafka/LeaderTopicPartitionTest.java b/connect/kafka/src/test/java/org/apache/kafka/connect/kafka/LeaderTopicPartitionTest.java new file mode 100644 index 00000000000..4be4fa8a404 --- /dev/null +++ b/connect/kafka/src/test/java/org/apache/kafka/connect/kafka/LeaderTopicPartitionTest.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.kafka; + +import org.junit.Before; +import org.junit.Test; +import org.junit.After; +import org.junit.runner.RunWith; +import org.powermock.core.classloader.annotations.PowerMockIgnore; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import static org.junit.Assert.assertFalse; + +import static org.junit.Assert.assertEquals; + +@RunWith(PowerMockRunner.class) +@PrepareForTest({LeaderTopicPartition.class}) +@PowerMockIgnore("javax.management.*") + +public class LeaderTopicPartitionTest { + + private LeaderTopicPartition objectUnderTest; + + private static final int LEADER_ID = 0; + private static final String TOPIC = "test.topic"; + private static final int PARTITION = 1; + + private static final String LEADER_TOPIC_PARTITION = "0:test.topic:1"; + private static final String TOPIC_PARTITION = "test.topic:1"; + + @Before + public void setup() { + objectUnderTest = new LeaderTopicPartition( + LEADER_ID, + TOPIC, + PARTITION + ); + } + + @After + public void teardown() { + objectUnderTest = null; + } + + @Test(expected = IllegalArgumentException.class) + public void testNullTopicName() { + objectUnderTest = new LeaderTopicPartition( + LEADER_ID, + null, + PARTITION + ); + } + + @Test + public void testToString() { + assertEquals(LEADER_TOPIC_PARTITION, objectUnderTest.toString()); + } + + @Test(expected = IllegalArgumentException.class) + public void testFromInvalidString() { + String invalidString = "test"; + LeaderTopicPartition.fromString(invalidString); + } + + @Test + public void testFromString() { + objectUnderTest = LeaderTopicPartition.fromString(LEADER_TOPIC_PARTITION); + assertEquals(LEADER_TOPIC_PARTITION, objectUnderTest.toString()); + } + + @Test + public void testToTopicPartitionString() { + assertEquals(TOPIC_PARTITION, objectUnderTest.toTopicPartitionString()); + } + + + @Test + public void testEquals() { + LeaderTopicPartition objectUnderTest1 = new LeaderTopicPartition( + LEADER_ID, + TOPIC, + PARTITION + ); + LeaderTopicPartition objectUnderTest2 = new LeaderTopicPartition( + LEADER_ID, + TOPIC, + PARTITION + ); + assert objectUnderTest1.equals(objectUnderTest2); + } + + @Test + public void testEqualsWithOtherInstance() { + String unexpectedString = "NOT_LEADER_TOPIC_PARTITION"; + assertFalse(objectUnderTest.equals(unexpectedString)); + } + +} diff --git a/gradle/findbugs-exclude.xml b/gradle/findbugs-exclude.xml index 09981852f95..cae57fba391 100644 --- a/gradle/findbugs-exclude.xml +++ b/gradle/findbugs-exclude.xml @@ -255,6 +255,16 @@ For a detailed description of findbugs bug categories, see http://findbugs.sourc <Bug pattern="IS2_INCONSISTENT_SYNC" /> </Match> + <Match> + <!-- Ignore spurious warning about inconsistent synchronization on instance variable. Some accesses of the + variable must occur in synchronized blocks - stop(), while some need not - start(). Connect framework + shouldn't be calling stop() before start() has returned for the connector. + This seems to throw the static checker off. --> + <Package name="org.apache.kafka.connect.kafka" /> + <Source name="KafkaSourceTask.java" /> + <Bug pattern="IS2_INCONSISTENT_SYNC" /> + </Match> + <Match> <!-- Suppress a spurious warning about an unreleased lock. --> <Class name="kafka.utils.timer.SystemTimer"/> diff --git a/settings.gradle b/settings.gradle index bbcdc3174b8..d3ff9e3cb72 100644 --- a/settings.gradle +++ b/settings.gradle @@ -17,4 +17,4 @@ include 'core', 'examples', 'clients', 'tools', 'streams', 'streams:streams-scal 'streams:upgrade-system-tests-0100', 'streams:upgrade-system-tests-0101', 'streams:upgrade-system-tests-0102', 'streams:upgrade-system-tests-0110', 'streams:upgrade-system-tests-10', 'streams:upgrade-system-tests-11', 'log4j-appender', 'connect:api', 'connect:transforms', 'connect:runtime', 'connect:json', 'connect:file', - 'connect:basic-auth-extension', 'jmh-benchmarks' + 'connect:basic-auth-extension', 'connect:kafka', 'jmh-benchmarks' ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > KIP-310: Add a Kafka Source Connector to Kafka Connect > ------------------------------------------------------ > > Key: KAFKA-6963 > URL: https://issues.apache.org/jira/browse/KAFKA-6963 > Project: Kafka > Issue Type: Improvement > Reporter: Rhys Anthony McCaig > Priority: Major > > This proposal introduces a new Kafka Connect Source Connector. > See the KIP at > [KIP-310|https://cwiki.apache.org/confluence/display/KAFKA/KIP-310%3A+Add+a+Kafka+Source+Connector+to+Kafka+Connect] > -- This message was sent by Atlassian JIRA (v7.6.3#76005)