KAFKA-2371: Add distributed support for Copycat. This adds coordination between DistributedHerders using the generalized consumer support, allowing automatic balancing of connectors and tasks across workers. A few pieces that require interaction between workers (resolving config inconsistencies, forwarding of configuration changes to the leader worker) are incomplete because they require REST API support to implement properly.
Author: Ewen Cheslack-Postava <m...@ewencp.org> Reviewers: Jason Gustafson, Gwen Shapira Closes #321 from ewencp/kafka-2371-distributed-herder Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/2e617735 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/2e617735 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/2e617735 Branch: refs/heads/trunk Commit: 2e61773590c0ba86cb8813e6ba17bf6ee33f4461 Parents: 21443f2 Author: Ewen Cheslack-Postava <m...@ewencp.org> Authored: Fri Oct 23 16:37:30 2015 -0700 Committer: Gwen Shapira <csh...@gmail.com> Committed: Fri Oct 23 16:37:30 2015 -0700 ---------------------------------------------------------------------- build.gradle | 1 + checkstyle/import-control.xml | 1 + .../clients/consumer/RoundRobinAssignor.java | 35 +- .../consumer/internals/AbstractCoordinator.java | 8 +- .../kafka/common/utils/CircularIterator.java | 54 ++ config/copycat-distributed.properties | 2 + .../kafka/copycat/file/FileStreamSinkTask.java | 12 +- .../copycat/file/FileStreamSourceTask.java | 17 +- .../kafka/copycat/cli/CopycatDistributed.java | 7 +- .../kafka/copycat/runtime/ConnectorConfig.java | 2 +- .../kafka/copycat/runtime/TaskConfig.java | 54 ++ .../apache/kafka/copycat/runtime/Worker.java | 145 +++- .../runtime/distributed/ClusterConfigState.java | 40 +- .../runtime/distributed/CopycatProtocol.java | 246 +++++++ .../runtime/distributed/DistributedHerder.java | 733 +++++++++++++------ .../distributed/DistributedHerderConfig.java | 192 +++++ .../runtime/distributed/NotLeaderException.java | 38 + .../runtime/distributed/WorkerCoordinator.java | 288 ++++++++ .../runtime/distributed/WorkerGroupMember.java | 184 +++++ .../distributed/WorkerRebalanceListener.java | 38 + .../runtime/standalone/StandaloneHerder.java | 168 ++--- .../copycat/storage/KafkaConfigStorage.java | 64 +- .../storage/KafkaOffsetBackingStore.java | 2 + .../kafka/copycat/util/ConnectorTaskId.java | 10 +- .../kafka/copycat/runtime/WorkerTest.java | 199 ++++- .../distributed/DistributedHerderTest.java | 436 ++++++----- .../distributed/WorkerCoordinatorTest.java | 436 +++++++++++ .../standalone/StandaloneHerderTest.java | 45 +- .../copycat/storage/KafkaConfigStorageTest.java | 49 +- .../apache/kafka/copycat/util/TestFuture.java | 10 +- tests/kafkatest/services/copycat.py | 67 +- .../kafkatest/tests/copycat_distributed_test.py | 67 +- tests/kafkatest/tests/copycat_test.py | 5 +- .../templates/copycat-distributed.properties | 7 +- 34 files changed, 2966 insertions(+), 696 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/2e617735/build.gradle ---------------------------------------------------------------------- diff --git a/build.gradle b/build.gradle index 16fb981..128c195 100644 --- a/build.gradle +++ b/build.gradle @@ -754,6 +754,7 @@ project(':copycat:runtime') { testCompile "$easymock" testCompile "$powermock" testCompile "$powermock_easymock" + testCompile project(':clients').sourceSets.test.output testRuntime "$slf4jlog4j" testRuntime project(":copycat:json") } http://git-wip-us.apache.org/repos/asf/kafka/blob/2e617735/checkstyle/import-control.xml ---------------------------------------------------------------------- diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml index 6474865..e1ea93c 100644 --- a/checkstyle/import-control.xml +++ b/checkstyle/import-control.xml @@ -146,6 +146,7 @@ <allow pkg="org.apache.kafka.copycat.data" /> <allow pkg="org.apache.kafka.copycat.errors" /> <allow pkg="org.apache.kafka.clients" /> + <allow pkg="org.apache.kafka.test"/> <subpackage name="source"> <allow pkg="org.apache.kafka.copycat.connector" /> http://git-wip-us.apache.org/repos/asf/kafka/blob/2e617735/clients/src/main/java/org/apache/kafka/clients/consumer/RoundRobinAssignor.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/RoundRobinAssignor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/RoundRobinAssignor.java index c5ea2bb..b8dc253 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/RoundRobinAssignor.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/RoundRobinAssignor.java @@ -14,11 +14,11 @@ package org.apache.kafka.clients.consumer; import org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.utils.CircularIterator; import org.apache.kafka.common.utils.Utils; import java.util.ArrayList; import java.util.HashMap; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.SortedSet; @@ -78,37 +78,4 @@ public class RoundRobinAssignor extends AbstractPartitionAssignor { return "roundrobin"; } - private static class CircularIterator<T> implements Iterator<T> { - int i = 0; - private List<T> list; - - public CircularIterator(List<T> list) { - if (list.isEmpty()) { - throw new IllegalArgumentException("CircularIterator can only be used on non-empty lists"); - } - this.list = list; - } - - @Override - public boolean hasNext() { - return true; - } - - @Override - public T next() { - T next = list.get(i); - i = (i + 1) % list.size(); - return next; - } - - public T peek() { - return list.get(i); - } - - @Override - public void remove() { - throw new UnsupportedOperationException(); - } - } - } http://git-wip-us.apache.org/repos/asf/kafka/blob/2e617735/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java index 1ffd2bb..a2b9ec5 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java @@ -199,7 +199,7 @@ public abstract class AbstractCoordinator { this.memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID; rejoinNeeded = true; } - + private boolean needsOnLeave = true; /** * Ensure that the group is active (i.e. joined and synced) */ @@ -208,7 +208,10 @@ public abstract class AbstractCoordinator { return; // onLeave only invoked if we have a valid current generation - onLeave(generation, memberId); + if (needsOnLeave) { + onLeave(generation, memberId); + needsOnLeave = false; + } while (needRejoin()) { ensureCoordinatorKnown(); @@ -225,6 +228,7 @@ public abstract class AbstractCoordinator { if (future.succeeded()) { onJoin(generation, memberId, protocol, future.value()); + needsOnLeave = true; heartbeatTask.reset(); } else { if (future.exception() instanceof UnknownMemberIdException) http://git-wip-us.apache.org/repos/asf/kafka/blob/2e617735/clients/src/main/java/org/apache/kafka/common/utils/CircularIterator.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/utils/CircularIterator.java b/clients/src/main/java/org/apache/kafka/common/utils/CircularIterator.java new file mode 100644 index 0000000..00be783 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/utils/CircularIterator.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.common.utils; + +import java.util.Iterator; +import java.util.List; + +public class CircularIterator<T> implements Iterator<T> { + int i = 0; + private List<T> list; + + public CircularIterator(List<T> list) { + if (list.isEmpty()) { + throw new IllegalArgumentException("CircularIterator can only be used on non-empty lists"); + } + this.list = list; + } + + @Override + public boolean hasNext() { + return true; + } + + @Override + public T next() { + T next = list.get(i); + i = (i + 1) % list.size(); + return next; + } + + public T peek() { + return list.get(i); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/2e617735/config/copycat-distributed.properties ---------------------------------------------------------------------- diff --git a/config/copycat-distributed.properties b/config/copycat-distributed.properties index b122413..2ea5b73 100644 --- a/config/copycat-distributed.properties +++ b/config/copycat-distributed.properties @@ -18,6 +18,8 @@ # These are defaults. This file just demonstrates how to override some settings. bootstrap.servers=localhost:9092 +group.id=copycat-cluster + # The converters specify the format of data in Kafka and how to translate it into Copycat data. Every Copycat user will # need to configure these based on the format they want their data in when loaded from or stored into Kafka key.converter=org.apache.kafka.copycat.json.JsonConverter http://git-wip-us.apache.org/repos/asf/kafka/blob/2e617735/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSinkTask.java ---------------------------------------------------------------------- diff --git a/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSinkTask.java b/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSinkTask.java index 9ea459c..6dfe4a7 100644 --- a/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSinkTask.java +++ b/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSinkTask.java @@ -38,6 +38,7 @@ import java.util.Properties; public class FileStreamSinkTask extends SinkTask { private static final Logger log = LoggerFactory.getLogger(FileStreamSinkTask.class); + private String filename; private PrintStream outputStream; public FileStreamSinkTask() { @@ -45,12 +46,13 @@ public class FileStreamSinkTask extends SinkTask { // for testing public FileStreamSinkTask(PrintStream outputStream) { + filename = null; this.outputStream = outputStream; } @Override public void start(Properties props) { - String filename = props.getProperty(FileStreamSinkConnector.FILE_CONFIG); + filename = props.getProperty(FileStreamSinkConnector.FILE_CONFIG); if (filename == null) { outputStream = System.out; } else { @@ -65,16 +67,24 @@ public class FileStreamSinkTask extends SinkTask { @Override public void put(Collection<SinkRecord> sinkRecords) { for (SinkRecord record : sinkRecords) { + log.trace("Writing line to {}: {}", logFilename(), record.value()); outputStream.println(record.value()); } } @Override public void flush(Map<TopicPartition, OffsetAndMetadata> offsets) { + log.trace("Flushing output stream for {}", logFilename()); outputStream.flush(); } @Override public void stop() { + if (outputStream != System.out) + outputStream.close(); + } + + private String logFilename() { + return filename == null ? "stdout" : filename; } } http://git-wip-us.apache.org/repos/asf/kafka/blob/2e617735/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSourceTask.java ---------------------------------------------------------------------- diff --git a/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSourceTask.java b/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSourceTask.java index cf71be3..f2249d0 100644 --- a/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSourceTask.java +++ b/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSourceTask.java @@ -56,7 +56,7 @@ public class FileStreamSourceTask extends SourceTask { } topic = props.getProperty(FileStreamSourceConnector.TOPIC_CONFIG); if (topic == null) - throw new CopycatException("ConsoleSourceTask config missing topic setting"); + throw new CopycatException("FileStreamSourceTask config missing topic setting"); } @Override @@ -88,6 +88,7 @@ public class FileStreamSourceTask extends SourceTask { streamOffset = 0L; } reader = new BufferedReader(new InputStreamReader(stream)); + log.debug("Opened {} for reading", logFilename()); } catch (FileNotFoundException e) { log.warn("Couldn't find file for FileStreamSourceTask, sleeping to wait for it to be created"); synchronized (this) { @@ -113,6 +114,7 @@ public class FileStreamSourceTask extends SourceTask { int nread = 0; while (readerCopy.ready()) { nread = readerCopy.read(buffer, offset, buffer.length - offset); + log.trace("Read {} bytes from {}", nread, logFilename()); if (nread > 0) { offset += nread; @@ -126,6 +128,7 @@ public class FileStreamSourceTask extends SourceTask { do { line = extractLine(); if (line != null) { + log.trace("Read a line from {}", logFilename()); if (records == null) records = new ArrayList<>(); records.add(new SourceRecord(offsetKey(filename), offsetValue(streamOffset), topic, VALUE_SCHEMA, line)); @@ -183,10 +186,12 @@ public class FileStreamSourceTask extends SourceTask { log.trace("Stopping"); synchronized (this) { try { - stream.close(); - log.trace("Closed input stream"); + if (stream != null && stream != System.in) { + stream.close(); + log.trace("Closed input stream"); + } } catch (IOException e) { - log.error("Failed to close ConsoleSourceTask stream: ", e); + log.error("Failed to close FileStreamSourceTask stream: ", e); } this.notify(); } @@ -199,4 +204,8 @@ public class FileStreamSourceTask extends SourceTask { private Map<String, Long> offsetValue(Long pos) { return Collections.singletonMap(POSITION_FIELD, pos); } + + private String logFilename() { + return filename == null ? "stdin" : filename; + } } http://git-wip-us.apache.org/repos/asf/kafka/blob/2e617735/copycat/runtime/src/main/java/org/apache/kafka/copycat/cli/CopycatDistributed.java ---------------------------------------------------------------------- diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/cli/CopycatDistributed.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/cli/CopycatDistributed.java index b0230b2..0ff6e81 100644 --- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/cli/CopycatDistributed.java +++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/cli/CopycatDistributed.java @@ -48,8 +48,8 @@ public class CopycatDistributed { Properties workerProps; Properties connectorProps; - if (args.length < 2) { - log.info("Usage: CopycatDistributed worker.properties connector1.properties [connector2.properties ...]"); + if (args.length < 1) { + log.info("Usage: CopycatDistributed worker.properties [connector1.properties connector2.properties ...]"); System.exit(1); } @@ -58,8 +58,7 @@ public class CopycatDistributed { WorkerConfig workerConfig = new WorkerConfig(workerProps); Worker worker = new Worker(workerConfig, new KafkaOffsetBackingStore()); - DistributedHerder herder = new DistributedHerder(worker); - herder.configure(workerConfig.originals()); + DistributedHerder herder = new DistributedHerder(worker, workerConfig.originals()); final Copycat copycat = new Copycat(worker, herder); copycat.start(); http://git-wip-us.apache.org/repos/asf/kafka/blob/2e617735/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/ConnectorConfig.java ---------------------------------------------------------------------- diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/ConnectorConfig.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/ConnectorConfig.java index 767c88b..2242299 100644 --- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/ConnectorConfig.java +++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/ConnectorConfig.java @@ -59,7 +59,7 @@ public class ConnectorConfig extends AbstractConfig { static { config = new ConfigDef() .define(NAME_CONFIG, Type.STRING, Importance.HIGH, NAME_DOC) - .define(CONNECTOR_CLASS_CONFIG, Type.STRING, Importance.HIGH, CONNECTOR_CLASS_DOC) + .define(CONNECTOR_CLASS_CONFIG, Type.CLASS, Importance.HIGH, CONNECTOR_CLASS_DOC) .define(TASKS_MAX_CONFIG, Type.INT, TASKS_MAX_DEFAULT, Importance.HIGH, TASKS_MAX_DOC) .define(TOPICS_CONFIG, Type.LIST, TOPICS_DEFAULT, Importance.HIGH, TOPICS_DOC); } http://git-wip-us.apache.org/repos/asf/kafka/blob/2e617735/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/TaskConfig.java ---------------------------------------------------------------------- diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/TaskConfig.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/TaskConfig.java new file mode 100644 index 0000000..be97879 --- /dev/null +++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/TaskConfig.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.copycat.runtime; + +import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigDef.Importance; +import org.apache.kafka.common.config.ConfigDef.Type; + +import java.util.HashMap; +import java.util.Map; + +/** + * <p> + * Configuration options for Tasks. These only include Copycat system-level configuration + * options. + * </p> + */ +public class TaskConfig extends AbstractConfig { + + public static final String TASK_CLASS_CONFIG = "task.class"; + private static final String TASK_CLASS_DOC = + "Name of the class for this task. Must be a subclass of org.apache.kafka.copycat.connector.Task"; + + private static ConfigDef config; + + static { + config = new ConfigDef() + .define(TASK_CLASS_CONFIG, Type.CLASS, Importance.HIGH, TASK_CLASS_DOC); + } + + public TaskConfig() { + this(new HashMap<String, String>()); + } + + public TaskConfig(Map<String, ?> props) { + super(config, props); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/2e617735/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Worker.java ---------------------------------------------------------------------- diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Worker.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Worker.java index 0fdab4c..b37e49f 100644 --- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Worker.java +++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Worker.java @@ -17,12 +17,15 @@ package org.apache.kafka.copycat.runtime; +import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.utils.SystemTime; import org.apache.kafka.common.utils.Time; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.copycat.cli.WorkerConfig; +import org.apache.kafka.copycat.connector.Connector; +import org.apache.kafka.copycat.connector.ConnectorContext; import org.apache.kafka.copycat.connector.Task; import org.apache.kafka.copycat.errors.CopycatException; import org.apache.kafka.copycat.sink.SinkTask; @@ -33,8 +36,10 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.Set; /** * <p> @@ -55,6 +60,7 @@ public class Worker { private Converter internalKeyConverter; private Converter internalValueConverter; private OffsetBackingStore offsetBackingStore; + private HashMap<String, Connector> connectors = new HashMap<>(); private HashMap<ConnectorTaskId, WorkerTask> tasks = new HashMap<>(); private KafkaProducer<byte[], byte[]> producer; private SourceTaskOffsetCommitter sourceTaskOffsetCommitter; @@ -106,6 +112,17 @@ public class Worker { long started = time.milliseconds(); long limit = started + config.getLong(WorkerConfig.TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG); + for (Map.Entry<String, Connector> entry : connectors.entrySet()) { + Connector conn = entry.getValue(); + log.warn("Shutting down connector {} uncleanly; herder should have shut down connectors before the" + + "Worker is stopped.", conn); + try { + conn.stop(); + } catch (CopycatException e) { + log.error("Error while shutting down connector " + conn, e); + } + } + for (Map.Entry<ConnectorTaskId, WorkerTask> entry : tasks.entrySet()) { WorkerTask task = entry.getValue(); log.warn("Shutting down task {} uncleanly; herder should have shut down " @@ -134,15 +151,106 @@ public class Worker { } /** + * Add a new connector. + * @param connConfig connector configuration + * @param ctx context for the connector + */ + public void addConnector(ConnectorConfig connConfig, ConnectorContext ctx) { + String connName = connConfig.getString(ConnectorConfig.NAME_CONFIG); + Class<?> maybeConnClass = connConfig.getClass(ConnectorConfig.CONNECTOR_CLASS_CONFIG); + log.info("Creating connector {} of type {}", connName, maybeConnClass.getName()); + + Class<? extends Connector> connClass; + try { + connClass = maybeConnClass.asSubclass(Connector.class); + } catch (ClassCastException e) { + throw new CopycatException("Specified class is not a subclass of Connector: " + maybeConnClass.getName()); + } + + if (connectors.containsKey(connName)) + throw new CopycatException("Connector with name " + connName + " already exists"); + + final Connector connector = instantiateConnector(connClass); + connector.initialize(ctx); + try { + Map<String, Object> originals = connConfig.originals(); + Properties props = new Properties(); + props.putAll(originals); + connector.start(props); + } catch (CopycatException e) { + throw new CopycatException("Connector threw an exception while starting", e); + } + + connectors.put(connName, connector); + + log.info("Finished creating connector {}", connName); + } + + private static Connector instantiateConnector(Class<? extends Connector> connClass) { + try { + return Utils.newInstance(connClass); + } catch (Throwable t) { + // Catches normal exceptions due to instantiation errors as well as any runtime errors that + // may be caused by user code + throw new CopycatException("Failed to create connector instance", t); + } + } + + public Map<ConnectorTaskId, Map<String, String>> reconfigureConnectorTasks(String connName, int maxTasks, List<String> sinkTopics) { + log.trace("Reconfiguring connector tasks for {}", connName); + + Connector connector = connectors.get(connName); + if (connector == null) + throw new CopycatException("Connector " + connName + " not found in this worker."); + + Map<ConnectorTaskId, Map<String, String>> result = new HashMap<>(); + String taskClassName = connector.taskClass().getName(); + int index = 0; + for (Properties taskProps : connector.taskConfigs(maxTasks)) { + ConnectorTaskId taskId = new ConnectorTaskId(connName, index); + index++; + Map<String, String> taskConfig = Utils.propsToStringMap(taskProps); + taskConfig.put(TaskConfig.TASK_CLASS_CONFIG, taskClassName); + if (sinkTopics != null) + taskConfig.put(SinkTask.TOPICS_CONFIG, Utils.join(sinkTopics, ",")); + result.put(taskId, taskConfig); + } + return result; + } + + public void stopConnector(String connName) { + log.info("Stopping connector {}", connName); + + Connector connector = connectors.get(connName); + if (connector == null) + throw new CopycatException("Connector " + connName + " not found in this worker."); + + try { + connector.stop(); + } catch (CopycatException e) { + log.error("Error shutting down connector {}: ", connector, e); + } + + connectors.remove(connName); + + log.info("Stopped connector {}", connName); + } + + /** + * Get the IDs of the connectors currently running in this worker. + */ + public Set<String> connectorNames() { + return connectors.keySet(); + } + + /** * Add a new task. * @param id Globally unique ID for this task. - * @param taskClassName name of the {@link org.apache.kafka.copycat.connector.Task} - * class to instantiate. Must be a subclass of either - * {@link org.apache.kafka.copycat.source.SourceTask} or - * {@link org.apache.kafka.copycat.sink.SinkTask}. - * @param props configuration options for the task + * @param taskConfig the parsed task configuration */ - public void addTask(ConnectorTaskId id, String taskClassName, Properties props) { + public void addTask(ConnectorTaskId id, TaskConfig taskConfig) { + log.info("Creating task {}", id); + if (tasks.containsKey(id)) { String msg = "Task already exists in this worker; the herder should not have requested " + "that this : " + id; @@ -150,7 +258,7 @@ public class Worker { throw new CopycatException(msg); } - final Task task = instantiateTask(taskClassName); + final Task task = instantiateTask(taskConfig.getClass(TaskConfig.TASK_CLASS_CONFIG).asSubclass(Task.class)); // Decide which type of worker task we need based on the type of task. final WorkerTask workerTask; @@ -171,20 +279,30 @@ public class Worker { // Start the task before adding modifying any state, any exceptions are caught higher up the // call chain and there's no cleanup to do here + Properties props = new Properties(); + props.putAll(taskConfig.originals()); workerTask.start(props); + if (task instanceof SourceTask) { + WorkerSourceTask workerSourceTask = (WorkerSourceTask) workerTask; + sourceTaskOffsetCommitter.schedule(id, workerSourceTask); + } tasks.put(id, workerTask); } - private static Task instantiateTask(String taskClassName) { + private static Task instantiateTask(Class<? extends Task> taskClass) { try { - return Utils.newInstance(Class.forName(taskClassName).asSubclass(Task.class)); - } catch (ClassNotFoundException e) { + return Utils.newInstance(taskClass); + } catch (KafkaException e) { throw new CopycatException("Task class not found", e); } } public void stopTask(ConnectorTaskId id) { + log.info("Stopping task {}", id); + WorkerTask task = getTask(id); + if (task instanceof WorkerSourceTask) + sourceTaskOffsetCommitter.remove(id); task.stop(); if (!task.awaitStop(config.getLong(WorkerConfig.TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG))) log.error("Graceful stop of task {} failed.", task); @@ -192,6 +310,13 @@ public class Worker { tasks.remove(id); } + /** + * Get the IDs of the tasks currently running in this worker. + */ + public Set<ConnectorTaskId> taskIds() { + return tasks.keySet(); + } + private WorkerTask getTask(ConnectorTaskId id) { WorkerTask task = tasks.get(id); if (task == null) { http://git-wip-us.apache.org/repos/asf/kafka/blob/2e617735/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/ClusterConfigState.java ---------------------------------------------------------------------- diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/ClusterConfigState.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/ClusterConfigState.java index 719dd09..a46141e 100644 --- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/ClusterConfigState.java +++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/ClusterConfigState.java @@ -19,10 +19,8 @@ package org.apache.kafka.copycat.runtime.distributed; import org.apache.kafka.copycat.util.ConnectorTaskId; -import java.util.ArrayList; -import java.util.Collection; import java.util.Collections; -import java.util.List; +import java.util.HashSet; import java.util.Map; import java.util.Set; @@ -30,6 +28,10 @@ import java.util.Set; * An immutable snapshot of the configuration state of connectors and tasks in a Copycat cluster. */ public class ClusterConfigState { + public static final ClusterConfigState EMPTY = new ClusterConfigState(-1, Collections.<String, Integer>emptyMap(), + Collections.<String, Map<String, String>>emptyMap(), Collections.<ConnectorTaskId, Map<String, String>>emptyMap(), + Collections.<String>emptySet()); + private final long offset; private final Map<String, Integer> connectorTaskCounts; private final Map<String, Map<String, String>> connectorConfigs; @@ -60,8 +62,8 @@ public class ClusterConfigState { /** * Get a list of the connectors in this configuration */ - public Collection<String> connectors() { - return connectorTaskCounts.keySet(); + public Set<String> connectors() { + return connectorConfigs.keySet(); } /** @@ -83,19 +85,29 @@ public class ClusterConfigState { } /** + * Get the number of tasks assigned for the given conncetor. + * @param connectorName name of the connector to look up tasks for + * @return the number of tasks + */ + public int taskCount(String connectorName) { + Integer count = connectorTaskCounts.get(connectorName); + return count == null ? 0 : count; + } + + /** * Get the current set of task IDs for the specified connector. * @param connectorName the name of the connector to look up task configs for * @return the current set of connector task IDs */ - public Collection<ConnectorTaskId> tasks(String connectorName) { + public Set<ConnectorTaskId> tasks(String connectorName) { if (inconsistentConnectors.contains(connectorName)) - return Collections.EMPTY_LIST; + return Collections.emptySet(); Integer numTasks = connectorTaskCounts.get(connectorName); if (numTasks == null) - throw new IllegalArgumentException("Connector does not exist in current configuration."); + return Collections.emptySet(); - List<ConnectorTaskId> taskIds = new ArrayList<>(); + Set<ConnectorTaskId> taskIds = new HashSet<>(); for (int taskIndex = 0; taskIndex < numTasks; taskIndex++) { ConnectorTaskId taskId = new ConnectorTaskId(connectorName, taskIndex); taskIds.add(taskId); @@ -119,4 +131,14 @@ public class ClusterConfigState { return inconsistentConnectors; } + @Override + public String toString() { + return "ClusterConfigState{" + + "offset=" + offset + + ", connectorTaskCounts=" + connectorTaskCounts + + ", connectorConfigs=" + connectorConfigs + + ", taskConfigs=" + taskConfigs + + ", inconsistentConnectors=" + inconsistentConnectors + + '}'; + } } http://git-wip-us.apache.org/repos/asf/kafka/blob/2e617735/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/CopycatProtocol.java ---------------------------------------------------------------------- diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/CopycatProtocol.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/CopycatProtocol.java new file mode 100644 index 0000000..a450b1d --- /dev/null +++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/CopycatProtocol.java @@ -0,0 +1,246 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.copycat.runtime.distributed; + +import org.apache.kafka.common.protocol.types.ArrayOf; +import org.apache.kafka.common.protocol.types.Field; +import org.apache.kafka.common.protocol.types.Schema; +import org.apache.kafka.common.protocol.types.SchemaException; +import org.apache.kafka.common.protocol.types.Struct; +import org.apache.kafka.common.protocol.types.Type; +import org.apache.kafka.copycat.util.ConnectorTaskId; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +/** + * This class implements the protocol for Copycat workers in a group. It includes the format of worker state used when + * joining the group and distributing assignments, and the format of assignments of connectors and tasks to workers. + */ +public class CopycatProtocol { + public static final String VERSION_KEY_NAME = "version"; + public static final String CONFIG_OFFSET_KEY_NAME = "config-offset"; + public static final String CONNECTOR_KEY_NAME = "connector"; + public static final String LEADER_KEY_NAME = "leader"; + public static final String ERROR_KEY_NAME = "error"; + public static final String TASKS_KEY_NAME = "tasks"; + public static final String ASSIGNMENT_KEY_NAME = "assignment"; + public static final int CONNECTOR_TASK = -1; + + public static final short COPYCAT_PROTOCOL_V0 = 0; + public static final Schema COPYCAT_PROTOCOL_HEADER_SCHEMA = new Schema( + new Field(VERSION_KEY_NAME, Type.INT16)); + private static final Struct COPYCAT_PROTOCOL_HEADER_V0 = new Struct(COPYCAT_PROTOCOL_HEADER_SCHEMA) + .set(VERSION_KEY_NAME, COPYCAT_PROTOCOL_V0); + + public static final Schema CONFIG_STATE_V0 = new Schema( + new Field(CONFIG_OFFSET_KEY_NAME, Type.INT64)); + // Assignments for each worker are a set of connectors and tasks. These are categorized by connector ID. A sentinel + // task ID (CONNECTOR_TASK) is used to indicate the connector itself (i.e. that the assignment includes + // responsibility for running the Connector instance in addition to any tasks it generates). + public static final Schema CONNECTOR_ASSIGNMENT_V0 = new Schema( + new Field(CONNECTOR_KEY_NAME, Type.STRING), + new Field(TASKS_KEY_NAME, new ArrayOf(Type.INT32))); + public static final Schema ASSIGNMENT_V0 = new Schema( + new Field(ERROR_KEY_NAME, Type.INT16), + new Field(LEADER_KEY_NAME, Type.STRING), + new Field(CONFIG_OFFSET_KEY_NAME, Type.INT64), + new Field(ASSIGNMENT_KEY_NAME, new ArrayOf(CONNECTOR_ASSIGNMENT_V0))); + + public static ByteBuffer serializeMetadata(ConfigState configState) { + Struct struct = new Struct(CONFIG_STATE_V0); + struct.set(CONFIG_OFFSET_KEY_NAME, configState.offset()); + ByteBuffer buffer = ByteBuffer.allocate(COPYCAT_PROTOCOL_HEADER_V0.sizeOf() + CONFIG_STATE_V0.sizeOf(struct)); + COPYCAT_PROTOCOL_HEADER_V0.writeTo(buffer); + CONFIG_STATE_V0.write(buffer, struct); + buffer.flip(); + return buffer; + } + + public static ConfigState deserializeMetadata(ByteBuffer buffer) { + Struct header = (Struct) COPYCAT_PROTOCOL_HEADER_SCHEMA.read(buffer); + Short version = header.getShort(VERSION_KEY_NAME); + checkVersionCompatibility(version); + Struct struct = (Struct) CONFIG_STATE_V0.read(buffer); + long configOffset = struct.getLong(CONFIG_OFFSET_KEY_NAME); + return new ConfigState(configOffset); + } + + public static ByteBuffer serializeAssignment(Assignment assignment) { + Struct struct = new Struct(ASSIGNMENT_V0); + struct.set(ERROR_KEY_NAME, assignment.error()); + struct.set(LEADER_KEY_NAME, assignment.leader()); + struct.set(CONFIG_OFFSET_KEY_NAME, assignment.offset()); + List<Struct> taskAssignments = new ArrayList<>(); + for (Map.Entry<String, List<Integer>> connectorEntry : assignment.asMap().entrySet()) { + Struct taskAssignment = new Struct(CONNECTOR_ASSIGNMENT_V0); + taskAssignment.set(CONNECTOR_KEY_NAME, connectorEntry.getKey()); + List<Integer> tasks = connectorEntry.getValue(); + taskAssignment.set(TASKS_KEY_NAME, tasks.toArray()); + taskAssignments.add(taskAssignment); + } + struct.set(ASSIGNMENT_KEY_NAME, taskAssignments.toArray()); + + ByteBuffer buffer = ByteBuffer.allocate(COPYCAT_PROTOCOL_HEADER_V0.sizeOf() + ASSIGNMENT_V0.sizeOf(struct)); + COPYCAT_PROTOCOL_HEADER_V0.writeTo(buffer); + ASSIGNMENT_V0.write(buffer, struct); + buffer.flip(); + return buffer; + } + + public static Assignment deserializeAssignment(ByteBuffer buffer) { + Struct header = (Struct) COPYCAT_PROTOCOL_HEADER_SCHEMA.read(buffer); + Short version = header.getShort(VERSION_KEY_NAME); + checkVersionCompatibility(version); + Struct struct = (Struct) ASSIGNMENT_V0.read(buffer); + short error = struct.getShort(ERROR_KEY_NAME); + String leader = struct.getString(LEADER_KEY_NAME); + long offset = struct.getLong(CONFIG_OFFSET_KEY_NAME); + List<String> connectorIds = new ArrayList<>(); + List<ConnectorTaskId> taskIds = new ArrayList<>(); + for (Object structObj : struct.getArray(ASSIGNMENT_KEY_NAME)) { + Struct assignment = (Struct) structObj; + String connector = assignment.getString(CONNECTOR_KEY_NAME); + for (Object taskIdObj : assignment.getArray(TASKS_KEY_NAME)) { + Integer taskId = (Integer) taskIdObj; + if (taskId == CONNECTOR_TASK) + connectorIds.add(connector); + else + taskIds.add(new ConnectorTaskId(connector, taskId)); + } + } + return new Assignment(error, leader, offset, connectorIds, taskIds); + } + + public static class ConfigState { + private final long offset; + + public ConfigState(long offset) { + this.offset = offset; + } + + public long offset() { + return offset; + } + + @Override + public String toString() { + return "ConfigState{" + + "offset=" + offset + + '}'; + } + } + + public static class Assignment { + public static final short NO_ERROR = 0; + // Configuration offsets mismatched in a way that the leader could not resolve. Workers should read to the end + // of the config log and try to re-join + public static final short CONFIG_MISMATCH = 1; + + private final short error; + private final String leader; + private final long offset; + private final List<String> connectorIds; + private final List<ConnectorTaskId> taskIds; + + /** + * Create an assignment indicating responsibility for the given connector instances and task Ids. + * @param connectorIds list of connectors that the worker should instantiate and run + * @param taskIds list of task IDs that the worker should instantiate and run + */ + public Assignment(short error, String leader, long configOffset, + List<String> connectorIds, List<ConnectorTaskId> taskIds) { + this.error = error; + this.leader = leader; + this.offset = configOffset; + this.taskIds = taskIds; + this.connectorIds = connectorIds; + } + + public short error() { + return error; + } + + public String leader() { + return leader; + } + + public boolean failed() { + return error != NO_ERROR; + } + + public long offset() { + return offset; + } + + public List<String> connectors() { + return connectorIds; + } + + public List<ConnectorTaskId> tasks() { + return taskIds; + } + + @Override + public String toString() { + return "Assignment{" + + "error=" + error + + ", leader='" + leader + '\'' + + ", offset=" + offset + + ", connectorIds=" + connectorIds + + ", taskIds=" + taskIds + + '}'; + } + + private Map<String, List<Integer>> asMap() { + // Using LinkedHashMap preserves the ordering, which is helpful for tests and debugging + Map<String, List<Integer>> taskMap = new LinkedHashMap<>(); + for (String connectorId : new HashSet<>(connectorIds)) { + List<Integer> connectorTasks = taskMap.get(connectorId); + if (connectorTasks == null) { + connectorTasks = new ArrayList<>(); + taskMap.put(connectorId, connectorTasks); + } + connectorTasks.add(CONNECTOR_TASK); + } + for (ConnectorTaskId taskId : taskIds) { + String connectorId = taskId.connector(); + List<Integer> connectorTasks = taskMap.get(connectorId); + if (connectorTasks == null) { + connectorTasks = new ArrayList<>(); + taskMap.put(connectorId, connectorTasks); + } + connectorTasks.add(taskId.task()); + } + return taskMap; + } + } + + private static void checkVersionCompatibility(short version) { + // check for invalid versions + if (version < COPYCAT_PROTOCOL_V0) + throw new SchemaException("Unsupported subscription version: " + version); + + // otherwise, assume versions can be parsed as V0 + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/2e617735/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerder.java ---------------------------------------------------------------------- diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerder.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerder.java index 5273658..bf3229d 100644 --- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerder.java +++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerder.java @@ -17,304 +17,627 @@ package org.apache.kafka.copycat.runtime.distributed; +import org.apache.kafka.clients.consumer.ConsumerWakeupException; +import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.utils.Utils; -import org.apache.kafka.copycat.connector.Connector; +import org.apache.kafka.copycat.connector.ConnectorContext; import org.apache.kafka.copycat.errors.CopycatException; import org.apache.kafka.copycat.runtime.ConnectorConfig; import org.apache.kafka.copycat.runtime.Herder; import org.apache.kafka.copycat.runtime.HerderConnectorContext; +import org.apache.kafka.copycat.runtime.TaskConfig; import org.apache.kafka.copycat.runtime.Worker; import org.apache.kafka.copycat.sink.SinkConnector; -import org.apache.kafka.copycat.sink.SinkTask; import org.apache.kafka.copycat.storage.KafkaConfigStorage; import org.apache.kafka.copycat.util.Callback; import org.apache.kafka.copycat.util.ConnectorTaskId; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.*; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; /** - * Distributed "herder" that coordinates with other workers to spread work across multiple processes. + * <p> + * Distributed "herder" that coordinates with other workers to spread work across multiple processes. + * </p> + * <p> + * Under the hood, this is implemented as a group managed by Kafka's group membership facilities (i.e. the generalized + * group/consumer coordinator). Each instance of DistributedHerder joins the group and indicates what it's current + * configuration state is (where it is in the configuration log). The group coordinator selects one member to take + * this information and assign each instance a subset of the active connectors & tasks to execute. This assignment + * is currently performed in a simple round-robin fashion, but this is not guaranteed -- the herder may also choose + * to, e.g., use a sticky assignment to avoid the usual start/stop costs associated with connectors and tasks. Once + * an assignment is received, the DistributedHerder simply runs its assigned connectors and tasks in a Worker. + * </p> + * <p> + * In addition to distributing work, the DistributedHerder uses the leader determined during the work assignment + * to select a leader for this generation of the group who is responsible for other tasks that can only be performed + * by a single node at a time. Most importantly, this includes writing updated configurations for connectors and tasks, + * (and therefore, also for creating, destroy, and scaling up/down connectors). + * </p> */ -public class DistributedHerder implements Herder { +public class DistributedHerder implements Herder, Runnable { private static final Logger log = LoggerFactory.getLogger(DistributedHerder.class); - private Worker worker; - private KafkaConfigStorage configStorage; + private final Worker worker; + private final KafkaConfigStorage configStorage; private ClusterConfigState configState; - private HashMap<String, ConnectorState> connectors = new HashMap<>(); - public DistributedHerder(Worker worker) { - this.worker = worker; - this.configStorage = new KafkaConfigStorage(worker.getInternalValueConverter(), - new ConnectorConfigCallback(), new TaskConfigCallback()); + private final int workerSyncTimeoutMs; + private final int workerUnsyncBackoffMs; + + private final WorkerGroupMember member; + private final AtomicBoolean stopping; + private final CountDownLatch stopLatch = new CountDownLatch(1); + + // Track enough information about the current membership state to be able to determine which requests via the API + // and the from other nodes are safe to process + private boolean rebalanceResolved; + private CopycatProtocol.Assignment assignment; + + // To handle most external requests, like creating or destroying a connector, we can use a generic request where + // the caller specifies all the code that should be executed. + private final Queue<HerderRequest> requests = new LinkedBlockingDeque<>(); + // Config updates can be collected and applied together when possible. Also, we need to take care to rebalance when + // needed (e.g. task reconfiguration, which requires everyone to coordinate offset commits). + private final Set<String> connectorConfigUpdates = new HashSet<>(); + private boolean needsReconfigRebalance; + + public DistributedHerder(Worker worker, Map<String, ?> configs) { + this(worker, configs, null, null); } - // Public for testing (mock KafkaConfigStorage) - public DistributedHerder(Worker worker, KafkaConfigStorage configStorage) { + // public for testing + public DistributedHerder(Worker worker, Map<String, ?> configs, KafkaConfigStorage configStorage, WorkerGroupMember member) { this.worker = worker; - this.configStorage = configStorage; + if (configStorage != null) { + // For testing. Assume configuration has already been performed + this.configStorage = configStorage; + } else { + this.configStorage = new KafkaConfigStorage(worker.getInternalValueConverter(), connectorConfigCallback(), taskConfigCallback()); + this.configStorage.configure(configs); + } + configState = ClusterConfigState.EMPTY; + + DistributedHerderConfig config = new DistributedHerderConfig(configs); + this.workerSyncTimeoutMs = config.getInt(DistributedHerderConfig.WORKER_SYNC_TIMEOUT_MS_CONFIG); + this.workerUnsyncBackoffMs = config.getInt(DistributedHerderConfig.WORKER_UNSYNC_BACKOFF_MS_CONFIG); + + this.member = member != null ? member : new WorkerGroupMember(config, this.configStorage, rebalanceListener()); + stopping = new AtomicBoolean(false); + + rebalanceResolved = true; // If we still need to follow up after a rebalance occurred, starting up tasks + needsReconfigRebalance = false; } - public synchronized void configure(Map<String, ?> configs) { - configStorage.configure(configs); + @Override + public void start() { + Thread thread = new Thread(this); + thread.start(); } - public synchronized void start() { - log.info("Herder starting"); + public void run() { + try { + log.info("Herder starting"); - configStorage.start(); + configStorage.start(); - log.info("Restoring connectors from stored configs"); - restoreConnectors(); + log.info("Herder started"); - log.info("Herder started"); + while (!stopping.get()) { + tick(); + } + + halt(); + + log.info("Herder stopped"); + } finally { + stopLatch.countDown(); + } } - public synchronized void stop() { - log.info("Herder stopping"); + // public for testing + public void tick() { + // The main loop does two primary things: 1) drive the group membership protocol, responding to rebalance events + // as they occur, and 2) handle external requests targeted at the leader. All the "real" work of the herder is + // performed in this thread, which keeps synchronization straightforward at the cost of some operations possibly + // blocking up this thread (especially those in callbacks due to rebalance events). - // There's no coordination/hand-off to do here since this is all standalone. Instead, we - // should just clean up the stuff we normally would, i.e. cleanly checkpoint and shutdown all - // the tasks. - for (Map.Entry<String, ConnectorState> entry : connectors.entrySet()) { - ConnectorState state = entry.getValue(); - stopConnector(state); + try { + member.ensureActive(); + // Ensure we're in a good state in our group. If not restart and everything should be setup to rejoin + if (!handleRebalanceCompleted()) return; + } catch (ConsumerWakeupException e) { + // May be due to a request from another thread, or might be stopping. If the latter, we need to check the + // flag immediately. If the former, we need to re-run the ensureActive call since we can't handle requests + // unless we're in the group. + return; } - connectors.clear(); - if (configStorage != null) { - configStorage.stop(); - configStorage = null; + // Process any external requests + while (!requests.isEmpty()) { + HerderRequest request = requests.poll(); + try { + request.callback().onCompletion(null, request.action().call()); + } catch (Throwable t) { + request.callback().onCompletion(t, null); + } } - log.info("Herder stopped"); - } + // Process any configuration updates + synchronized (this) { + if (needsReconfigRebalance || !connectorConfigUpdates.isEmpty()) { + // Connector reconfigs only need local updates since there is no coordination between workers required. + // However, if connectors were added or removed, work needs to be rebalanced since we have more work + // items to distribute among workers. + ClusterConfigState newConfigState = configStorage.snapshot(); + if (!newConfigState.connectors().equals(configState.connectors())) + needsReconfigRebalance = true; + configState = newConfigState; + if (needsReconfigRebalance) { + // Task reconfigs require a rebalance. Request the rebalance, clean out state, and then restart + // this loop, which will then ensure the rebalance occurs without any other requests being + // processed until it completes. + member.requestRejoin(); + // Any connector config updates will be addressed during the rebalance too + connectorConfigUpdates.clear(); + needsReconfigRebalance = false; + return; + } else if (!connectorConfigUpdates.isEmpty()) { + // If we only have connector config updates, we can just bounce the updated connectors that are + // currently assigned to this worker. + Set<String> localConnectors = worker.connectorNames(); + for (String connectorName : connectorConfigUpdates) { + if (!localConnectors.contains(connectorName)) + continue; + worker.stopConnector(connectorName); + // The update may be a deletion, so verify we actually need to restart the connector + if (configState.connectors().contains(connectorName)) + startConnector(connectorName); + } + connectorConfigUpdates.clear(); + } + } + } - @Override - public synchronized void addConnector(Map<String, String> connectorProps, - Callback<String> callback) { + // Let the group take any actions it needs to try { - // Ensure the config is written to storage first - ConnectorConfig connConfig = new ConnectorConfig(connectorProps); - String connName = connConfig.getString(ConnectorConfig.NAME_CONFIG); - configStorage.putConnectorConfig(connName, connectorProps); + member.poll(Long.MAX_VALUE); + // Ensure we're in a good state in our group. If not restart and everything should be setup to rejoin + if (!handleRebalanceCompleted()) return; + } catch (ConsumerWakeupException e) { // FIXME should not be ConsumerWakeupException + // Ignore. Just indicates we need to check the exit flag, for requested actions, etc. + } + } - ConnectorState connState = createConnector(connConfig); - if (callback != null) - callback.onCompletion(null, connState.name); - // This should always be a new job, create jobs from scratch - createConnectorTasks(connState); - } catch (CopycatException e) { - if (callback != null) - callback.onCompletion(e, null); + // public for testing + public void halt() { + synchronized (this) { + // Clean up any connectors and tasks that are still running. + log.info("Stopping connectors and tasks that are still assigned to this worker."); + for (String connName : new HashSet<>(worker.connectorNames())) { + try { + worker.stopConnector(connName); + } catch (Throwable t) { + log.error("Failed to shut down connector " + connName, t); + } + } + for (ConnectorTaskId taskId : new HashSet<>(worker.taskIds())) { + try { + worker.stopTask(taskId); + } catch (Throwable t) { + log.error("Failed to shut down task " + taskId, t); + } + } + + member.stop(); + + // Explicitly fail any outstanding requests so they actually get a response and get an understandable reason + // for their failure + while (!requests.isEmpty()) { + HerderRequest request = requests.poll(); + request.callback().onCompletion(new CopycatException("Worker is shutting down"), null); + } + + if (configStorage != null) + configStorage.stop(); } } @Override - public synchronized void deleteConnector(String name, Callback<Void> callback) { - try { - destroyConnector(name); - if (callback != null) - callback.onCompletion(null, null); - } catch (CopycatException e) { - if (callback != null) - callback.onCompletion(e, null); + public void stop() { + log.info("Herder stopping"); + + stopping.set(true); + member.wakeup(); + while (stopLatch.getCount() > 0) { + try { + stopLatch.await(); + } catch (InterruptedException e) { + // ignore, should not happen + } } } @Override - public synchronized void requestTaskReconfiguration(String connName) { - ConnectorState state = connectors.get(connName); - if (state == null) { - log.error("Task that requested reconfiguration does not exist: {}", connName); + public synchronized void addConnector(final Map<String, String> connectorProps, + final Callback<String> callback) { + final ConnectorConfig connConfig; + final String connName; + try { + connConfig = new ConnectorConfig(connectorProps); + connName = connConfig.getString(ConnectorConfig.NAME_CONFIG); + } catch (Throwable t) { + if (callback != null) + callback.onCompletion(t, null); return; } - updateConnectorTasks(state); + + log.debug("Submitting connector config {}", connName); + + requests.add(new HerderRequest( + new Callable<Void>() { + @Override + public Void call() throws Exception { + if (!isLeader()) + throw new NotLeaderException("Only the leader can add connectors."); + + log.debug("Submitting connector config {}", connName); + configStorage.putConnectorConfig(connName, connectorProps); + + return null; + } + }, + new Callback<Void>() { + @Override + public void onCompletion(Throwable error, Void result) { + if (callback == null) return; + + if (error != null) + callback.onCompletion(error, null); + else + callback.onCompletion(null, connName); + } + })); + member.wakeup(); } - // Creates and configures the connector. Does not setup any tasks - private ConnectorState createConnector(ConnectorConfig connConfig) { - String connName = connConfig.getString(ConnectorConfig.NAME_CONFIG); - String className = connConfig.getString(ConnectorConfig.CONNECTOR_CLASS_CONFIG); - log.info("Creating connector {} of type {}", connName, className); - int maxTasks = connConfig.getInt(ConnectorConfig.TASKS_MAX_CONFIG); - List<String> topics = connConfig.getList(ConnectorConfig.TOPICS_CONFIG); // Sinks only - Properties configs = connConfig.unusedProperties(); - - if (connectors.containsKey(connName)) { - log.error("Ignoring request to create connector due to conflicting connector name"); - throw new CopycatException("Connector with name " + connName + " already exists"); - } + @Override + public synchronized void deleteConnector(final String connName, final Callback<Void> callback) { + log.debug("Submitting connector config deletion {}", connName); + + requests.add(new HerderRequest( + new Callable<Void>() { + @Override + public Void call() throws Exception { + if (!isLeader()) + throw new NotLeaderException("Only the leader can delete connectors."); + + log.debug("Submitting null connector config {}", connName); + configStorage.putConnectorConfig(connName, null); + return null; + } + }, + new Callback<Void>() { + @Override + public void onCompletion(Throwable error, Void result) { + if (callback != null) + callback.onCompletion(error, null); + } + } + )); + member.wakeup(); + } - final Connector connector; - try { - connector = instantiateConnector(className); - } catch (Throwable t) { - // Catches normal exceptions due to instantiation errors as well as any runtime errors that - // may be caused by user code - throw new CopycatException("Failed to create connector instance", t); - } - connector.initialize(new HerderConnectorContext(this, connName)); - try { - connector.start(configs); - } catch (CopycatException e) { - throw new CopycatException("Connector threw an exception while starting", e); - } - ConnectorState state = new ConnectorState(connName, connector, maxTasks, topics); - connectors.put(connName, state); + @Override + public synchronized void requestTaskReconfiguration(final String connName) { + requests.add(new HerderRequest( + new Callable<Void>() { + @Override + public Void call() throws Exception { + reconfigureConnector(connName); + return null; + } + } + )); + member.wakeup(); + } - log.info("Finished creating connector {}", connName); - return state; + private boolean isLeader() { + return assignment != null && member.memberId().equals(assignment.leader()); } - private static Connector instantiateConnector(String className) { - try { - return Utils.newInstance(className, Connector.class); - } catch (ClassNotFoundException e) { - throw new CopycatException("Couldn't instantiate connector class", e); + /** + * Handle post-assignment operations, either trying to resolve issues that kept assignment from completing, getting + * this node into sync and its work started. Since + * + * @return false if we couldn't finish + */ + private boolean handleRebalanceCompleted() { + if (this.rebalanceResolved) + return true; + + rebalanceResolved = true; + + // We need to handle a variety of cases after a rebalance: + // 1. Assignment failed + // 1a. We are the leader for the round. We will be leader again if we rejoin now, so we need to catch up before + // even attempting to. If we can't we should drop out of the group because we will block everyone from making + // progress. We can backoff and try rejoining later. + // 1b. We are not the leader. We might need to catch up. If we're already caught up we can rejoin immediately, + // otherwise, we just want to wait indefinitely to catch up and rejoin whenver we're finally ready. + // 2. Assignment succeeded. + // 2a. We are caught up on configs. Awesome! We can proceed to run our assigned work. + // 2b. We need to try to catch up. We can do this potentially indefinitely because if it takes to long, we'll + // be kicked out of the group anyway due to lack of heartbeats. + + boolean needsReadToEnd = false; + long syncConfigsTimeoutMs = Long.MAX_VALUE; + boolean needsRejoin = false; + if (assignment.failed()) { + needsRejoin = true; + if (isLeader()) { + log.warn("Join group completed, but assignment failed and we are the leader. Reading to end of config and retrying."); + needsReadToEnd = true; + syncConfigsTimeoutMs = workerSyncTimeoutMs; + } else if (configState.offset() < assignment.offset()) { + log.warn("Join group completed, but assignment failed and we lagging. Reading to end of config and retrying."); + needsReadToEnd = true; + } else { + log.warn("Join group completed, but assignment failed. We were up to date, so just retrying."); + } + } else { + if (configState.offset() < assignment.offset()) { + log.warn("Catching up to assignment's config offset."); + needsReadToEnd = true; + } } - } - private void destroyConnector(String connName) { - log.info("Destroying connector {}", connName); - ConnectorState state = connectors.get(connName); - if (state == null) { - log.error("Failed to destroy connector {} because it does not exist", connName); - throw new CopycatException("Connector does not exist"); + if (needsReadToEnd) { + // Force exiting this method to avoid creating any connectors/tasks and require immediate rejoining if + // we timed out. This should only happen if we were the leader and didn't finish quickly enough, in which + // case we've waited a long time and should have already left the group OR the timeout should have been + // very long and not having finished also indicates we've waited longer than the session timeout. + if (!readConfigToEnd(syncConfigsTimeoutMs)) + needsRejoin = true; } - stopConnector(state); - configStorage.putConnectorConfig(state.name, null); - connectors.remove(state.name); + if (needsRejoin) { + member.requestRejoin(); + return false; + } - log.info("Finished destroying connector {}", connName); + // Should still validate that they match since we may have gone *past* the required offset, in which case we + // should *not* start any tasks and rejoin + if (configState.offset() != assignment.offset()) { + log.info("Current config state offset {} does not match group assignment {}. Forcing rebalance.", configState.offset(), assignment.offset()); + member.requestRejoin(); + return false; + } + + startWork(); + + return true; } - // Stops a connectors tasks, then the connector - private void stopConnector(ConnectorState state) { - removeConnectorTasks(state); + /** + * Try to read to the end of the config log within the given timeout + * @param timeoutMs maximum time to wait to sync to the end of the log + * @return true if successful, false if timed out + */ + private boolean readConfigToEnd(long timeoutMs) { + log.info("Current config state offset {} is behind group assignment {}, reading to end of config log", configState.offset(), assignment.offset()); try { - state.connector.stop(); - } catch (CopycatException e) { - log.error("Error shutting down connector {}: ", state.connector, e); + configStorage.readToEnd().get(timeoutMs, TimeUnit.MILLISECONDS); + configState = configStorage.snapshot(); + log.info("Finished reading to end of log and updated config snapshot, new config log offset: {}", configState.offset()); + return true; + } catch (TimeoutException e) { + log.warn("Didn't reach end of config log quickly enough", e); + // TODO: With explicit leave group support, it would be good to explicitly leave the group *before* this + // backoff since it'll be longer than the session timeout + if (isLeader()) + backoff(workerUnsyncBackoffMs); + return false; + } catch (InterruptedException | ExecutionException e) { + throw new CopycatException("Error trying to catch up after assignment", e); } } - private void createConnectorTasks(ConnectorState state) { - String taskClassName = state.connector.taskClass().getName(); - - log.info("Creating tasks for connector {} of type {}", state.name, taskClassName); - - List<Properties> taskConfigs = state.connector.taskConfigs(state.maxTasks); - - // Generate the final configs, including framework provided settings - Map<ConnectorTaskId, Properties> taskProps = new HashMap<>(); - for (int i = 0; i < taskConfigs.size(); i++) { - ConnectorTaskId taskId = new ConnectorTaskId(state.name, i); - Properties config = taskConfigs.get(i); - // TODO: This probably shouldn't be in the Herder. It's nice to have Copycat ensure the list of topics - // is automatically provided to tasks since it is required by the framework, but this - String subscriptionTopics = Utils.join(state.inputTopics, ","); - if (state.connector instanceof SinkConnector) { - // Make sure we don't modify the original since the connector may reuse it internally - Properties configForSink = new Properties(); - configForSink.putAll(config); - configForSink.setProperty(SinkTask.TOPICS_CONFIG, subscriptionTopics); - config = configForSink; - } - taskProps.put(taskId, config); - } + private void backoff(long ms) { + Utils.sleep(ms); + } - // And initiate the tasks - for (int i = 0; i < taskConfigs.size(); i++) { - ConnectorTaskId taskId = new ConnectorTaskId(state.name, i); - Properties config = taskProps.get(taskId); + private void startWork() { + // Start assigned connectors and tasks + log.info("Starting connectors and tasks using config offset {}", assignment.offset()); + for (String connectorName : assignment.connectors()) { try { - worker.addTask(taskId, taskClassName, config); - // We only need to store the task IDs so we can clean up. - state.tasks.add(taskId); - } catch (Throwable e) { - log.error("Failed to add task {}: ", taskId, e); - // Swallow this so we can continue updating the rest of the tasks - // FIXME what's the proper response? Kill all the tasks? Consider this the same as a task - // that died after starting successfully. + startConnector(connectorName); + } catch (ConfigException e) { + log.error("Couldn't instantiate connector " + connectorName + " because it has an invalid connector " + + "configuration. This connector will not execute until reconfigured.", e); } } - } - - private void removeConnectorTasks(ConnectorState state) { - Iterator<ConnectorTaskId> taskIter = state.tasks.iterator(); - while (taskIter.hasNext()) { - ConnectorTaskId taskId = taskIter.next(); + for (ConnectorTaskId taskId : assignment.tasks()) { try { - worker.stopTask(taskId); - taskIter.remove(); - } catch (CopycatException e) { - log.error("Failed to stop task {}: ", taskId, e); - // Swallow this so we can continue stopping the rest of the tasks - // FIXME: Forcibly kill the task? + log.info("Starting task {}", taskId); + Map<String, String> configs = configState.taskConfig(taskId); + TaskConfig taskConfig = new TaskConfig(configs); + worker.addTask(taskId, taskConfig); + } catch (ConfigException e) { + log.error("Couldn't instantiate task " + taskId + " because it has an invalid task " + + "configuration. This task will not execute until reconfigured.", e); } } } - private void updateConnectorTasks(ConnectorState state) { - removeConnectorTasks(state); - createConnectorTasks(state); + // Helper for starting a connector with the given name, which will extract & parse the config, generate connector + // context and add to the worker. This needs to be called from within the main worker thread for this herder. + private void startConnector(String connectorName) { + log.info("Starting connector {}", connectorName); + Map<String, String> configs = configState.connectorConfig(connectorName); + ConnectorConfig connConfig = new ConnectorConfig(configs); + String connName = connConfig.getString(ConnectorConfig.NAME_CONFIG); + ConnectorContext ctx = new HerderConnectorContext(DistributedHerder.this, connName); + worker.addConnector(connConfig, ctx); + + // Immediately request configuration since this could be a brand new connector. However, also only update those + // task configs if they are actually different from the existing ones to avoid unnecessary updates when this is + // just restoring an existing connector. + reconfigureConnector(connName); } - private void restoreConnectors() { - configState = configStorage.snapshot(); - Collection<String> connNames = configState.connectors(); - for (String connName : connNames) { - log.info("Restoring connector {}", connName); - Map<String, String> connProps = configState.connectorConfig(connName); - ConnectorConfig connConfig = new ConnectorConfig(connProps); - ConnectorState connState = createConnector(connConfig); - // Because this coordinator is standalone, connectors are only restored when this process - // starts and we know there can't be any existing tasks. So in this special case we're able - // to just create the tasks rather than having to check for existing tasks and sort out - // whether they need to be reconfigured. - createConnectorTasks(connState); + // Updates configurations for a connector by requesting them from the connector, filling in parameters provided + // by the system, then checks whether any configs have actually changed before submitting the new configs to storage + private void reconfigureConnector(String connName) { + Map<String, String> configs = configState.connectorConfig(connName); + ConnectorConfig connConfig = new ConnectorConfig(configs); + + List<String> sinkTopics = null; + if (SinkConnector.class.isAssignableFrom(connConfig.getClass(ConnectorConfig.CONNECTOR_CLASS_CONFIG))) + sinkTopics = connConfig.getList(ConnectorConfig.TOPICS_CONFIG); + + Map<ConnectorTaskId, Map<String, String>> taskProps + = worker.reconfigureConnectorTasks(connName, connConfig.getInt(ConnectorConfig.TASKS_MAX_CONFIG), sinkTopics); + boolean changed = false; + int currentNumTasks = configState.taskCount(connName); + if (taskProps.size() != currentNumTasks) { + log.debug("Change in connector task count from {} to {}, writing updated task configurations", currentNumTasks, taskProps.size()); + changed = true; + } else { + for (Map.Entry<ConnectorTaskId, Map<String, String>> taskConfig : taskProps.entrySet()) { + if (!taskConfig.getValue().equals(configState.taskConfig(taskConfig.getKey()))) { + log.debug("Change in task configurations, writing updated task configurations"); + changed = true; + break; + } + } + } + if (changed) { + // FIXME: Configs should only be written by the leader to avoid conflicts due to zombies. However, until the + // REST API is available to forward this request, we need to do this on the worker that generates the config + configStorage.putTaskConfigs(taskProps); } } + private class HerderRequest { + private final Callable<Void> action; + private final Callback<Void> callback; - private static class ConnectorState { - public String name; - public Connector connector; - public int maxTasks; - public List<String> inputTopics; - Set<ConnectorTaskId> tasks; + public HerderRequest(Callable<Void> action, Callback<Void> callback) { + this.action = action; + this.callback = callback; + } - public ConnectorState(String name, Connector connector, int maxTasks, - List<String> inputTopics) { - this.name = name; - this.connector = connector; - this.maxTasks = maxTasks; - this.inputTopics = inputTopics; - this.tasks = new HashSet<>(); + public HerderRequest(Callable<Void> action) { + this.action = action; + this.callback = DEFAULT_CALLBACK; } - } - private class ConnectorConfigCallback implements Callback<String> { - @Override - public void onCompletion(Throwable error, String result) { - configState = configStorage.snapshot(); - // FIXME + public Callable<Void> action() { + return action; + } + + public Callback<Void> callback() { + return callback; } } - private class TaskConfigCallback implements Callback<List<ConnectorTaskId>> { + private static final Callback<Void> DEFAULT_CALLBACK = new Callback<Void>() { @Override - public void onCompletion(Throwable error, List<ConnectorTaskId> result) { - configState = configStorage.snapshot(); - // FIXME + public void onCompletion(Throwable error, Void result) { + if (error != null) + log.error("HerderRequest's action threw an exception: ", error); } + }; + + + // Config callbacks are triggered from the KafkaConfigStorage thread + private Callback<String> connectorConfigCallback() { + return new Callback<String>() { + @Override + public void onCompletion(Throwable error, String connector) { + log.debug("Connector {} config updated", connector); + // Stage the update and wake up the work thread. Connector config *changes* only need the one connector + // to be bounced. However, this callback may also indicate a connector *addition*, which does require + // a rebalance, so we need to be careful about what operation we request. + synchronized (DistributedHerder.this) { + connectorConfigUpdates.add(connector); + } + member.wakeup(); + } + }; } + private Callback<List<ConnectorTaskId>> taskConfigCallback() { + return new Callback<List<ConnectorTaskId>>() { + @Override + public void onCompletion(Throwable error, List<ConnectorTaskId> tasks) { + log.debug("Tasks {} configs updated", tasks); + // Stage the update and wake up the work thread. No need to record the set of tasks here because task reconfigs + // always need a rebalance to ensure offsets get committed. + // TODO: As an optimization, some task config updates could avoid a rebalance. In particular, single-task + // connectors clearly don't need any coordination. + synchronized (DistributedHerder.this) { + needsReconfigRebalance = true; + } + member.wakeup(); + } + }; + } + + // Rebalances are triggered internally from the group member, so these are always executed in the work thread. + private WorkerRebalanceListener rebalanceListener() { + return new WorkerRebalanceListener() { + @Override + public void onAssigned(CopycatProtocol.Assignment assignment) { + // This callback just logs the info and saves it. The actual response is handled in the main loop, which + // ensures the group member's logic for rebalancing can complete, potentially long-running steps to + // catch up (or backoff if we fail) not executed in a callback, and so we'll be able to invoke other + // group membership actions (e.g., we may need to explicitly leave the group if we cannot handle the + // assigned tasks). + log.info("Joined group and got assignment: {}", assignment); + DistributedHerder.this.assignment = assignment; + rebalanceResolved = false; + } + @Override + public void onRevoked(String leader, Collection<String> connectors, Collection<ConnectorTaskId> tasks) { + log.info("Rebalance started"); + + // Note that since we don't reset the assignment, we we don't revoke leadership here. During a rebalance, + // it is still important to have a leader that can write configs, offsets, etc. + + // TODO: Parallelize this. We should be able to request all connectors and tasks to stop, then wait on all of + // them to finish + // TODO: Technically we don't have to stop connectors at all until we know they've really been removed from + // this worker. Instead, we can let them continue to run but buffer any update requests (which should be + // rare anyway). This would avoid a steady stream of start/stop, which probably also includes lots of + // unnecessary repeated connections to the source/sink system. + for (String connectorName : connectors) + worker.stopConnector(connectorName); + // TODO: We need to at least commit task offsets, but if we could commit offsets & pause them instead of + // stopping them then state could continue to be reused when the task remains on this worker. For example, + // this would avoid having to close a connection and then reopen it when the task is assigned back to this + // worker again. + for (ConnectorTaskId taskId : tasks) + worker.stopTask(taskId); + } + }; + } }