Repository: kafka
Updated Branches:
  refs/heads/trunk 21443f214 -> 2e6177359


http://git-wip-us.apache.org/repos/asf/kafka/blob/2e617735/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerderTest.java
----------------------------------------------------------------------
diff --git 
a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerderTest.java
 
b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerderTest.java
index 0463b85..1213656 100644
--- 
a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerderTest.java
+++ 
b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerderTest.java
@@ -17,20 +17,19 @@
 
 package org.apache.kafka.copycat.runtime.distributed;
 
-import org.apache.kafka.copycat.connector.Connector;
-import org.apache.kafka.copycat.connector.Task;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.copycat.connector.ConnectorContext;
 import org.apache.kafka.copycat.runtime.ConnectorConfig;
-import org.apache.kafka.copycat.runtime.HerderConnectorContext;
+import org.apache.kafka.copycat.runtime.TaskConfig;
 import org.apache.kafka.copycat.runtime.Worker;
-import org.apache.kafka.copycat.sink.SinkConnector;
-import org.apache.kafka.copycat.sink.SinkTask;
 import org.apache.kafka.copycat.source.SourceConnector;
 import org.apache.kafka.copycat.source.SourceTask;
 import org.apache.kafka.copycat.storage.KafkaConfigStorage;
 import org.apache.kafka.copycat.util.Callback;
 import org.apache.kafka.copycat.util.ConnectorTaskId;
-import org.apache.kafka.copycat.util.FutureCallback;
+import org.apache.kafka.copycat.util.TestFuture;
 import org.easymock.EasyMock;
+import org.easymock.IAnswer;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -39,251 +38,354 @@ import org.powermock.api.easymock.annotation.Mock;
 import org.powermock.core.classloader.annotations.PowerMockIgnore;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
+import org.powermock.reflect.Whitebox;
 
-import java.util.*;
-import java.util.concurrent.TimeUnit;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeoutException;
 
 @RunWith(PowerMockRunner.class)
-@PrepareForTest({DistributedHerder.class})
+@PrepareForTest(DistributedHerder.class)
 @PowerMockIgnore("javax.management.*")
 public class DistributedHerderTest {
-    private static final List<String> CONNECTOR_NAMES = 
Arrays.asList("source-test1", "source-test2", "sink-test3");
-    private static final List<String> SOURCE_CONNECTOR_NAMES = 
Arrays.asList("source-test1", "source-test2");
-    private static final List<String> SINK_CONNECTOR_NAMES = 
Arrays.asList("sink-test3");
-    private static final String TOPICS_LIST_STR = "topic1,topic2";
+    private static final Map<String, String> HERDER_CONFIG = new HashMap<>();
+    static {
+        HERDER_CONFIG.put(KafkaConfigStorage.CONFIG_TOPIC_CONFIG, 
"config-topic");
+        HERDER_CONFIG.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
"localhost:9092");
+        HERDER_CONFIG.put(DistributedHerderConfig.GROUP_ID_CONFIG, 
"test-copycat-group");
+    }
 
-    private static final Map<String, String> CONFIG_STORAGE_CONFIG = 
Collections.singletonMap(KafkaConfigStorage.CONFIG_TOPIC_CONFIG, 
"config-topic");
+    private static final String CONN1 = "sourceA";
+    private static final String CONN2 = "sourceA";
+    private static final ConnectorTaskId TASK0 = new ConnectorTaskId(CONN1, 0);
+    private static final ConnectorTaskId TASK1 = new ConnectorTaskId(CONN1, 1);
+    private static final ConnectorTaskId TASK2 = new ConnectorTaskId(CONN1, 2);
+    private static final Integer MAX_TASKS = 3;
+    private static final Map<String, String> CONNECTOR_CONFIG = new 
HashMap<>();
+    static {
+        CONNECTOR_CONFIG.put(ConnectorConfig.NAME_CONFIG, "sourceA");
+        CONNECTOR_CONFIG.put(ConnectorConfig.TASKS_MAX_CONFIG, 
MAX_TASKS.toString());
+        CONNECTOR_CONFIG.put(ConnectorConfig.TOPICS_CONFIG, "foo,bar");
+        CONNECTOR_CONFIG.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, 
BogusSourceConnector.class.getName());
+    }
+    private static final Map<String, String> TASK_CONFIG = new HashMap<>();
+    static {
+        TASK_CONFIG.put(TaskConfig.TASK_CLASS_CONFIG, 
BogusSourceTask.class.getName());
+    }
+    private static final HashMap<ConnectorTaskId, Map<String, String>> 
TASK_CONFIGS = new HashMap<>();
+    static {
+        TASK_CONFIGS.put(TASK0, TASK_CONFIG);
+        TASK_CONFIGS.put(TASK1, TASK_CONFIG);
+        TASK_CONFIGS.put(TASK2, TASK_CONFIG);
+    }
+    private static final ClusterConfigState SNAPSHOT = new 
ClusterConfigState(1, Collections.singletonMap(CONN1, 3),
+            Collections.singletonMap(CONN1, CONNECTOR_CONFIG), TASK_CONFIGS, 
Collections.<String>emptySet());
 
     @Mock private KafkaConfigStorage configStorage;
+    @Mock private WorkerGroupMember member;
     private DistributedHerder herder;
     @Mock private Worker worker;
     @Mock private Callback<String> createCallback;
+    @Mock private Callback<Void> destroyCallback;
 
-    private Map<String, Map<String, String>> connectorProps;
-    private Map<String, Class<? extends Connector>> connectorClasses;
-    private Map<String, Class<? extends Task>> connectorTaskClasses;
-    private Map<String, Connector> connectors;
-    private Properties taskProps;
+    private Callback<String> connectorConfigCallback;
+    private Callback<List<ConnectorTaskId>> taskConfigCallback;
+    private WorkerRebalanceListener rebalanceListener;
 
     @Before
-    public void setUp() {
+    public void setUp() throws Exception {
         worker = PowerMock.createMock(Worker.class);
-        herder = new DistributedHerder(worker, configStorage);
-
-        connectorProps = new HashMap<>();
-        connectorClasses = new HashMap<>();
-        connectorTaskClasses = new HashMap<>();
-        connectors = new HashMap<>();
-        for (String connectorName : CONNECTOR_NAMES) {
-            Class<? extends Connector> connectorClass = 
connectorName.contains("source") ? BogusSourceConnector.class : 
BogusSinkConnector.class;
-            Class<? extends Task> taskClass = connectorName.contains("source") 
? BogusSourceTask.class : BogusSinkTask.class;
-            Connector connector = connectorName.contains("source") ? 
PowerMock.createMock(BogusSourceConnector.class) : 
PowerMock.createMock(BogusSinkConnector.class);
-
-            Map<String, String> props = new HashMap<>();
-            props.put(ConnectorConfig.NAME_CONFIG, connectorName);
-            props.put(SinkConnector.TOPICS_CONFIG, TOPICS_LIST_STR);
-            props.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, 
connectorClass.getName());
-
-            connectorProps.put(connectorName, props);
-            connectorClasses.put(connectorName, connectorClass);
-            connectorTaskClasses.put(connectorName, taskClass);
-            connectors.put(connectorName, connector);
-        }
-
-        PowerMock.mockStatic(DistributedHerder.class);
-
-        // These can be anything since connectors can pass along whatever they 
want.
-        taskProps = new Properties();
-        taskProps.setProperty("foo", "bar");
+
+        herder = PowerMock.createPartialMock(DistributedHerder.class, new 
String[]{"backoff"},
+                worker, HERDER_CONFIG, configStorage, member);
+        connectorConfigCallback = Whitebox.invokeMethod(herder, 
"connectorConfigCallback");
+        taskConfigCallback = Whitebox.invokeMethod(herder, 
"taskConfigCallback");
+        rebalanceListener = Whitebox.invokeMethod(herder, "rebalanceListener");
     }
 
     @Test
