http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerder.java
----------------------------------------------------------------------
diff --git 
a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerder.java
 
b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerder.java
deleted file mode 100644
index 96de1ca..0000000
--- 
a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerder.java
+++ /dev/null
@@ -1,920 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.kafka.copycat.runtime.distributed;
-
-import org.apache.kafka.common.errors.WakeupException;
-import org.apache.kafka.common.config.ConfigException;
-import org.apache.kafka.common.utils.SystemTime;
-import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.copycat.connector.ConnectorContext;
-import org.apache.kafka.copycat.errors.AlreadyExistsException;
-import org.apache.kafka.copycat.errors.CopycatException;
-import org.apache.kafka.copycat.errors.NotFoundException;
-import org.apache.kafka.copycat.runtime.ConnectorConfig;
-import org.apache.kafka.copycat.runtime.Herder;
-import org.apache.kafka.copycat.runtime.HerderConnectorContext;
-import org.apache.kafka.copycat.runtime.TaskConfig;
-import org.apache.kafka.copycat.runtime.Worker;
-import org.apache.kafka.copycat.runtime.rest.RestServer;
-import org.apache.kafka.copycat.runtime.rest.entities.ConnectorInfo;
-import org.apache.kafka.copycat.runtime.rest.entities.TaskInfo;
-import org.apache.kafka.copycat.sink.SinkConnector;
-import org.apache.kafka.copycat.storage.KafkaConfigStorage;
-import org.apache.kafka.copycat.util.Callback;
-import org.apache.kafka.copycat.util.ConnectorTaskId;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.PriorityQueue;
-import java.util.Queue;
-import java.util.Set;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-/**
- * <p>
- *     Distributed "herder" that coordinates with other workers to spread work 
across multiple processes.
- * </p>
- * <p>
- *     Under the hood, this is implemented as a group managed by Kafka's group 
membership facilities (i.e. the generalized
- *     group/consumer coordinator). Each instance of DistributedHerder joins 
the group and indicates what it's current
- *     configuration state is (where it is in the configuration log). The 
group coordinator selects one member to take
- *     this information and assign each instance a subset of the active 
connectors & tasks to execute. This assignment
- *     is currently performed in a simple round-robin fashion, but this is not 
guaranteed -- the herder may also choose
- *     to, e.g., use a sticky assignment to avoid the usual start/stop costs 
associated with connectors and tasks. Once
- *     an assignment is received, the DistributedHerder simply runs its 
assigned connectors and tasks in a Worker.
- * </p>
- * <p>
- *     In addition to distributing work, the DistributedHerder uses the leader 
determined during the work assignment
- *     to select a leader for this generation of the group who is responsible 
for other tasks that can only be performed
- *     by a single node at a time. Most importantly, this includes writing 
updated configurations for connectors and tasks,
- *     (and therefore, also for creating, destroy, and scaling up/down 
connectors).
- * </p>
- */
-public class DistributedHerder implements Herder, Runnable {
-    private static final Logger log = 
LoggerFactory.getLogger(DistributedHerder.class);
-
-    private static final long RECONFIGURE_CONNECTOR_TASKS_BACKOFF_MS = 250;
-
-    private final Worker worker;
-    private final KafkaConfigStorage configStorage;
-    private ClusterConfigState configState;
-    private final Time time;
-
-    private final int workerSyncTimeoutMs;
-    private final int workerUnsyncBackoffMs;
-
-    private final WorkerGroupMember member;
-    private final AtomicBoolean stopping;
-    private final CountDownLatch stopLatch = new CountDownLatch(1);
-
-    // Track enough information about the current membership state to be able 
to determine which requests via the API
-    // and the from other nodes are safe to process
-    private boolean rebalanceResolved;
-    private CopycatProtocol.Assignment assignment;
-
-    // To handle most external requests, like creating or destroying a 
connector, we can use a generic request where
-    // the caller specifies all the code that should be executed.
-    private final Queue<HerderRequest> requests = new PriorityQueue<>();
-    // Config updates can be collected and applied together when possible. 
Also, we need to take care to rebalance when
-    // needed (e.g. task reconfiguration, which requires everyone to 
coordinate offset commits).
-    private Set<String> connectorConfigUpdates = new HashSet<>();
-    private boolean needsReconfigRebalance;
-
-    private final ExecutorService forwardRequestExecutor;
-
-    public DistributedHerder(DistributedConfig config, Worker worker, String 
restUrl) {
-        this(config, worker, null, null, restUrl, new SystemTime());
-    }
-
-    // public for testing
-    public DistributedHerder(DistributedConfig config, Worker worker, 
KafkaConfigStorage configStorage, WorkerGroupMember member, String restUrl, 
Time time) {
-        this.worker = worker;
-        if (configStorage != null) {
-            // For testing. Assume configuration has already been performed
-            this.configStorage = configStorage;
-        } else {
-            this.configStorage = new 
KafkaConfigStorage(worker.getInternalValueConverter(), 
connectorConfigCallback(), taskConfigCallback());
-            this.configStorage.configure(config.originals());
-        }
-        configState = ClusterConfigState.EMPTY;
-        this.time = time;
-
-        this.workerSyncTimeoutMs = 
config.getInt(DistributedConfig.WORKER_SYNC_TIMEOUT_MS_CONFIG);
-        this.workerUnsyncBackoffMs = 
config.getInt(DistributedConfig.WORKER_UNSYNC_BACKOFF_MS_CONFIG);
-
-        this.member = member != null ? member : new WorkerGroupMember(config, 
restUrl, this.configStorage, rebalanceListener());
-        stopping = new AtomicBoolean(false);
-
-        rebalanceResolved = true; // If we still need to follow up after a 
rebalance occurred, starting up tasks
-        needsReconfigRebalance = false;
-
-        forwardRequestExecutor = Executors.newSingleThreadExecutor();
-    }
-
-    @Override
-    public void start() {
-        Thread thread = new Thread(this, "DistributedHerder");
-        thread.start();
-    }
-
-    public void run() {
-        try {
-            log.info("Herder starting");
-
-            configStorage.start();
-
-            log.info("Herder started");
-
-            while (!stopping.get()) {
-                tick();
-            }
-
-            halt();
-
-            log.info("Herder stopped");
-        } catch (Throwable t) {
-            log.error("Uncaught exception in herder work thread, exiting: ", 
t);
-            stopLatch.countDown();
-            System.exit(1);
-        } finally {
-            stopLatch.countDown();
-        }
-    }
-
-    // public for testing
-    public void tick() {
-        // The main loop does two primary things: 1) drive the group 
membership protocol, responding to rebalance events
-        // as they occur, and 2) handle external requests targeted at the 
leader. All the "real" work of the herder is
-        // performed in this thread, which keeps synchronization 
straightforward at the cost of some operations possibly
-        // blocking up this thread (especially those in callbacks due to 
rebalance events).
-
-        try {
-            member.ensureActive();
-            // Ensure we're in a good state in our group. If not restart and 
everything should be setup to rejoin
-            if (!handleRebalanceCompleted()) return;
-        } catch (WakeupException e) {
-            // May be due to a request from another thread, or might be 
stopping. If the latter, we need to check the
-            // flag immediately. If the former, we need to re-run the 
ensureActive call since we can't handle requests
-            // unless we're in the group.
-            return;
-        }
-
-        // Process any external requests
-        final long now = time.milliseconds();
-        long nextRequestTimeoutMs = Long.MAX_VALUE;
-        while (true) {
-            final HerderRequest next;
-            synchronized (this) {
-                next = requests.peek();
-                if (next == null) {
-                    break;
-                } else if (now >= next.at) {
-                    requests.poll();
-                } else {
-                    nextRequestTimeoutMs = next.at - now;
-                    break;
-                }
-            }
-
-            try {
-                next.action().call();
-                next.callback().onCompletion(null, null);
-            } catch (Throwable t) {
-                next.callback().onCompletion(t, null);
-            }
-        }
-
-        // Process any configuration updates
-        Set<String> connectorConfigUpdatesCopy = null;
-        synchronized (this) {
-            if (needsReconfigRebalance || !connectorConfigUpdates.isEmpty()) {
-                // Connector reconfigs only need local updates since there is 
no coordination between workers required.
-                // However, if connectors were added or removed, work needs to 
be rebalanced since we have more work
-                // items to distribute among workers.
-                ClusterConfigState newConfigState = configStorage.snapshot();
-                if 
(!newConfigState.connectors().equals(configState.connectors()))
-                    needsReconfigRebalance = true;
-                configState = newConfigState;
-                if (needsReconfigRebalance) {
-                    // Task reconfigs require a rebalance. Request the 
rebalance, clean out state, and then restart
-                    // this loop, which will then ensure the rebalance occurs 
without any other requests being
-                    // processed until it completes.
-                    member.requestRejoin();
-                    // Any connector config updates will be addressed during 
the rebalance too
-                    connectorConfigUpdates.clear();
-                    needsReconfigRebalance = false;
-                    return;
-                } else if (!connectorConfigUpdates.isEmpty()) {
-                    // We can't start/stop while locked since starting 
connectors can cause task updates that will
-                    // require writing configs, which in turn make callbacks 
into this class from another thread that
-                    // require acquiring a lock. This leads to deadlock. 
Instead, just copy the info we need and process
-                    // the updates after unlocking.
-                    connectorConfigUpdatesCopy = connectorConfigUpdates;
-                    connectorConfigUpdates = new HashSet<>();
-                }
-            }
-        }
-        if (connectorConfigUpdatesCopy != null) {
-            // If we only have connector config updates, we can just bounce 
the updated connectors that are
-            // currently assigned to this worker.
-            Set<String> localConnectors = assignment == null ? 
Collections.<String>emptySet() : new HashSet<>(assignment.connectors());
-            for (String connectorName : connectorConfigUpdatesCopy) {
-                if (!localConnectors.contains(connectorName))
-                    continue;
-                boolean remains = 
configState.connectors().contains(connectorName);
-                log.info("Handling connector-only config update by {} 
connector {}",
-                        remains ? "restarting" : "stopping", connectorName);
-                worker.stopConnector(connectorName);
-                // The update may be a deletion, so verify we actually need to 
restart the connector
-                if (remains)
-                    startConnector(connectorName);
-            }
-        }
-
-        // Let the group take any actions it needs to
-        try {
-            member.poll(nextRequestTimeoutMs);
-            // Ensure we're in a good state in our group. If not restart and 
everything should be setup to rejoin
-            if (!handleRebalanceCompleted()) return;
-        } catch (WakeupException e) { // FIXME should not be WakeupException
-            // Ignore. Just indicates we need to check the exit flag, for 
requested actions, etc.
-        }
-    }
-
-    // public for testing
-    public void halt() {
-        synchronized (this) {
-            // Clean up any connectors and tasks that are still running.
-            log.info("Stopping connectors and tasks that are still assigned to 
this worker.");
-            for (String connName : new HashSet<>(worker.connectorNames())) {
-                try {
-                    worker.stopConnector(connName);
-                } catch (Throwable t) {
-                    log.error("Failed to shut down connector " + connName, t);
-                }
-            }
-            for (ConnectorTaskId taskId : new HashSet<>(worker.taskIds())) {
-                try {
-                    worker.stopTask(taskId);
-                } catch (Throwable t) {
-                    log.error("Failed to shut down task " + taskId, t);
-                }
-            }
-
-            member.stop();
-
-            // Explicitly fail any outstanding requests so they actually get a 
response and get an understandable reason
-            // for their failure
-            while (!requests.isEmpty()) {
-                HerderRequest request = requests.poll();
-                request.callback().onCompletion(new CopycatException("Worker 
is shutting down"), null);
-            }
-
-            if (configStorage != null)
-                configStorage.stop();
-        }
-    }
-
-    @Override
-    public void stop() {
-        log.info("Herder stopping");
-
-        stopping.set(true);
-        member.wakeup();
-        while (stopLatch.getCount() > 0) {
-            try {
-                stopLatch.await();
-            } catch (InterruptedException e) {
-                // ignore, should not happen
-            }
-        }
-
-
-        forwardRequestExecutor.shutdown();
-        try {
-            if (!forwardRequestExecutor.awaitTermination(10000, 
TimeUnit.MILLISECONDS))
-                forwardRequestExecutor.shutdownNow();
-        } catch (InterruptedException e) {
-            // ignore
-        }
-
-        log.info("Herder stopped");
-    }
-
-    @Override
-    public synchronized void connectors(final Callback<Collection<String>> 
callback) {
-        log.trace("Submitting connector listing request");
-
-        addRequest(
-                new Callable<Void>() {
-                    @Override
-                    public Void call() throws Exception {
-                        if (!checkConfigSynced(callback))
-                            return null;
-
-                        callback.onCompletion(null, configState.connectors());
-                        return null;
-                    }
-                },
-                forwardErrorCallback(callback)
-        );
-    }
-
-    @Override
-    public synchronized void connectorInfo(final String connName, final 
Callback<ConnectorInfo> callback) {
-        log.trace("Submitting connector info request {}", connName);
-
-        addRequest(
-                new Callable<Void>() {
-                    @Override
-                    public Void call() throws Exception {
-                        if (!checkConfigSynced(callback))
-                            return null;
-
-                        if (!configState.connectors().contains(connName)) {
-                            callback.onCompletion(new 
NotFoundException("Connector " + connName + " not found"), null);
-                        } else {
-                            callback.onCompletion(null, new 
ConnectorInfo(connName, configState.connectorConfig(connName), 
configState.tasks(connName)));
-                        }
-                        return null;
-                    }
-                },
-                forwardErrorCallback(callback)
-        );
-    }
-
-    @Override
-    public void connectorConfig(String connName, final Callback<Map<String, 
String>> callback) {
-        // Subset of connectorInfo, so piggy back on that implementation
-        log.trace("Submitting connector config read request {}", connName);
-        connectorInfo(connName, new Callback<ConnectorInfo>() {
-            @Override
-            public void onCompletion(Throwable error, ConnectorInfo result) {
-                if (error != null)
-                    callback.onCompletion(error, null);
-                else
-                    callback.onCompletion(null, result.config());
-            }
-        });
-    }
-
-    @Override
-    public void putConnectorConfig(final String connName, Map<String, String> 
config, final boolean allowReplace,
-                                   final Callback<Created<ConnectorInfo>> 
callback) {
-        final Map<String, String> connConfig;
-        if (config == null) {
-            connConfig = null;
-        } else if (!config.containsKey(ConnectorConfig.NAME_CONFIG)) {
-            connConfig = new HashMap<>(config);
-            connConfig.put(ConnectorConfig.NAME_CONFIG, connName);
-        } else {
-            connConfig = config;
-        }
-
-        log.trace("Submitting connector config write request {}", connName);
-
-        addRequest(
-                new Callable<Void>() {
-                    @Override
-                    public Void call() throws Exception {
-                        log.trace("Handling connector config request {}", 
connName);
-                        if (!isLeader()) {
-                            callback.onCompletion(new NotLeaderException("Only 
the leader can set connector configs.", leaderUrl()), null);
-                            return null;
-                        }
-
-                        boolean exists = 
configState.connectors().contains(connName);
-                        if (!allowReplace && exists) {
-                            callback.onCompletion(new 
AlreadyExistsException("Connector " + connName + " already exists"), null);
-                            return null;
-                        }
-
-                        if (connConfig == null && !exists) {
-                            callback.onCompletion(new 
NotFoundException("Connector " + connName + " not found"), null);
-                            return null;
-                        }
-
-                        log.trace("Submitting connector config {} {} {}", 
connName, allowReplace, configState.connectors());
-                        configStorage.putConnectorConfig(connName, connConfig);
-
-                        boolean created = !exists && connConfig != null;
-                        // Note that we use the updated connector config 
despite the fact that we don't have an updated
-                        // snapshot yet. The existing task info should still 
be accurate.
-                        ConnectorInfo info = connConfig == null ? null :
-                                new ConnectorInfo(connName, connConfig, 
configState.tasks(connName));
-                        callback.onCompletion(null, new Created<>(created, 
info));
-
-                        return null;
-                    }
-                },
-                forwardErrorCallback(callback)
-        );
-    }
-
-    @Override
-    public synchronized void requestTaskReconfiguration(final String connName) 
{
-        log.trace("Submitting connector task reconfiguration request {}", 
connName);
-
-        addRequest(
-                new Callable<Void>() {
-                    @Override
-                    public Void call() throws Exception {
-                        reconfigureConnectorTasksWithRetry(connName);
-                        return null;
-                    }
-                },
-                new Callback<Void>() {
-                    @Override
-                    public void onCompletion(Throwable error, Void result) {
-                        log.error("Unexpected error during task 
reconfiguration: ", error);
-                        log.error("Task reconfiguration for {} failed 
unexpectedly, this connector will not be properly reconfigured unless manually 
triggered.", connName);
-                    }
-                }
-        );
-    }
-
-    @Override
-    public synchronized void taskConfigs(final String connName, final 
Callback<List<TaskInfo>> callback) {
-        log.trace("Submitting get task configuration request {}", connName);
-
-        addRequest(
-                new Callable<Void>() {
-                    @Override
-                    public Void call() throws Exception {
-                        if (!checkConfigSynced(callback))
-                            return null;
-
-                        if (!configState.connectors().contains(connName)) {
-                            callback.onCompletion(new 
NotFoundException("Connector " + connName + " not found"), null);
-                        } else {
-                            List<TaskInfo> result = new ArrayList<>();
-                            for (int i = 0; i < 
configState.taskCount(connName); i++) {
-                                ConnectorTaskId id = new 
ConnectorTaskId(connName, i);
-                                result.add(new TaskInfo(id, 
configState.taskConfig(id)));
-                            }
-                            callback.onCompletion(null, result);
-                        }
-                        return null;
-                    }
-                },
-                forwardErrorCallback(callback)
-        );
-    }
-
-    @Override
-    public synchronized void putTaskConfigs(final String connName, final 
List<Map<String, String>> configs, final Callback<Void> callback) {
-        log.trace("Submitting put task configuration request {}", connName);
-
-        addRequest(
-                new Callable<Void>() {
-                    @Override
-                    public Void call() throws Exception {
-                        if (!isLeader())
-                            callback.onCompletion(new NotLeaderException("Only 
the leader may write task configurations.", leaderUrl()), null);
-                        else if (!configState.connectors().contains(connName))
-                            callback.onCompletion(new 
NotFoundException("Connector " + connName + " not found"), null);
-                        else {
-                            
configStorage.putTaskConfigs(taskConfigListAsMap(connName, configs));
-                            callback.onCompletion(null, null);
-                        }
-                        return null;
-                    }
-                },
-                forwardErrorCallback(callback)
-        );
-    }
-
-
-    // Should only be called from work thread, so synchronization should not 
be needed
-    private boolean isLeader() {
-        return assignment != null && 
member.memberId().equals(assignment.leader());
-    }
-
-    /**
-     * Get the URL for the leader's REST interface, or null if we do not have 
the leader's URL yet.
-     */
-    private String leaderUrl() {
-        if (assignment == null)
-            return null;
-        return assignment.leaderUrl();
-    }
-
-    /**
-     * Handle post-assignment operations, either trying to resolve issues that 
kept assignment from completing, getting
-     * this node into sync and its work started. Since
-     *
-     * @return false if we couldn't finish
-     */
-    private boolean handleRebalanceCompleted() {
-        if (this.rebalanceResolved)
-            return true;
-
-        // We need to handle a variety of cases after a rebalance:
-        // 1. Assignment failed
-        //  1a. We are the leader for the round. We will be leader again if we 
rejoin now, so we need to catch up before
-        //      even attempting to. If we can't we should drop out of the 
group because we will block everyone from making
-        //      progress. We can backoff and try rejoining later.
-        //  1b. We are not the leader. We might need to catch up. If we're 
already caught up we can rejoin immediately,
-        //      otherwise, we just want to wait indefinitely to catch up and 
rejoin whenver we're finally ready.
-        // 2. Assignment succeeded.
-        //  2a. We are caught up on configs. Awesome! We can proceed to run 
our assigned work.
-        //  2b. We need to try to catch up. We can do this potentially 
indefinitely because if it takes to long, we'll
-        //      be kicked out of the group anyway due to lack of heartbeats.
-
-        boolean needsReadToEnd = false;
-        long syncConfigsTimeoutMs = Long.MAX_VALUE;
-        boolean needsRejoin = false;
-        if (assignment.failed()) {
-            needsRejoin = true;
-            if (isLeader()) {
-                log.warn("Join group completed, but assignment failed and we 
are the leader. Reading to end of config and retrying.");
-                needsReadToEnd = true;
-                syncConfigsTimeoutMs = workerSyncTimeoutMs;
-            } else if (configState.offset() < assignment.offset()) {
-                log.warn("Join group completed, but assignment failed and we 
lagging. Reading to end of config and retrying.");
-                needsReadToEnd = true;
-            } else {
-                log.warn("Join group completed, but assignment failed. We were 
up to date, so just retrying.");
-            }
-        } else {
-            if (configState.offset() < assignment.offset()) {
-                log.warn("Catching up to assignment's config offset.");
-                needsReadToEnd = true;
-            }
-        }
-
-        if (needsReadToEnd) {
-            // Force exiting this method to avoid creating any 
connectors/tasks and require immediate rejoining if
-            // we timed out. This should only happen if we were the leader and 
didn't finish quickly enough, in which
-            // case we've waited a long time and should have already left the 
group OR the timeout should have been
-            // very long and not having finished also indicates we've waited 
longer than the session timeout.
-            if (!readConfigToEnd(syncConfigsTimeoutMs))
-                needsRejoin = true;
-        }
-
-        if (needsRejoin) {
-            member.requestRejoin();
-            return false;
-        }
-
-        // Should still validate that they match since we may have gone *past* 
the required offset, in which case we
-        // should *not* start any tasks and rejoin
-        if (configState.offset() != assignment.offset()) {
-            log.info("Current config state offset {} does not match group 
assignment {}. Forcing rebalance.", configState.offset(), assignment.offset());
-            member.requestRejoin();
-            return false;
-        }
-
-        startWork();
-
-        // We only mark this as resolved once we've actually started work, 
which allows us to correctly track whether
-        // what work is currently active and running. If we bail early, the 
main tick loop + having requested rejoin
-        // guarantees we'll attempt to rejoin before executing this method 
again.
-        rebalanceResolved = true;
-        return true;
-    }
-
-    /**
-     * Try to read to the end of the config log within the given timeout
-     * @param timeoutMs maximum time to wait to sync to the end of the log
-     * @return true if successful, false if timed out
-     */
-    private boolean readConfigToEnd(long timeoutMs) {
-        log.info("Current config state offset {} is behind group assignment 
{}, reading to end of config log", configState.offset(), assignment.offset());
-        try {
-            configStorage.readToEnd().get(timeoutMs, TimeUnit.MILLISECONDS);
-            configState = configStorage.snapshot();
-            log.info("Finished reading to end of log and updated config 
snapshot, new config log offset: {}", configState.offset());
-            return true;
-        } catch (TimeoutException e) {
-            log.warn("Didn't reach end of config log quickly enough", e);
-            // TODO: With explicit leave group support, it would be good to 
explicitly leave the group *before* this
-            // backoff since it'll be longer than the session timeout
-            if (isLeader())
-                backoff(workerUnsyncBackoffMs);
-            return false;
-        } catch (InterruptedException | ExecutionException e) {
-            throw new CopycatException("Error trying to catch up after 
assignment", e);
-        }
-    }
-
-    private void backoff(long ms) {
-        Utils.sleep(ms);
-    }
-
-    private void startWork() {
-        // Start assigned connectors and tasks
-        log.info("Starting connectors and tasks using config offset {}", 
assignment.offset());
-        for (String connectorName : assignment.connectors()) {
-            try {
-                startConnector(connectorName);
-            } catch (ConfigException e) {
-                log.error("Couldn't instantiate connector " + connectorName + 
" because it has an invalid connector " +
-                        "configuration. This connector will not execute until 
reconfigured.", e);
-            }
-        }
-        for (ConnectorTaskId taskId : assignment.tasks()) {
-            try {
-                log.info("Starting task {}", taskId);
-                Map<String, String> configs = configState.taskConfig(taskId);
-                TaskConfig taskConfig = new TaskConfig(configs);
-                worker.addTask(taskId, taskConfig);
-            } catch (ConfigException e) {
-                log.error("Couldn't instantiate task " + taskId + " because it 
has an invalid task " +
-                        "configuration. This task will not execute until 
reconfigured.", e);
-            }
-        }
-        log.info("Finished starting connectors and tasks");
-    }
-
-    // Helper for starting a connector with the given name, which will extract 
& parse the config, generate connector
-    // context and add to the worker. This needs to be called from within the 
main worker thread for this herder.
-    private void startConnector(String connectorName) {
-        log.info("Starting connector {}", connectorName);
-        Map<String, String> configs = 
configState.connectorConfig(connectorName);
-        ConnectorConfig connConfig = new ConnectorConfig(configs);
-        String connName = connConfig.getString(ConnectorConfig.NAME_CONFIG);
-        ConnectorContext ctx = new 
HerderConnectorContext(DistributedHerder.this, connName);
-        worker.addConnector(connConfig, ctx);
-
-        // Immediately request configuration since this could be a brand new 
connector. However, also only update those
-        // task configs if they are actually different from the existing ones 
to avoid unnecessary updates when this is
-        // just restoring an existing connector.
-        reconfigureConnectorTasksWithRetry(connName);
-    }
-
-    private void reconfigureConnectorTasksWithRetry(final String connName) {
-        reconfigureConnector(connName, new Callback<Void>() {
-            @Override
-            public void onCompletion(Throwable error, Void result) {
-                // If we encountered an error, we don't have much choice but 
to just retry. If we don't, we could get
-                // stuck with a connector that thinks it has generated tasks, 
but wasn't actually successful and therefore
-                // never makes progress. The retry has to run through a 
HerderRequest since this callback could be happening
-                // from the HTTP request forwarding thread.
-                if (error != null) {
-                    log.error("Failed to reconfigure connector's tasks, 
retrying after backoff:", error);
-                    addRequest(RECONFIGURE_CONNECTOR_TASKS_BACKOFF_MS,
-                            new Callable<Void>() {
-                                @Override
-                                public Void call() throws Exception {
-                                    
reconfigureConnectorTasksWithRetry(connName);
-                                    return null;
-                                }
-                            }, new Callback<Void>() {
-                                @Override
-                                public void onCompletion(Throwable error, Void 
result) {
-                                    log.error("Unexpected error during 
connector task reconfiguration: ", error);
-                                    log.error("Task reconfiguration for {} 
failed unexpectedly, this connector will not be properly reconfigured unless 
manually triggered.", connName);
-                                }
-                            }
-                    );
-                }
-            }
-        });
-    }
-
-    // Updates configurations for a connector by requesting them from the 
connector, filling in parameters provided
-    // by the system, then checks whether any configs have actually changed 
before submitting the new configs to storage
-    private void reconfigureConnector(final String connName, final 
Callback<Void> cb) {
-        try {
-            Map<String, String> configs = 
configState.connectorConfig(connName);
-            ConnectorConfig connConfig = new ConnectorConfig(configs);
-
-            List<String> sinkTopics = null;
-            if 
(SinkConnector.class.isAssignableFrom(connConfig.getClass(ConnectorConfig.CONNECTOR_CLASS_CONFIG)))
-                sinkTopics = connConfig.getList(ConnectorConfig.TOPICS_CONFIG);
-
-            final List<Map<String, String>> taskProps
-                    = worker.connectorTaskConfigs(connName, 
connConfig.getInt(ConnectorConfig.TASKS_MAX_CONFIG), sinkTopics);
-            boolean changed = false;
-            int currentNumTasks = configState.taskCount(connName);
-            if (taskProps.size() != currentNumTasks) {
-                log.debug("Change in connector task count from {} to {}, 
writing updated task configurations", currentNumTasks, taskProps.size());
-                changed = true;
-            } else {
-                int index = 0;
-                for (Map<String, String> taskConfig : taskProps) {
-                    if (!taskConfig.equals(configState.taskConfig(new 
ConnectorTaskId(connName, index)))) {
-                        log.debug("Change in task configurations, writing 
updated task configurations");
-                        changed = true;
-                        break;
-                    }
-                    index++;
-                }
-            }
-            if (changed) {
-                if (isLeader()) {
-                    configStorage.putTaskConfigs(taskConfigListAsMap(connName, 
taskProps));
-                    cb.onCompletion(null, null);
-                } else {
-                    // We cannot forward the request on the same thread 
because this reconfiguration can happen in as a
-                    // result of . If we blocked
-                    forwardRequestExecutor.submit(new Runnable() {
-                        @Override
-                        public void run() {
-                            try {
-                                String reconfigUrl = 
RestServer.urlJoin(leaderUrl(), "/connectors/" + connName + "/tasks");
-                                RestServer.httpRequest(reconfigUrl, "POST", 
taskProps, null);
-                                cb.onCompletion(null, null);
-                            } catch (CopycatException e) {
-                                log.error("Request to leader to reconfigure 
connector tasks failed", e);
-                                cb.onCompletion(e, null);
-                            }
-                        }
-                    });
-                }
-            }
-        } catch (Throwable t) {
-            cb.onCompletion(t, null);
-        }
-    }
-
-    // Common handling for requests that get config data. Checks if we are in 
sync with the current config, which allows
-    // us to answer requests directly. If we are not, handles invoking the 
callback with the appropriate error.
-    private boolean checkConfigSynced(Callback<?> callback) {
-        if (assignment == null || configState.offset() != assignment.offset()) 
{
-            if (!isLeader())
-                callback.onCompletion(new NotLeaderException("Cannot get 
config data because config is not in sync and this is not the leader", 
leaderUrl()), null);
-            else
-                callback.onCompletion(new CopycatException("Cannot get config 
data because this is the leader node, but it does not have the most up to date 
configs"), null);
-            return false;
-        }
-        return true;
-    }
-
-    private void addRequest(Callable<Void> action, Callback<Void> callback) {
-        addRequest(0, action, callback);
-    }
-
-    private void addRequest(long delayMs, Callable<Void> action, 
Callback<Void> callback) {
-        HerderRequest req = new HerderRequest(time.milliseconds() + delayMs, 
action, callback);
-        requests.add(req);
-        if (requests.peek() == req)
-            member.wakeup();
-    }
-
-    private class HerderRequest implements Comparable<HerderRequest> {
-        private final long at;
-        private final Callable<Void> action;
-        private final Callback<Void> callback;
-
-        public HerderRequest(long at, Callable<Void> action, Callback<Void> 
callback) {
-            this.at = at;
-            this.action = action;
-            this.callback = callback;
-        }
-
-        public Callable<Void> action() {
-            return action;
-        }
-
-        public Callback<Void> callback() {
-            return callback;
-        }
-
-        @Override
-        public int compareTo(HerderRequest o) {
-            return Long.compare(at, o.at);
-        }
-    }
-
-    private static final Callback<Void> forwardErrorCallback(final Callback<?> 
callback) {
-        return new Callback<Void>() {
-            @Override
-            public void onCompletion(Throwable error, Void result) {
-                if (error != null)
-                    callback.onCompletion(error, null);
-            }
-        };
-    };
-
-
-    // Config callbacks are triggered from the KafkaConfigStorage thread
-    private Callback<String> connectorConfigCallback() {
-        return new Callback<String>() {
-            @Override
-            public void onCompletion(Throwable error, String connector) {
-                log.info("Connector {} config updated", connector);
-                // Stage the update and wake up the work thread. Connector 
config *changes* only need the one connector
-                // to be bounced. However, this callback may also indicate a 
connector *addition*, which does require
-                // a rebalance, so we need to be careful about what operation 
we request.
-                synchronized (DistributedHerder.this) {
-                    connectorConfigUpdates.add(connector);
-                }
-                member.wakeup();
-            }
-        };
-    }
-
-    private Callback<List<ConnectorTaskId>> taskConfigCallback() {
-        return new Callback<List<ConnectorTaskId>>() {
-            @Override
-            public void onCompletion(Throwable error, List<ConnectorTaskId> 
tasks) {
-                log.info("Tasks {} configs updated", tasks);
-                // Stage the update and wake up the work thread. No need to 
record the set of tasks here because task reconfigs
-                // always need a rebalance to ensure offsets get committed.
-                // TODO: As an optimization, some task config updates could 
avoid a rebalance. In particular, single-task
-                // connectors clearly don't need any coordination.
-                synchronized (DistributedHerder.this) {
-                    needsReconfigRebalance = true;
-                }
-                member.wakeup();
-            }
-        };
-    }
-
-    // Rebalances are triggered internally from the group member, so these are 
always executed in the work thread.
-    private WorkerRebalanceListener rebalanceListener() {
-        return new WorkerRebalanceListener() {
-            @Override
-            public void onAssigned(CopycatProtocol.Assignment assignment) {
-                // This callback just logs the info and saves it. The actual 
response is handled in the main loop, which
-                // ensures the group member's logic for rebalancing can 
complete, potentially long-running steps to
-                // catch up (or backoff if we fail) not executed in a 
callback, and so we'll be able to invoke other
-                // group membership actions (e.g., we may need to explicitly 
leave the group if we cannot handle the
-                // assigned tasks).
-                log.info("Joined group and got assignment: {}", assignment);
-                synchronized (DistributedHerder.this) {
-                    DistributedHerder.this.assignment = assignment;
-                    rebalanceResolved = false;
-                }
-                // We *must* interrupt any poll() call since this could occur 
when the poll starts, and we might then
-                // sleep in the poll() for a long time. Forcing a wakeup 
ensures we'll get to process this event in the
-                // main thread.
-                member.wakeup();
-            }
-
-            @Override
-            public void onRevoked(String leader, Collection<String> 
connectors, Collection<ConnectorTaskId> tasks) {
-                log.info("Rebalance started");
-
-                // Note that since we don't reset the assignment, we we don't 
revoke leadership here. During a rebalance,
-                // it is still important to have a leader that can write 
configs, offsets, etc.
-
-                if (rebalanceResolved) {
-                    // TODO: Parallelize this. We should be able to request 
all connectors and tasks to stop, then wait on all of
-                    // them to finish
-                    // TODO: Technically we don't have to stop connectors at 
all until we know they've really been removed from
-                    // this worker. Instead, we can let them continue to run 
but buffer any update requests (which should be
-                    // rare anyway). This would avoid a steady stream of 
start/stop, which probably also includes lots of
-                    // unnecessary repeated connections to the source/sink 
system.
-                    for (String connectorName : connectors)
-                        worker.stopConnector(connectorName);
-                    // TODO: We need to at least commit task offsets, but if 
we could commit offsets & pause them instead of
-                    // stopping them then state could continue to be reused 
when the task remains on this worker. For example,
-                    // this would avoid having to close a connection and then 
reopen it when the task is assigned back to this
-                    // worker again.
-                    for (ConnectorTaskId taskId : tasks)
-                        worker.stopTask(taskId);
-
-                    log.info("Finished stopping tasks in preparation for 
rebalance");
-                } else {
-                    log.info("Wasn't unable to resume work after last 
rebalance, can skip stopping connectors and tasks");
-                }
-
-            }
-        };
-    }
-
-
-    private static Map<ConnectorTaskId, Map<String, String>> 
taskConfigListAsMap(String connName, List<Map<String, String>> configs) {
-        int index = 0;
-        Map<ConnectorTaskId, Map<String, String>> result = new HashMap<>();
-        for (Map<String, String> taskConfigMap : configs) {
-            ConnectorTaskId taskId = new ConnectorTaskId(connName, index);
-            result.put(taskId, taskConfigMap);
-            index++;
-        }
-        return result;
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/NotLeaderException.java
----------------------------------------------------------------------
diff --git 
a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/NotLeaderException.java
 
b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/NotLeaderException.java
deleted file mode 100644
index 7e6dc67..0000000
--- 
a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/NotLeaderException.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.kafka.copycat.runtime.distributed;
-
-import org.apache.kafka.copycat.errors.CopycatException;
-
-/**
- * Indicates an operation was not permitted because it can only be performed 
on the leader and this worker is not currently
- * the leader.
- */
-public class NotLeaderException extends CopycatException {
-    private final String leaderUrl;
-
-    public NotLeaderException(String msg, String leaderUrl) {
-        super(msg);
-        this.leaderUrl = leaderUrl;
-    }
-
-    public NotLeaderException(String msg, String leaderUrl, Throwable 
throwable) {
-        super(msg, throwable);
-        this.leaderUrl = leaderUrl;
-    }
-
-    public NotLeaderException(String leaderUrl, Throwable throwable) {
-        super(throwable);
-        this.leaderUrl = leaderUrl;
-    }
-
-    public String leaderUrl() {
-        return leaderUrl;
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/WorkerCoordinator.java
----------------------------------------------------------------------
diff --git 
a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/WorkerCoordinator.java
 
b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/WorkerCoordinator.java
deleted file mode 100644
index c748971..0000000
--- 
a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/WorkerCoordinator.java
+++ /dev/null
@@ -1,293 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-package org.apache.kafka.copycat.runtime.distributed;
-
-import org.apache.kafka.clients.consumer.internals.AbstractCoordinator;
-import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
-import org.apache.kafka.common.MetricName;
-import org.apache.kafka.common.metrics.Measurable;
-import org.apache.kafka.common.metrics.MetricConfig;
-import org.apache.kafka.common.metrics.Metrics;
-import org.apache.kafka.common.utils.CircularIterator;
-import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.copycat.storage.KafkaConfigStorage;
-import org.apache.kafka.copycat.util.ConnectorTaskId;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.Closeable;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * This class manages the coordination process with the Kafka group 
coordinator on the broker for managing Copycat assignments to workers.
- */
-public final class WorkerCoordinator extends AbstractCoordinator implements 
Closeable {
-    private static final Logger log = 
LoggerFactory.getLogger(WorkerCoordinator.class);
-
-    // Currently Copycat doesn't support multiple task assignment strategies, 
so we currently just fill in a default value
-    public static final String DEFAULT_SUBPROTOCOL = "default";
-
-    private final String restUrl;
-    private final KafkaConfigStorage configStorage;
-    private CopycatProtocol.Assignment assignmentSnapshot;
-    private final CopycatWorkerCoordinatorMetrics sensors;
-    private ClusterConfigState configSnapshot;
-    private final WorkerRebalanceListener listener;
-
-    private boolean rejoinRequested;
-
-    /**
-     * Initialize the coordination manager.
-     */
-    public WorkerCoordinator(ConsumerNetworkClient client,
-                             String groupId,
-                             int sessionTimeoutMs,
-                             int heartbeatIntervalMs,
-                             Metrics metrics,
-                             String metricGrpPrefix,
-                             Map<String, String> metricTags,
-                             Time time,
-                             long requestTimeoutMs,
-                             long retryBackoffMs,
-                             String restUrl,
-                             KafkaConfigStorage configStorage,
-                             WorkerRebalanceListener listener) {
-        super(client,
-                groupId,
-                sessionTimeoutMs,
-                heartbeatIntervalMs,
-                metrics,
-                metricGrpPrefix,
-                metricTags,
-                time,
-                retryBackoffMs);
-        this.restUrl = restUrl;
-        this.configStorage = configStorage;
-        this.assignmentSnapshot = null;
-        this.sensors = new CopycatWorkerCoordinatorMetrics(metrics, 
metricGrpPrefix, metricTags);
-        this.listener = listener;
-        this.rejoinRequested = false;
-    }
-
-    public void requestRejoin() {
-        rejoinRequested = true;
-    }
-
-    @Override
-    public String protocolType() {
-        return "copycat";
-    }
-
-    @Override
-    public LinkedHashMap<String, ByteBuffer> metadata() {
-        LinkedHashMap<String, ByteBuffer> metadata = new LinkedHashMap<>();
-        configSnapshot = configStorage.snapshot();
-        CopycatProtocol.WorkerState workerState = new 
CopycatProtocol.WorkerState(restUrl, configSnapshot.offset());
-        metadata.put(DEFAULT_SUBPROTOCOL, 
CopycatProtocol.serializeMetadata(workerState));
-        return metadata;
-    }
-
-    @Override
-    protected void onJoinComplete(int generation, String memberId, String 
protocol, ByteBuffer memberAssignment) {
-        assignmentSnapshot = 
CopycatProtocol.deserializeAssignment(memberAssignment);
-        // At this point we always consider ourselves to be a member of the 
cluster, even if there was an assignment
-        // error (the leader couldn't make the assignment) or we are behind 
the config and cannot yet work on our assigned
-        // tasks. It's the responsibility of the code driving this process to 
decide how to react (e.g. trying to get
-        // up to date, try to rejoin again, leaving the group and backing off, 
etc.).
-        rejoinRequested = false;
-        listener.onAssigned(assignmentSnapshot);
-    }
-
-    @Override
-    protected Map<String, ByteBuffer> performAssignment(String leaderId, 
String protocol, Map<String, ByteBuffer> allMemberMetadata) {
-        log.debug("Performing task assignment");
-
-        Map<String, CopycatProtocol.WorkerState> allConfigs = new HashMap<>();
-        for (Map.Entry<String, ByteBuffer> entry : 
allMemberMetadata.entrySet())
-            allConfigs.put(entry.getKey(), 
CopycatProtocol.deserializeMetadata(entry.getValue()));
-
-        long maxOffset = findMaxMemberConfigOffset(allConfigs);
-        Long leaderOffset = ensureLeaderConfig(maxOffset);
-        if (leaderOffset == null)
-            return fillAssignmentsAndSerialize(allConfigs.keySet(), 
CopycatProtocol.Assignment.CONFIG_MISMATCH,
-                    leaderId, allConfigs.get(leaderId).url(), maxOffset,
-                    new HashMap<String, List<String>>(), new HashMap<String, 
List<ConnectorTaskId>>());
-        return performTaskAssignment(leaderId, leaderOffset, allConfigs);
-    }
-
-    private long findMaxMemberConfigOffset(Map<String, 
CopycatProtocol.WorkerState> allConfigs) {
-        // The new config offset is the maximum seen by any member. We always 
perform assignment using this offset,
-        // even if some members have fallen behind. The config offset used to 
generate the assignment is included in
-        // the response so members that have fallen behind will not use the 
assignment until they have caught up.
-        Long maxOffset = null;
-        for (Map.Entry<String, CopycatProtocol.WorkerState> stateEntry : 
allConfigs.entrySet()) {
-            long memberRootOffset = stateEntry.getValue().offset();
-            if (maxOffset == null)
-                maxOffset = memberRootOffset;
-            else
-                maxOffset = Math.max(maxOffset, memberRootOffset);
-        }
-
-        log.debug("Max config offset root: {}, local snapshot config offsets 
root: {}",
-                maxOffset, configSnapshot.offset());
-        return maxOffset;
-    }
-
-    private Long ensureLeaderConfig(long maxOffset) {
-        // If this leader is behind some other members, we can't do assignment
-        if (configSnapshot.offset() < maxOffset) {
-            // We might be able to take a new snapshot to catch up immediately 
and avoid another round of syncing here.
-            // Alternatively, if this node has already passed the maximum 
reported by any other member of the group, it
-            // is also safe to use this newer state.
-            ClusterConfigState updatedSnapshot = configStorage.snapshot();
-            if (updatedSnapshot.offset() < maxOffset) {
-                log.info("Was selected to perform assignments, but do not have 
latest config found in sync request. " +
-                        "Returning an empty configuration to trigger 
re-sync.");
-                return null;
-            } else {
-                configSnapshot = updatedSnapshot;
-                return configSnapshot.offset();
-            }
-        }
-
-        return maxOffset;
-    }
-
-    private Map<String, ByteBuffer> performTaskAssignment(String leaderId, 
long maxOffset, Map<String, CopycatProtocol.WorkerState> allConfigs) {
-        Map<String, List<String>> connectorAssignments = new HashMap<>();
-        Map<String, List<ConnectorTaskId>> taskAssignments = new HashMap<>();
-
-        // Perform round-robin task assignment
-        CircularIterator<String> memberIt = new 
CircularIterator<>(sorted(allConfigs.keySet()));
-        for (String connectorId : sorted(configSnapshot.connectors())) {
-            String connectorAssignedTo = memberIt.next();
-            log.trace("Assigning connector {} to {}", connectorId, 
connectorAssignedTo);
-            List<String> memberConnectors = 
connectorAssignments.get(connectorAssignedTo);
-            if (memberConnectors == null) {
-                memberConnectors = new ArrayList<>();
-                connectorAssignments.put(connectorAssignedTo, 
memberConnectors);
-            }
-            memberConnectors.add(connectorId);
-
-            for (ConnectorTaskId taskId : 
sorted(configSnapshot.tasks(connectorId))) {
-                String taskAssignedTo = memberIt.next();
-                log.trace("Assigning task {} to {}", taskId, taskAssignedTo);
-                List<ConnectorTaskId> memberTasks = 
taskAssignments.get(taskAssignedTo);
-                if (memberTasks == null) {
-                    memberTasks = new ArrayList<>();
-                    taskAssignments.put(taskAssignedTo, memberTasks);
-                }
-                memberTasks.add(taskId);
-            }
-        }
-
-        return fillAssignmentsAndSerialize(allConfigs.keySet(), 
CopycatProtocol.Assignment.NO_ERROR,
-                leaderId, allConfigs.get(leaderId).url(), maxOffset, 
connectorAssignments, taskAssignments);
-    }
-
-    private Map<String, ByteBuffer> 
fillAssignmentsAndSerialize(Collection<String> members,
-                                                                short error,
-                                                                String 
leaderId,
-                                                                String 
leaderUrl,
-                                                                long maxOffset,
-                                                                Map<String, 
List<String>> connectorAssignments,
-                                                                Map<String, 
List<ConnectorTaskId>> taskAssignments) {
-
-        Map<String, ByteBuffer> groupAssignment = new HashMap<>();
-        for (String member : members) {
-            List<String> connectors = connectorAssignments.get(member);
-            if (connectors == null)
-                connectors = Collections.emptyList();
-            List<ConnectorTaskId> tasks = taskAssignments.get(member);
-            if (tasks == null)
-                tasks = Collections.emptyList();
-            CopycatProtocol.Assignment assignment = new 
CopycatProtocol.Assignment(error, leaderId, leaderUrl, maxOffset, connectors, 
tasks);
-            log.debug("Assignment: {} -> {}", member, assignment);
-            groupAssignment.put(member, 
CopycatProtocol.serializeAssignment(assignment));
-        }
-        log.debug("Finished assignment");
-        return groupAssignment;
-    }
-
-    @Override
-    protected void onJoinPrepare(int generation, String memberId) {
-        log.debug("Revoking previous assignment {}", assignmentSnapshot);
-        if (assignmentSnapshot != null && !assignmentSnapshot.failed())
-            listener.onRevoked(assignmentSnapshot.leader(), 
assignmentSnapshot.connectors(), assignmentSnapshot.tasks());
-    }
-
-    @Override
-    public boolean needRejoin() {
-        return super.needRejoin() || (assignmentSnapshot == null || 
assignmentSnapshot.failed()) || rejoinRequested;
-    }
-
-    public String memberId() {
-        return this.memberId;
-    }
-
-    @Override
-    public void close() {
-        super.close();
-    }
-
-    private class CopycatWorkerCoordinatorMetrics {
-        public final Metrics metrics;
-        public final String metricGrpName;
-
-        public CopycatWorkerCoordinatorMetrics(Metrics metrics, String 
metricGrpPrefix, Map<String, String> tags) {
-            this.metrics = metrics;
-            this.metricGrpName = metricGrpPrefix + "-coordinator-metrics";
-
-            Measurable numConnectors = new Measurable() {
-                public double measure(MetricConfig config, long now) {
-                    return assignmentSnapshot.connectors().size();
-                }
-            };
-
-            Measurable numTasks = new Measurable() {
-                public double measure(MetricConfig config, long now) {
-                    return assignmentSnapshot.tasks().size();
-                }
-            };
-
-            metrics.addMetric(new MetricName("assigned-connectors",
-                            this.metricGrpName,
-                            "The number of connector instances currently 
assigned to this consumer",
-                            tags),
-                    numConnectors);
-            metrics.addMetric(new MetricName("assigned-tasks",
-                            this.metricGrpName,
-                            "The number of tasks currently assigned to this 
consumer",
-                            tags),
-                    numTasks);
-        }
-    }
-
-    private static <T extends Comparable<T>> List<T> sorted(Collection<T> 
members) {
-        List<T> res = new ArrayList<>(members);
-        Collections.sort(res);
-        return res;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/WorkerGroupMember.java
----------------------------------------------------------------------
diff --git 
a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/WorkerGroupMember.java
 
b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/WorkerGroupMember.java
deleted file mode 100644
index 908fe59..0000000
--- 
a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/WorkerGroupMember.java
+++ /dev/null
@@ -1,185 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.kafka.copycat.runtime.distributed;
-
-import org.apache.kafka.clients.ClientUtils;
-import org.apache.kafka.clients.CommonClientConfigs;
-import org.apache.kafka.clients.Metadata;
-import org.apache.kafka.clients.NetworkClient;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
-import org.apache.kafka.common.Cluster;
-import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.common.metrics.JmxReporter;
-import org.apache.kafka.common.metrics.MetricConfig;
-import org.apache.kafka.common.metrics.Metrics;
-import org.apache.kafka.common.metrics.MetricsReporter;
-import org.apache.kafka.common.network.ChannelBuilder;
-import org.apache.kafka.common.network.Selector;
-import org.apache.kafka.common.utils.AppInfoParser;
-import org.apache.kafka.common.utils.SystemTime;
-import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.copycat.storage.KafkaConfigStorage;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.net.InetSocketAddress;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
-
-/**
- * This class manages the coordination process with brokers for the copycat 
cluster group membership. It ties together
- * the Coordinator, which implements the group member protocol, with all the 
other pieces needed to drive the connection
- * to the group coordinator broker. This isolates all the networking to a 
single thread managed by this class, with
- * higher level operations in response to group membership events being 
handled by the herder.
- */
-public class WorkerGroupMember {
-    private static final Logger log = 
LoggerFactory.getLogger(WorkerGroupMember.class);
-
-    private static final AtomicInteger COPYCAT_CLIENT_ID_SEQUENCE = new 
AtomicInteger(1);
-    private static final String JMX_PREFIX = "kafka.copycat";
-
-    private final Time time;
-    private final String clientId;
-    private final ConsumerNetworkClient client;
-    private final Metrics metrics;
-    private final Metadata metadata;
-    private final long retryBackoffMs;
-    private final WorkerCoordinator coordinator;
-
-    private boolean stopped = false;
-
-    public WorkerGroupMember(DistributedConfig config, String restUrl, 
KafkaConfigStorage configStorage, WorkerRebalanceListener listener) {
-        try {
-            this.time = new SystemTime();
-
-            MetricConfig metricConfig = new 
MetricConfig().samples(config.getInt(CommonClientConfigs.METRICS_NUM_SAMPLES_CONFIG))
-                    
.timeWindow(config.getLong(CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_CONFIG),
 TimeUnit.MILLISECONDS);
-            String clientIdConfig = 
config.getString(CommonClientConfigs.CLIENT_ID_CONFIG);
-            clientId = clientIdConfig.length() <= 0 ? "copycat-" + 
COPYCAT_CLIENT_ID_SEQUENCE.getAndIncrement() : clientIdConfig;
-            List<MetricsReporter> reporters = 
config.getConfiguredInstances(CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG,
 MetricsReporter.class);
-            reporters.add(new JmxReporter(JMX_PREFIX));
-            this.metrics = new Metrics(metricConfig, reporters, time);
-            this.retryBackoffMs = 
config.getLong(CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG);
-            this.metadata = new Metadata(retryBackoffMs, 
config.getLong(CommonClientConfigs.METADATA_MAX_AGE_CONFIG));
-            List<InetSocketAddress> addresses = 
ClientUtils.parseAndValidateAddresses(config.getList(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG));
-            this.metadata.update(Cluster.bootstrap(addresses), 0);
-            String metricGrpPrefix = "copycat";
-            Map<String, String> metricsTags = new LinkedHashMap<>();
-            metricsTags.put("client-id", clientId);
-            ChannelBuilder channelBuilder = 
ClientUtils.createChannelBuilder(config.values());
-            NetworkClient netClient = new NetworkClient(
-                    new 
Selector(config.getLong(CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_CONFIG), 
metrics, time, metricGrpPrefix, metricsTags, channelBuilder),
-                    this.metadata,
-                    clientId,
-                    100, // a fixed large enough value will suffice
-                    
config.getLong(CommonClientConfigs.RECONNECT_BACKOFF_MS_CONFIG),
-                    config.getInt(CommonClientConfigs.SEND_BUFFER_CONFIG),
-                    config.getInt(CommonClientConfigs.RECEIVE_BUFFER_CONFIG),
-                    
config.getInt(CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG), time);
-            this.client = new ConsumerNetworkClient(netClient, metadata, time, 
retryBackoffMs);
-            this.coordinator = new WorkerCoordinator(this.client,
-                    config.getString(DistributedConfig.GROUP_ID_CONFIG),
-                    config.getInt(DistributedConfig.SESSION_TIMEOUT_MS_CONFIG),
-                    
config.getInt(DistributedConfig.HEARTBEAT_INTERVAL_MS_CONFIG),
-                    metrics,
-                    metricGrpPrefix,
-                    metricsTags,
-                    this.time,
-                    config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG),
-                    retryBackoffMs,
-                    restUrl,
-                    configStorage,
-                    listener);
-
-            AppInfoParser.registerAppInfo(JMX_PREFIX, clientId);
-            log.debug("Copycat group member created");
-        } catch (Throwable t) {
-            // call close methods if internal objects are already constructed
-            // this is to prevent resource leak. see KAFKA-2121
-            stop(true);
-            // now propagate the exception
-            throw new KafkaException("Failed to construct kafka consumer", t);
-        }
-    }
-
-    public void stop() {
-        if (stopped) return;
-        stop(false);
-    }
-
-    public void ensureActive() {
-        coordinator.ensureCoordinatorKnown();
-        coordinator.ensureActiveGroup();
-    }
-
-    public void poll(long timeout) {
-        if (timeout < 0)
-            throw new IllegalArgumentException("Timeout must not be negative");
-
-        // poll for new data until the timeout expires
-        long remaining = timeout;
-        while (remaining >= 0) {
-            long start = time.milliseconds();
-            coordinator.ensureCoordinatorKnown();
-            coordinator.ensureActiveGroup();
-            client.poll(remaining);
-            remaining -= time.milliseconds() - start;
-        }
-    }
-
-    /**
-     * Interrupt any running poll() calls, causing a WakeupException to be 
thrown in the thread invoking that method.
-     */
-    public void wakeup() {
-        this.client.wakeup();
-    }
-
-    /**
-     * Get the member ID of this worker in the group of workers.
-     *
-     * This ID is the unique member ID automatically generated.
-     *
-     * @return the member ID
-     */
-    public String memberId() {
-        return coordinator.memberId();
-    }
-
-    public void requestRejoin() {
-        coordinator.requestRejoin();
-    }
-
-    private void stop(boolean swallowException) {
-        log.trace("Stopping the Copycat group member.");
-        AtomicReference<Throwable> firstException = new 
AtomicReference<Throwable>();
-        this.stopped = true;
-        ClientUtils.closeQuietly(coordinator, "coordinator", firstException);
-        ClientUtils.closeQuietly(metrics, "consumer metrics", firstException);
-        ClientUtils.closeQuietly(client, "consumer network client", 
firstException);
-        AppInfoParser.unregisterAppInfo(JMX_PREFIX, clientId);
-        if (firstException.get() != null && !swallowException)
-            throw new KafkaException("Failed to stop the Copycat group 
member", firstException.get());
-        else
-            log.debug("The Copycat group member has stopped.");
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/WorkerRebalanceListener.java
----------------------------------------------------------------------
diff --git 
a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/WorkerRebalanceListener.java
 
b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/WorkerRebalanceListener.java
deleted file mode 100644
index c9d2ed2..0000000
--- 
a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/WorkerRebalanceListener.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.kafka.copycat.runtime.distributed;
-
-import org.apache.kafka.copycat.util.ConnectorTaskId;
-
-import java.util.Collection;
-
-/**
- * Listener for rebalance events in the worker group.
- */
-public interface WorkerRebalanceListener {
-    /**
-     * Invoked when a new assignment is created by joining the Copycat worker 
group. This is invoked for both successful
-     * and unsuccessful assignments.
-     */
-    void onAssigned(CopycatProtocol.Assignment assignment);
-
-    /**
-     * Invoked when a rebalance operation starts, revoking ownership for the 
set of connectors and tasks.
-     */
-    void onRevoked(String leader, Collection<String> connectors, 
Collection<ConnectorTaskId> tasks);
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/rest/RestServer.java
----------------------------------------------------------------------
diff --git 
a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/rest/RestServer.java
 
b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/rest/RestServer.java
deleted file mode 100644
index 5da747d..0000000
--- 
a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/rest/RestServer.java
+++ /dev/null
@@ -1,258 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.kafka.copycat.runtime.rest;
-
-import com.fasterxml.jackson.core.type.TypeReference;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
-import org.apache.kafka.copycat.errors.CopycatException;
-import org.apache.kafka.copycat.runtime.Herder;
-import org.apache.kafka.copycat.runtime.WorkerConfig;
-import org.apache.kafka.copycat.runtime.rest.entities.ErrorMessage;
-import org.apache.kafka.copycat.runtime.rest.errors.CopycatExceptionMapper;
-import org.apache.kafka.copycat.runtime.rest.errors.CopycatRestException;
-import org.apache.kafka.copycat.runtime.rest.resources.ConnectorsResource;
-import org.apache.kafka.copycat.runtime.rest.resources.RootResource;
-import org.eclipse.jetty.server.Connector;
-import org.eclipse.jetty.server.Handler;
-import org.eclipse.jetty.server.Server;
-import org.eclipse.jetty.server.ServerConnector;
-import org.eclipse.jetty.server.Slf4jRequestLog;
-import org.eclipse.jetty.server.handler.DefaultHandler;
-import org.eclipse.jetty.server.handler.HandlerCollection;
-import org.eclipse.jetty.server.handler.RequestLogHandler;
-import org.eclipse.jetty.server.handler.StatisticsHandler;
-import org.eclipse.jetty.servlet.ServletContextHandler;
-import org.eclipse.jetty.servlet.ServletHolder;
-import org.glassfish.jersey.server.ResourceConfig;
-import org.glassfish.jersey.servlet.ServletContainer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.ws.rs.core.Response;
-import javax.ws.rs.core.UriBuilder;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.net.HttpURLConnection;
-import java.net.URL;
-import java.util.List;
-import java.util.Map;
-
-/**
- * Embedded server for the REST API that provides the control plane for 
Copycat workers.
- */
-public class RestServer {
-    private static final Logger log = 
LoggerFactory.getLogger(RestServer.class);
-
-    private static final long GRACEFUL_SHUTDOWN_TIMEOUT_MS = 60 * 1000;
-
-    private static final ObjectMapper JSON_SERDE = new ObjectMapper();
-
-    private final WorkerConfig config;
-    private Herder herder;
-    private Server jettyServer;
-
-    /**
-     * Create a REST server for this herder using the specified configs.
-     */
-    public RestServer(WorkerConfig config) {
-        this.config = config;
-
-        // To make the advertised port available immediately, we need to do 
some configuration here
-        String hostname = config.getString(WorkerConfig.REST_HOST_NAME_CONFIG);
-        Integer port = config.getInt(WorkerConfig.REST_PORT_CONFIG);
-
-        jettyServer = new Server();
-
-        ServerConnector connector = new ServerConnector(jettyServer);
-        if (hostname != null && !hostname.isEmpty())
-            connector.setHost(hostname);
-        connector.setPort(port);
-        jettyServer.setConnectors(new Connector[]{connector});
-    }
-
-    public void start(Herder herder) {
-        log.info("Starting REST server");
-
-        this.herder = herder;
-
-        ResourceConfig resourceConfig = new ResourceConfig();
-        resourceConfig.register(new JacksonJsonProvider());
-
-        resourceConfig.register(RootResource.class);
-        resourceConfig.register(new ConnectorsResource(herder));
-
-        resourceConfig.register(CopycatExceptionMapper.class);
-
-        ServletContainer servletContainer = new 
ServletContainer(resourceConfig);
-        ServletHolder servletHolder = new ServletHolder(servletContainer);
-
-        ServletContextHandler context = new 
ServletContextHandler(ServletContextHandler.SESSIONS);
-        context.setContextPath("/");
-        context.addServlet(servletHolder, "/*");
-
-        RequestLogHandler requestLogHandler = new RequestLogHandler();
-        Slf4jRequestLog requestLog = new Slf4jRequestLog();
-        requestLog.setLoggerName(RestServer.class.getCanonicalName());
-        requestLog.setLogLatency(true);
-        requestLogHandler.setRequestLog(requestLog);
-
-        HandlerCollection handlers = new HandlerCollection();
-        handlers.setHandlers(new Handler[]{context, new DefaultHandler(), 
requestLogHandler});
-
-        /* Needed for graceful shutdown as per `setStopTimeout` documentation 
*/
-        StatisticsHandler statsHandler = new StatisticsHandler();
-        statsHandler.setHandler(handlers);
-        jettyServer.setHandler(statsHandler);
-        jettyServer.setStopTimeout(GRACEFUL_SHUTDOWN_TIMEOUT_MS);
-        jettyServer.setStopAtShutdown(true);
-
-        try {
-            jettyServer.start();
-        } catch (Exception e) {
-            throw new CopycatException("Unable to start REST server", e);
-        }
-
-        log.info("REST server listening at " + jettyServer.getURI() + ", 
advertising URL " + advertisedUrl());
-    }
-
-    public void stop() {
-        try {
-            jettyServer.stop();
-            jettyServer.join();
-        } catch (Exception e) {
-            throw new CopycatException("Unable to stop REST server", e);
-        } finally {
-            jettyServer.destroy();
-        }
-    }
-
-    /**
-     * Get the URL to advertise to other workers and clients. This uses the 
default connector from the embedded Jetty
-     * server, unless overrides for advertised hostname and/or port are 
provided via configs.
-     */
-    public String advertisedUrl() {
-        UriBuilder builder = UriBuilder.fromUri(jettyServer.getURI());
-        String advertisedHostname = 
config.getString(WorkerConfig.REST_ADVERTISED_HOST_NAME_CONFIG);
-        if (advertisedHostname != null && !advertisedHostname.isEmpty())
-            builder.host(advertisedHostname);
-        Integer advertisedPort = 
config.getInt(WorkerConfig.REST_ADVERTISED_PORT_CONFIG);
-        if (advertisedPort != null)
-            builder.port(advertisedPort);
-        else
-            builder.port(config.getInt(WorkerConfig.REST_PORT_CONFIG));
-        return builder.build().toString();
-    }
-
-
-    /**
-     * @param url               HTTP connection will be established with this 
url.
-     * @param method            HTTP method ("GET", "POST", "PUT", etc.)
-     * @param requestBodyData   Object to serialize as JSON and send in the 
request body.
-     * @param responseFormat    Expected format of the response to the HTTP 
request.
-     * @param <T>               The type of the deserialized response to the 
HTTP request.
-     * @return The deserialized response to the HTTP request, or null if no 
data is expected.
-     */
-    public static <T> HttpResponse<T> httpRequest(String url, String method, 
Object requestBodyData,
-                                    TypeReference<T> responseFormat) {
-        HttpURLConnection connection = null;
-        try {
-            String serializedBody = requestBodyData == null ? null : 
JSON_SERDE.writeValueAsString(requestBodyData);
-            log.debug("Sending {} with input {} to {}", method, 
serializedBody, url);
-
-            connection = (HttpURLConnection) new URL(url).openConnection();
-            connection.setRequestMethod(method);
-
-            connection.setRequestProperty("User-Agent", "kafka-copycat");
-            connection.setRequestProperty("Accept", "application/json");
-
-            // connection.getResponseCode() implicitly calls getInputStream, 
so always set to true.
-            // On the other hand, leaving this out breaks nothing.
-            connection.setDoInput(true);
-
-            connection.setUseCaches(false);
-
-            if (requestBodyData != null) {
-                connection.setRequestProperty("Content-Type", 
"application/json");
-                connection.setDoOutput(true);
-
-                OutputStream os = connection.getOutputStream();
-                os.write(serializedBody.getBytes());
-                os.flush();
-                os.close();
-            }
-
-            int responseCode = connection.getResponseCode();
-            if (responseCode == HttpURLConnection.HTTP_NO_CONTENT) {
-                return new HttpResponse<>(responseCode, 
connection.getHeaderFields(), null);
-            } else if (responseCode >= 400) {
-                InputStream es = connection.getErrorStream();
-                ErrorMessage errorMessage = JSON_SERDE.readValue(es, 
ErrorMessage.class);
-                es.close();
-                throw new CopycatRestException(responseCode, 
errorMessage.errorCode(), errorMessage.message());
-            } else if (responseCode >= 200 && responseCode < 300) {
-                InputStream is = connection.getInputStream();
-                T result = JSON_SERDE.readValue(is, responseFormat);
-                is.close();
-                return new HttpResponse<>(responseCode, 
connection.getHeaderFields(), result);
-            } else {
-                throw new 
CopycatRestException(Response.Status.INTERNAL_SERVER_ERROR,
-                        Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(),
-                        "Unexpected status code when handling forwarded 
request: " + responseCode);
-            }
-        } catch (IOException e) {
-            log.error("IO error forwarding REST request: ", e);
-            throw new 
CopycatRestException(Response.Status.INTERNAL_SERVER_ERROR, "IO Error trying to 
forward REST request: " + e.getMessage(), e);
-        } finally {
-            if (connection != null)
-                connection.disconnect();
-        }
-    }
-
-    public static class HttpResponse<T> {
-        private int status;
-        private Map<String, List<String>> headers;
-        private T body;
-
-        public HttpResponse(int status, Map<String, List<String>> headers, T 
body) {
-            this.status = status;
-            this.headers = headers;
-            this.body = body;
-        }
-
-        public int status() {
-            return status;
-        }
-
-        public Map<String, List<String>> headers() {
-            return headers;
-        }
-
-        public T body() {
-            return body;
-        }
-    }
-
-    public static String urlJoin(String base, String path) {
-        if (base.endsWith("/") && path.startsWith("/"))
-            return base + path.substring(1);
-        else
-            return base + path;
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/rest/entities/ConnectorInfo.java
----------------------------------------------------------------------
diff --git 
a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/rest/entities/ConnectorInfo.java
 
b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/rest/entities/ConnectorInfo.java
deleted file mode 100644
index 2b047d3..0000000
--- 
a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/rest/entities/ConnectorInfo.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.kafka.copycat.runtime.rest.entities;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.kafka.copycat.util.ConnectorTaskId;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-
-public class ConnectorInfo {
-
-    private final String name;
-    private final Map<String, String> config;
-    private final List<ConnectorTaskId> tasks;
-
-    @JsonCreator
-    public ConnectorInfo(@JsonProperty("name") String name, 
@JsonProperty("config") Map<String, String> config,
-                         @JsonProperty("tasks") List<ConnectorTaskId> tasks) {
-        this.name = name;
-        this.config = config;
-        this.tasks = tasks;
-    }
-
-    @JsonProperty
-    public String name() {
-        return name;
-    }
-
-    @JsonProperty
-    public Map<String, String> config() {
-        return config;
-    }
-
-    @JsonProperty
-    public List<ConnectorTaskId> tasks() {
-        return tasks;
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (this == o) return true;
-        if (o == null || getClass() != o.getClass()) return false;
-        ConnectorInfo that = (ConnectorInfo) o;
-        return Objects.equals(name, that.name) &&
-                Objects.equals(config, that.config) &&
-                Objects.equals(tasks, that.tasks);
-    }
-
-    @Override
-    public int hashCode() {
-        return Objects.hash(name, config, tasks);
-    }
-
-
-    private static List<ConnectorTaskId> 
jsonTasks(Collection<org.apache.kafka.copycat.util.ConnectorTaskId> tasks) {
-        List<ConnectorTaskId> jsonTasks = new ArrayList<>();
-        for (ConnectorTaskId task : tasks)
-            jsonTasks.add(task);
-        return jsonTasks;
-    }
-}

Reply via email to