http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerder.java ---------------------------------------------------------------------- diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerder.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerder.java deleted file mode 100644 index 96de1ca..0000000 --- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerder.java +++ /dev/null @@ -1,920 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - **/ - -package org.apache.kafka.copycat.runtime.distributed; - -import org.apache.kafka.common.errors.WakeupException; -import org.apache.kafka.common.config.ConfigException; -import org.apache.kafka.common.utils.SystemTime; -import org.apache.kafka.common.utils.Time; -import org.apache.kafka.common.utils.Utils; -import org.apache.kafka.copycat.connector.ConnectorContext; -import org.apache.kafka.copycat.errors.AlreadyExistsException; -import org.apache.kafka.copycat.errors.CopycatException; -import org.apache.kafka.copycat.errors.NotFoundException; -import org.apache.kafka.copycat.runtime.ConnectorConfig; -import org.apache.kafka.copycat.runtime.Herder; -import org.apache.kafka.copycat.runtime.HerderConnectorContext; -import org.apache.kafka.copycat.runtime.TaskConfig; -import org.apache.kafka.copycat.runtime.Worker; -import org.apache.kafka.copycat.runtime.rest.RestServer; -import org.apache.kafka.copycat.runtime.rest.entities.ConnectorInfo; -import org.apache.kafka.copycat.runtime.rest.entities.TaskInfo; -import org.apache.kafka.copycat.sink.SinkConnector; -import org.apache.kafka.copycat.storage.KafkaConfigStorage; -import org.apache.kafka.copycat.util.Callback; -import org.apache.kafka.copycat.util.ConnectorTaskId; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.PriorityQueue; -import java.util.Queue; -import java.util.Set; -import java.util.concurrent.Callable; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicBoolean; - -/** - * <p> - * Distributed "herder" that coordinates with other workers to spread work across multiple processes. - * </p> - * <p> - * Under the hood, this is implemented as a group managed by Kafka's group membership facilities (i.e. the generalized - * group/consumer coordinator). Each instance of DistributedHerder joins the group and indicates what it's current - * configuration state is (where it is in the configuration log). The group coordinator selects one member to take - * this information and assign each instance a subset of the active connectors & tasks to execute. This assignment - * is currently performed in a simple round-robin fashion, but this is not guaranteed -- the herder may also choose - * to, e.g., use a sticky assignment to avoid the usual start/stop costs associated with connectors and tasks. Once - * an assignment is received, the DistributedHerder simply runs its assigned connectors and tasks in a Worker. - * </p> - * <p> - * In addition to distributing work, the DistributedHerder uses the leader determined during the work assignment - * to select a leader for this generation of the group who is responsible for other tasks that can only be performed - * by a single node at a time. Most importantly, this includes writing updated configurations for connectors and tasks, - * (and therefore, also for creating, destroy, and scaling up/down connectors). - * </p> - */ -public class DistributedHerder implements Herder, Runnable { - private static final Logger log = LoggerFactory.getLogger(DistributedHerder.class); - - private static final long RECONFIGURE_CONNECTOR_TASKS_BACKOFF_MS = 250; - - private final Worker worker; - private final KafkaConfigStorage configStorage; - private ClusterConfigState configState; - private final Time time; - - private final int workerSyncTimeoutMs; - private final int workerUnsyncBackoffMs; - - private final WorkerGroupMember member; - private final AtomicBoolean stopping; - private final CountDownLatch stopLatch = new CountDownLatch(1); - - // Track enough information about the current membership state to be able to determine which requests via the API - // and the from other nodes are safe to process - private boolean rebalanceResolved; - private CopycatProtocol.Assignment assignment; - - // To handle most external requests, like creating or destroying a connector, we can use a generic request where - // the caller specifies all the code that should be executed. - private final Queue<HerderRequest> requests = new PriorityQueue<>(); - // Config updates can be collected and applied together when possible. Also, we need to take care to rebalance when - // needed (e.g. task reconfiguration, which requires everyone to coordinate offset commits). - private Set<String> connectorConfigUpdates = new HashSet<>(); - private boolean needsReconfigRebalance; - - private final ExecutorService forwardRequestExecutor; - - public DistributedHerder(DistributedConfig config, Worker worker, String restUrl) { - this(config, worker, null, null, restUrl, new SystemTime()); - } - - // public for testing - public DistributedHerder(DistributedConfig config, Worker worker, KafkaConfigStorage configStorage, WorkerGroupMember member, String restUrl, Time time) { - this.worker = worker; - if (configStorage != null) { - // For testing. Assume configuration has already been performed - this.configStorage = configStorage; - } else { - this.configStorage = new KafkaConfigStorage(worker.getInternalValueConverter(), connectorConfigCallback(), taskConfigCallback()); - this.configStorage.configure(config.originals()); - } - configState = ClusterConfigState.EMPTY; - this.time = time; - - this.workerSyncTimeoutMs = config.getInt(DistributedConfig.WORKER_SYNC_TIMEOUT_MS_CONFIG); - this.workerUnsyncBackoffMs = config.getInt(DistributedConfig.WORKER_UNSYNC_BACKOFF_MS_CONFIG); - - this.member = member != null ? member : new WorkerGroupMember(config, restUrl, this.configStorage, rebalanceListener()); - stopping = new AtomicBoolean(false); - - rebalanceResolved = true; // If we still need to follow up after a rebalance occurred, starting up tasks - needsReconfigRebalance = false; - - forwardRequestExecutor = Executors.newSingleThreadExecutor(); - } - - @Override - public void start() { - Thread thread = new Thread(this, "DistributedHerder"); - thread.start(); - } - - public void run() { - try { - log.info("Herder starting"); - - configStorage.start(); - - log.info("Herder started"); - - while (!stopping.get()) { - tick(); - } - - halt(); - - log.info("Herder stopped"); - } catch (Throwable t) { - log.error("Uncaught exception in herder work thread, exiting: ", t); - stopLatch.countDown(); - System.exit(1); - } finally { - stopLatch.countDown(); - } - } - - // public for testing - public void tick() { - // The main loop does two primary things: 1) drive the group membership protocol, responding to rebalance events - // as they occur, and 2) handle external requests targeted at the leader. All the "real" work of the herder is - // performed in this thread, which keeps synchronization straightforward at the cost of some operations possibly - // blocking up this thread (especially those in callbacks due to rebalance events). - - try { - member.ensureActive(); - // Ensure we're in a good state in our group. If not restart and everything should be setup to rejoin - if (!handleRebalanceCompleted()) return; - } catch (WakeupException e) { - // May be due to a request from another thread, or might be stopping. If the latter, we need to check the - // flag immediately. If the former, we need to re-run the ensureActive call since we can't handle requests - // unless we're in the group. - return; - } - - // Process any external requests - final long now = time.milliseconds(); - long nextRequestTimeoutMs = Long.MAX_VALUE; - while (true) { - final HerderRequest next; - synchronized (this) { - next = requests.peek(); - if (next == null) { - break; - } else if (now >= next.at) { - requests.poll(); - } else { - nextRequestTimeoutMs = next.at - now; - break; - } - } - - try { - next.action().call(); - next.callback().onCompletion(null, null); - } catch (Throwable t) { - next.callback().onCompletion(t, null); - } - } - - // Process any configuration updates - Set<String> connectorConfigUpdatesCopy = null; - synchronized (this) { - if (needsReconfigRebalance || !connectorConfigUpdates.isEmpty()) { - // Connector reconfigs only need local updates since there is no coordination between workers required. - // However, if connectors were added or removed, work needs to be rebalanced since we have more work - // items to distribute among workers. - ClusterConfigState newConfigState = configStorage.snapshot(); - if (!newConfigState.connectors().equals(configState.connectors())) - needsReconfigRebalance = true; - configState = newConfigState; - if (needsReconfigRebalance) { - // Task reconfigs require a rebalance. Request the rebalance, clean out state, and then restart - // this loop, which will then ensure the rebalance occurs without any other requests being - // processed until it completes. - member.requestRejoin(); - // Any connector config updates will be addressed during the rebalance too - connectorConfigUpdates.clear(); - needsReconfigRebalance = false; - return; - } else if (!connectorConfigUpdates.isEmpty()) { - // We can't start/stop while locked since starting connectors can cause task updates that will - // require writing configs, which in turn make callbacks into this class from another thread that - // require acquiring a lock. This leads to deadlock. Instead, just copy the info we need and process - // the updates after unlocking. - connectorConfigUpdatesCopy = connectorConfigUpdates; - connectorConfigUpdates = new HashSet<>(); - } - } - } - if (connectorConfigUpdatesCopy != null) { - // If we only have connector config updates, we can just bounce the updated connectors that are - // currently assigned to this worker. - Set<String> localConnectors = assignment == null ? Collections.<String>emptySet() : new HashSet<>(assignment.connectors()); - for (String connectorName : connectorConfigUpdatesCopy) { - if (!localConnectors.contains(connectorName)) - continue; - boolean remains = configState.connectors().contains(connectorName); - log.info("Handling connector-only config update by {} connector {}", - remains ? "restarting" : "stopping", connectorName); - worker.stopConnector(connectorName); - // The update may be a deletion, so verify we actually need to restart the connector - if (remains) - startConnector(connectorName); - } - } - - // Let the group take any actions it needs to - try { - member.poll(nextRequestTimeoutMs); - // Ensure we're in a good state in our group. If not restart and everything should be setup to rejoin - if (!handleRebalanceCompleted()) return; - } catch (WakeupException e) { // FIXME should not be WakeupException - // Ignore. Just indicates we need to check the exit flag, for requested actions, etc. - } - } - - // public for testing - public void halt() { - synchronized (this) { - // Clean up any connectors and tasks that are still running. - log.info("Stopping connectors and tasks that are still assigned to this worker."); - for (String connName : new HashSet<>(worker.connectorNames())) { - try { - worker.stopConnector(connName); - } catch (Throwable t) { - log.error("Failed to shut down connector " + connName, t); - } - } - for (ConnectorTaskId taskId : new HashSet<>(worker.taskIds())) { - try { - worker.stopTask(taskId); - } catch (Throwable t) { - log.error("Failed to shut down task " + taskId, t); - } - } - - member.stop(); - - // Explicitly fail any outstanding requests so they actually get a response and get an understandable reason - // for their failure - while (!requests.isEmpty()) { - HerderRequest request = requests.poll(); - request.callback().onCompletion(new CopycatException("Worker is shutting down"), null); - } - - if (configStorage != null) - configStorage.stop(); - } - } - - @Override - public void stop() { - log.info("Herder stopping"); - - stopping.set(true); - member.wakeup(); - while (stopLatch.getCount() > 0) { - try { - stopLatch.await(); - } catch (InterruptedException e) { - // ignore, should not happen - } - } - - - forwardRequestExecutor.shutdown(); - try { - if (!forwardRequestExecutor.awaitTermination(10000, TimeUnit.MILLISECONDS)) - forwardRequestExecutor.shutdownNow(); - } catch (InterruptedException e) { - // ignore - } - - log.info("Herder stopped"); - } - - @Override - public synchronized void connectors(final Callback<Collection<String>> callback) { - log.trace("Submitting connector listing request"); - - addRequest( - new Callable<Void>() { - @Override - public Void call() throws Exception { - if (!checkConfigSynced(callback)) - return null; - - callback.onCompletion(null, configState.connectors()); - return null; - } - }, - forwardErrorCallback(callback) - ); - } - - @Override - public synchronized void connectorInfo(final String connName, final Callback<ConnectorInfo> callback) { - log.trace("Submitting connector info request {}", connName); - - addRequest( - new Callable<Void>() { - @Override - public Void call() throws Exception { - if (!checkConfigSynced(callback)) - return null; - - if (!configState.connectors().contains(connName)) { - callback.onCompletion(new NotFoundException("Connector " + connName + " not found"), null); - } else { - callback.onCompletion(null, new ConnectorInfo(connName, configState.connectorConfig(connName), configState.tasks(connName))); - } - return null; - } - }, - forwardErrorCallback(callback) - ); - } - - @Override - public void connectorConfig(String connName, final Callback<Map<String, String>> callback) { - // Subset of connectorInfo, so piggy back on that implementation - log.trace("Submitting connector config read request {}", connName); - connectorInfo(connName, new Callback<ConnectorInfo>() { - @Override - public void onCompletion(Throwable error, ConnectorInfo result) { - if (error != null) - callback.onCompletion(error, null); - else - callback.onCompletion(null, result.config()); - } - }); - } - - @Override - public void putConnectorConfig(final String connName, Map<String, String> config, final boolean allowReplace, - final Callback<Created<ConnectorInfo>> callback) { - final Map<String, String> connConfig; - if (config == null) { - connConfig = null; - } else if (!config.containsKey(ConnectorConfig.NAME_CONFIG)) { - connConfig = new HashMap<>(config); - connConfig.put(ConnectorConfig.NAME_CONFIG, connName); - } else { - connConfig = config; - } - - log.trace("Submitting connector config write request {}", connName); - - addRequest( - new Callable<Void>() { - @Override - public Void call() throws Exception { - log.trace("Handling connector config request {}", connName); - if (!isLeader()) { - callback.onCompletion(new NotLeaderException("Only the leader can set connector configs.", leaderUrl()), null); - return null; - } - - boolean exists = configState.connectors().contains(connName); - if (!allowReplace && exists) { - callback.onCompletion(new AlreadyExistsException("Connector " + connName + " already exists"), null); - return null; - } - - if (connConfig == null && !exists) { - callback.onCompletion(new NotFoundException("Connector " + connName + " not found"), null); - return null; - } - - log.trace("Submitting connector config {} {} {}", connName, allowReplace, configState.connectors()); - configStorage.putConnectorConfig(connName, connConfig); - - boolean created = !exists && connConfig != null; - // Note that we use the updated connector config despite the fact that we don't have an updated - // snapshot yet. The existing task info should still be accurate. - ConnectorInfo info = connConfig == null ? null : - new ConnectorInfo(connName, connConfig, configState.tasks(connName)); - callback.onCompletion(null, new Created<>(created, info)); - - return null; - } - }, - forwardErrorCallback(callback) - ); - } - - @Override - public synchronized void requestTaskReconfiguration(final String connName) { - log.trace("Submitting connector task reconfiguration request {}", connName); - - addRequest( - new Callable<Void>() { - @Override - public Void call() throws Exception { - reconfigureConnectorTasksWithRetry(connName); - return null; - } - }, - new Callback<Void>() { - @Override - public void onCompletion(Throwable error, Void result) { - log.error("Unexpected error during task reconfiguration: ", error); - log.error("Task reconfiguration for {} failed unexpectedly, this connector will not be properly reconfigured unless manually triggered.", connName); - } - } - ); - } - - @Override - public synchronized void taskConfigs(final String connName, final Callback<List<TaskInfo>> callback) { - log.trace("Submitting get task configuration request {}", connName); - - addRequest( - new Callable<Void>() { - @Override - public Void call() throws Exception { - if (!checkConfigSynced(callback)) - return null; - - if (!configState.connectors().contains(connName)) { - callback.onCompletion(new NotFoundException("Connector " + connName + " not found"), null); - } else { - List<TaskInfo> result = new ArrayList<>(); - for (int i = 0; i < configState.taskCount(connName); i++) { - ConnectorTaskId id = new ConnectorTaskId(connName, i); - result.add(new TaskInfo(id, configState.taskConfig(id))); - } - callback.onCompletion(null, result); - } - return null; - } - }, - forwardErrorCallback(callback) - ); - } - - @Override - public synchronized void putTaskConfigs(final String connName, final List<Map<String, String>> configs, final Callback<Void> callback) { - log.trace("Submitting put task configuration request {}", connName); - - addRequest( - new Callable<Void>() { - @Override - public Void call() throws Exception { - if (!isLeader()) - callback.onCompletion(new NotLeaderException("Only the leader may write task configurations.", leaderUrl()), null); - else if (!configState.connectors().contains(connName)) - callback.onCompletion(new NotFoundException("Connector " + connName + " not found"), null); - else { - configStorage.putTaskConfigs(taskConfigListAsMap(connName, configs)); - callback.onCompletion(null, null); - } - return null; - } - }, - forwardErrorCallback(callback) - ); - } - - - // Should only be called from work thread, so synchronization should not be needed - private boolean isLeader() { - return assignment != null && member.memberId().equals(assignment.leader()); - } - - /** - * Get the URL for the leader's REST interface, or null if we do not have the leader's URL yet. - */ - private String leaderUrl() { - if (assignment == null) - return null; - return assignment.leaderUrl(); - } - - /** - * Handle post-assignment operations, either trying to resolve issues that kept assignment from completing, getting - * this node into sync and its work started. Since - * - * @return false if we couldn't finish - */ - private boolean handleRebalanceCompleted() { - if (this.rebalanceResolved) - return true; - - // We need to handle a variety of cases after a rebalance: - // 1. Assignment failed - // 1a. We are the leader for the round. We will be leader again if we rejoin now, so we need to catch up before - // even attempting to. If we can't we should drop out of the group because we will block everyone from making - // progress. We can backoff and try rejoining later. - // 1b. We are not the leader. We might need to catch up. If we're already caught up we can rejoin immediately, - // otherwise, we just want to wait indefinitely to catch up and rejoin whenver we're finally ready. - // 2. Assignment succeeded. - // 2a. We are caught up on configs. Awesome! We can proceed to run our assigned work. - // 2b. We need to try to catch up. We can do this potentially indefinitely because if it takes to long, we'll - // be kicked out of the group anyway due to lack of heartbeats. - - boolean needsReadToEnd = false; - long syncConfigsTimeoutMs = Long.MAX_VALUE; - boolean needsRejoin = false; - if (assignment.failed()) { - needsRejoin = true; - if (isLeader()) { - log.warn("Join group completed, but assignment failed and we are the leader. Reading to end of config and retrying."); - needsReadToEnd = true; - syncConfigsTimeoutMs = workerSyncTimeoutMs; - } else if (configState.offset() < assignment.offset()) { - log.warn("Join group completed, but assignment failed and we lagging. Reading to end of config and retrying."); - needsReadToEnd = true; - } else { - log.warn("Join group completed, but assignment failed. We were up to date, so just retrying."); - } - } else { - if (configState.offset() < assignment.offset()) { - log.warn("Catching up to assignment's config offset."); - needsReadToEnd = true; - } - } - - if (needsReadToEnd) { - // Force exiting this method to avoid creating any connectors/tasks and require immediate rejoining if - // we timed out. This should only happen if we were the leader and didn't finish quickly enough, in which - // case we've waited a long time and should have already left the group OR the timeout should have been - // very long and not having finished also indicates we've waited longer than the session timeout. - if (!readConfigToEnd(syncConfigsTimeoutMs)) - needsRejoin = true; - } - - if (needsRejoin) { - member.requestRejoin(); - return false; - } - - // Should still validate that they match since we may have gone *past* the required offset, in which case we - // should *not* start any tasks and rejoin - if (configState.offset() != assignment.offset()) { - log.info("Current config state offset {} does not match group assignment {}. Forcing rebalance.", configState.offset(), assignment.offset()); - member.requestRejoin(); - return false; - } - - startWork(); - - // We only mark this as resolved once we've actually started work, which allows us to correctly track whether - // what work is currently active and running. If we bail early, the main tick loop + having requested rejoin - // guarantees we'll attempt to rejoin before executing this method again. - rebalanceResolved = true; - return true; - } - - /** - * Try to read to the end of the config log within the given timeout - * @param timeoutMs maximum time to wait to sync to the end of the log - * @return true if successful, false if timed out - */ - private boolean readConfigToEnd(long timeoutMs) { - log.info("Current config state offset {} is behind group assignment {}, reading to end of config log", configState.offset(), assignment.offset()); - try { - configStorage.readToEnd().get(timeoutMs, TimeUnit.MILLISECONDS); - configState = configStorage.snapshot(); - log.info("Finished reading to end of log and updated config snapshot, new config log offset: {}", configState.offset()); - return true; - } catch (TimeoutException e) { - log.warn("Didn't reach end of config log quickly enough", e); - // TODO: With explicit leave group support, it would be good to explicitly leave the group *before* this - // backoff since it'll be longer than the session timeout - if (isLeader()) - backoff(workerUnsyncBackoffMs); - return false; - } catch (InterruptedException | ExecutionException e) { - throw new CopycatException("Error trying to catch up after assignment", e); - } - } - - private void backoff(long ms) { - Utils.sleep(ms); - } - - private void startWork() { - // Start assigned connectors and tasks - log.info("Starting connectors and tasks using config offset {}", assignment.offset()); - for (String connectorName : assignment.connectors()) { - try { - startConnector(connectorName); - } catch (ConfigException e) { - log.error("Couldn't instantiate connector " + connectorName + " because it has an invalid connector " + - "configuration. This connector will not execute until reconfigured.", e); - } - } - for (ConnectorTaskId taskId : assignment.tasks()) { - try { - log.info("Starting task {}", taskId); - Map<String, String> configs = configState.taskConfig(taskId); - TaskConfig taskConfig = new TaskConfig(configs); - worker.addTask(taskId, taskConfig); - } catch (ConfigException e) { - log.error("Couldn't instantiate task " + taskId + " because it has an invalid task " + - "configuration. This task will not execute until reconfigured.", e); - } - } - log.info("Finished starting connectors and tasks"); - } - - // Helper for starting a connector with the given name, which will extract & parse the config, generate connector - // context and add to the worker. This needs to be called from within the main worker thread for this herder. - private void startConnector(String connectorName) { - log.info("Starting connector {}", connectorName); - Map<String, String> configs = configState.connectorConfig(connectorName); - ConnectorConfig connConfig = new ConnectorConfig(configs); - String connName = connConfig.getString(ConnectorConfig.NAME_CONFIG); - ConnectorContext ctx = new HerderConnectorContext(DistributedHerder.this, connName); - worker.addConnector(connConfig, ctx); - - // Immediately request configuration since this could be a brand new connector. However, also only update those - // task configs if they are actually different from the existing ones to avoid unnecessary updates when this is - // just restoring an existing connector. - reconfigureConnectorTasksWithRetry(connName); - } - - private void reconfigureConnectorTasksWithRetry(final String connName) { - reconfigureConnector(connName, new Callback<Void>() { - @Override - public void onCompletion(Throwable error, Void result) { - // If we encountered an error, we don't have much choice but to just retry. If we don't, we could get - // stuck with a connector that thinks it has generated tasks, but wasn't actually successful and therefore - // never makes progress. The retry has to run through a HerderRequest since this callback could be happening - // from the HTTP request forwarding thread. - if (error != null) { - log.error("Failed to reconfigure connector's tasks, retrying after backoff:", error); - addRequest(RECONFIGURE_CONNECTOR_TASKS_BACKOFF_MS, - new Callable<Void>() { - @Override - public Void call() throws Exception { - reconfigureConnectorTasksWithRetry(connName); - return null; - } - }, new Callback<Void>() { - @Override - public void onCompletion(Throwable error, Void result) { - log.error("Unexpected error during connector task reconfiguration: ", error); - log.error("Task reconfiguration for {} failed unexpectedly, this connector will not be properly reconfigured unless manually triggered.", connName); - } - } - ); - } - } - }); - } - - // Updates configurations for a connector by requesting them from the connector, filling in parameters provided - // by the system, then checks whether any configs have actually changed before submitting the new configs to storage - private void reconfigureConnector(final String connName, final Callback<Void> cb) { - try { - Map<String, String> configs = configState.connectorConfig(connName); - ConnectorConfig connConfig = new ConnectorConfig(configs); - - List<String> sinkTopics = null; - if (SinkConnector.class.isAssignableFrom(connConfig.getClass(ConnectorConfig.CONNECTOR_CLASS_CONFIG))) - sinkTopics = connConfig.getList(ConnectorConfig.TOPICS_CONFIG); - - final List<Map<String, String>> taskProps - = worker.connectorTaskConfigs(connName, connConfig.getInt(ConnectorConfig.TASKS_MAX_CONFIG), sinkTopics); - boolean changed = false; - int currentNumTasks = configState.taskCount(connName); - if (taskProps.size() != currentNumTasks) { - log.debug("Change in connector task count from {} to {}, writing updated task configurations", currentNumTasks, taskProps.size()); - changed = true; - } else { - int index = 0; - for (Map<String, String> taskConfig : taskProps) { - if (!taskConfig.equals(configState.taskConfig(new ConnectorTaskId(connName, index)))) { - log.debug("Change in task configurations, writing updated task configurations"); - changed = true; - break; - } - index++; - } - } - if (changed) { - if (isLeader()) { - configStorage.putTaskConfigs(taskConfigListAsMap(connName, taskProps)); - cb.onCompletion(null, null); - } else { - // We cannot forward the request on the same thread because this reconfiguration can happen in as a - // result of . If we blocked - forwardRequestExecutor.submit(new Runnable() { - @Override - public void run() { - try { - String reconfigUrl = RestServer.urlJoin(leaderUrl(), "/connectors/" + connName + "/tasks"); - RestServer.httpRequest(reconfigUrl, "POST", taskProps, null); - cb.onCompletion(null, null); - } catch (CopycatException e) { - log.error("Request to leader to reconfigure connector tasks failed", e); - cb.onCompletion(e, null); - } - } - }); - } - } - } catch (Throwable t) { - cb.onCompletion(t, null); - } - } - - // Common handling for requests that get config data. Checks if we are in sync with the current config, which allows - // us to answer requests directly. If we are not, handles invoking the callback with the appropriate error. - private boolean checkConfigSynced(Callback<?> callback) { - if (assignment == null || configState.offset() != assignment.offset()) { - if (!isLeader()) - callback.onCompletion(new NotLeaderException("Cannot get config data because config is not in sync and this is not the leader", leaderUrl()), null); - else - callback.onCompletion(new CopycatException("Cannot get config data because this is the leader node, but it does not have the most up to date configs"), null); - return false; - } - return true; - } - - private void addRequest(Callable<Void> action, Callback<Void> callback) { - addRequest(0, action, callback); - } - - private void addRequest(long delayMs, Callable<Void> action, Callback<Void> callback) { - HerderRequest req = new HerderRequest(time.milliseconds() + delayMs, action, callback); - requests.add(req); - if (requests.peek() == req) - member.wakeup(); - } - - private class HerderRequest implements Comparable<HerderRequest> { - private final long at; - private final Callable<Void> action; - private final Callback<Void> callback; - - public HerderRequest(long at, Callable<Void> action, Callback<Void> callback) { - this.at = at; - this.action = action; - this.callback = callback; - } - - public Callable<Void> action() { - return action; - } - - public Callback<Void> callback() { - return callback; - } - - @Override - public int compareTo(HerderRequest o) { - return Long.compare(at, o.at); - } - } - - private static final Callback<Void> forwardErrorCallback(final Callback<?> callback) { - return new Callback<Void>() { - @Override - public void onCompletion(Throwable error, Void result) { - if (error != null) - callback.onCompletion(error, null); - } - }; - }; - - - // Config callbacks are triggered from the KafkaConfigStorage thread - private Callback<String> connectorConfigCallback() { - return new Callback<String>() { - @Override - public void onCompletion(Throwable error, String connector) { - log.info("Connector {} config updated", connector); - // Stage the update and wake up the work thread. Connector config *changes* only need the one connector - // to be bounced. However, this callback may also indicate a connector *addition*, which does require - // a rebalance, so we need to be careful about what operation we request. - synchronized (DistributedHerder.this) { - connectorConfigUpdates.add(connector); - } - member.wakeup(); - } - }; - } - - private Callback<List<ConnectorTaskId>> taskConfigCallback() { - return new Callback<List<ConnectorTaskId>>() { - @Override - public void onCompletion(Throwable error, List<ConnectorTaskId> tasks) { - log.info("Tasks {} configs updated", tasks); - // Stage the update and wake up the work thread. No need to record the set of tasks here because task reconfigs - // always need a rebalance to ensure offsets get committed. - // TODO: As an optimization, some task config updates could avoid a rebalance. In particular, single-task - // connectors clearly don't need any coordination. - synchronized (DistributedHerder.this) { - needsReconfigRebalance = true; - } - member.wakeup(); - } - }; - } - - // Rebalances are triggered internally from the group member, so these are always executed in the work thread. - private WorkerRebalanceListener rebalanceListener() { - return new WorkerRebalanceListener() { - @Override - public void onAssigned(CopycatProtocol.Assignment assignment) { - // This callback just logs the info and saves it. The actual response is handled in the main loop, which - // ensures the group member's logic for rebalancing can complete, potentially long-running steps to - // catch up (or backoff if we fail) not executed in a callback, and so we'll be able to invoke other - // group membership actions (e.g., we may need to explicitly leave the group if we cannot handle the - // assigned tasks). - log.info("Joined group and got assignment: {}", assignment); - synchronized (DistributedHerder.this) { - DistributedHerder.this.assignment = assignment; - rebalanceResolved = false; - } - // We *must* interrupt any poll() call since this could occur when the poll starts, and we might then - // sleep in the poll() for a long time. Forcing a wakeup ensures we'll get to process this event in the - // main thread. - member.wakeup(); - } - - @Override - public void onRevoked(String leader, Collection<String> connectors, Collection<ConnectorTaskId> tasks) { - log.info("Rebalance started"); - - // Note that since we don't reset the assignment, we we don't revoke leadership here. During a rebalance, - // it is still important to have a leader that can write configs, offsets, etc. - - if (rebalanceResolved) { - // TODO: Parallelize this. We should be able to request all connectors and tasks to stop, then wait on all of - // them to finish - // TODO: Technically we don't have to stop connectors at all until we know they've really been removed from - // this worker. Instead, we can let them continue to run but buffer any update requests (which should be - // rare anyway). This would avoid a steady stream of start/stop, which probably also includes lots of - // unnecessary repeated connections to the source/sink system. - for (String connectorName : connectors) - worker.stopConnector(connectorName); - // TODO: We need to at least commit task offsets, but if we could commit offsets & pause them instead of - // stopping them then state could continue to be reused when the task remains on this worker. For example, - // this would avoid having to close a connection and then reopen it when the task is assigned back to this - // worker again. - for (ConnectorTaskId taskId : tasks) - worker.stopTask(taskId); - - log.info("Finished stopping tasks in preparation for rebalance"); - } else { - log.info("Wasn't unable to resume work after last rebalance, can skip stopping connectors and tasks"); - } - - } - }; - } - - - private static Map<ConnectorTaskId, Map<String, String>> taskConfigListAsMap(String connName, List<Map<String, String>> configs) { - int index = 0; - Map<ConnectorTaskId, Map<String, String>> result = new HashMap<>(); - for (Map<String, String> taskConfigMap : configs) { - ConnectorTaskId taskId = new ConnectorTaskId(connName, index); - result.put(taskId, taskConfigMap); - index++; - } - return result; - } -}
http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/NotLeaderException.java ---------------------------------------------------------------------- diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/NotLeaderException.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/NotLeaderException.java deleted file mode 100644 index 7e6dc67..0000000 --- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/NotLeaderException.java +++ /dev/null @@ -1,47 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - **/ - -package org.apache.kafka.copycat.runtime.distributed; - -import org.apache.kafka.copycat.errors.CopycatException; - -/** - * Indicates an operation was not permitted because it can only be performed on the leader and this worker is not currently - * the leader. - */ -public class NotLeaderException extends CopycatException { - private final String leaderUrl; - - public NotLeaderException(String msg, String leaderUrl) { - super(msg); - this.leaderUrl = leaderUrl; - } - - public NotLeaderException(String msg, String leaderUrl, Throwable throwable) { - super(msg, throwable); - this.leaderUrl = leaderUrl; - } - - public NotLeaderException(String leaderUrl, Throwable throwable) { - super(throwable); - this.leaderUrl = leaderUrl; - } - - public String leaderUrl() { - return leaderUrl; - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/WorkerCoordinator.java ---------------------------------------------------------------------- diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/WorkerCoordinator.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/WorkerCoordinator.java deleted file mode 100644 index c748971..0000000 --- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/WorkerCoordinator.java +++ /dev/null @@ -1,293 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - **/ -package org.apache.kafka.copycat.runtime.distributed; - -import org.apache.kafka.clients.consumer.internals.AbstractCoordinator; -import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient; -import org.apache.kafka.common.MetricName; -import org.apache.kafka.common.metrics.Measurable; -import org.apache.kafka.common.metrics.MetricConfig; -import org.apache.kafka.common.metrics.Metrics; -import org.apache.kafka.common.utils.CircularIterator; -import org.apache.kafka.common.utils.Time; -import org.apache.kafka.copycat.storage.KafkaConfigStorage; -import org.apache.kafka.copycat.util.ConnectorTaskId; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.Closeable; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; - -/** - * This class manages the coordination process with the Kafka group coordinator on the broker for managing Copycat assignments to workers. - */ -public final class WorkerCoordinator extends AbstractCoordinator implements Closeable { - private static final Logger log = LoggerFactory.getLogger(WorkerCoordinator.class); - - // Currently Copycat doesn't support multiple task assignment strategies, so we currently just fill in a default value - public static final String DEFAULT_SUBPROTOCOL = "default"; - - private final String restUrl; - private final KafkaConfigStorage configStorage; - private CopycatProtocol.Assignment assignmentSnapshot; - private final CopycatWorkerCoordinatorMetrics sensors; - private ClusterConfigState configSnapshot; - private final WorkerRebalanceListener listener; - - private boolean rejoinRequested; - - /** - * Initialize the coordination manager. - */ - public WorkerCoordinator(ConsumerNetworkClient client, - String groupId, - int sessionTimeoutMs, - int heartbeatIntervalMs, - Metrics metrics, - String metricGrpPrefix, - Map<String, String> metricTags, - Time time, - long requestTimeoutMs, - long retryBackoffMs, - String restUrl, - KafkaConfigStorage configStorage, - WorkerRebalanceListener listener) { - super(client, - groupId, - sessionTimeoutMs, - heartbeatIntervalMs, - metrics, - metricGrpPrefix, - metricTags, - time, - retryBackoffMs); - this.restUrl = restUrl; - this.configStorage = configStorage; - this.assignmentSnapshot = null; - this.sensors = new CopycatWorkerCoordinatorMetrics(metrics, metricGrpPrefix, metricTags); - this.listener = listener; - this.rejoinRequested = false; - } - - public void requestRejoin() { - rejoinRequested = true; - } - - @Override - public String protocolType() { - return "copycat"; - } - - @Override - public LinkedHashMap<String, ByteBuffer> metadata() { - LinkedHashMap<String, ByteBuffer> metadata = new LinkedHashMap<>(); - configSnapshot = configStorage.snapshot(); - CopycatProtocol.WorkerState workerState = new CopycatProtocol.WorkerState(restUrl, configSnapshot.offset()); - metadata.put(DEFAULT_SUBPROTOCOL, CopycatProtocol.serializeMetadata(workerState)); - return metadata; - } - - @Override - protected void onJoinComplete(int generation, String memberId, String protocol, ByteBuffer memberAssignment) { - assignmentSnapshot = CopycatProtocol.deserializeAssignment(memberAssignment); - // At this point we always consider ourselves to be a member of the cluster, even if there was an assignment - // error (the leader couldn't make the assignment) or we are behind the config and cannot yet work on our assigned - // tasks. It's the responsibility of the code driving this process to decide how to react (e.g. trying to get - // up to date, try to rejoin again, leaving the group and backing off, etc.). - rejoinRequested = false; - listener.onAssigned(assignmentSnapshot); - } - - @Override - protected Map<String, ByteBuffer> performAssignment(String leaderId, String protocol, Map<String, ByteBuffer> allMemberMetadata) { - log.debug("Performing task assignment"); - - Map<String, CopycatProtocol.WorkerState> allConfigs = new HashMap<>(); - for (Map.Entry<String, ByteBuffer> entry : allMemberMetadata.entrySet()) - allConfigs.put(entry.getKey(), CopycatProtocol.deserializeMetadata(entry.getValue())); - - long maxOffset = findMaxMemberConfigOffset(allConfigs); - Long leaderOffset = ensureLeaderConfig(maxOffset); - if (leaderOffset == null) - return fillAssignmentsAndSerialize(allConfigs.keySet(), CopycatProtocol.Assignment.CONFIG_MISMATCH, - leaderId, allConfigs.get(leaderId).url(), maxOffset, - new HashMap<String, List<String>>(), new HashMap<String, List<ConnectorTaskId>>()); - return performTaskAssignment(leaderId, leaderOffset, allConfigs); - } - - private long findMaxMemberConfigOffset(Map<String, CopycatProtocol.WorkerState> allConfigs) { - // The new config offset is the maximum seen by any member. We always perform assignment using this offset, - // even if some members have fallen behind. The config offset used to generate the assignment is included in - // the response so members that have fallen behind will not use the assignment until they have caught up. - Long maxOffset = null; - for (Map.Entry<String, CopycatProtocol.WorkerState> stateEntry : allConfigs.entrySet()) { - long memberRootOffset = stateEntry.getValue().offset(); - if (maxOffset == null) - maxOffset = memberRootOffset; - else - maxOffset = Math.max(maxOffset, memberRootOffset); - } - - log.debug("Max config offset root: {}, local snapshot config offsets root: {}", - maxOffset, configSnapshot.offset()); - return maxOffset; - } - - private Long ensureLeaderConfig(long maxOffset) { - // If this leader is behind some other members, we can't do assignment - if (configSnapshot.offset() < maxOffset) { - // We might be able to take a new snapshot to catch up immediately and avoid another round of syncing here. - // Alternatively, if this node has already passed the maximum reported by any other member of the group, it - // is also safe to use this newer state. - ClusterConfigState updatedSnapshot = configStorage.snapshot(); - if (updatedSnapshot.offset() < maxOffset) { - log.info("Was selected to perform assignments, but do not have latest config found in sync request. " + - "Returning an empty configuration to trigger re-sync."); - return null; - } else { - configSnapshot = updatedSnapshot; - return configSnapshot.offset(); - } - } - - return maxOffset; - } - - private Map<String, ByteBuffer> performTaskAssignment(String leaderId, long maxOffset, Map<String, CopycatProtocol.WorkerState> allConfigs) { - Map<String, List<String>> connectorAssignments = new HashMap<>(); - Map<String, List<ConnectorTaskId>> taskAssignments = new HashMap<>(); - - // Perform round-robin task assignment - CircularIterator<String> memberIt = new CircularIterator<>(sorted(allConfigs.keySet())); - for (String connectorId : sorted(configSnapshot.connectors())) { - String connectorAssignedTo = memberIt.next(); - log.trace("Assigning connector {} to {}", connectorId, connectorAssignedTo); - List<String> memberConnectors = connectorAssignments.get(connectorAssignedTo); - if (memberConnectors == null) { - memberConnectors = new ArrayList<>(); - connectorAssignments.put(connectorAssignedTo, memberConnectors); - } - memberConnectors.add(connectorId); - - for (ConnectorTaskId taskId : sorted(configSnapshot.tasks(connectorId))) { - String taskAssignedTo = memberIt.next(); - log.trace("Assigning task {} to {}", taskId, taskAssignedTo); - List<ConnectorTaskId> memberTasks = taskAssignments.get(taskAssignedTo); - if (memberTasks == null) { - memberTasks = new ArrayList<>(); - taskAssignments.put(taskAssignedTo, memberTasks); - } - memberTasks.add(taskId); - } - } - - return fillAssignmentsAndSerialize(allConfigs.keySet(), CopycatProtocol.Assignment.NO_ERROR, - leaderId, allConfigs.get(leaderId).url(), maxOffset, connectorAssignments, taskAssignments); - } - - private Map<String, ByteBuffer> fillAssignmentsAndSerialize(Collection<String> members, - short error, - String leaderId, - String leaderUrl, - long maxOffset, - Map<String, List<String>> connectorAssignments, - Map<String, List<ConnectorTaskId>> taskAssignments) { - - Map<String, ByteBuffer> groupAssignment = new HashMap<>(); - for (String member : members) { - List<String> connectors = connectorAssignments.get(member); - if (connectors == null) - connectors = Collections.emptyList(); - List<ConnectorTaskId> tasks = taskAssignments.get(member); - if (tasks == null) - tasks = Collections.emptyList(); - CopycatProtocol.Assignment assignment = new CopycatProtocol.Assignment(error, leaderId, leaderUrl, maxOffset, connectors, tasks); - log.debug("Assignment: {} -> {}", member, assignment); - groupAssignment.put(member, CopycatProtocol.serializeAssignment(assignment)); - } - log.debug("Finished assignment"); - return groupAssignment; - } - - @Override - protected void onJoinPrepare(int generation, String memberId) { - log.debug("Revoking previous assignment {}", assignmentSnapshot); - if (assignmentSnapshot != null && !assignmentSnapshot.failed()) - listener.onRevoked(assignmentSnapshot.leader(), assignmentSnapshot.connectors(), assignmentSnapshot.tasks()); - } - - @Override - public boolean needRejoin() { - return super.needRejoin() || (assignmentSnapshot == null || assignmentSnapshot.failed()) || rejoinRequested; - } - - public String memberId() { - return this.memberId; - } - - @Override - public void close() { - super.close(); - } - - private class CopycatWorkerCoordinatorMetrics { - public final Metrics metrics; - public final String metricGrpName; - - public CopycatWorkerCoordinatorMetrics(Metrics metrics, String metricGrpPrefix, Map<String, String> tags) { - this.metrics = metrics; - this.metricGrpName = metricGrpPrefix + "-coordinator-metrics"; - - Measurable numConnectors = new Measurable() { - public double measure(MetricConfig config, long now) { - return assignmentSnapshot.connectors().size(); - } - }; - - Measurable numTasks = new Measurable() { - public double measure(MetricConfig config, long now) { - return assignmentSnapshot.tasks().size(); - } - }; - - metrics.addMetric(new MetricName("assigned-connectors", - this.metricGrpName, - "The number of connector instances currently assigned to this consumer", - tags), - numConnectors); - metrics.addMetric(new MetricName("assigned-tasks", - this.metricGrpName, - "The number of tasks currently assigned to this consumer", - tags), - numTasks); - } - } - - private static <T extends Comparable<T>> List<T> sorted(Collection<T> members) { - List<T> res = new ArrayList<>(members); - Collections.sort(res); - return res; - } - -} http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/WorkerGroupMember.java ---------------------------------------------------------------------- diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/WorkerGroupMember.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/WorkerGroupMember.java deleted file mode 100644 index 908fe59..0000000 --- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/WorkerGroupMember.java +++ /dev/null @@ -1,185 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - **/ - -package org.apache.kafka.copycat.runtime.distributed; - -import org.apache.kafka.clients.ClientUtils; -import org.apache.kafka.clients.CommonClientConfigs; -import org.apache.kafka.clients.Metadata; -import org.apache.kafka.clients.NetworkClient; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient; -import org.apache.kafka.common.Cluster; -import org.apache.kafka.common.KafkaException; -import org.apache.kafka.common.metrics.JmxReporter; -import org.apache.kafka.common.metrics.MetricConfig; -import org.apache.kafka.common.metrics.Metrics; -import org.apache.kafka.common.metrics.MetricsReporter; -import org.apache.kafka.common.network.ChannelBuilder; -import org.apache.kafka.common.network.Selector; -import org.apache.kafka.common.utils.AppInfoParser; -import org.apache.kafka.common.utils.SystemTime; -import org.apache.kafka.common.utils.Time; -import org.apache.kafka.copycat.storage.KafkaConfigStorage; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.net.InetSocketAddress; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; - -/** - * This class manages the coordination process with brokers for the copycat cluster group membership. It ties together - * the Coordinator, which implements the group member protocol, with all the other pieces needed to drive the connection - * to the group coordinator broker. This isolates all the networking to a single thread managed by this class, with - * higher level operations in response to group membership events being handled by the herder. - */ -public class WorkerGroupMember { - private static final Logger log = LoggerFactory.getLogger(WorkerGroupMember.class); - - private static final AtomicInteger COPYCAT_CLIENT_ID_SEQUENCE = new AtomicInteger(1); - private static final String JMX_PREFIX = "kafka.copycat"; - - private final Time time; - private final String clientId; - private final ConsumerNetworkClient client; - private final Metrics metrics; - private final Metadata metadata; - private final long retryBackoffMs; - private final WorkerCoordinator coordinator; - - private boolean stopped = false; - - public WorkerGroupMember(DistributedConfig config, String restUrl, KafkaConfigStorage configStorage, WorkerRebalanceListener listener) { - try { - this.time = new SystemTime(); - - MetricConfig metricConfig = new MetricConfig().samples(config.getInt(CommonClientConfigs.METRICS_NUM_SAMPLES_CONFIG)) - .timeWindow(config.getLong(CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS); - String clientIdConfig = config.getString(CommonClientConfigs.CLIENT_ID_CONFIG); - clientId = clientIdConfig.length() <= 0 ? "copycat-" + COPYCAT_CLIENT_ID_SEQUENCE.getAndIncrement() : clientIdConfig; - List<MetricsReporter> reporters = config.getConfiguredInstances(CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG, MetricsReporter.class); - reporters.add(new JmxReporter(JMX_PREFIX)); - this.metrics = new Metrics(metricConfig, reporters, time); - this.retryBackoffMs = config.getLong(CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG); - this.metadata = new Metadata(retryBackoffMs, config.getLong(CommonClientConfigs.METADATA_MAX_AGE_CONFIG)); - List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG)); - this.metadata.update(Cluster.bootstrap(addresses), 0); - String metricGrpPrefix = "copycat"; - Map<String, String> metricsTags = new LinkedHashMap<>(); - metricsTags.put("client-id", clientId); - ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config.values()); - NetworkClient netClient = new NetworkClient( - new Selector(config.getLong(CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_CONFIG), metrics, time, metricGrpPrefix, metricsTags, channelBuilder), - this.metadata, - clientId, - 100, // a fixed large enough value will suffice - config.getLong(CommonClientConfigs.RECONNECT_BACKOFF_MS_CONFIG), - config.getInt(CommonClientConfigs.SEND_BUFFER_CONFIG), - config.getInt(CommonClientConfigs.RECEIVE_BUFFER_CONFIG), - config.getInt(CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG), time); - this.client = new ConsumerNetworkClient(netClient, metadata, time, retryBackoffMs); - this.coordinator = new WorkerCoordinator(this.client, - config.getString(DistributedConfig.GROUP_ID_CONFIG), - config.getInt(DistributedConfig.SESSION_TIMEOUT_MS_CONFIG), - config.getInt(DistributedConfig.HEARTBEAT_INTERVAL_MS_CONFIG), - metrics, - metricGrpPrefix, - metricsTags, - this.time, - config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG), - retryBackoffMs, - restUrl, - configStorage, - listener); - - AppInfoParser.registerAppInfo(JMX_PREFIX, clientId); - log.debug("Copycat group member created"); - } catch (Throwable t) { - // call close methods if internal objects are already constructed - // this is to prevent resource leak. see KAFKA-2121 - stop(true); - // now propagate the exception - throw new KafkaException("Failed to construct kafka consumer", t); - } - } - - public void stop() { - if (stopped) return; - stop(false); - } - - public void ensureActive() { - coordinator.ensureCoordinatorKnown(); - coordinator.ensureActiveGroup(); - } - - public void poll(long timeout) { - if (timeout < 0) - throw new IllegalArgumentException("Timeout must not be negative"); - - // poll for new data until the timeout expires - long remaining = timeout; - while (remaining >= 0) { - long start = time.milliseconds(); - coordinator.ensureCoordinatorKnown(); - coordinator.ensureActiveGroup(); - client.poll(remaining); - remaining -= time.milliseconds() - start; - } - } - - /** - * Interrupt any running poll() calls, causing a WakeupException to be thrown in the thread invoking that method. - */ - public void wakeup() { - this.client.wakeup(); - } - - /** - * Get the member ID of this worker in the group of workers. - * - * This ID is the unique member ID automatically generated. - * - * @return the member ID - */ - public String memberId() { - return coordinator.memberId(); - } - - public void requestRejoin() { - coordinator.requestRejoin(); - } - - private void stop(boolean swallowException) { - log.trace("Stopping the Copycat group member."); - AtomicReference<Throwable> firstException = new AtomicReference<Throwable>(); - this.stopped = true; - ClientUtils.closeQuietly(coordinator, "coordinator", firstException); - ClientUtils.closeQuietly(metrics, "consumer metrics", firstException); - ClientUtils.closeQuietly(client, "consumer network client", firstException); - AppInfoParser.unregisterAppInfo(JMX_PREFIX, clientId); - if (firstException.get() != null && !swallowException) - throw new KafkaException("Failed to stop the Copycat group member", firstException.get()); - else - log.debug("The Copycat group member has stopped."); - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/WorkerRebalanceListener.java ---------------------------------------------------------------------- diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/WorkerRebalanceListener.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/WorkerRebalanceListener.java deleted file mode 100644 index c9d2ed2..0000000 --- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/WorkerRebalanceListener.java +++ /dev/null @@ -1,38 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - **/ - -package org.apache.kafka.copycat.runtime.distributed; - -import org.apache.kafka.copycat.util.ConnectorTaskId; - -import java.util.Collection; - -/** - * Listener for rebalance events in the worker group. - */ -public interface WorkerRebalanceListener { - /** - * Invoked when a new assignment is created by joining the Copycat worker group. This is invoked for both successful - * and unsuccessful assignments. - */ - void onAssigned(CopycatProtocol.Assignment assignment); - - /** - * Invoked when a rebalance operation starts, revoking ownership for the set of connectors and tasks. - */ - void onRevoked(String leader, Collection<String> connectors, Collection<ConnectorTaskId> tasks); -} http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/rest/RestServer.java ---------------------------------------------------------------------- diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/rest/RestServer.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/rest/RestServer.java deleted file mode 100644 index 5da747d..0000000 --- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/rest/RestServer.java +++ /dev/null @@ -1,258 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - **/ - -package org.apache.kafka.copycat.runtime.rest; - -import com.fasterxml.jackson.core.type.TypeReference; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider; -import org.apache.kafka.copycat.errors.CopycatException; -import org.apache.kafka.copycat.runtime.Herder; -import org.apache.kafka.copycat.runtime.WorkerConfig; -import org.apache.kafka.copycat.runtime.rest.entities.ErrorMessage; -import org.apache.kafka.copycat.runtime.rest.errors.CopycatExceptionMapper; -import org.apache.kafka.copycat.runtime.rest.errors.CopycatRestException; -import org.apache.kafka.copycat.runtime.rest.resources.ConnectorsResource; -import org.apache.kafka.copycat.runtime.rest.resources.RootResource; -import org.eclipse.jetty.server.Connector; -import org.eclipse.jetty.server.Handler; -import org.eclipse.jetty.server.Server; -import org.eclipse.jetty.server.ServerConnector; -import org.eclipse.jetty.server.Slf4jRequestLog; -import org.eclipse.jetty.server.handler.DefaultHandler; -import org.eclipse.jetty.server.handler.HandlerCollection; -import org.eclipse.jetty.server.handler.RequestLogHandler; -import org.eclipse.jetty.server.handler.StatisticsHandler; -import org.eclipse.jetty.servlet.ServletContextHandler; -import org.eclipse.jetty.servlet.ServletHolder; -import org.glassfish.jersey.server.ResourceConfig; -import org.glassfish.jersey.servlet.ServletContainer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.ws.rs.core.Response; -import javax.ws.rs.core.UriBuilder; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.net.HttpURLConnection; -import java.net.URL; -import java.util.List; -import java.util.Map; - -/** - * Embedded server for the REST API that provides the control plane for Copycat workers. - */ -public class RestServer { - private static final Logger log = LoggerFactory.getLogger(RestServer.class); - - private static final long GRACEFUL_SHUTDOWN_TIMEOUT_MS = 60 * 1000; - - private static final ObjectMapper JSON_SERDE = new ObjectMapper(); - - private final WorkerConfig config; - private Herder herder; - private Server jettyServer; - - /** - * Create a REST server for this herder using the specified configs. - */ - public RestServer(WorkerConfig config) { - this.config = config; - - // To make the advertised port available immediately, we need to do some configuration here - String hostname = config.getString(WorkerConfig.REST_HOST_NAME_CONFIG); - Integer port = config.getInt(WorkerConfig.REST_PORT_CONFIG); - - jettyServer = new Server(); - - ServerConnector connector = new ServerConnector(jettyServer); - if (hostname != null && !hostname.isEmpty()) - connector.setHost(hostname); - connector.setPort(port); - jettyServer.setConnectors(new Connector[]{connector}); - } - - public void start(Herder herder) { - log.info("Starting REST server"); - - this.herder = herder; - - ResourceConfig resourceConfig = new ResourceConfig(); - resourceConfig.register(new JacksonJsonProvider()); - - resourceConfig.register(RootResource.class); - resourceConfig.register(new ConnectorsResource(herder)); - - resourceConfig.register(CopycatExceptionMapper.class); - - ServletContainer servletContainer = new ServletContainer(resourceConfig); - ServletHolder servletHolder = new ServletHolder(servletContainer); - - ServletContextHandler context = new ServletContextHandler(ServletContextHandler.SESSIONS); - context.setContextPath("/"); - context.addServlet(servletHolder, "/*"); - - RequestLogHandler requestLogHandler = new RequestLogHandler(); - Slf4jRequestLog requestLog = new Slf4jRequestLog(); - requestLog.setLoggerName(RestServer.class.getCanonicalName()); - requestLog.setLogLatency(true); - requestLogHandler.setRequestLog(requestLog); - - HandlerCollection handlers = new HandlerCollection(); - handlers.setHandlers(new Handler[]{context, new DefaultHandler(), requestLogHandler}); - - /* Needed for graceful shutdown as per `setStopTimeout` documentation */ - StatisticsHandler statsHandler = new StatisticsHandler(); - statsHandler.setHandler(handlers); - jettyServer.setHandler(statsHandler); - jettyServer.setStopTimeout(GRACEFUL_SHUTDOWN_TIMEOUT_MS); - jettyServer.setStopAtShutdown(true); - - try { - jettyServer.start(); - } catch (Exception e) { - throw new CopycatException("Unable to start REST server", e); - } - - log.info("REST server listening at " + jettyServer.getURI() + ", advertising URL " + advertisedUrl()); - } - - public void stop() { - try { - jettyServer.stop(); - jettyServer.join(); - } catch (Exception e) { - throw new CopycatException("Unable to stop REST server", e); - } finally { - jettyServer.destroy(); - } - } - - /** - * Get the URL to advertise to other workers and clients. This uses the default connector from the embedded Jetty - * server, unless overrides for advertised hostname and/or port are provided via configs. - */ - public String advertisedUrl() { - UriBuilder builder = UriBuilder.fromUri(jettyServer.getURI()); - String advertisedHostname = config.getString(WorkerConfig.REST_ADVERTISED_HOST_NAME_CONFIG); - if (advertisedHostname != null && !advertisedHostname.isEmpty()) - builder.host(advertisedHostname); - Integer advertisedPort = config.getInt(WorkerConfig.REST_ADVERTISED_PORT_CONFIG); - if (advertisedPort != null) - builder.port(advertisedPort); - else - builder.port(config.getInt(WorkerConfig.REST_PORT_CONFIG)); - return builder.build().toString(); - } - - - /** - * @param url HTTP connection will be established with this url. - * @param method HTTP method ("GET", "POST", "PUT", etc.) - * @param requestBodyData Object to serialize as JSON and send in the request body. - * @param responseFormat Expected format of the response to the HTTP request. - * @param <T> The type of the deserialized response to the HTTP request. - * @return The deserialized response to the HTTP request, or null if no data is expected. - */ - public static <T> HttpResponse<T> httpRequest(String url, String method, Object requestBodyData, - TypeReference<T> responseFormat) { - HttpURLConnection connection = null; - try { - String serializedBody = requestBodyData == null ? null : JSON_SERDE.writeValueAsString(requestBodyData); - log.debug("Sending {} with input {} to {}", method, serializedBody, url); - - connection = (HttpURLConnection) new URL(url).openConnection(); - connection.setRequestMethod(method); - - connection.setRequestProperty("User-Agent", "kafka-copycat"); - connection.setRequestProperty("Accept", "application/json"); - - // connection.getResponseCode() implicitly calls getInputStream, so always set to true. - // On the other hand, leaving this out breaks nothing. - connection.setDoInput(true); - - connection.setUseCaches(false); - - if (requestBodyData != null) { - connection.setRequestProperty("Content-Type", "application/json"); - connection.setDoOutput(true); - - OutputStream os = connection.getOutputStream(); - os.write(serializedBody.getBytes()); - os.flush(); - os.close(); - } - - int responseCode = connection.getResponseCode(); - if (responseCode == HttpURLConnection.HTTP_NO_CONTENT) { - return new HttpResponse<>(responseCode, connection.getHeaderFields(), null); - } else if (responseCode >= 400) { - InputStream es = connection.getErrorStream(); - ErrorMessage errorMessage = JSON_SERDE.readValue(es, ErrorMessage.class); - es.close(); - throw new CopycatRestException(responseCode, errorMessage.errorCode(), errorMessage.message()); - } else if (responseCode >= 200 && responseCode < 300) { - InputStream is = connection.getInputStream(); - T result = JSON_SERDE.readValue(is, responseFormat); - is.close(); - return new HttpResponse<>(responseCode, connection.getHeaderFields(), result); - } else { - throw new CopycatRestException(Response.Status.INTERNAL_SERVER_ERROR, - Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), - "Unexpected status code when handling forwarded request: " + responseCode); - } - } catch (IOException e) { - log.error("IO error forwarding REST request: ", e); - throw new CopycatRestException(Response.Status.INTERNAL_SERVER_ERROR, "IO Error trying to forward REST request: " + e.getMessage(), e); - } finally { - if (connection != null) - connection.disconnect(); - } - } - - public static class HttpResponse<T> { - private int status; - private Map<String, List<String>> headers; - private T body; - - public HttpResponse(int status, Map<String, List<String>> headers, T body) { - this.status = status; - this.headers = headers; - this.body = body; - } - - public int status() { - return status; - } - - public Map<String, List<String>> headers() { - return headers; - } - - public T body() { - return body; - } - } - - public static String urlJoin(String base, String path) { - if (base.endsWith("/") && path.startsWith("/")) - return base + path.substring(1); - else - return base + path; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/rest/entities/ConnectorInfo.java ---------------------------------------------------------------------- diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/rest/entities/ConnectorInfo.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/rest/entities/ConnectorInfo.java deleted file mode 100644 index 2b047d3..0000000 --- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/rest/entities/ConnectorInfo.java +++ /dev/null @@ -1,81 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - **/ - -package org.apache.kafka.copycat.runtime.rest.entities; - -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; -import org.apache.kafka.copycat.util.ConnectorTaskId; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.Map; -import java.util.Objects; - -public class ConnectorInfo { - - private final String name; - private final Map<String, String> config; - private final List<ConnectorTaskId> tasks; - - @JsonCreator - public ConnectorInfo(@JsonProperty("name") String name, @JsonProperty("config") Map<String, String> config, - @JsonProperty("tasks") List<ConnectorTaskId> tasks) { - this.name = name; - this.config = config; - this.tasks = tasks; - } - - @JsonProperty - public String name() { - return name; - } - - @JsonProperty - public Map<String, String> config() { - return config; - } - - @JsonProperty - public List<ConnectorTaskId> tasks() { - return tasks; - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - ConnectorInfo that = (ConnectorInfo) o; - return Objects.equals(name, that.name) && - Objects.equals(config, that.config) && - Objects.equals(tasks, that.tasks); - } - - @Override - public int hashCode() { - return Objects.hash(name, config, tasks); - } - - - private static List<ConnectorTaskId> jsonTasks(Collection<org.apache.kafka.copycat.util.ConnectorTaskId> tasks) { - List<ConnectorTaskId> jsonTasks = new ArrayList<>(); - for (ConnectorTaskId task : tasks) - jsonTasks.add(task); - return jsonTasks; - } -}