-    public void testCreateSourceConnector() throws Exception {
-        String connectorName = SOURCE_CONNECTOR_NAMES.get(0);
+    public void testJoinAssignment() {
+        // Join group and get assignment
+        EasyMock.expect(member.memberId()).andStubReturn("member");
+        expectRebalance(1, Arrays.asList(CONN1), Arrays.asList(TASK1));
+        expectPostRebalanceCatchup(SNAPSHOT);
+        worker.addConnector(EasyMock.<ConnectorConfig>anyObject(), 
EasyMock.<ConnectorContext>anyObject());
+        PowerMock.expectLastCall();
+        EasyMock.expect(worker.reconfigureConnectorTasks(CONN1, MAX_TASKS, 
null)).andReturn(TASK_CONFIGS);
+        worker.addTask(EasyMock.eq(TASK1), EasyMock.<TaskConfig>anyObject());
+        PowerMock.expectLastCall();
+        member.poll(EasyMock.anyInt());
+        PowerMock.expectLastCall();
 
-        expectConfigStorageConfigureStart();
-        expectEmptyRestore();
-        expectAdd(connectorName);
         PowerMock.replayAll();
 
-        herder.configure(CONFIG_STORAGE_CONFIG);
-        herder.start();
-        herder.addConnector(connectorProps.get(connectorName), createCallback);
+        herder.tick();
 
         PowerMock.verifyAll();
     }
 
     @Test
-    public void testCreateSinkConnector() throws Exception {
-        String connectorName = SINK_CONNECTOR_NAMES.get(0);
+    public void testHaltCleansUpWorker() {
+        
EasyMock.expect(worker.connectorNames()).andReturn(Collections.singleton(CONN1));
+        worker.stopConnector(CONN1);
+        PowerMock.expectLastCall();
+        
EasyMock.expect(worker.taskIds()).andReturn(Collections.singleton(TASK1));
+        worker.stopTask(TASK1);
+        PowerMock.expectLastCall();
+        member.stop();
+        PowerMock.expectLastCall();
+        configStorage.stop();
+        PowerMock.expectLastCall();
 
-        expectConfigStorageConfigureStart();
-        expectEmptyRestore();
-        expectAdd(connectorName);
         PowerMock.replayAll();
 
-        herder.configure(CONFIG_STORAGE_CONFIG);
-        herder.start();
-        herder.addConnector(connectorProps.get(connectorName), createCallback);
+        herder.halt();
 
         PowerMock.verifyAll();
     }
 
     @Test
-    public void testDestroyConnector() throws Exception {
-        String connectorName = SOURCE_CONNECTOR_NAMES.get(0);
-
-        expectConfigStorageConfigureStart();
-        expectEmptyRestore();
-        expectAdd(connectorName);
-        expectDestroy(connectorName);
-        PowerMock.replayAll();
+    public void testCreateConnector() throws Exception {
+        EasyMock.expect(member.memberId()).andStubReturn("leader");
+        expectRebalance(1, Collections.<String>emptyList(), 
Collections.<ConnectorTaskId>emptyList());
+        expectPostRebalanceCatchup(SNAPSHOT);
 
-        herder.configure(CONFIG_STORAGE_CONFIG);
-        herder.start();
-        herder.addConnector(connectorProps.get(connectorName), createCallback);
+        member.wakeup();
+        PowerMock.expectLastCall();
+        configStorage.putConnectorConfig(CONN1, CONNECTOR_CONFIG);
+        PowerMock.expectLastCall();
+        createCallback.onCompletion(null, CONN1);
+        PowerMock.expectLastCall();
+        member.poll(EasyMock.anyInt());
+        PowerMock.expectLastCall();
+        // No immediate action besides this -- change will be picked up via 
the config log
 
-        FutureCallback<Void> futureCb = new FutureCallback<>(new 
Callback<Void>() {
-            @Override
-            public void onCompletion(Throwable error, Void result) {
+        PowerMock.replayAll();
 
-            }
-        });
-        herder.deleteConnector(CONNECTOR_NAMES.get(0), futureCb);
-        futureCb.get(1000L, TimeUnit.MILLISECONDS);
+        herder.addConnector(CONNECTOR_CONFIG, createCallback);
+        herder.tick();
 
         PowerMock.verifyAll();
     }
 
     @Test
-    public void testCreateAndStop() throws Exception {
-        String connectorName = SOURCE_CONNECTOR_NAMES.get(0);
+    public void testDestroyConnector() throws Exception {
+        EasyMock.expect(member.memberId()).andStubReturn("leader");
+        // Start with one connector
+        expectRebalance(1, Arrays.asList(CONN1), 
Collections.<ConnectorTaskId>emptyList());
+        expectPostRebalanceCatchup(SNAPSHOT);
+        worker.addConnector(EasyMock.<ConnectorConfig>anyObject(), 
EasyMock.<ConnectorContext>anyObject());
+        PowerMock.expectLastCall();
+        EasyMock.expect(worker.reconfigureConnectorTasks(CONN1, MAX_TASKS, 
null)).andReturn(TASK_CONFIGS);
+
+        // And delete the connector
+        member.wakeup();
+        PowerMock.expectLastCall();
+        configStorage.putConnectorConfig(CONN1, null);
+        PowerMock.expectLastCall();
+        destroyCallback.onCompletion(null, null);
+        PowerMock.expectLastCall();
+        member.poll(EasyMock.anyInt());
+        PowerMock.expectLastCall();
+        // No immediate action besides this -- change will be picked up via 
the config log
 
-        expectConfigStorageConfigureStart();
-        expectEmptyRestore();
-        expectAdd(connectorName);
         PowerMock.replayAll();
 
-        herder.configure(CONFIG_STORAGE_CONFIG);
-        herder.start();
-        herder.addConnector(connectorProps.get(connectorName), createCallback);
+        herder.deleteConnector(CONN1, destroyCallback);
+        herder.tick();
 
         PowerMock.verifyAll();
     }
 
     @Test
-    public void testRestoreAndStop() throws Exception {
-        String restoreConnectorName1 = SOURCE_CONNECTOR_NAMES.get(0);
-        String restoreConnectorName2 = SINK_CONNECTOR_NAMES.get(0);
-        String additionalConnectorName = SOURCE_CONNECTOR_NAMES.get(1);
-
-        expectConfigStorageConfigureStart();
-        expectRestore(Arrays.asList(restoreConnectorName1, 
restoreConnectorName2));
-        expectAdd(additionalConnectorName);
-        // Stopping the herder should correctly stop all restored and new 
connectors
-        expectStop(restoreConnectorName1);
-        expectStop(restoreConnectorName2);
-        expectStop(additionalConnectorName);
-        configStorage.stop();
+    public void testConnectorConfigAdded() {
+        // If a connector was added, we need to rebalance
+        EasyMock.expect(member.memberId()).andStubReturn("member");
+
+        // join, no configs so no need to catch up on config topic
+        expectRebalance(-1, Collections.<String>emptyList(), 
Collections.<ConnectorTaskId>emptyList());
+        member.poll(EasyMock.anyInt());
+        PowerMock.expectLastCall();
+
+        // apply config
+        member.wakeup();
+        member.ensureActive();
+        PowerMock.expectLastCall();
+        // Checks for config updates and starts rebalance
+        EasyMock.expect(configStorage.snapshot()).andReturn(SNAPSHOT);
+        member.requestRejoin();
+        PowerMock.expectLastCall();
+        // Performs rebalance and gets new assignment
+        expectRebalance(Collections.<String>emptyList(), 
Collections.<ConnectorTaskId>emptyList(),
+                CopycatProtocol.Assignment.NO_ERROR, 1, Arrays.asList(CONN1), 
Collections.<ConnectorTaskId>emptyList());
+        worker.addConnector(EasyMock.<ConnectorConfig>anyObject(), 
EasyMock.<ConnectorContext>anyObject());
+        PowerMock.expectLastCall();
+        EasyMock.expect(worker.reconfigureConnectorTasks(CONN1, MAX_TASKS, 
null)).andReturn(TASK_CONFIGS);
+        member.poll(EasyMock.anyInt());
         PowerMock.expectLastCall();
 
         PowerMock.replayAll();
 
-        herder.configure(CONFIG_STORAGE_CONFIG);
-        herder.start();
-        herder.addConnector(connectorProps.get(additionalConnectorName), 
createCallback);
-        herder.stop();
+        herder.tick(); // join
+        connectorConfigCallback.onCompletion(null, CONN1); // read updated 
config
+        herder.tick(); // apply config
+        herder.tick(); // do rebalance
 
         PowerMock.verifyAll();
     }
 
-    private void expectConfigStorageConfigureStart() {
-        configStorage.configure(CONFIG_STORAGE_CONFIG);
+    @Test
+    public void testConnectorConfigUpdate() {
+        // Connector config can be applied without any rebalance
+
+        EasyMock.expect(member.memberId()).andStubReturn("member");
+        
EasyMock.expect(worker.connectorNames()).andStubReturn(Collections.singleton(CONN1));
+
+        // join
+        expectRebalance(1, Arrays.asList(CONN1), 
Collections.<ConnectorTaskId>emptyList());
+        expectPostRebalanceCatchup(SNAPSHOT);
+        worker.addConnector(EasyMock.<ConnectorConfig>anyObject(), 
EasyMock.<ConnectorContext>anyObject());
         PowerMock.expectLastCall();
-        configStorage.start();
+        EasyMock.expect(worker.reconfigureConnectorTasks(CONN1, MAX_TASKS, 
null)).andReturn(TASK_CONFIGS);
+        member.poll(EasyMock.anyInt());
         PowerMock.expectLastCall();
-    }
 
-    private void expectAdd(String connectorName) throws Exception {
-        configStorage.putConnectorConfig(connectorName, 
connectorProps.get(connectorName));
+        // apply config
+        member.wakeup();
+        member.ensureActive();
+        PowerMock.expectLastCall();
+        EasyMock.expect(configStorage.snapshot()).andReturn(SNAPSHOT); // for 
this test, it doesn't matter if we use the same config snapshot
+        worker.stopConnector(CONN1);
+        PowerMock.expectLastCall();
+        worker.addConnector(EasyMock.<ConnectorConfig>anyObject(), 
EasyMock.<ConnectorContext>anyObject());
+        PowerMock.expectLastCall();
+        EasyMock.expect(worker.reconfigureConnectorTasks(CONN1, MAX_TASKS, 
null)).andReturn(TASK_CONFIGS);
+        member.poll(EasyMock.anyInt());
         PowerMock.expectLastCall();
-        expectInstantiateConnector(connectorName, true);
-    }
 
-    private void expectEmptyRestore() throws Exception {
-        expectRestore(Collections.<String>emptyList());
-    }
+        PowerMock.replayAll();
+
+        herder.tick(); // join
+        connectorConfigCallback.onCompletion(null, CONN1); // read updated 
config
+        herder.tick(); // apply config
 
-    private void expectRestore(List<String> connectorNames) throws Exception {
-        Map<String, Integer> rootConfig = new HashMap<>();
-        Map<String, Map<String, String>> connectorConfigs = new HashMap<>();
-        for (String connName : connectorNames) {
-            rootConfig.put(connName, 0);
-            connectorConfigs.put(connName, connectorProps.get(connName));
-        }
-        EasyMock.expect(configStorage.snapshot())
-                .andReturn(new ClusterConfigState(1, rootConfig, 
connectorConfigs, Collections.EMPTY_MAP, Collections.EMPTY_SET));
-
-        // Restore never uses a callback
-        for (String connectorName : connectorNames)
-            expectInstantiateConnector(connectorName, false);
+        PowerMock.verifyAll();
     }
 
-    private void expectInstantiateConnector(String connectorName, boolean 
expectCallback) throws Exception {
-        PowerMock.expectPrivate(DistributedHerder.class, 
"instantiateConnector", connectorClasses.get(connectorName).getName())
-                .andReturn(connectors.get(connectorName));
-        if (expectCallback) {
-            createCallback.onCompletion(null, connectorName);
-            PowerMock.expectLastCall();
-        }
+    @Test
+    public void testTaskConfigAdded() {
+        // Task config always requires rebalance
+        EasyMock.expect(member.memberId()).andStubReturn("member");
+
+        // join
+        expectRebalance(-1, Collections.<String>emptyList(), 
Collections.<ConnectorTaskId>emptyList());
+        member.poll(EasyMock.anyInt());
+        PowerMock.expectLastCall();
 
-        Connector connector = connectors.get(connectorName);
-        connector.initialize(EasyMock.anyObject(HerderConnectorContext.class));
+        // apply config
+        member.wakeup();
+        member.ensureActive();
+        PowerMock.expectLastCall();
+        // Checks for config updates and starts rebalance
+        EasyMock.expect(configStorage.snapshot()).andReturn(SNAPSHOT);
+        member.requestRejoin();
         PowerMock.expectLastCall();
-        connector.start(new Properties());
+        // Performs rebalance and gets new assignment
+        expectRebalance(Collections.<String>emptyList(), 
Collections.<ConnectorTaskId>emptyList(),
+                CopycatProtocol.Assignment.NO_ERROR, 1, 
Collections.<String>emptyList(), Arrays.asList(TASK0));
+        worker.addTask(EasyMock.eq(TASK0), EasyMock.<TaskConfig>anyObject());
         PowerMock.expectLastCall();
+        member.poll(EasyMock.anyInt());
+        PowerMock.expectLastCall();
+
+        PowerMock.replayAll();
 
-        // Just return the connector properties for the individual task we 
generate by default
-        EasyMock.<Class<? extends 
Task>>expect(connector.taskClass()).andReturn(connectorTaskClasses.get(connectorName));
+        herder.tick(); // join
+        taskConfigCallback.onCompletion(null, Arrays.asList(TASK0, TASK1, 
TASK2)); // read updated config
+        herder.tick(); // apply config
+        herder.tick(); // do rebalance
 
-        
EasyMock.expect(connector.taskConfigs(ConnectorConfig.TASKS_MAX_DEFAULT))
-                .andReturn(Arrays.asList(taskProps));
-        // And we should instantiate the tasks. For a sink task, we should see 
added properties for
-        // the input topic partitions
-        Properties generatedTaskProps = new Properties();
-        generatedTaskProps.putAll(taskProps);
-        if (connectorName.contains("sink"))
-            generatedTaskProps.setProperty(SinkTask.TOPICS_CONFIG, 
TOPICS_LIST_STR);
-        ConnectorTaskId taskId = new ConnectorTaskId(connectorName, 0);
-        worker.addTask(taskId, 
connectorTaskClasses.get(connectorName).getName(), generatedTaskProps);
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testJoinLeaderCatchUpFails() throws Exception {
+        // Join group and as leader fail to do assignment
+        EasyMock.expect(member.memberId()).andStubReturn("leader");
+        expectRebalance(Collections.<String>emptyList(), 
Collections.<ConnectorTaskId>emptyList(),
+                CopycatProtocol.Assignment.CONFIG_MISMATCH, 1, 
Collections.<String>emptyList(), Collections.<ConnectorTaskId>emptyList());
+        // Reading to end of log times out
+        TestFuture<Void> readToEndFuture = new TestFuture<>();
+        readToEndFuture.resolveOnGet(new TimeoutException());
+        EasyMock.expect(configStorage.readToEnd()).andReturn(readToEndFuture);
+        PowerMock.expectPrivate(herder, "backoff", 
DistributedHerderConfig.WORKER_UNSYNC_BACKOFF_MS_DEFAULT);
+        member.requestRejoin();
+
+        // After backoff, restart the process and this time succeed
+        expectRebalance(1, Arrays.asList(CONN1), Arrays.asList(TASK1));
+        expectPostRebalanceCatchup(SNAPSHOT);
+
+        worker.addConnector(EasyMock.<ConnectorConfig>anyObject(), 
EasyMock.<ConnectorContext>anyObject());
         PowerMock.expectLastCall();
+        EasyMock.expect(worker.reconfigureConnectorTasks(CONN1, MAX_TASKS, 
null)).andReturn(TASK_CONFIGS);
+        worker.addTask(EasyMock.eq(TASK1), EasyMock.<TaskConfig>anyObject());
+        PowerMock.expectLastCall();
+        member.poll(EasyMock.anyInt());
+        PowerMock.expectLastCall();
+
+        PowerMock.replayAll();
+
+        herder.tick();
+        herder.tick();
+
+        PowerMock.verifyAll();
     }
 
-    private void expectStop(String connectorName) {
-        worker.stopTask(new ConnectorTaskId(connectorName, 0));
-        EasyMock.expectLastCall();
-        Connector connector = connectors.get(connectorName);
-        connector.stop();
-        EasyMock.expectLastCall();
+    @Test
+    public void testInconsistentConfigs() throws Exception {
+        // FIXME: if we have inconsistent configs, we need to request forced 
reconfig + write of the connector's task configs
+        // This requires inter-worker communication, so needs the REST API
     }
 
-    private void expectDestroy(String connectorName) {
-        expectStop(connectorName);
-        configStorage.putConnectorConfig(connectorName, null);
-        PowerMock.expectLastCall();
+
+    private void expectRebalance(final long offset, final List<String> 
assignedConnectors, final List<ConnectorTaskId> assignedTasks) {
+        expectRebalance(null, null, CopycatProtocol.Assignment.NO_ERROR, 
offset, assignedConnectors, assignedTasks);
     }
 
-    // We need to use a real class here due to some issue with mocking 
java.lang.Class
-    private abstract class BogusSourceConnector extends SourceConnector {
+    // Handles common initial part of rebalance callback. Does not handle 
instantiation of connectors and tasks.
+    private void expectRebalance(final Collection<String> revokedConnectors, 
final List<ConnectorTaskId> revokedTasks,
+                                 final short error, final long offset, final 
List<String> assignedConnectors, final List<ConnectorTaskId> assignedTasks) {
+        member.ensureActive();
+        PowerMock.expectLastCall().andAnswer(new IAnswer<Object>() {
+            @Override
+            public Object answer() throws Throwable {
+                if (revokedConnectors != null)
+                    rebalanceListener.onRevoked("leader", revokedConnectors, 
revokedTasks);
+                CopycatProtocol.Assignment assignment = new 
CopycatProtocol.Assignment(
+                        error, "leader", offset, assignedConnectors, 
assignedTasks);
+                rebalanceListener.onAssigned(assignment);
+                return null;
+            }
+        });
     }
 
-    private abstract class BogusSourceTask extends SourceTask {
+    private void expectPostRebalanceCatchup(final ClusterConfigState 
readToEndSnapshot) {
+        TestFuture<Void> readToEndFuture = new TestFuture<>();
+        readToEndFuture.resolveOnGet((Void) null);
+        EasyMock.expect(configStorage.readToEnd()).andReturn(readToEndFuture);
+        EasyMock.expect(configStorage.snapshot()).andReturn(readToEndSnapshot);
     }
 
-    private abstract class BogusSinkConnector extends SinkConnector {
+
+    // We need to use a real class here due to some issue with mocking 
java.lang.Class
+    private abstract class BogusSourceConnector extends SourceConnector {
     }
 
-    private abstract class BogusSinkTask extends SourceTask {
+    private abstract class BogusSourceTask extends SourceTask {
     }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/2e617735/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/distributed/WorkerCoordinatorTest.java
----------------------------------------------------------------------
diff --git 
a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/distributed/WorkerCoordinatorTest.java
 
b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/distributed/WorkerCoordinatorTest.java
new file mode 100644
index 0000000..30c76a2
--- /dev/null
+++ 
b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/distributed/WorkerCoordinatorTest.java
@@ -0,0 +1,436 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+package org.apache.kafka.copycat.runtime.distributed;
+
+import org.apache.kafka.clients.ClientRequest;
+import org.apache.kafka.clients.Metadata;
+import org.apache.kafka.clients.MockClient;
+import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.requests.GroupMetadataResponse;
+import org.apache.kafka.common.requests.JoinGroupResponse;
+import org.apache.kafka.common.requests.SyncGroupRequest;
+import org.apache.kafka.common.requests.SyncGroupResponse;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.copycat.storage.KafkaConfigStorage;
+import org.apache.kafka.copycat.util.ConnectorTaskId;
+import org.apache.kafka.test.TestUtils;
+import org.easymock.EasyMock;
+import org.easymock.Mock;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.powermock.api.easymock.PowerMock;
+import org.powermock.reflect.Whitebox;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+public class WorkerCoordinatorTest {
+
+    private String connectorId = "connector";
+    private String connectorId2 = "connector2";
+    private ConnectorTaskId taskId0 = new ConnectorTaskId(connectorId, 0);
+    private ConnectorTaskId taskId1 = new ConnectorTaskId(connectorId, 1);
+    private ConnectorTaskId taskId2 = new ConnectorTaskId(connectorId2, 0);
+
+    private String groupId = "test-group";
+    private int sessionTimeoutMs = 10;
+    private int heartbeatIntervalMs = 2;
+    private long retryBackoffMs = 100;
+    private long requestTimeoutMs = 5000;
+    private MockTime time;
+    private MockClient client;
+    private Cluster cluster = TestUtils.singletonCluster("topic", 1);
+    private Node node = cluster.nodes().get(0);
+    private Metadata metadata;
+    private Metrics metrics;
+    private Map<String, String> metricTags = new LinkedHashMap<>();
+    private ConsumerNetworkClient consumerClient;
+    private MockRebalanceListener rebalanceListener;
+    @Mock private KafkaConfigStorage configStorage;
+    private WorkerCoordinator coordinator;
+
+    private ClusterConfigState configState1;
+    private ClusterConfigState configState2;
+
+    @Before
+    public void setup() {
+        this.time = new MockTime();
+        this.client = new MockClient(time);
+        this.metadata = new Metadata(0, Long.MAX_VALUE);
+        this.metadata.update(cluster, time.milliseconds());
+        this.consumerClient = new ConsumerNetworkClient(client, metadata, 
time, 100);
+        this.metrics = new Metrics(time);
+        this.rebalanceListener = new MockRebalanceListener();
+        this.configStorage = PowerMock.createMock(KafkaConfigStorage.class);
+
+        client.setNode(node);
+
+        this.coordinator = new WorkerCoordinator(consumerClient,
+                groupId,
+                sessionTimeoutMs,
+                heartbeatIntervalMs,
+                metrics,
+                "consumer" + groupId,
+                metricTags,
+                time,
+                requestTimeoutMs,
+                retryBackoffMs,
+                configStorage,
+                rebalanceListener);
+
+        configState1 = new ClusterConfigState(
+                1L, Collections.singletonMap(connectorId, 1),
+                Collections.singletonMap(connectorId, (Map<String, String>) 
new HashMap<String, String>()),
+                Collections.singletonMap(taskId0, (Map<String, String>) new 
HashMap<String, String>()),
+                Collections.<String>emptySet()
+        );
+        Map<String, Integer> configState2ConnectorTaskCounts = new HashMap<>();
+        configState2ConnectorTaskCounts.put(connectorId, 2);
+        configState2ConnectorTaskCounts.put(connectorId2, 1);
+        Map<String, Map<String, String>> configState2ConnectorConfigs = new 
HashMap<>();
+        configState2ConnectorConfigs.put(connectorId, new HashMap<String, 
String>());
+        configState2ConnectorConfigs.put(connectorId2, new HashMap<String, 
String>());
+        Map<ConnectorTaskId, Map<String, String>> configState2TaskConfigs = 
new HashMap<>();
+        configState2TaskConfigs.put(taskId0, new HashMap<String, String>());
+        configState2TaskConfigs.put(taskId1, new HashMap<String, String>());
+        configState2TaskConfigs.put(taskId2, new HashMap<String, String>());
+        configState2 = new ClusterConfigState(
+                2L, configState2ConnectorTaskCounts,
+                configState2ConnectorConfigs,
+                configState2TaskConfigs,
+                Collections.<String>emptySet()
+        );
+    }
+
+    @After
+    public void teardown() {
+        this.metrics.close();
+    }
+
+    // We only test functionality unique to WorkerCoordinator. Most 
functionality is already well tested via the tests
+    // that cover AbstractCoordinator & ConsumerCoordinator.
+
+    @Test
+    public void testMetadata() {
+        EasyMock.expect(configStorage.snapshot()).andReturn(configState1);
+
+        PowerMock.replayAll();
+
+        LinkedHashMap<String, ByteBuffer> serialized = coordinator.metadata();
+        assertEquals(1, serialized.size());
+        CopycatProtocol.ConfigState state = 
CopycatProtocol.deserializeMetadata(serialized.get(WorkerCoordinator.DEFAULT_SUBPROTOCOL));
+        assertEquals(1, state.offset());
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testNormalJoinGroupLeader() {
+        EasyMock.expect(configStorage.snapshot()).andReturn(configState1);
+
+        PowerMock.replayAll();
+
+        final String consumerId = "leader";
+
+        client.prepareResponse(groupMetadataResponse(node, 
Errors.NONE.code()));
+        coordinator.ensureCoordinatorKnown();
+
+        // normal join group
+        Map<String, Long> memberConfigOffsets = new HashMap<>();
+        memberConfigOffsets.put("leader", 1L);
+        memberConfigOffsets.put("member", 1L);
+        client.prepareResponse(joinGroupLeaderResponse(1, consumerId, 
memberConfigOffsets, Errors.NONE.code()));
+        client.prepareResponse(new MockClient.RequestMatcher() {
+            @Override
+            public boolean matches(ClientRequest request) {
+                SyncGroupRequest sync = new 
SyncGroupRequest(request.request().body());
+                return sync.memberId().equals(consumerId) &&
+                        sync.generationId() == 1 &&
+                        sync.groupAssignment().containsKey(consumerId);
+            }
+        }, syncGroupResponse(CopycatProtocol.Assignment.NO_ERROR, "leader", 
1L, Collections.singletonList(connectorId),
+                Collections.<ConnectorTaskId>emptyList(), Errors.NONE.code()));
+        coordinator.ensureActiveGroup();
+
+        assertFalse(coordinator.needRejoin());
+        assertEquals(0, rebalanceListener.revokedCount);
+        assertEquals(1, rebalanceListener.assignedCount);
+        assertFalse(rebalanceListener.assignment.failed());
+        assertEquals(1L, rebalanceListener.assignment.offset());
+        assertEquals("leader", rebalanceListener.assignment.leader());
+        assertEquals(Collections.singletonList(connectorId), 
rebalanceListener.assignment.connectors());
+        assertEquals(Collections.emptyList(), 
rebalanceListener.assignment.tasks());
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testNormalJoinGroupFollower() {
+        EasyMock.expect(configStorage.snapshot()).andReturn(configState1);
+
+        PowerMock.replayAll();
+
+        final String memberId = "member";
+
+        client.prepareResponse(groupMetadataResponse(node, 
Errors.NONE.code()));
+        coordinator.ensureCoordinatorKnown();
+
+        // normal join group
+        client.prepareResponse(joinGroupFollowerResponse(1, memberId, 
"leader", Errors.NONE.code()));
+        client.prepareResponse(new MockClient.RequestMatcher() {
+            @Override
+            public boolean matches(ClientRequest request) {
+                SyncGroupRequest sync = new 
SyncGroupRequest(request.request().body());
+                return sync.memberId().equals(memberId) &&
+                        sync.generationId() == 1 &&
+                        sync.groupAssignment().isEmpty();
+            }
+        }, syncGroupResponse(CopycatProtocol.Assignment.NO_ERROR, "leader", 
1L, Collections.<String>emptyList(),
+                Collections.singletonList(taskId0), Errors.NONE.code()));
+        coordinator.ensureActiveGroup();
+
+        assertFalse(coordinator.needRejoin());
+        assertEquals(0, rebalanceListener.revokedCount);
+        assertEquals(1, rebalanceListener.assignedCount);
+        assertFalse(rebalanceListener.assignment.failed());
+        assertEquals(1L, rebalanceListener.assignment.offset());
+        assertEquals(Collections.emptyList(), 
rebalanceListener.assignment.connectors());
+        assertEquals(Collections.singletonList(taskId0), 
rebalanceListener.assignment.tasks());
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testJoinLeaderCannotAssign() {
+        // If the selected leader can't get up to the maximum offset, it will 
fail to assign and we should immediately
+        // need to retry the join.
+
+        // When the first round fails, we'll take an updated config snapshot
+        EasyMock.expect(configStorage.snapshot()).andReturn(configState1);
+        EasyMock.expect(configStorage.snapshot()).andReturn(configState2);
+
+        PowerMock.replayAll();
+
+        final String memberId = "member";
+
+        client.prepareResponse(groupMetadataResponse(node, 
Errors.NONE.code()));
+        coordinator.ensureCoordinatorKnown();
+
+        // config mismatch results in assignment error
+        client.prepareResponse(joinGroupFollowerResponse(1, memberId, 
"leader", Errors.NONE.code()));
+        MockClient.RequestMatcher matcher = new MockClient.RequestMatcher() {
+            @Override
+            public boolean matches(ClientRequest request) {
+                SyncGroupRequest sync = new 
SyncGroupRequest(request.request().body());
+                return sync.memberId().equals(memberId) &&
+                        sync.generationId() == 1 &&
+                        sync.groupAssignment().isEmpty();
+            }
+        };
+        client.prepareResponse(matcher, 
syncGroupResponse(CopycatProtocol.Assignment.CONFIG_MISMATCH, "leader", 10L,
+                Collections.<String>emptyList(), 
Collections.<ConnectorTaskId>emptyList(), Errors.NONE.code()));
+        client.prepareResponse(joinGroupFollowerResponse(1, memberId, 
"leader", Errors.NONE.code()));
+        client.prepareResponse(matcher, 
syncGroupResponse(CopycatProtocol.Assignment.NO_ERROR, "leader", 1L,
+                Collections.<String>emptyList(), 
Collections.singletonList(taskId0), Errors.NONE.code()));
+        coordinator.ensureActiveGroup();
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testRejoinGroup() {
+        EasyMock.expect(configStorage.snapshot()).andReturn(configState1);
+        EasyMock.expect(configStorage.snapshot()).andReturn(configState1);
+
+        PowerMock.replayAll();
+
+        client.prepareResponse(groupMetadataResponse(node, 
Errors.NONE.code()));
+        coordinator.ensureCoordinatorKnown();
+
+        // join the group once
+        client.prepareResponse(joinGroupFollowerResponse(1, "consumer", 
"leader", Errors.NONE.code()));
+        
client.prepareResponse(syncGroupResponse(CopycatProtocol.Assignment.NO_ERROR, 
"leader", 1L, Collections.<String>emptyList(),
+                Collections.singletonList(taskId0), Errors.NONE.code()));
+        coordinator.ensureActiveGroup();
+
+        assertEquals(0, rebalanceListener.revokedCount);
+        assertEquals(1, rebalanceListener.assignedCount);
+        assertFalse(rebalanceListener.assignment.failed());
+        assertEquals(1L, rebalanceListener.assignment.offset());
+        assertEquals(Collections.emptyList(), 
rebalanceListener.assignment.connectors());
+        assertEquals(Collections.singletonList(taskId0), 
rebalanceListener.assignment.tasks());
+
+        // and join the group again
+        coordinator.requestRejoin();
+        client.prepareResponse(joinGroupFollowerResponse(1, "consumer", 
"leader", Errors.NONE.code()));
+        
client.prepareResponse(syncGroupResponse(CopycatProtocol.Assignment.NO_ERROR, 
"leader", 1L, Collections.singletonList(connectorId),
+                Collections.<ConnectorTaskId>emptyList(), Errors.NONE.code()));
+        coordinator.ensureActiveGroup();
+
+        assertEquals(1, rebalanceListener.revokedCount);
+        assertEquals(Collections.emptyList(), 
rebalanceListener.revokedConnectors);
+        assertEquals(Collections.singletonList(taskId0), 
rebalanceListener.revokedTasks);
+        assertEquals(2, rebalanceListener.assignedCount);
+        assertFalse(rebalanceListener.assignment.failed());
+        assertEquals(1L, rebalanceListener.assignment.offset());
+        assertEquals(Collections.singletonList(connectorId), 
rebalanceListener.assignment.connectors());
+        assertEquals(Collections.emptyList(), 
rebalanceListener.assignment.tasks());
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testLeaderDoSync1() throws Exception {
+        // Since all the protocol responses are mocked, the other tests 
validate doSync runs, but don't validate its
+        // output. So we test it directly here.
+
+        EasyMock.expect(configStorage.snapshot()).andReturn(configState1);
+
+        PowerMock.replayAll();
+
+        // Prime the current configuration state
+        coordinator.metadata();
+
+        Map<String, ByteBuffer> configs = new HashMap<>();
+        // Mark everyone as in sync with configState1
+        configs.put("leader", CopycatProtocol.serializeMetadata(new 
CopycatProtocol.ConfigState(1L)));
+        configs.put("member", CopycatProtocol.serializeMetadata(new 
CopycatProtocol.ConfigState(1L)));
+        Map<String, ByteBuffer> result = Whitebox.invokeMethod(coordinator, 
"doSync", "leader", WorkerCoordinator.DEFAULT_SUBPROTOCOL, configs);
+
+        // configState1 has 1 connector, 1 task
+        CopycatProtocol.Assignment leaderAssignment = 
CopycatProtocol.deserializeAssignment(result.get("leader"));
+        assertEquals(false, leaderAssignment.failed());
+        assertEquals("leader", leaderAssignment.leader());
+        assertEquals(1, leaderAssignment.offset());
+        assertEquals(Collections.singletonList(connectorId), 
leaderAssignment.connectors());
+        assertEquals(Collections.emptyList(), leaderAssignment.tasks());
+
+        CopycatProtocol.Assignment memberAssignment = 
CopycatProtocol.deserializeAssignment(result.get("member"));
+        assertEquals(false, memberAssignment.failed());
+        assertEquals("leader", memberAssignment.leader());
+        assertEquals(1, memberAssignment.offset());
+        assertEquals(Collections.emptyList(), memberAssignment.connectors());
+        assertEquals(Collections.singletonList(taskId0), 
memberAssignment.tasks());
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testLeaderDoSync2() throws Exception {
+        // Since all the protocol responses are mocked, the other tests 
validate doSync runs, but don't validate its
+        // output. So we test it directly here.
+
+        EasyMock.expect(configStorage.snapshot()).andReturn(configState2);
+
+        PowerMock.replayAll();
+
+        // Prime the current configuration state
+        coordinator.metadata();
+
+        Map<String, ByteBuffer> configs = new HashMap<>();
+        // Mark everyone as in sync with configState1
+        configs.put("leader", CopycatProtocol.serializeMetadata(new 
CopycatProtocol.ConfigState(1L)));
+        configs.put("member", CopycatProtocol.serializeMetadata(new 
CopycatProtocol.ConfigState(1L)));
+        Map<String, ByteBuffer> result = Whitebox.invokeMethod(coordinator, 
"doSync", "leader", WorkerCoordinator.DEFAULT_SUBPROTOCOL, configs);
+
+        // configState2 has 2 connector, 3 tasks and should trigger round 
robin assignment
+        CopycatProtocol.Assignment leaderAssignment = 
CopycatProtocol.deserializeAssignment(result.get("leader"));
+        assertEquals(false, leaderAssignment.failed());
+        assertEquals("leader", leaderAssignment.leader());
+        assertEquals(1, leaderAssignment.offset());
+        assertEquals(Collections.singletonList(connectorId), 
leaderAssignment.connectors());
+        assertEquals(Arrays.asList(taskId1, taskId2), 
leaderAssignment.tasks());
+
+        CopycatProtocol.Assignment memberAssignment = 
CopycatProtocol.deserializeAssignment(result.get("member"));
+        assertEquals(false, memberAssignment.failed());
+        assertEquals("leader", memberAssignment.leader());
+        assertEquals(1, memberAssignment.offset());
+        assertEquals(Collections.singletonList(connectorId2), 
memberAssignment.connectors());
+        assertEquals(Collections.singletonList(taskId0), 
memberAssignment.tasks());
+
+        PowerMock.verifyAll();
+    }
+
+
+    private Struct groupMetadataResponse(Node node, short error) {
+        GroupMetadataResponse response = new GroupMetadataResponse(error, 
node);
+        return response.toStruct();
+    }
+
+    private Struct joinGroupLeaderResponse(int generationId, String memberId,
+                                           Map<String, Long> configOffsets, 
short error) {
+        Map<String, ByteBuffer> metadata = new HashMap<>();
+        for (Map.Entry<String, Long> configStateEntry : 
configOffsets.entrySet()) {
+            ByteBuffer buf = CopycatProtocol.serializeMetadata(new 
CopycatProtocol.ConfigState(configStateEntry.getValue()));
+            metadata.put(configStateEntry.getKey(), buf);
+        }
+        return new JoinGroupResponse(error, generationId, 
WorkerCoordinator.DEFAULT_SUBPROTOCOL, memberId, memberId, metadata).toStruct();
+    }
+
+    private Struct joinGroupFollowerResponse(int generationId, String 
memberId, String leaderId, short error) {
+        return new JoinGroupResponse(error, generationId, 
WorkerCoordinator.DEFAULT_SUBPROTOCOL, memberId, leaderId,
+                Collections.<String, ByteBuffer>emptyMap()).toStruct();
+    }
+
+    private Struct syncGroupResponse(short assignmentError, String leader, 
long configOffset, List<String> connectorIds,
+                                     List<ConnectorTaskId> taskIds, short 
error) {
+        CopycatProtocol.Assignment assignment = new 
CopycatProtocol.Assignment(assignmentError, leader, configOffset, connectorIds, 
taskIds);
+        ByteBuffer buf = CopycatProtocol.serializeAssignment(assignment);
+        return new SyncGroupResponse(error, buf).toStruct();
+    }
+
+
+    private static class MockRebalanceListener implements 
WorkerRebalanceListener {
+        public CopycatProtocol.Assignment assignment = null;
+
+        public String revokedLeader;
+        public Collection<String> revokedConnectors;
+        public Collection<ConnectorTaskId> revokedTasks;
+
+        public int revokedCount = 0;
+        public int assignedCount = 0;
+
+        @Override
+        public void onAssigned(CopycatProtocol.Assignment assignment) {
+            this.assignment = assignment;
+            assignedCount++;
+        }
+
+        @Override
+        public void onRevoked(String leader, Collection<String> connectors, 
Collection<ConnectorTaskId> tasks) {
+            this.revokedLeader = leader;
+            this.revokedConnectors = connectors;
+            this.revokedTasks = tasks;
+            revokedCount++;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/2e617735/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/standalone/StandaloneHerderTest.java
----------------------------------------------------------------------
diff --git 
a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/standalone/StandaloneHerderTest.java
 
b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/standalone/StandaloneHerderTest.java
index 606b94d..b395fc7 100644
--- 
a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/standalone/StandaloneHerderTest.java
+++ 
b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/standalone/StandaloneHerderTest.java
@@ -21,6 +21,7 @@ import org.apache.kafka.copycat.connector.Connector;
 import org.apache.kafka.copycat.connector.Task;
 import org.apache.kafka.copycat.runtime.ConnectorConfig;
 import org.apache.kafka.copycat.runtime.HerderConnectorContext;
+import org.apache.kafka.copycat.runtime.TaskConfig;
 import org.apache.kafka.copycat.runtime.Worker;
 import org.apache.kafka.copycat.sink.SinkConnector;
 import org.apache.kafka.copycat.sink.SinkTask;
@@ -35,22 +36,21 @@ import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.powermock.api.easymock.PowerMock;
 import org.powermock.api.easymock.annotation.Mock;
-import org.powermock.core.classloader.annotations.PowerMockIgnore;
-import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
-import java.util.Properties;
 import java.util.concurrent.TimeUnit;
 
 @RunWith(PowerMockRunner.class)
-@PrepareForTest({StandaloneHerder.class})
-@PowerMockIgnore("javax.management.*")
 public class StandaloneHerderTest {
     private static final String CONNECTOR_NAME = "test";
+    private static final List<String> TOPICS_LIST = Arrays.asList("topic1", 
"topic2");
     private static final String TOPICS_LIST_STR = "topic1,topic2";
+    private static final int DEFAULT_MAX_TASKS = 1;
 
     private StandaloneHerder herder;
     @Mock protected Worker worker;
@@ -58,7 +58,7 @@ public class StandaloneHerderTest {
     @Mock protected Callback<String> createCallback;
 
     private Map<String, String> connectorProps;
-    private Properties taskProps;
+    private Map<String, String> taskProps;
 
     @Before
     public void setup() {
@@ -68,11 +68,10 @@ public class StandaloneHerderTest {
         connectorProps = new HashMap<>();
         connectorProps.put(ConnectorConfig.NAME_CONFIG, CONNECTOR_NAME);
         connectorProps.put(SinkConnector.TOPICS_CONFIG, TOPICS_LIST_STR);
-        PowerMock.mockStatic(StandaloneHerder.class);
 
         // These can be anything since connectors can pass along whatever they 
want.
-        taskProps = new Properties();
-        taskProps.setProperty("foo", "bar");
+        taskProps = new HashMap<>();
+        taskProps.put("foo", "bar");
     }
 
     @Test
@@ -121,7 +120,9 @@ public class StandaloneHerderTest {
     public void testCreateAndStop() throws Exception {
         connector = PowerMock.createMock(BogusSourceConnector.class);
         expectAdd(BogusSourceConnector.class, BogusSourceTask.class, false);
+        // herder.stop() should stop any running connectors and tasks even if 
destroyConnector was not invoked
         expectStop();
+
         PowerMock.replayAll();
 
         herder.addConnector(connectorProps, createCallback);
@@ -135,36 +136,30 @@ public class StandaloneHerderTest {
                            boolean sink) throws Exception {
         connectorProps.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, 
connClass.getName());
 
-        PowerMock.expectPrivate(StandaloneHerder.class, 
"instantiateConnector", connClass.getName())
-                .andReturn(connector);
-
-        createCallback.onCompletion(null, CONNECTOR_NAME);
+        worker.addConnector(EasyMock.eq(new ConnectorConfig(connectorProps)), 
EasyMock.anyObject(HerderConnectorContext.class));
         PowerMock.expectLastCall();
 
-        connector.initialize(EasyMock.anyObject(HerderConnectorContext.class));
-        PowerMock.expectLastCall();
-        connector.start(new Properties());
+        createCallback.onCompletion(null, CONNECTOR_NAME);
         PowerMock.expectLastCall();
 
-        // Just return the connector properties for the individual task we 
generate by default
-        EasyMock.<Class<? extends 
Task>>expect(connector.taskClass()).andReturn(taskClass);
-
-        
EasyMock.expect(connector.taskConfigs(ConnectorConfig.TASKS_MAX_DEFAULT))
-                .andReturn(Arrays.asList(taskProps));
         // And we should instantiate the tasks. For a sink task, we should see 
added properties for
         // the input topic partitions
-        Properties generatedTaskProps = new Properties();
+        Map<String, String> generatedTaskProps = new HashMap<>();
         generatedTaskProps.putAll(taskProps);
+        generatedTaskProps.put(TaskConfig.TASK_CLASS_CONFIG, 
taskClass.getName());
         if (sink)
-            generatedTaskProps.setProperty(SinkTask.TOPICS_CONFIG, 
TOPICS_LIST_STR);
-        worker.addTask(new ConnectorTaskId(CONNECTOR_NAME, 0), 
taskClass.getName(), generatedTaskProps);
+            generatedTaskProps.put(SinkTask.TOPICS_CONFIG, TOPICS_LIST_STR);
+        EasyMock.expect(worker.reconfigureConnectorTasks(CONNECTOR_NAME, 
DEFAULT_MAX_TASKS, TOPICS_LIST))
+                .andReturn(Collections.singletonMap(new 
ConnectorTaskId(CONNECTOR_NAME, 0), generatedTaskProps));
+
+        worker.addTask(new ConnectorTaskId(CONNECTOR_NAME, 0), new 
TaskConfig(generatedTaskProps));
         PowerMock.expectLastCall();
     }
 
     private void expectStop() {
         worker.stopTask(new ConnectorTaskId(CONNECTOR_NAME, 0));
         EasyMock.expectLastCall();
-        connector.stop();
+        worker.stopConnector(CONNECTOR_NAME);
         EasyMock.expectLastCall();
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/2e617735/copycat/runtime/src/test/java/org/apache/kafka/copycat/storage/KafkaConfigStorageTest.java
----------------------------------------------------------------------
diff --git 
a/copycat/runtime/src/test/java/org/apache/kafka/copycat/storage/KafkaConfigStorageTest.java
 
b/copycat/runtime/src/test/java/org/apache/kafka/copycat/storage/KafkaConfigStorageTest.java
index b02b752..cf9f8aa 100644
--- 
a/copycat/runtime/src/test/java/org/apache/kafka/copycat/storage/KafkaConfigStorageTest.java
+++ 
b/copycat/runtime/src/test/java/org/apache/kafka/copycat/storage/KafkaConfigStorageTest.java
@@ -169,6 +169,12 @@ public class KafkaConfigStorageTest {
         connectorReconfiguredCallback.onCompletion(null, CONNECTOR_IDS.get(1));
         EasyMock.expectLastCall();
 
+        // Config deletion
+        expectConvertWriteAndRead(
+                CONNECTOR_CONFIG_KEYS.get(1), 
KafkaConfigStorage.CONNECTOR_CONFIGURATION_V0, null, null, null);
+        connectorReconfiguredCallback.onCompletion(null, CONNECTOR_IDS.get(1));
+        EasyMock.expectLastCall();
+
         expectStop();
 
         PowerMock.replayAll();
@@ -185,17 +191,24 @@ public class KafkaConfigStorageTest {
         // Writing should block until it is written and read back from Kafka
         configStorage.putConnectorConfig(CONNECTOR_IDS.get(0), 
SAMPLE_CONFIGS.get(0));
         configState = configStorage.snapshot();
-        assertEquals(0, configState.offset());
+        assertEquals(1, configState.offset());
         assertEquals(SAMPLE_CONFIGS.get(0), 
configState.connectorConfig(CONNECTOR_IDS.get(0)));
         assertNull(configState.connectorConfig(CONNECTOR_IDS.get(1)));
 
         // Second should also block and all configs should still be available
         configStorage.putConnectorConfig(CONNECTOR_IDS.get(1), 
SAMPLE_CONFIGS.get(1));
         configState = configStorage.snapshot();
-        assertEquals(1, configState.offset());
+        assertEquals(2, configState.offset());
         assertEquals(SAMPLE_CONFIGS.get(0), 
configState.connectorConfig(CONNECTOR_IDS.get(0)));
         assertEquals(SAMPLE_CONFIGS.get(1), 
configState.connectorConfig(CONNECTOR_IDS.get(1)));
 
+        // Deletion should remove the second one we added
+        configStorage.putConnectorConfig(CONNECTOR_IDS.get(1), null);
+        configState = configStorage.snapshot();
+        assertEquals(3, configState.offset());
+        assertEquals(SAMPLE_CONFIGS.get(0), 
configState.connectorConfig(CONNECTOR_IDS.get(0)));
+        assertNull(configState.connectorConfig(CONNECTOR_IDS.get(1)));
+
         configStorage.stop();
 
         PowerMock.verifyAll();
@@ -255,13 +268,13 @@ public class KafkaConfigStorageTest {
 
         // Validate root config by listing all connectors and tasks
         configState = configStorage.snapshot();
-        assertEquals(2, configState.offset());
+        assertEquals(3, configState.offset());
         String connectorName = CONNECTOR_IDS.get(0);
         assertEquals(Arrays.asList(connectorName), new 
ArrayList<>(configState.connectors()));
-        assertEquals(Arrays.asList(TASK_IDS.get(0), TASK_IDS.get(1)), 
configState.tasks(connectorName));
+        assertEquals(new HashSet<>(Arrays.asList(TASK_IDS.get(0), 
TASK_IDS.get(1))), configState.tasks(connectorName));
         assertEquals(SAMPLE_CONFIGS.get(0), 
configState.taskConfig(TASK_IDS.get(0)));
         assertEquals(SAMPLE_CONFIGS.get(1), 
configState.taskConfig(TASK_IDS.get(1)));
-        assertEquals(new HashSet<>(Collections.EMPTY_LIST), 
configState.inconsistentConnectors());
+        assertEquals(Collections.EMPTY_SET, 
configState.inconsistentConnectors());
 
         configStorage.stop();
 
@@ -306,16 +319,16 @@ public class KafkaConfigStorageTest {
 
         // Should see a single connector and its config should be the last one 
seen anywhere in the log
         ClusterConfigState configState = configStorage.snapshot();
-        assertEquals(6, configState.offset()); // Should always be last read, 
even if uncommitted
+        assertEquals(7, configState.offset()); // Should always be next to be 
read, even if uncommitted
         assertEquals(Arrays.asList(CONNECTOR_IDS.get(0)), new 
ArrayList<>(configState.connectors()));
         // CONNECTOR_CONFIG_STRUCTS[2] -> SAMPLE_CONFIGS[2]
         assertEquals(SAMPLE_CONFIGS.get(2), 
configState.connectorConfig(CONNECTOR_IDS.get(0)));
         // Should see 2 tasks for that connector. Only config updates before 
the root key update should be reflected
-        assertEquals(Arrays.asList(TASK_IDS.get(0), TASK_IDS.get(1)), 
configState.tasks(CONNECTOR_IDS.get(0)));
+        assertEquals(new HashSet<>(Arrays.asList(TASK_IDS.get(0), 
TASK_IDS.get(1))), configState.tasks(CONNECTOR_IDS.get(0)));
         // Both TASK_CONFIG_STRUCTS[0] -> SAMPLE_CONFIGS[0]
         assertEquals(SAMPLE_CONFIGS.get(0), 
configState.taskConfig(TASK_IDS.get(0)));
         assertEquals(SAMPLE_CONFIGS.get(0), 
configState.taskConfig(TASK_IDS.get(1)));
-        assertEquals(new HashSet<>(Collections.EMPTY_LIST), 
configState.inconsistentConnectors());
+        assertEquals(Collections.EMPTY_SET, 
configState.inconsistentConnectors());
 
         configStorage.stop();
 
@@ -374,10 +387,10 @@ public class KafkaConfigStorageTest {
         configStorage.start();
         // After reading the log, it should have been in an inconsistent state
         ClusterConfigState configState = configStorage.snapshot();
-        assertEquals(5, configState.offset()); // Should always be last read, 
not last committed
+        assertEquals(6, configState.offset()); // Should always be next to be 
read, not last committed
         assertEquals(Arrays.asList(CONNECTOR_IDS.get(0)), new 
ArrayList<>(configState.connectors()));
         // Inconsistent data should leave us with no tasks listed for the 
connector and an entry in the inconsistent list
-        assertEquals(Collections.EMPTY_LIST, 
configState.tasks(CONNECTOR_IDS.get(0)));
+        assertEquals(Collections.EMPTY_SET, 
configState.tasks(CONNECTOR_IDS.get(0)));
         // Both TASK_CONFIG_STRUCTS[0] -> SAMPLE_CONFIGS[0]
         assertNull(configState.taskConfig(TASK_IDS.get(0)));
         assertNull(configState.taskConfig(TASK_IDS.get(1)));
@@ -398,11 +411,11 @@ public class KafkaConfigStorageTest {
         configState = configStorage.snapshot();
         // This is only two more ahead of the last one because multiple calls 
fail, and so their configs are not written
         // to the topic. Only the last call with 1 task config + 1 commit 
actually gets written.
-        assertEquals(7, configState.offset());
+        assertEquals(8, configState.offset());
         assertEquals(Arrays.asList(CONNECTOR_IDS.get(0)), new 
ArrayList<>(configState.connectors()));
-        assertEquals(Arrays.asList(TASK_IDS.get(0)), 
configState.tasks(CONNECTOR_IDS.get(0)));
+        assertEquals(new HashSet<>(Arrays.asList(TASK_IDS.get(0))), 
configState.tasks(CONNECTOR_IDS.get(0)));
         assertEquals(SAMPLE_CONFIGS.get(0), 
configState.taskConfig(TASK_IDS.get(0)));
-        assertEquals(new HashSet<>(Collections.EMPTY_LIST), 
configState.inconsistentConnectors());
+        assertEquals(Collections.EMPTY_SET, 
configState.inconsistentConnectors());
 
         configStorage.stop();
 
@@ -446,17 +459,19 @@ public class KafkaConfigStorageTest {
     private void expectConvertWriteRead(final String configKey, final Schema 
valueSchema, final byte[] serialized,
                                         final String dataFieldName, final 
Object dataFieldValue) {
         final Capture<Struct> capturedRecord = EasyMock.newCapture();
-        EasyMock.expect(converter.fromCopycatData(EasyMock.eq(TOPIC), 
EasyMock.eq(valueSchema), EasyMock.capture(capturedRecord)))
-                .andReturn(serialized);
+        if (serialized != null)
+            EasyMock.expect(converter.fromCopycatData(EasyMock.eq(TOPIC), 
EasyMock.eq(valueSchema), EasyMock.capture(capturedRecord)))
+                    .andReturn(serialized);
         storeLog.send(EasyMock.eq(configKey), EasyMock.aryEq(serialized));
         PowerMock.expectLastCall();
         EasyMock.expect(converter.toCopycatData(EasyMock.eq(TOPIC), 
EasyMock.aryEq(serialized)))
                 .andAnswer(new IAnswer<SchemaAndValue>() {
                     @Override
                     public SchemaAndValue answer() throws Throwable {
-                        assertEquals(dataFieldValue, 
capturedRecord.getValue().get(dataFieldName));
+                        if (dataFieldName != null)
+                            assertEquals(dataFieldValue, 
capturedRecord.getValue().get(dataFieldName));
                         // Note null schema because default settings for 
internal serialization are schema-less
-                        return new SchemaAndValue(null, 
structToMap(capturedRecord.getValue()));
+                        return new SchemaAndValue(null, serialized == null ? 
null : structToMap(capturedRecord.getValue()));
                     }
                 });
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/2e617735/copycat/runtime/src/test/java/org/apache/kafka/copycat/util/TestFuture.java
----------------------------------------------------------------------
diff --git 
a/copycat/runtime/src/test/java/org/apache/kafka/copycat/util/TestFuture.java 
b/copycat/runtime/src/test/java/org/apache/kafka/copycat/util/TestFuture.java
index c5978ec..55e24c8 100644
--- 
a/copycat/runtime/src/test/java/org/apache/kafka/copycat/util/TestFuture.java
+++ 
b/copycat/runtime/src/test/java/org/apache/kafka/copycat/util/TestFuture.java
@@ -102,8 +102,14 @@ public class TestFuture<T> implements Future<T> {
             }
         }
 
-        if (exception != null)
-            throw new ExecutionException(exception);
+        if (exception != null) {
+            if (exception instanceof TimeoutException)
+                throw (TimeoutException) exception;
+            else if (exception instanceof InterruptedException)
+                throw (InterruptedException) exception;
+            else
+                throw new ExecutionException(exception);
+        }
         return result;
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/2e617735/tests/kafkatest/services/copycat.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/copycat.py 
b/tests/kafkatest/services/copycat.py
index 4e2ab40..45ef330 100644
--- a/tests/kafkatest/services/copycat.py
+++ b/tests/kafkatest/services/copycat.py
@@ -39,6 +39,16 @@ class CopycatServiceBase(Service):
         except:
             return []
 
+    def set_configs(self, config_template, connector_config_templates):
+        """
+        Set configurations for the worker and the connector to run on
+        it. These are not provided in the constructor because the worker
+        config generally needs access to ZK/Kafka services to
+        create the configuration.
+        """
+        self.config_template = config_template
+        self.connector_config_templates = connector_config_templates
+
     def stop_node(self, node, clean_shutdown=True):
         pids = self.pids(node)
         sig = signal.SIGTERM if clean_shutdown else signal.SIGKILL
@@ -51,7 +61,7 @@ class CopycatServiceBase(Service):
         node.account.ssh("rm -f /mnt/copycat.pid", allow_fail=False)
 
     def restart(self):
-        # We don't want to do any clean up here, just restart the process
+        # We don't want to do any clean up here, just restart the process.
         for node in self.nodes:
             self.stop_node(node)
             self.start_node(node)
@@ -62,8 +72,11 @@ class CopycatServiceBase(Service):
                              (self.__class__.__name__, node.account))
         for pid in self.pids(node):
             node.account.signal(pid, signal.SIGKILL, allow_fail=False)
-        node.account.ssh("rm -rf /mnt/copycat.pid /mnt/copycat.log 
/mnt/copycat.properties /mnt/copycat-connector.properties " + " 
".join(self.files), allow_fail=False)
 
+        node.account.ssh("rm -rf /mnt/copycat.pid /mnt/copycat.log 
/mnt/copycat.properties  " + " ".join(self.config_filenames() + self.files), 
allow_fail=False)
+
+    def config_filenames(self):
+        return ["/mnt/copycat-connector-" + str(idx) + ".properties" for idx, 
template in enumerate(self.connector_config_templates)]
 
 class CopycatStandaloneService(CopycatServiceBase):
     """Runs Copycat in standalone mode."""
@@ -71,16 +84,6 @@ class CopycatStandaloneService(CopycatServiceBase):
     def __init__(self, context, kafka, files):
         super(CopycatStandaloneService, self).__init__(context, 1, kafka, 
files)
 
-    def set_configs(self, config_template, connector_config_template):
-        """
-        Set configurations for the worker and the connector to run on
-        it. These are not provided in the constructor because the worker
-        config generally needs access to ZK/Kafka services to
-        create the configuration.
-        """
-        self.config_template = config_template
-        self.connector_config_template = connector_config_template
-
     # For convenience since this service only makes sense with a single node
     @property
     def node(self):
@@ -88,12 +91,17 @@ class CopycatStandaloneService(CopycatServiceBase):
 
     def start_node(self, node):
         node.account.create_file("/mnt/copycat.properties", 
self.config_template)
-        node.account.create_file("/mnt/copycat-connector.properties", 
self.connector_config_template)
+        remote_connector_configs = []
+        for idx, template in enumerate(self.connector_config_templates):
+            target_file = "/mnt/copycat-connector-" + str(idx) + ".properties"
+            node.account.create_file(target_file, template)
+            remote_connector_configs.append(target_file)
 
         self.logger.info("Starting Copycat standalone process")
         with node.account.monitor_log("/mnt/copycat.log") as monitor:
-            node.account.ssh("/opt/kafka/bin/copycat-standalone.sh 
/mnt/copycat.properties /mnt/copycat-connector.properties " +
-                             "1>> /mnt/copycat.log 2>> /mnt/copycat.log & echo 
$! > /mnt/copycat.pid")
+            node.account.ssh("/opt/kafka/bin/copycat-standalone.sh 
/mnt/copycat.properties " +
+                             " ".join(remote_connector_configs) +
+                             " 1>> /mnt/copycat.log 2>> /mnt/copycat.log & 
echo $! > /mnt/copycat.pid")
             monitor.wait_until('Copycat started', timeout_sec=10, 
err_msg="Never saw message indicating Copycat finished startup")
 
         if len(self.pids(node)) == 0:
@@ -108,27 +116,28 @@ class CopycatDistributedService(CopycatServiceBase):
         super(CopycatDistributedService, self).__init__(context, num_nodes, 
kafka, files)
         self.offsets_topic = offsets_topic
         self.configs_topic = configs_topic
-
-    def set_configs(self, config_template, connector_config_template):
-        """
-        Set configurations for the worker and the connector to run on
-        it. These are not provided in the constructor because the worker
-        config generally needs access to ZK/Kafka services to
-        create the configuration.
-        """
-        self.config_template = config_template
-        self.connector_config_template = connector_config_template
+        self.first_start = True
 
     def start_node(self, node):
         node.account.create_file("/mnt/copycat.properties", 
self.config_template)
-        node.account.create_file("/mnt/copycat-connector.properties", 
self.connector_config_template)
+        remote_connector_configs = []
+        for idx, template in enumerate(self.connector_config_templates):
+            target_file = "/mnt/copycat-connector-" + str(idx) + ".properties"
+            node.account.create_file(target_file, template)
+            remote_connector_configs.append(target_file)
 
-        self.logger.info("Starting Copycat standalone process")
+        self.logger.info("Starting Copycat distributed process")
         with node.account.monitor_log("/mnt/copycat.log") as monitor:
-            node.account.ssh("/opt/kafka/bin/copycat-distributed.sh 
/mnt/copycat.properties /mnt/copycat-connector.properties " +
-                             "1>> /mnt/copycat.log 2>> /mnt/copycat.log & echo 
$! > /mnt/copycat.pid")
+            cmd = "/opt/kafka/bin/copycat-distributed.sh 
/mnt/copycat.properties "
+            # Only submit connectors on the first node so they don't get 
submitted multiple times. Also only submit them
+            # the first time the node is started so
+            if self.first_start and node == self.nodes[0]:
+                cmd += " ".join(remote_connector_configs)
+            cmd += " 1>> /mnt/copycat.log 2>> /mnt/copycat.log & echo $! > 
/mnt/copycat.pid"
+            node.account.ssh(cmd)
             monitor.wait_until('Copycat started', timeout_sec=10, 
err_msg="Never saw message indicating Copycat finished startup")
 
         if len(self.pids(node)) == 0:
             raise RuntimeError("No process ids recorded")
 
+        self.first_start = False

http://git-wip-us.apache.org/repos/asf/kafka/blob/2e617735/tests/kafkatest/tests/copycat_distributed_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/copycat_distributed_test.py 
b/tests/kafkatest/tests/copycat_distributed_test.py
index 9d00334..57965e5 100644
--- a/tests/kafkatest/tests/copycat_distributed_test.py
+++ b/tests/kafkatest/tests/copycat_distributed_test.py
@@ -31,10 +31,12 @@ class CopycatDistributedFileTest(KafkaTest):
     OFFSETS_TOPIC = "copycat-offsets"
     CONFIG_TOPIC = "copycat-configs"
 
-    FIRST_INPUT_LISTS = [["foo", "bar", "baz"], ["foo2", "bar2", "baz2"]]
-    FIRST_INPUTS = ["\n".join(input_list) + "\n" for input_list in 
FIRST_INPUT_LISTS]
-    SECOND_INPUT_LISTS = [["razz", "ma", "tazz"], ["razz2", "ma2", "tazz2"]]
-    SECOND_INPUTS = ["\n".join(input_list) + "\n" for input_list in 
SECOND_INPUT_LISTS]
+    # Since tasks can be assigned to any node and we're testing with files, we 
need to make sure the content is the same
+    # across all nodes.
+    FIRST_INPUT_LIST = ["foo", "bar", "baz"]
+    FIRST_INPUTS = "\n".join(FIRST_INPUT_LIST) + "\n"
+    SECOND_INPUT_LIST = ["razz", "ma", "tazz"]
+    SECOND_INPUTS = "\n".join(SECOND_INPUT_LIST) + "\n"
 
     SCHEMA = { "type": "string", "optional": False }
 
@@ -43,13 +45,7 @@ class CopycatDistributedFileTest(KafkaTest):
             'test' : { 'partitions': 1, 'replication-factor': 1 }
         })
 
-        # FIXME these should have multiple nodes. However, currently the 
connectors are submitted via command line,
-        # which means we would get duplicates. Both would run, but they would 
have conflicting keys for offsets and
-        # configs. Until we have real distributed coordination of workers with 
unified connector submission, we need
-        # to restrict each of these to a single node.
-        self.num_nodes = 1
-        self.source = CopycatDistributedService(test_context, self.num_nodes, 
self.kafka, [self.INPUT_FILE])
-        self.sink = CopycatDistributedService(test_context, self.num_nodes, 
self.kafka, [self.OUTPUT_FILE])
+        self.cc = CopycatDistributedService(test_context, 2, self.kafka, 
[self.INPUT_FILE, self.OUTPUT_FILE])
 
     def test_file_source_and_sink(self, 
converter="org.apache.kafka.copycat.json.JsonConverter", schemas=True):
         assert converter != None, "converter type must be set"
@@ -58,33 +54,40 @@ class CopycatDistributedFileTest(KafkaTest):
         self.value_converter = converter
         self.schemas = schemas
 
-        # These need to be set
-        self.source.set_configs(self.render("copycat-distributed.properties"), 
self.render("copycat-file-source.properties"))
-        self.sink.set_configs(self.render("copycat-distributed.properties"), 
self.render("copycat-file-sink.properties"))
+        self.cc.set_configs(self.render("copycat-distributed.properties"), 
[self.render("copycat-file-source.properties"), 
self.render("copycat-file-sink.properties")])
 
-        self.source.start()
-        self.sink.start()
+        self.cc.start()
 
-        # Generating data on the source node should generate new records and 
create new output on the sink node
-        for node, input in zip(self.source.nodes, self.FIRST_INPUTS):
-            node.account.ssh("echo -e -n " + repr(input) + " >> " + 
self.INPUT_FILE)
-        wait_until(lambda: 
self.validate_output(self.FIRST_INPUT_LISTS[:self.num_nodes]), timeout_sec=60, 
err_msg="Data added to input file was not seen in the output file in a 
reasonable amount of time.")
+        # Generating data on the source node should generate new records and 
create new output on the sink node. Timeouts
+        # here need to be more generous than they are for standalone mode 
because a) it takes longer to write configs,
+        # do rebalancing of the group, etc, and b) without explicit leave 
group support, rebalancing takes awhile
+        for node in self.cc.nodes:
+            node.account.ssh("echo -e -n " + repr(self.FIRST_INPUTS) + " >> " 
+ self.INPUT_FILE)
+        wait_until(lambda: self.validate_output(self.FIRST_INPUT_LIST), 
timeout_sec=120, err_msg="Data added to input file was not seen in the output 
file in a reasonable amount of time.")
 
         # Restarting both should result in them picking up where they left off,
         # only processing new data.
-        self.source.restart()
-        self.sink.restart()
+        self.cc.restart()
 
-        for node, input in zip(self.source.nodes, self.SECOND_INPUTS):
-            node.account.ssh("echo -e -n " + repr(input) + " >> " + 
self.INPUT_FILE)
-        wait_until(lambda: 
self.validate_output(self.FIRST_INPUT_LISTS[:self.num_nodes] + 
self.SECOND_INPUT_LISTS[:self.num_nodes]), timeout_sec=60, err_msg="Sink output 
file never converged to the same state as the input file")
+        for node in self.cc.nodes:
+            node.account.ssh("echo -e -n " + repr(self.SECOND_INPUTS) + " >> " 
+ self.INPUT_FILE)
+        wait_until(lambda: self.validate_output(self.FIRST_INPUT_LIST + 
self.SECOND_INPUT_LIST), timeout_sec=120, err_msg="Sink output file never 
converged to the same state as the input file")
 
-    def validate_output(self, inputs):
+    def validate_output(self, input):
+        input_set = set(input)
+        # Output needs to be collected from all nodes because we can't be sure 
where the tasks will be scheduled.
+        # Between the first and second rounds, we might even end up with half 
the data on each node.
+        output_set = set(itertools.chain(*[
+            [line.strip() for line in self.file_contents(node, 
self.OUTPUT_FILE)] for node in self.cc.nodes
+        ]))
+        #print input_set, output_set
+        return input_set == output_set
+
+
+    def file_contents(self, node, file):
         try:
-            input_set = set(itertools.chain(*inputs))
-            output_set = set(itertools.chain(*[
-                [line.strip() for line in node.account.ssh_capture("cat " + 
self.OUTPUT_FILE)] for node in self.sink.nodes
-            ]))
-            return input_set == output_set
+            # Convert to a list here or the CalledProcessError may be returned 
during a call to the generator instead of
+            # immediately
+            return list(node.account.ssh_capture("cat " + file))
         except subprocess.CalledProcessError:
-            return False
+            return []

http://git-wip-us.apache.org/repos/asf/kafka/blob/2e617735/tests/kafkatest/tests/copycat_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/copycat_test.py 
b/tests/kafkatest/tests/copycat_test.py
index 1bd8ccb..bad5330 100644
--- a/tests/kafkatest/tests/copycat_test.py
+++ b/tests/kafkatest/tests/copycat_test.py
@@ -60,9 +60,8 @@ class CopycatStandaloneFileTest(KafkaTest):
         self.value_converter = converter
         self.schemas = schemas
 
-        # These need to be set
-        self.source.set_configs(self.render("copycat-standalone.properties"), 
self.render("copycat-file-source.properties"))
-        self.sink.set_configs(self.render("copycat-standalone.properties"), 
self.render("copycat-file-sink.properties"))
+        self.source.set_configs(self.render("copycat-standalone.properties"), 
[self.render("copycat-file-source.properties")])
+        self.sink.set_configs(self.render("copycat-standalone.properties"), 
[self.render("copycat-file-sink.properties")])
 
         self.source.start()
         self.sink.start()

http://git-wip-us.apache.org/repos/asf/kafka/blob/2e617735/tests/kafkatest/tests/templates/copycat-distributed.properties
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/templates/copycat-distributed.properties 
b/tests/kafkatest/tests/templates/copycat-distributed.properties
index 31f9901..325dc85 100644
--- a/tests/kafkatest/tests/templates/copycat-distributed.properties
+++ b/tests/kafkatest/tests/templates/copycat-distributed.properties
@@ -15,6 +15,8 @@
 
 bootstrap.servers={{ kafka.bootstrap_servers() }}
 
+group.id={{ group|default("copycat-cluster") }}
+
 key.converter={{ 
key_converter|default("org.apache.kafka.copycat.json.JsonConverter") }}
 value.converter={{ 
value_converter|default("org.apache.kafka.copycat.json.JsonConverter") }}
 {% if key_converter is not defined or key_converter.endswith("JsonConverter") 
%}
@@ -30,4 +32,7 @@ internal.key.converter.schemas.enable=false
 internal.value.converter.schemas.enable=false
 
 offset.storage.topic={{ OFFSETS_TOPIC }}
-config.storage.topic={{ CONFIG_TOPIC }}
\ No newline at end of file
+config.storage.topic={{ CONFIG_TOPIC }}
+
+# Make sure data gets flushed frequently so tests don't have to wait to ensure 
they see data in output systems
+offset.flush.interval.ms=5000

Reply via email to