http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/distributed/WorkerCoordinatorTest.java
----------------------------------------------------------------------
diff --git 
a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/distributed/WorkerCoordinatorTest.java
 
b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/distributed/WorkerCoordinatorTest.java
deleted file mode 100644
index ac9df44..0000000
--- 
a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/distributed/WorkerCoordinatorTest.java
+++ /dev/null
@@ -1,443 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-package org.apache.kafka.copycat.runtime.distributed;
-
-import org.apache.kafka.clients.ClientRequest;
-import org.apache.kafka.clients.Metadata;
-import org.apache.kafka.clients.MockClient;
-import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
-import org.apache.kafka.common.Cluster;
-import org.apache.kafka.common.Node;
-import org.apache.kafka.common.metrics.Metrics;
-import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.protocol.types.Struct;
-import org.apache.kafka.common.requests.GroupCoordinatorResponse;
-import org.apache.kafka.common.requests.JoinGroupResponse;
-import org.apache.kafka.common.requests.SyncGroupRequest;
-import org.apache.kafka.common.requests.SyncGroupResponse;
-import org.apache.kafka.common.utils.MockTime;
-import org.apache.kafka.copycat.storage.KafkaConfigStorage;
-import org.apache.kafka.copycat.util.ConnectorTaskId;
-import org.apache.kafka.test.TestUtils;
-import org.easymock.EasyMock;
-import org.easymock.Mock;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.powermock.api.easymock.PowerMock;
-import org.powermock.reflect.Whitebox;
-
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-
-public class WorkerCoordinatorTest {
-
-    private static final String LEADER_URL = "leaderUrl:8083";
-    private static final String MEMBER_URL = "memberUrl:8083";
-
-    private String connectorId = "connector";
-    private String connectorId2 = "connector2";
-    private ConnectorTaskId taskId0 = new ConnectorTaskId(connectorId, 0);
-    private ConnectorTaskId taskId1 = new ConnectorTaskId(connectorId, 1);
-    private ConnectorTaskId taskId2 = new ConnectorTaskId(connectorId2, 0);
-
-    private String groupId = "test-group";
-    private int sessionTimeoutMs = 10;
-    private int heartbeatIntervalMs = 2;
-    private long retryBackoffMs = 100;
-    private long requestTimeoutMs = 5000;
-    private MockTime time;
-    private MockClient client;
-    private Cluster cluster = TestUtils.singletonCluster("topic", 1);
-    private Node node = cluster.nodes().get(0);
-    private Metadata metadata;
-    private Metrics metrics;
-    private Map<String, String> metricTags = new LinkedHashMap<>();
-    private ConsumerNetworkClient consumerClient;
-    private MockRebalanceListener rebalanceListener;
-    @Mock private KafkaConfigStorage configStorage;
-    private WorkerCoordinator coordinator;
-
-    private ClusterConfigState configState1;
-    private ClusterConfigState configState2;
-
-    @Before
-    public void setup() {
-        this.time = new MockTime();
-        this.client = new MockClient(time);
-        this.metadata = new Metadata(0, Long.MAX_VALUE);
-        this.metadata.update(cluster, time.milliseconds());
-        this.consumerClient = new ConsumerNetworkClient(client, metadata, 
time, 100);
-        this.metrics = new Metrics(time);
-        this.rebalanceListener = new MockRebalanceListener();
-        this.configStorage = PowerMock.createMock(KafkaConfigStorage.class);
-
-        client.setNode(node);
-
-        this.coordinator = new WorkerCoordinator(consumerClient,
-                groupId,
-                sessionTimeoutMs,
-                heartbeatIntervalMs,
-                metrics,
-                "consumer" + groupId,
-                metricTags,
-                time,
-                requestTimeoutMs,
-                retryBackoffMs,
-                LEADER_URL,
-                configStorage,
-                rebalanceListener);
-
-        configState1 = new ClusterConfigState(
-                1L, Collections.singletonMap(connectorId, 1),
-                Collections.singletonMap(connectorId, (Map<String, String>) 
new HashMap<String, String>()),
-                Collections.singletonMap(taskId0, (Map<String, String>) new 
HashMap<String, String>()),
-                Collections.<String>emptySet()
-        );
-        Map<String, Integer> configState2ConnectorTaskCounts = new HashMap<>();
-        configState2ConnectorTaskCounts.put(connectorId, 2);
-        configState2ConnectorTaskCounts.put(connectorId2, 1);
-        Map<String, Map<String, String>> configState2ConnectorConfigs = new 
HashMap<>();
-        configState2ConnectorConfigs.put(connectorId, new HashMap<String, 
String>());
-        configState2ConnectorConfigs.put(connectorId2, new HashMap<String, 
String>());
-        Map<ConnectorTaskId, Map<String, String>> configState2TaskConfigs = 
new HashMap<>();
-        configState2TaskConfigs.put(taskId0, new HashMap<String, String>());
-        configState2TaskConfigs.put(taskId1, new HashMap<String, String>());
-        configState2TaskConfigs.put(taskId2, new HashMap<String, String>());
-        configState2 = new ClusterConfigState(
-                2L, configState2ConnectorTaskCounts,
-                configState2ConnectorConfigs,
-                configState2TaskConfigs,
-                Collections.<String>emptySet()
-        );
-    }
-
-    @After
-    public void teardown() {
-        this.metrics.close();
-    }
-
-    // We only test functionality unique to WorkerCoordinator. Most 
functionality is already well tested via the tests
-    // that cover AbstractCoordinator & ConsumerCoordinator.
-
-    @Test
-    public void testMetadata() {
-        EasyMock.expect(configStorage.snapshot()).andReturn(configState1);
-
-        PowerMock.replayAll();
-
-        LinkedHashMap<String, ByteBuffer> serialized = coordinator.metadata();
-        assertEquals(1, serialized.size());
-        CopycatProtocol.WorkerState state = 
CopycatProtocol.deserializeMetadata(serialized.get(WorkerCoordinator.DEFAULT_SUBPROTOCOL));
-        assertEquals(1, state.offset());
-
-        PowerMock.verifyAll();
-    }
-
-    @Test
-    public void testNormalJoinGroupLeader() {
-        EasyMock.expect(configStorage.snapshot()).andReturn(configState1);
-
-        PowerMock.replayAll();
-
-        final String consumerId = "leader";
-
-        client.prepareResponse(groupMetadataResponse(node, 
Errors.NONE.code()));
-        coordinator.ensureCoordinatorKnown();
-
-        // normal join group
-        Map<String, Long> memberConfigOffsets = new HashMap<>();
-        memberConfigOffsets.put("leader", 1L);
-        memberConfigOffsets.put("member", 1L);
-        client.prepareResponse(joinGroupLeaderResponse(1, consumerId, 
memberConfigOffsets, Errors.NONE.code()));
-        client.prepareResponse(new MockClient.RequestMatcher() {
-            @Override
-            public boolean matches(ClientRequest request) {
-                SyncGroupRequest sync = new 
SyncGroupRequest(request.request().body());
-                return sync.memberId().equals(consumerId) &&
-                        sync.generationId() == 1 &&
-                        sync.groupAssignment().containsKey(consumerId);
-            }
-        }, syncGroupResponse(CopycatProtocol.Assignment.NO_ERROR, "leader", 
1L, Collections.singletonList(connectorId),
-                Collections.<ConnectorTaskId>emptyList(), Errors.NONE.code()));
-        coordinator.ensureActiveGroup();
-
-        assertFalse(coordinator.needRejoin());
-        assertEquals(0, rebalanceListener.revokedCount);
-        assertEquals(1, rebalanceListener.assignedCount);
-        assertFalse(rebalanceListener.assignment.failed());
-        assertEquals(1L, rebalanceListener.assignment.offset());
-        assertEquals("leader", rebalanceListener.assignment.leader());
-        assertEquals(Collections.singletonList(connectorId), 
rebalanceListener.assignment.connectors());
-        assertEquals(Collections.emptyList(), 
rebalanceListener.assignment.tasks());
-
-        PowerMock.verifyAll();
-    }
-
-    @Test
-    public void testNormalJoinGroupFollower() {
-        EasyMock.expect(configStorage.snapshot()).andReturn(configState1);
-
-        PowerMock.replayAll();
-
-        final String memberId = "member";
-
-        client.prepareResponse(groupMetadataResponse(node, 
Errors.NONE.code()));
-        coordinator.ensureCoordinatorKnown();
-
-        // normal join group
-        client.prepareResponse(joinGroupFollowerResponse(1, memberId, 
"leader", Errors.NONE.code()));
-        client.prepareResponse(new MockClient.RequestMatcher() {
-            @Override
-            public boolean matches(ClientRequest request) {
-                SyncGroupRequest sync = new 
SyncGroupRequest(request.request().body());
-                return sync.memberId().equals(memberId) &&
-                        sync.generationId() == 1 &&
-                        sync.groupAssignment().isEmpty();
-            }
-        }, syncGroupResponse(CopycatProtocol.Assignment.NO_ERROR, "leader", 
1L, Collections.<String>emptyList(),
-                Collections.singletonList(taskId0), Errors.NONE.code()));
-        coordinator.ensureActiveGroup();
-
-        assertFalse(coordinator.needRejoin());
-        assertEquals(0, rebalanceListener.revokedCount);
-        assertEquals(1, rebalanceListener.assignedCount);
-        assertFalse(rebalanceListener.assignment.failed());
-        assertEquals(1L, rebalanceListener.assignment.offset());
-        assertEquals(Collections.emptyList(), 
rebalanceListener.assignment.connectors());
-        assertEquals(Collections.singletonList(taskId0), 
rebalanceListener.assignment.tasks());
-
-        PowerMock.verifyAll();
-    }
-
-    @Test
-    public void testJoinLeaderCannotAssign() {
-        // If the selected leader can't get up to the maximum offset, it will 
fail to assign and we should immediately
-        // need to retry the join.
-
-        // When the first round fails, we'll take an updated config snapshot
-        EasyMock.expect(configStorage.snapshot()).andReturn(configState1);
-        EasyMock.expect(configStorage.snapshot()).andReturn(configState2);
-
-        PowerMock.replayAll();
-
-        final String memberId = "member";
-
-        client.prepareResponse(groupMetadataResponse(node, 
Errors.NONE.code()));
-        coordinator.ensureCoordinatorKnown();
-
-        // config mismatch results in assignment error
-        client.prepareResponse(joinGroupFollowerResponse(1, memberId, 
"leader", Errors.NONE.code()));
-        MockClient.RequestMatcher matcher = new MockClient.RequestMatcher() {
-            @Override
-            public boolean matches(ClientRequest request) {
-                SyncGroupRequest sync = new 
SyncGroupRequest(request.request().body());
-                return sync.memberId().equals(memberId) &&
-                        sync.generationId() == 1 &&
-                        sync.groupAssignment().isEmpty();
-            }
-        };
-        client.prepareResponse(matcher, 
syncGroupResponse(CopycatProtocol.Assignment.CONFIG_MISMATCH, "leader", 10L,
-                Collections.<String>emptyList(), 
Collections.<ConnectorTaskId>emptyList(), Errors.NONE.code()));
-        client.prepareResponse(joinGroupFollowerResponse(1, memberId, 
"leader", Errors.NONE.code()));
-        client.prepareResponse(matcher, 
syncGroupResponse(CopycatProtocol.Assignment.NO_ERROR, "leader", 1L,
-                Collections.<String>emptyList(), 
Collections.singletonList(taskId0), Errors.NONE.code()));
-        coordinator.ensureActiveGroup();
-
-        PowerMock.verifyAll();
-    }
-
-    @Test
-    public void testRejoinGroup() {
-        EasyMock.expect(configStorage.snapshot()).andReturn(configState1);
-        EasyMock.expect(configStorage.snapshot()).andReturn(configState1);
-
-        PowerMock.replayAll();
-
-        client.prepareResponse(groupMetadataResponse(node, 
Errors.NONE.code()));
-        coordinator.ensureCoordinatorKnown();
-
-        // join the group once
-        client.prepareResponse(joinGroupFollowerResponse(1, "consumer", 
"leader", Errors.NONE.code()));
-        
client.prepareResponse(syncGroupResponse(CopycatProtocol.Assignment.NO_ERROR, 
"leader", 1L, Collections.<String>emptyList(),
-                Collections.singletonList(taskId0), Errors.NONE.code()));
-        coordinator.ensureActiveGroup();
-
-        assertEquals(0, rebalanceListener.revokedCount);
-        assertEquals(1, rebalanceListener.assignedCount);
-        assertFalse(rebalanceListener.assignment.failed());
-        assertEquals(1L, rebalanceListener.assignment.offset());
-        assertEquals(Collections.emptyList(), 
rebalanceListener.assignment.connectors());
-        assertEquals(Collections.singletonList(taskId0), 
rebalanceListener.assignment.tasks());
-
-        // and join the group again
-        coordinator.requestRejoin();
-        client.prepareResponse(joinGroupFollowerResponse(1, "consumer", 
"leader", Errors.NONE.code()));
-        
client.prepareResponse(syncGroupResponse(CopycatProtocol.Assignment.NO_ERROR, 
"leader", 1L, Collections.singletonList(connectorId),
-                Collections.<ConnectorTaskId>emptyList(), Errors.NONE.code()));
-        coordinator.ensureActiveGroup();
-
-        assertEquals(1, rebalanceListener.revokedCount);
-        assertEquals(Collections.emptyList(), 
rebalanceListener.revokedConnectors);
-        assertEquals(Collections.singletonList(taskId0), 
rebalanceListener.revokedTasks);
-        assertEquals(2, rebalanceListener.assignedCount);
-        assertFalse(rebalanceListener.assignment.failed());
-        assertEquals(1L, rebalanceListener.assignment.offset());
-        assertEquals(Collections.singletonList(connectorId), 
rebalanceListener.assignment.connectors());
-        assertEquals(Collections.emptyList(), 
rebalanceListener.assignment.tasks());
-
-        PowerMock.verifyAll();
-    }
-
-    @Test
-    public void testLeaderPerformAssignment1() throws Exception {
-        // Since all the protocol responses are mocked, the other tests 
validate doSync runs, but don't validate its
-        // output. So we test it directly here.
-
-        EasyMock.expect(configStorage.snapshot()).andReturn(configState1);
-
-        PowerMock.replayAll();
-
-        // Prime the current configuration state
-        coordinator.metadata();
-
-        Map<String, ByteBuffer> configs = new HashMap<>();
-        // Mark everyone as in sync with configState1
-        configs.put("leader", CopycatProtocol.serializeMetadata(new 
CopycatProtocol.WorkerState(LEADER_URL, 1L)));
-        configs.put("member", CopycatProtocol.serializeMetadata(new 
CopycatProtocol.WorkerState(MEMBER_URL, 1L)));
-        Map<String, ByteBuffer> result = Whitebox.invokeMethod(coordinator, 
"performAssignment", "leader", WorkerCoordinator.DEFAULT_SUBPROTOCOL, configs);
-
-        // configState1 has 1 connector, 1 task
-        CopycatProtocol.Assignment leaderAssignment = 
CopycatProtocol.deserializeAssignment(result.get("leader"));
-        assertEquals(false, leaderAssignment.failed());
-        assertEquals("leader", leaderAssignment.leader());
-        assertEquals(1, leaderAssignment.offset());
-        assertEquals(Collections.singletonList(connectorId), 
leaderAssignment.connectors());
-        assertEquals(Collections.emptyList(), leaderAssignment.tasks());
-
-        CopycatProtocol.Assignment memberAssignment = 
CopycatProtocol.deserializeAssignment(result.get("member"));
-        assertEquals(false, memberAssignment.failed());
-        assertEquals("leader", memberAssignment.leader());
-        assertEquals(1, memberAssignment.offset());
-        assertEquals(Collections.emptyList(), memberAssignment.connectors());
-        assertEquals(Collections.singletonList(taskId0), 
memberAssignment.tasks());
-
-        PowerMock.verifyAll();
-    }
-
-    @Test
-    public void testLeaderPerformAssignment2() throws Exception {
-        // Since all the protocol responses are mocked, the other tests 
validate doSync runs, but don't validate its
-        // output. So we test it directly here.
-
-        EasyMock.expect(configStorage.snapshot()).andReturn(configState2);
-
-        PowerMock.replayAll();
-
-        // Prime the current configuration state
-        coordinator.metadata();
-
-        Map<String, ByteBuffer> configs = new HashMap<>();
-        // Mark everyone as in sync with configState1
-        configs.put("leader", CopycatProtocol.serializeMetadata(new 
CopycatProtocol.WorkerState(LEADER_URL, 1L)));
-        configs.put("member", CopycatProtocol.serializeMetadata(new 
CopycatProtocol.WorkerState(MEMBER_URL, 1L)));
-        Map<String, ByteBuffer> result = Whitebox.invokeMethod(coordinator, 
"performAssignment", "leader", WorkerCoordinator.DEFAULT_SUBPROTOCOL, configs);
-
-        // configState2 has 2 connector, 3 tasks and should trigger round 
robin assignment
-        CopycatProtocol.Assignment leaderAssignment = 
CopycatProtocol.deserializeAssignment(result.get("leader"));
-        assertEquals(false, leaderAssignment.failed());
-        assertEquals("leader", leaderAssignment.leader());
-        assertEquals(1, leaderAssignment.offset());
-        assertEquals(Collections.singletonList(connectorId), 
leaderAssignment.connectors());
-        assertEquals(Arrays.asList(taskId1, taskId2), 
leaderAssignment.tasks());
-
-        CopycatProtocol.Assignment memberAssignment = 
CopycatProtocol.deserializeAssignment(result.get("member"));
-        assertEquals(false, memberAssignment.failed());
-        assertEquals("leader", memberAssignment.leader());
-        assertEquals(1, memberAssignment.offset());
-        assertEquals(Collections.singletonList(connectorId2), 
memberAssignment.connectors());
-        assertEquals(Collections.singletonList(taskId0), 
memberAssignment.tasks());
-
-        PowerMock.verifyAll();
-    }
-
-
-    private Struct groupMetadataResponse(Node node, short error) {
-        GroupCoordinatorResponse response = new 
GroupCoordinatorResponse(error, node);
-        return response.toStruct();
-    }
-
-    private Struct joinGroupLeaderResponse(int generationId, String memberId,
-                                           Map<String, Long> configOffsets, 
short error) {
-        Map<String, ByteBuffer> metadata = new HashMap<>();
-        for (Map.Entry<String, Long> configStateEntry : 
configOffsets.entrySet()) {
-            // We need a member URL, but it doesn't matter for the purposes of 
this test. Just set it to the member ID
-            String memberUrl = configStateEntry.getKey();
-            long configOffset = configStateEntry.getValue();
-            ByteBuffer buf = CopycatProtocol.serializeMetadata(new 
CopycatProtocol.WorkerState(memberUrl, configOffset));
-            metadata.put(configStateEntry.getKey(), buf);
-        }
-        return new JoinGroupResponse(error, generationId, 
WorkerCoordinator.DEFAULT_SUBPROTOCOL, memberId, memberId, metadata).toStruct();
-    }
-
-    private Struct joinGroupFollowerResponse(int generationId, String 
memberId, String leaderId, short error) {
-        return new JoinGroupResponse(error, generationId, 
WorkerCoordinator.DEFAULT_SUBPROTOCOL, memberId, leaderId,
-                Collections.<String, ByteBuffer>emptyMap()).toStruct();
-    }
-
-    private Struct syncGroupResponse(short assignmentError, String leader, 
long configOffset, List<String> connectorIds,
-                                     List<ConnectorTaskId> taskIds, short 
error) {
-        CopycatProtocol.Assignment assignment = new 
CopycatProtocol.Assignment(assignmentError, leader, LEADER_URL, configOffset, 
connectorIds, taskIds);
-        ByteBuffer buf = CopycatProtocol.serializeAssignment(assignment);
-        return new SyncGroupResponse(error, buf).toStruct();
-    }
-
-
-    private static class MockRebalanceListener implements 
WorkerRebalanceListener {
-        public CopycatProtocol.Assignment assignment = null;
-
-        public String revokedLeader;
-        public Collection<String> revokedConnectors;
-        public Collection<ConnectorTaskId> revokedTasks;
-
-        public int revokedCount = 0;
-        public int assignedCount = 0;
-
-        @Override
-        public void onAssigned(CopycatProtocol.Assignment assignment) {
-            this.assignment = assignment;
-            assignedCount++;
-        }
-
-        @Override
-        public void onRevoked(String leader, Collection<String> connectors, 
Collection<ConnectorTaskId> tasks) {
-            this.revokedLeader = leader;
-            this.revokedConnectors = connectors;
-            this.revokedTasks = tasks;
-            revokedCount++;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/rest/resources/ConnectorsResourceTest.java
----------------------------------------------------------------------
diff --git 
a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/rest/resources/ConnectorsResourceTest.java
 
b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/rest/resources/ConnectorsResourceTest.java
deleted file mode 100644
index c987092..0000000
--- 
a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/rest/resources/ConnectorsResourceTest.java
+++ /dev/null
@@ -1,364 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.kafka.copycat.runtime.rest.resources;
-
-import com.fasterxml.jackson.core.type.TypeReference;
-import org.apache.kafka.copycat.errors.AlreadyExistsException;
-import org.apache.kafka.copycat.errors.CopycatException;
-import org.apache.kafka.copycat.errors.NotFoundException;
-import org.apache.kafka.copycat.runtime.ConnectorConfig;
-import org.apache.kafka.copycat.runtime.Herder;
-import org.apache.kafka.copycat.runtime.distributed.NotLeaderException;
-import org.apache.kafka.copycat.runtime.rest.RestServer;
-import org.apache.kafka.copycat.runtime.rest.entities.ConnectorInfo;
-import org.apache.kafka.copycat.runtime.rest.entities.CreateConnectorRequest;
-import org.apache.kafka.copycat.runtime.rest.entities.TaskInfo;
-import org.apache.kafka.copycat.util.Callback;
-import org.apache.kafka.copycat.util.ConnectorTaskId;
-import org.easymock.Capture;
-import org.easymock.EasyMock;
-import org.easymock.IAnswer;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.powermock.api.easymock.PowerMock;
-import org.powermock.api.easymock.annotation.Mock;
-import org.powermock.core.classloader.annotations.PowerMockIgnore;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-
-import static org.junit.Assert.assertEquals;
-
-@RunWith(PowerMockRunner.class)
-@PrepareForTest(RestServer.class)
-@PowerMockIgnore("javax.management.*")
-public class ConnectorsResourceTest {
-    // Note trailing / and that we do *not* use LEADER_URL to construct our 
reference values. This checks that we handle
-    // URL construction properly, avoiding //, which will mess up routing in 
the REST server
-    private static final String LEADER_URL = "http://leader:8083/";;
-    private static final String CONNECTOR_NAME = "test";
-    private static final String CONNECTOR2_NAME = "test2";
-    private static final Map<String, String> CONNECTOR_CONFIG = new 
HashMap<>();
-    static {
-        CONNECTOR_CONFIG.put("name", CONNECTOR_NAME);
-        CONNECTOR_CONFIG.put("sample_config", "test_config");
-    }
-    private static final List<ConnectorTaskId> CONNECTOR_TASK_NAMES = 
Arrays.asList(
-            new ConnectorTaskId(CONNECTOR_NAME, 0),
-            new ConnectorTaskId(CONNECTOR_NAME, 1)
-    );
-    private static final List<Map<String, String>> TASK_CONFIGS = new 
ArrayList<>();
-    static {
-        TASK_CONFIGS.add(Collections.singletonMap("config", "value"));
-        TASK_CONFIGS.add(Collections.singletonMap("config", "other_value"));
-    }
-    private static final List<TaskInfo> TASK_INFOS = new ArrayList<>();
-    static {
-        TASK_INFOS.add(new TaskInfo(new ConnectorTaskId(CONNECTOR_NAME, 0), 
TASK_CONFIGS.get(0)));
-        TASK_INFOS.add(new TaskInfo(new ConnectorTaskId(CONNECTOR_NAME, 1), 
TASK_CONFIGS.get(1)));
-    }
-
-
-    @Mock
-    private Herder herder;
-    private ConnectorsResource connectorsResource;
-
-    @Before
-    public void setUp() throws NoSuchMethodException {
-        PowerMock.mockStatic(RestServer.class,
-                RestServer.class.getMethod("httpRequest", String.class, 
String.class, Object.class, TypeReference.class));
-        connectorsResource = new ConnectorsResource(herder);
-    }
-
-    @Test
-    public void testListConnectors() throws Throwable {
-        final Capture<Callback<Collection<String>>> cb = Capture.newInstance();
-        herder.connectors(EasyMock.capture(cb));
-        expectAndCallbackResult(cb, Arrays.asList(CONNECTOR2_NAME, 
CONNECTOR_NAME));
-
-        PowerMock.replayAll();
-
-        Collection<String> connectors = connectorsResource.listConnectors();
-        // Ordering isn't guaranteed, compare sets
-        assertEquals(new HashSet<>(Arrays.asList(CONNECTOR_NAME, 
CONNECTOR2_NAME)), new HashSet<>(connectors));
-
-        PowerMock.verifyAll();
-    }
-
-    @Test
-    public void testListConnectorsNotLeader() throws Throwable {
-        final Capture<Callback<Collection<String>>> cb = Capture.newInstance();
-        herder.connectors(EasyMock.capture(cb));
-        expectAndCallbackNotLeaderException(cb);
-        // Should forward request
-        
EasyMock.expect(RestServer.httpRequest(EasyMock.eq("http://leader:8083/connectors";),
 EasyMock.eq("GET"),
-                EasyMock.isNull(), EasyMock.anyObject(TypeReference.class)))
-                .andReturn(new RestServer.HttpResponse<>(200, new 
HashMap<String, List<String>>(), Arrays.asList(CONNECTOR2_NAME, 
CONNECTOR_NAME)));
-
-        PowerMock.replayAll();
-
-        Collection<String> connectors = connectorsResource.listConnectors();
-        // Ordering isn't guaranteed, compare sets
-        assertEquals(new HashSet<>(Arrays.asList(CONNECTOR_NAME, 
CONNECTOR2_NAME)), new HashSet<>(connectors));
-
-        PowerMock.verifyAll();
-    }
-
-    @Test(expected = CopycatException.class)
-    public void testListConnectorsNotSynced() throws Throwable {
-        final Capture<Callback<Collection<String>>> cb = Capture.newInstance();
-        herder.connectors(EasyMock.capture(cb));
-        expectAndCallbackException(cb, new CopycatException("not synced"));
-
-        PowerMock.replayAll();
-
-        // throws
-        connectorsResource.listConnectors();
-    }
-
-    @Test
-    public void testCreateConnector() throws Throwable {
-        CreateConnectorRequest body = new 
CreateConnectorRequest(CONNECTOR_NAME, 
Collections.singletonMap(ConnectorConfig.NAME_CONFIG, CONNECTOR_NAME));
-
-        final Capture<Callback<Herder.Created<ConnectorInfo>>> cb = 
Capture.newInstance();
-        herder.putConnectorConfig(EasyMock.eq(CONNECTOR_NAME), 
EasyMock.eq(body.config()), EasyMock.eq(false), EasyMock.capture(cb));
-        expectAndCallbackResult(cb, new Herder.Created<>(true, new 
ConnectorInfo(CONNECTOR_NAME, CONNECTOR_CONFIG, CONNECTOR_TASK_NAMES)));
-
-        PowerMock.replayAll();
-
-        connectorsResource.createConnector(body);
-
-        PowerMock.verifyAll();
-    }
-
-    @Test
-    public void testCreateConnectorNotLeader() throws Throwable {
-        CreateConnectorRequest body = new 
CreateConnectorRequest(CONNECTOR_NAME, 
Collections.singletonMap(ConnectorConfig.NAME_CONFIG, CONNECTOR_NAME));
-
-        final Capture<Callback<Herder.Created<ConnectorInfo>>> cb = 
Capture.newInstance();
-        herder.putConnectorConfig(EasyMock.eq(CONNECTOR_NAME), 
EasyMock.eq(body.config()), EasyMock.eq(false), EasyMock.capture(cb));
-        expectAndCallbackNotLeaderException(cb);
-        // Should forward request
-        
EasyMock.expect(RestServer.httpRequest(EasyMock.eq("http://leader:8083/connectors";),
 EasyMock.eq("POST"), EasyMock.eq(body), EasyMock.<TypeReference>anyObject()))
-                .andReturn(new RestServer.HttpResponse<>(201, new 
HashMap<String, List<String>>(), new ConnectorInfo(CONNECTOR_NAME, 
CONNECTOR_CONFIG, CONNECTOR_TASK_NAMES)));
-
-        PowerMock.replayAll();
-
-        connectorsResource.createConnector(body);
-
-        PowerMock.verifyAll();
-    }
-
-    @Test(expected = AlreadyExistsException.class)
-    public void testCreateConnectorExists() throws Throwable {
-        CreateConnectorRequest body = new 
CreateConnectorRequest(CONNECTOR_NAME, 
Collections.singletonMap(ConnectorConfig.NAME_CONFIG, CONNECTOR_NAME));
-
-        final Capture<Callback<Herder.Created<ConnectorInfo>>> cb = 
Capture.newInstance();
-        herder.putConnectorConfig(EasyMock.eq(CONNECTOR_NAME), 
EasyMock.eq(body.config()), EasyMock.eq(false), EasyMock.capture(cb));
-        expectAndCallbackException(cb, new AlreadyExistsException("already 
exists"));
-
-        PowerMock.replayAll();
-
-        connectorsResource.createConnector(body);
-
-        PowerMock.verifyAll();
-    }
-
-    @Test
-    public void testDeleteConnector() throws Throwable {
-        final Capture<Callback<Herder.Created<ConnectorInfo>>> cb = 
Capture.newInstance();
-        herder.putConnectorConfig(EasyMock.eq(CONNECTOR_NAME), 
EasyMock.<Map<String, String>>isNull(), EasyMock.eq(true), 
EasyMock.capture(cb));
-        expectAndCallbackResult(cb, null);
-
-        PowerMock.replayAll();
-
-        connectorsResource.destroyConnector(CONNECTOR_NAME);
-
-        PowerMock.verifyAll();
-    }
-
-    @Test
-    public void testDeleteConnectorNotLeader() throws Throwable {
-        final Capture<Callback<Herder.Created<ConnectorInfo>>> cb = 
Capture.newInstance();
-        herder.putConnectorConfig(EasyMock.eq(CONNECTOR_NAME), 
EasyMock.<Map<String, String>>isNull(), EasyMock.eq(true), 
EasyMock.capture(cb));
-        expectAndCallbackNotLeaderException(cb);
-        // Should forward request
-        
EasyMock.expect(RestServer.httpRequest("http://leader:8083/connectors/"; + 
CONNECTOR_NAME, "DELETE", null, null))
-                .andReturn(new RestServer.HttpResponse<>(204, new 
HashMap<String, List<String>>(), null));
-
-        PowerMock.replayAll();
-
-        connectorsResource.destroyConnector(CONNECTOR_NAME);
-
-        PowerMock.verifyAll();
-    }
-
-    // Not found exceptions should pass through to caller so they can be 
processed for 404s
-    @Test(expected = NotFoundException.class)
-    public void testDeleteConnectorNotFound() throws Throwable {
-        final Capture<Callback<Herder.Created<ConnectorInfo>>> cb = 
Capture.newInstance();
-        herder.putConnectorConfig(EasyMock.eq(CONNECTOR_NAME), 
EasyMock.<Map<String, String>>isNull(), EasyMock.eq(true), 
EasyMock.capture(cb));
-        expectAndCallbackException(cb, new NotFoundException("not found"));
-
-        PowerMock.replayAll();
-
-        connectorsResource.destroyConnector(CONNECTOR_NAME);
-
-        PowerMock.verifyAll();
-    }
-
-    @Test
-    public void testGetConnector() throws Throwable {
-        final Capture<Callback<ConnectorInfo>> cb = Capture.newInstance();
-        herder.connectorInfo(EasyMock.eq(CONNECTOR_NAME), 
EasyMock.capture(cb));
-        expectAndCallbackResult(cb, new ConnectorInfo(CONNECTOR_NAME, 
CONNECTOR_CONFIG, CONNECTOR_TASK_NAMES));
-
-        PowerMock.replayAll();
-
-        ConnectorInfo connInfo = 
connectorsResource.getConnector(CONNECTOR_NAME);
-        assertEquals(new ConnectorInfo(CONNECTOR_NAME, CONNECTOR_CONFIG, 
CONNECTOR_TASK_NAMES), connInfo);
-
-        PowerMock.verifyAll();
-    }
-
-    @Test
-    public void testGetConnectorConfig() throws Throwable {
-        final Capture<Callback<Map<String, String>>> cb = 
Capture.newInstance();
-        herder.connectorConfig(EasyMock.eq(CONNECTOR_NAME), 
EasyMock.capture(cb));
-        expectAndCallbackResult(cb, CONNECTOR_CONFIG);
-
-        PowerMock.replayAll();
-
-        Map<String, String> connConfig = 
connectorsResource.getConnectorConfig(CONNECTOR_NAME);
-        assertEquals(CONNECTOR_CONFIG, connConfig);
-
-        PowerMock.verifyAll();
-    }
-
-    @Test(expected = NotFoundException.class)
-    public void testGetConnectorConfigConnectorNotFound() throws Throwable {
-        final Capture<Callback<Map<String, String>>> cb = 
Capture.newInstance();
-        herder.connectorConfig(EasyMock.eq(CONNECTOR_NAME), 
EasyMock.capture(cb));
-        expectAndCallbackException(cb, new NotFoundException("not found"));
-
-        PowerMock.replayAll();
-
-        connectorsResource.getConnectorConfig(CONNECTOR_NAME);
-
-        PowerMock.verifyAll();
-    }
-
-    @Test
-    public void testPutConnectorConfig() throws Throwable {
-        final Capture<Callback<Herder.Created<ConnectorInfo>>> cb = 
Capture.newInstance();
-        herder.putConnectorConfig(EasyMock.eq(CONNECTOR_NAME), 
EasyMock.eq(CONNECTOR_CONFIG), EasyMock.eq(true), EasyMock.capture(cb));
-        expectAndCallbackResult(cb, new Herder.Created<>(false, new 
ConnectorInfo(CONNECTOR_NAME, CONNECTOR_CONFIG, CONNECTOR_TASK_NAMES)));
-
-        PowerMock.replayAll();
-
-        connectorsResource.putConnectorConfig(CONNECTOR_NAME, 
CONNECTOR_CONFIG);
-
-        PowerMock.verifyAll();
-    }
-
-    @Test
-    public void testGetConnectorTaskConfigs() throws Throwable {
-        final Capture<Callback<List<TaskInfo>>> cb = Capture.newInstance();
-        herder.taskConfigs(EasyMock.eq(CONNECTOR_NAME), EasyMock.capture(cb));
-        expectAndCallbackResult(cb, TASK_INFOS);
-
-        PowerMock.replayAll();
-
-        List<TaskInfo> taskInfos = 
connectorsResource.getTaskConfigs(CONNECTOR_NAME);
-        assertEquals(TASK_INFOS, taskInfos);
-
-        PowerMock.verifyAll();
-    }
-
-    @Test(expected = NotFoundException.class)
-    public void testGetConnectorTaskConfigsConnectorNotFound() throws 
Throwable {
-        final Capture<Callback<List<TaskInfo>>> cb = Capture.newInstance();
-        herder.taskConfigs(EasyMock.eq(CONNECTOR_NAME), EasyMock.capture(cb));
-        expectAndCallbackException(cb, new NotFoundException("connector not 
found"));
-
-        PowerMock.replayAll();
-
-        connectorsResource.getTaskConfigs(CONNECTOR_NAME);
-
-        PowerMock.verifyAll();
-    }
-
-    @Test
-    public void testPutConnectorTaskConfigs() throws Throwable {
-        final Capture<Callback<Void>> cb = Capture.newInstance();
-        herder.putTaskConfigs(EasyMock.eq(CONNECTOR_NAME), 
EasyMock.eq(TASK_CONFIGS), EasyMock.capture(cb));
-        expectAndCallbackResult(cb, null);
-
-        PowerMock.replayAll();
-
-        connectorsResource.putTaskConfigs(CONNECTOR_NAME, TASK_CONFIGS);
-
-        PowerMock.verifyAll();
-    }
-
-    @Test(expected = NotFoundException.class)
-    public void testPutConnectorTaskConfigsConnectorNotFound() throws 
Throwable {
-        final Capture<Callback<Void>> cb = Capture.newInstance();
-        herder.putTaskConfigs(EasyMock.eq(CONNECTOR_NAME), 
EasyMock.eq(TASK_CONFIGS), EasyMock.capture(cb));
-        expectAndCallbackException(cb, new NotFoundException("not found"));
-
-        PowerMock.replayAll();
-
-        connectorsResource.putTaskConfigs(CONNECTOR_NAME, TASK_CONFIGS);
-
-        PowerMock.verifyAll();
-    }
-
-    private  <T> void expectAndCallbackResult(final Capture<Callback<T>> cb, 
final T value) {
-        PowerMock.expectLastCall().andAnswer(new IAnswer<Void>() {
-            @Override
-            public Void answer() throws Throwable {
-                cb.getValue().onCompletion(null, value);
-                return null;
-            }
-        });
-    }
-
-    private  <T> void expectAndCallbackException(final Capture<Callback<T>> 
cb, final Throwable t) {
-        PowerMock.expectLastCall().andAnswer(new IAnswer<Void>() {
-            @Override
-            public Void answer() throws Throwable {
-                cb.getValue().onCompletion(t, null);
-                return null;
-            }
-        });
-    }
-
-    private  <T> void expectAndCallbackNotLeaderException(final 
Capture<Callback<T>> cb) {
-        expectAndCallbackException(cb, new NotLeaderException("not leader 
test", LEADER_URL));
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/standalone/StandaloneHerderTest.java
----------------------------------------------------------------------
diff --git 
a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/standalone/StandaloneHerderTest.java
 
b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/standalone/StandaloneHerderTest.java
deleted file mode 100644
index f1bd317..0000000
--- 
a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/standalone/StandaloneHerderTest.java
+++ /dev/null
@@ -1,337 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.kafka.copycat.runtime.standalone;
-
-import org.apache.kafka.copycat.connector.Connector;
-import org.apache.kafka.copycat.connector.ConnectorContext;
-import org.apache.kafka.copycat.connector.Task;
-import org.apache.kafka.copycat.errors.AlreadyExistsException;
-import org.apache.kafka.copycat.errors.NotFoundException;
-import org.apache.kafka.copycat.runtime.ConnectorConfig;
-import org.apache.kafka.copycat.runtime.Herder;
-import org.apache.kafka.copycat.runtime.HerderConnectorContext;
-import org.apache.kafka.copycat.runtime.TaskConfig;
-import org.apache.kafka.copycat.runtime.Worker;
-import org.apache.kafka.copycat.runtime.rest.entities.ConnectorInfo;
-import org.apache.kafka.copycat.runtime.rest.entities.TaskInfo;
-import org.apache.kafka.copycat.sink.SinkConnector;
-import org.apache.kafka.copycat.sink.SinkTask;
-import org.apache.kafka.copycat.source.SourceConnector;
-import org.apache.kafka.copycat.source.SourceTask;
-import org.apache.kafka.copycat.util.Callback;
-import org.apache.kafka.copycat.util.ConnectorTaskId;
-import org.apache.kafka.copycat.util.FutureCallback;
-import org.easymock.Capture;
-import org.easymock.EasyMock;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.powermock.api.easymock.PowerMock;
-import org.powermock.api.easymock.annotation.Mock;
-import org.powermock.modules.junit4.PowerMockRunner;
-
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-@RunWith(PowerMockRunner.class)
-public class StandaloneHerderTest {
-    private static final String CONNECTOR_NAME = "test";
-    private static final List<String> TOPICS_LIST = Arrays.asList("topic1", 
"topic2");
-    private static final String TOPICS_LIST_STR = "topic1,topic2";
-    private static final int DEFAULT_MAX_TASKS = 1;
-
-    private StandaloneHerder herder;
-    @Mock protected Worker worker;
-    private Connector connector;
-    @Mock protected Callback<Herder.Created<ConnectorInfo>> createCallback;
-
-    @Before
-    public void setup() {
-        worker = PowerMock.createMock(Worker.class);
-        herder = new StandaloneHerder(worker);
-    }
-
-    @Test
-    public void testCreateSourceConnector() throws Exception {
-        connector = PowerMock.createMock(BogusSourceConnector.class);
-        expectAdd(CONNECTOR_NAME, BogusSourceConnector.class, 
BogusSourceTask.class, false);
-        PowerMock.replayAll();
-
-        herder.putConnectorConfig(CONNECTOR_NAME, 
connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class), false, 
createCallback);
-
-        PowerMock.verifyAll();
-    }
-
-    @Test
-    public void testCreateConnectorAlreadyExists() throws Exception {
-        connector = PowerMock.createMock(BogusSourceConnector.class);
-        // First addition should succeed
-        expectAdd(CONNECTOR_NAME, BogusSourceConnector.class, 
BogusSourceTask.class, false);
-
-        // Second should fail
-        
createCallback.onCompletion(EasyMock.<AlreadyExistsException>anyObject(), 
EasyMock.<Herder.Created<ConnectorInfo>>isNull());
-        PowerMock.expectLastCall();
-
-        PowerMock.replayAll();
-
-        herder.putConnectorConfig(CONNECTOR_NAME, 
connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class), false, 
createCallback);
-        herder.putConnectorConfig(CONNECTOR_NAME, 
connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class), false, 
createCallback);
-
-        PowerMock.verifyAll();
-    }
-
-    @Test
-    public void testCreateSinkConnector() throws Exception {
-        connector = PowerMock.createMock(BogusSinkConnector.class);
-        expectAdd(CONNECTOR_NAME, BogusSinkConnector.class, 
BogusSinkTask.class, true);
-
-        PowerMock.replayAll();
-
-        herder.putConnectorConfig(CONNECTOR_NAME, 
connectorConfig(CONNECTOR_NAME, BogusSinkConnector.class), false, 
createCallback);
-
-        PowerMock.verifyAll();
-    }
-
-    @Test
-    public void testDestroyConnector() throws Exception {
-        connector = PowerMock.createMock(BogusSourceConnector.class);
-        expectAdd(CONNECTOR_NAME, BogusSourceConnector.class, 
BogusSourceTask.class, false);
-        expectDestroy();
-
-        PowerMock.replayAll();
-
-        herder.putConnectorConfig(CONNECTOR_NAME, 
connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class), false, 
createCallback);
-        FutureCallback<Herder.Created<ConnectorInfo>> futureCb = new 
FutureCallback<>();
-        herder.putConnectorConfig(CONNECTOR_NAME, null, true, futureCb);
-        futureCb.get(1000L, TimeUnit.MILLISECONDS);
-
-        // Second deletion should fail since the connector is gone
-        futureCb = new FutureCallback<>();
-        herder.putConnectorConfig(CONNECTOR_NAME, null, true, futureCb);
-        try {
-            futureCb.get(1000L, TimeUnit.MILLISECONDS);
-            fail("Should have thrown NotFoundException");
-        } catch (ExecutionException e) {
-            assertTrue(e.getCause() instanceof NotFoundException);
-        }
-
-        PowerMock.verifyAll();
-    }
-
-    @Test
-    public void testCreateAndStop() throws Exception {
-        connector = PowerMock.createMock(BogusSourceConnector.class);
-        expectAdd(CONNECTOR_NAME, BogusSourceConnector.class, 
BogusSourceTask.class, false);
-        // herder.stop() should stop any running connectors and tasks even if 
destroyConnector was not invoked
-        expectStop();
-
-        PowerMock.replayAll();
-
-        herder.putConnectorConfig(CONNECTOR_NAME, 
connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class), false, 
createCallback);
-        herder.stop();
-
-        PowerMock.verifyAll();
-    }
-
-    @Test
-    public void testAccessors() throws Exception {
-        Map<String, String> connConfig = connectorConfig(CONNECTOR_NAME, 
BogusSourceConnector.class);
-
-        Callback<Collection<String>> listConnectorsCb = 
PowerMock.createMock(Callback.class);
-        Callback<ConnectorInfo> connectorInfoCb = 
PowerMock.createMock(Callback.class);
-        Callback<Map<String, String>> connectorConfigCb = 
PowerMock.createMock(Callback.class);
-        Callback<List<TaskInfo>> taskConfigsCb = 
PowerMock.createMock(Callback.class);
-
-        // Check accessors with empty worker
-        listConnectorsCb.onCompletion(null, Collections.EMPTY_LIST);
-        EasyMock.expectLastCall();
-        connectorInfoCb.onCompletion(EasyMock.<NotFoundException>anyObject(), 
EasyMock.<ConnectorInfo>isNull());
-        EasyMock.expectLastCall();
-        
connectorConfigCb.onCompletion(EasyMock.<NotFoundException>anyObject(), 
EasyMock.<Map<String, String>>isNull());
-        EasyMock.expectLastCall();
-        taskConfigsCb.onCompletion(EasyMock.<NotFoundException>anyObject(), 
EasyMock.<List<TaskInfo>>isNull());
-        EasyMock.expectLastCall();
-
-
-        // Create connector
-        connector = PowerMock.createMock(BogusSourceConnector.class);
-        expectAdd(CONNECTOR_NAME, BogusSourceConnector.class, 
BogusSourceTask.class, false);
-
-        // Validate accessors with 1 connector
-        listConnectorsCb.onCompletion(null, Arrays.asList(CONNECTOR_NAME));
-        EasyMock.expectLastCall();
-        ConnectorInfo connInfo = new ConnectorInfo(CONNECTOR_NAME, connConfig, 
Arrays.asList(new ConnectorTaskId(CONNECTOR_NAME, 0)));
-        connectorInfoCb.onCompletion(null, connInfo);
-        EasyMock.expectLastCall();
-        connectorConfigCb.onCompletion(null, connConfig);
-        EasyMock.expectLastCall();
-
-        TaskInfo taskInfo = new TaskInfo(new ConnectorTaskId(CONNECTOR_NAME, 
0), taskConfig(BogusSourceTask.class, false));
-        taskConfigsCb.onCompletion(null, Arrays.asList(taskInfo));
-        EasyMock.expectLastCall();
-
-
-        PowerMock.replayAll();
-
-        // All operations are synchronous for StandaloneHerder, so we don't 
need to actually wait after making each call
-        herder.connectors(listConnectorsCb);
-        herder.connectorInfo(CONNECTOR_NAME, connectorInfoCb);
-        herder.connectorConfig(CONNECTOR_NAME, connectorConfigCb);
-        herder.taskConfigs(CONNECTOR_NAME, taskConfigsCb);
-
-        herder.putConnectorConfig(CONNECTOR_NAME, connConfig, false, 
createCallback);
-        herder.connectors(listConnectorsCb);
-        herder.connectorInfo(CONNECTOR_NAME, connectorInfoCb);
-        herder.connectorConfig(CONNECTOR_NAME, connectorConfigCb);
-        herder.taskConfigs(CONNECTOR_NAME, taskConfigsCb);
-
-        PowerMock.verifyAll();
-    }
-
-    @Test
-    public void testPutConnectorConfig() throws Exception {
-        Map<String, String> connConfig = connectorConfig(CONNECTOR_NAME, 
BogusSourceConnector.class);
-        Map<String, String> newConnConfig = new HashMap<>(connConfig);
-        newConnConfig.put("foo", "bar");
-
-        Callback<Map<String, String>> connectorConfigCb = 
PowerMock.createMock(Callback.class);
-        Callback<Herder.Created<ConnectorInfo>> putConnectorConfigCb = 
PowerMock.createMock(Callback.class);
-
-        // Create
-        connector = PowerMock.createMock(BogusSourceConnector.class);
-        expectAdd(CONNECTOR_NAME, BogusSourceConnector.class, 
BogusSourceTask.class, false);
-        // Should get first config
-        connectorConfigCb.onCompletion(null, connConfig);
-        EasyMock.expectLastCall();
-        // Update config, which requires stopping and restarting
-        worker.stopConnector(CONNECTOR_NAME);
-        EasyMock.expectLastCall();
-        Capture<ConnectorConfig> capturedConfig = EasyMock.newCapture();
-        worker.addConnector(EasyMock.capture(capturedConfig), 
EasyMock.<ConnectorContext>anyObject());
-        EasyMock.expectLastCall();
-        // Generate same task config, which should result in no additional 
action to restart tasks
-        EasyMock.expect(worker.connectorTaskConfigs(CONNECTOR_NAME, 
DEFAULT_MAX_TASKS, TOPICS_LIST))
-                
.andReturn(Collections.singletonList(taskConfig(BogusSourceTask.class, false)));
-        ConnectorInfo newConnInfo = new ConnectorInfo(CONNECTOR_NAME, 
newConnConfig, Arrays.asList(new ConnectorTaskId(CONNECTOR_NAME, 0)));
-        putConnectorConfigCb.onCompletion(null, new Herder.Created<>(false, 
newConnInfo));
-        EasyMock.expectLastCall();
-        // Should get new config
-        connectorConfigCb.onCompletion(null, newConnConfig);
-        EasyMock.expectLastCall();
-
-
-        PowerMock.replayAll();
-
-        herder.putConnectorConfig(CONNECTOR_NAME, connConfig, false, 
createCallback);
-        herder.connectorConfig(CONNECTOR_NAME, connectorConfigCb);
-        herder.putConnectorConfig(CONNECTOR_NAME, newConnConfig, true, 
putConnectorConfigCb);
-        assertEquals("bar", capturedConfig.getValue().originals().get("foo"));
-        herder.connectorConfig(CONNECTOR_NAME, connectorConfigCb);
-
-        PowerMock.verifyAll();
-
-    }
-
-    @Test(expected = UnsupportedOperationException.class)
-    public void testPutTaskConfigs() {
-        Callback<Void> cb = PowerMock.createMock(Callback.class);
-
-        PowerMock.replayAll();
-
-        herder.putTaskConfigs(CONNECTOR_NAME,
-                Arrays.asList(Collections.singletonMap("config", "value")),
-                cb);
-
-        PowerMock.verifyAll();
-    }
-
-    private void expectAdd(String name, Class<? extends Connector> connClass, 
Class<? extends Task> taskClass,
-                           boolean sink) throws Exception {
-        Map<String, String> connectorProps = connectorConfig(name, connClass);
-
-        worker.addConnector(EasyMock.eq(new ConnectorConfig(connectorProps)), 
EasyMock.anyObject(HerderConnectorContext.class));
-        PowerMock.expectLastCall();
-
-        ConnectorInfo connInfo = new ConnectorInfo(CONNECTOR_NAME, 
connectorProps, Arrays.asList(new ConnectorTaskId(CONNECTOR_NAME, 0)));
-        createCallback.onCompletion(null, new Herder.Created<>(true, 
connInfo));
-        PowerMock.expectLastCall();
-
-        // And we should instantiate the tasks. For a sink task, we should see 
added properties for
-        // the input topic partitions
-        Map<String, String> generatedTaskProps = taskConfig(taskClass, sink);
-        EasyMock.expect(worker.connectorTaskConfigs(CONNECTOR_NAME, 
DEFAULT_MAX_TASKS, TOPICS_LIST))
-                .andReturn(Collections.singletonList(generatedTaskProps));
-
-        worker.addTask(new ConnectorTaskId(CONNECTOR_NAME, 0), new 
TaskConfig(generatedTaskProps));
-        PowerMock.expectLastCall();
-    }
-
-    private void expectStop() {
-        worker.stopTask(new ConnectorTaskId(CONNECTOR_NAME, 0));
-        EasyMock.expectLastCall();
-        worker.stopConnector(CONNECTOR_NAME);
-        EasyMock.expectLastCall();
-    }
-
-    private void expectDestroy() {
-        expectStop();
-    }
-
-
-    private static HashMap<String, String> connectorConfig(String name, 
Class<? extends Connector> connClass) {
-        HashMap<String, String> connectorProps = new HashMap<>();
-        connectorProps.put(ConnectorConfig.NAME_CONFIG, name);
-        connectorProps.put(SinkConnector.TOPICS_CONFIG, TOPICS_LIST_STR);
-        connectorProps.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, 
connClass.getName());
-        return connectorProps;
-    }
-
-    private static Map<String, String> taskConfig(Class<? extends Task> 
taskClass, boolean sink) {
-        HashMap<String, String> generatedTaskProps = new HashMap<>();
-        // Connectors can add any settings, so these are arbitrary
-        generatedTaskProps.put("foo", "bar");
-        generatedTaskProps.put(TaskConfig.TASK_CLASS_CONFIG, 
taskClass.getName());
-        if (sink)
-            generatedTaskProps.put(SinkTask.TOPICS_CONFIG, TOPICS_LIST_STR);
-        return generatedTaskProps;
-    }
-
-    // We need to use a real class here due to some issue with mocking 
java.lang.Class
-    private abstract class BogusSourceConnector extends SourceConnector {
-    }
-
-    private abstract class BogusSourceTask extends SourceTask {
-    }
-
-    private abstract class BogusSinkConnector extends SinkConnector {
-    }
-
-    private abstract class BogusSinkTask extends SourceTask {
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/copycat/runtime/src/test/java/org/apache/kafka/copycat/storage/FileOffsetBackingStoreTest.java
----------------------------------------------------------------------
diff --git 
a/copycat/runtime/src/test/java/org/apache/kafka/copycat/storage/FileOffsetBackingStoreTest.java
 
b/copycat/runtime/src/test/java/org/apache/kafka/copycat/storage/FileOffsetBackingStoreTest.java
deleted file mode 100644
index 2976c0a..0000000
--- 
a/copycat/runtime/src/test/java/org/apache/kafka/copycat/storage/FileOffsetBackingStoreTest.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.kafka.copycat.storage;
-
-import org.apache.kafka.copycat.util.Callback;
-import org.easymock.EasyMock;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.powermock.api.easymock.PowerMock;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Map;
-
-import static org.junit.Assert.assertEquals;
-
-public class FileOffsetBackingStoreTest {
-
-    FileOffsetBackingStore store;
-    Map<String, Object> props;
-    File tempFile;
-
-    private static Map<ByteBuffer, ByteBuffer> firstSet = new HashMap<>();
-
-    static {
-        firstSet.put(buffer("key"), buffer("value"));
-        firstSet.put(null, null);
-    }
-
-    @Before
-    public void setup() throws IOException {
-        store = new FileOffsetBackingStore();
-        tempFile = File.createTempFile("fileoffsetbackingstore", null);
-        props = new HashMap<>();
-        props.put(FileOffsetBackingStore.OFFSET_STORAGE_FILE_FILENAME_CONFIG, 
tempFile.getAbsolutePath());
-        store.configure(props);
-        store.start();
-    }
-
-    @After
-    public void teardown() {
-        tempFile.delete();
-    }
-
-    @Test
-    public void testGetSet() throws Exception {
-        Callback<Void> setCallback = expectSuccessfulSetCallback();
-        Callback<Map<ByteBuffer, ByteBuffer>> getCallback = 
expectSuccessfulGetCallback();
-        PowerMock.replayAll();
-
-        store.set(firstSet, setCallback).get();
-
-        Map<ByteBuffer, ByteBuffer> values = 
store.get(Arrays.asList(buffer("key"), buffer("bad")), getCallback).get();
-        assertEquals(buffer("value"), values.get(buffer("key")));
-        assertEquals(null, values.get(buffer("bad")));
-
-        PowerMock.verifyAll();
-    }
-
-    @Test
-    public void testSaveRestore() throws Exception {
-        Callback<Void> setCallback = expectSuccessfulSetCallback();
-        Callback<Map<ByteBuffer, ByteBuffer>> getCallback = 
expectSuccessfulGetCallback();
-        PowerMock.replayAll();
-
-        store.set(firstSet, setCallback).get();
-        store.stop();
-
-        // Restore into a new store to ensure correct reload from scratch
-        FileOffsetBackingStore restore = new FileOffsetBackingStore();
-        restore.configure(props);
-        restore.start();
-        Map<ByteBuffer, ByteBuffer> values = 
restore.get(Arrays.asList(buffer("key")), getCallback).get();
-        assertEquals(buffer("value"), values.get(buffer("key")));
-
-        PowerMock.verifyAll();
-    }
-
-    private static ByteBuffer buffer(String v) {
-        return ByteBuffer.wrap(v.getBytes());
-    }
-
-    private Callback<Void> expectSuccessfulSetCallback() {
-        @SuppressWarnings("unchecked")
-        Callback<Void> setCallback = PowerMock.createMock(Callback.class);
-        setCallback.onCompletion(EasyMock.isNull(Throwable.class), 
EasyMock.isNull(Void.class));
-        PowerMock.expectLastCall();
-        return setCallback;
-    }
-
-    @SuppressWarnings("unchecked")
-    private Callback<Map<ByteBuffer, ByteBuffer>> 
expectSuccessfulGetCallback() {
-        Callback<Map<ByteBuffer, ByteBuffer>> getCallback = 
PowerMock.createMock(Callback.class);
-        getCallback.onCompletion(EasyMock.isNull(Throwable.class), 
EasyMock.anyObject(Map.class));
-        PowerMock.expectLastCall();
-        return getCallback;
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/copycat/runtime/src/test/java/org/apache/kafka/copycat/storage/KafkaConfigStorageTest.java
----------------------------------------------------------------------
diff --git 
a/copycat/runtime/src/test/java/org/apache/kafka/copycat/storage/KafkaConfigStorageTest.java
 
b/copycat/runtime/src/test/java/org/apache/kafka/copycat/storage/KafkaConfigStorageTest.java
deleted file mode 100644
index 7c25feb..0000000
--- 
a/copycat/runtime/src/test/java/org/apache/kafka/copycat/storage/KafkaConfigStorageTest.java
+++ /dev/null
@@ -1,522 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.kafka.copycat.storage;
-
-import org.apache.kafka.clients.CommonClientConfigs;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.copycat.data.Field;
-import org.apache.kafka.copycat.data.Schema;
-import org.apache.kafka.copycat.data.SchemaAndValue;
-import org.apache.kafka.copycat.data.Struct;
-import org.apache.kafka.copycat.runtime.distributed.ClusterConfigState;
-import org.apache.kafka.copycat.util.Callback;
-import org.apache.kafka.copycat.util.ConnectorTaskId;
-import org.apache.kafka.copycat.util.KafkaBasedLog;
-import org.apache.kafka.copycat.util.TestFuture;
-import org.easymock.Capture;
-import org.easymock.EasyMock;
-import org.easymock.IAnswer;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.powermock.api.easymock.PowerMock;
-import org.powermock.api.easymock.annotation.Mock;
-import org.powermock.core.classloader.annotations.PowerMockIgnore;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-import org.powermock.reflect.Whitebox;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.Future;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.fail;
-
-@RunWith(PowerMockRunner.class)
-@PrepareForTest(KafkaConfigStorage.class)
-@PowerMockIgnore("javax.management.*")
-public class KafkaConfigStorageTest {
-    private static final String TOPIC = "copycat-configs";
-    private static final Map<String, String> DEFAULT_CONFIG_STORAGE_PROPS = 
new HashMap<>();
-
-    static {
-        
DEFAULT_CONFIG_STORAGE_PROPS.put(KafkaConfigStorage.CONFIG_TOPIC_CONFIG, TOPIC);
-        
DEFAULT_CONFIG_STORAGE_PROPS.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
"broker1:9092,broker2:9093");
-    }
-
-    private static final List<String> CONNECTOR_IDS = 
Arrays.asList("connector1", "connector2");
-    private static final List<String> CONNECTOR_CONFIG_KEYS = 
Arrays.asList("connector-connector1", "connector-connector2");
-    private static final List<String> COMMIT_TASKS_CONFIG_KEYS = 
Arrays.asList("commit-connector1", "commit-connector2");
-
-    // Need a) connector with multiple tasks and b) multiple connectors
-    private static final List<ConnectorTaskId> TASK_IDS = Arrays.asList(
-            new ConnectorTaskId("connector1", 0),
-            new ConnectorTaskId("connector1", 1),
-            new ConnectorTaskId("connector2", 0)
-    );
-    private static final List<String> TASK_CONFIG_KEYS = 
Arrays.asList("task-connector1-0", "task-connector1-1", "task-connector2-0");
-
-    // Need some placeholders -- the contents don't matter here, just that 
they are restored properly
-    private static final List<Map<String, String>> SAMPLE_CONFIGS = 
Arrays.asList(
-            Collections.singletonMap("config-key-one", "config-value-one"),
-            Collections.singletonMap("config-key-two", "config-value-two"),
-            Collections.singletonMap("config-key-three", "config-value-three")
-    );
-    private static final List<Struct> CONNECTOR_CONFIG_STRUCTS = Arrays.asList(
-            new 
Struct(KafkaConfigStorage.CONNECTOR_CONFIGURATION_V0).put("properties", 
SAMPLE_CONFIGS.get(0)),
-            new 
Struct(KafkaConfigStorage.CONNECTOR_CONFIGURATION_V0).put("properties", 
SAMPLE_CONFIGS.get(1)),
-            new 
Struct(KafkaConfigStorage.CONNECTOR_CONFIGURATION_V0).put("properties", 
SAMPLE_CONFIGS.get(2))
-    );
-    private static final List<Struct> TASK_CONFIG_STRUCTS = Arrays.asList(
-            new 
Struct(KafkaConfigStorage.TASK_CONFIGURATION_V0).put("properties", 
SAMPLE_CONFIGS.get(0)),
-            new 
Struct(KafkaConfigStorage.TASK_CONFIGURATION_V0).put("properties", 
SAMPLE_CONFIGS.get(1))
-    );
-
-    private static final Struct TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR
-            = new 
Struct(KafkaConfigStorage.CONNECTOR_TASKS_COMMIT_V0).put("tasks", 2);
-
-    // The exact format doesn't matter here since both conversions are mocked
-    private static final List<byte[]> CONFIGS_SERIALIZED = Arrays.asList(
-            "config-bytes-1".getBytes(), "config-bytes-2".getBytes(), 
"config-bytes-3".getBytes(),
-            "config-bytes-4".getBytes(), "config-bytes-5".getBytes(), 
"config-bytes-6".getBytes(),
-            "config-bytes-7".getBytes(), "config-bytes-8".getBytes(), 
"config-bytes-9".getBytes()
-    );
-
-    @Mock
-    private Converter converter;
-    @Mock
-    private Callback<String> connectorReconfiguredCallback;
-    @Mock
-    private Callback<List<ConnectorTaskId>> tasksReconfiguredCallback;
-    @Mock
-    KafkaBasedLog<String, byte[]> storeLog;
-    private KafkaConfigStorage configStorage;
-
-    private Capture<String> capturedTopic = EasyMock.newCapture();
-    private Capture<Map<String, Object>> capturedProducerProps = 
EasyMock.newCapture();
-    private Capture<Map<String, Object>> capturedConsumerProps = 
EasyMock.newCapture();
-    private Capture<Callback<ConsumerRecord<String, byte[]>>> 
capturedConsumedCallback = EasyMock.newCapture();
-
-    private long logOffset = 0;
-
-    @Before
-    public void setUp() {
-        configStorage = PowerMock.createPartialMock(KafkaConfigStorage.class, 
new String[]{"createKafkaBasedLog"},
-                converter, connectorReconfiguredCallback, 
tasksReconfiguredCallback);
-    }
-
-    @Test
-    public void testStartStop() throws Exception {
-        expectConfigure();
-        expectStart(Collections.EMPTY_LIST, Collections.EMPTY_MAP);
-        expectStop();
-
-        PowerMock.replayAll();
-
-        configStorage.configure(DEFAULT_CONFIG_STORAGE_PROPS);
-        assertEquals(TOPIC, capturedTopic.getValue());
-        assertEquals("org.apache.kafka.common.serialization.StringSerializer", 
capturedProducerProps.getValue().get(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG));
-        
assertEquals("org.apache.kafka.common.serialization.ByteArraySerializer", 
capturedProducerProps.getValue().get(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG));
-        
assertEquals("org.apache.kafka.common.serialization.StringDeserializer", 
capturedConsumerProps.getValue().get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG));
-        
assertEquals("org.apache.kafka.common.serialization.ByteArrayDeserializer", 
capturedConsumerProps.getValue().get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG));
-
-        configStorage.start();
-        configStorage.stop();
-
-        PowerMock.verifyAll();
-    }
-
-    @Test
-    public void testPutConnectorConfig() throws Exception {
-        expectConfigure();
-        expectStart(Collections.EMPTY_LIST, Collections.EMPTY_MAP);
-
-        expectConvertWriteAndRead(
-                CONNECTOR_CONFIG_KEYS.get(0), 
KafkaConfigStorage.CONNECTOR_CONFIGURATION_V0, CONFIGS_SERIALIZED.get(0),
-                "properties", SAMPLE_CONFIGS.get(0));
-        connectorReconfiguredCallback.onCompletion(null, CONNECTOR_IDS.get(0));
-        EasyMock.expectLastCall();
-
-        expectConvertWriteAndRead(
-                CONNECTOR_CONFIG_KEYS.get(1), 
KafkaConfigStorage.CONNECTOR_CONFIGURATION_V0, CONFIGS_SERIALIZED.get(1),
-                "properties", SAMPLE_CONFIGS.get(1));
-        connectorReconfiguredCallback.onCompletion(null, CONNECTOR_IDS.get(1));
-        EasyMock.expectLastCall();
-
-        // Config deletion
-        expectConvertWriteAndRead(
-                CONNECTOR_CONFIG_KEYS.get(1), 
KafkaConfigStorage.CONNECTOR_CONFIGURATION_V0, null, null, null);
-        connectorReconfiguredCallback.onCompletion(null, CONNECTOR_IDS.get(1));
-        EasyMock.expectLastCall();
-
-        expectStop();
-
-        PowerMock.replayAll();
-
-        configStorage.configure(DEFAULT_CONFIG_STORAGE_PROPS);
-        configStorage.start();
-
-        // Null before writing
-        ClusterConfigState configState = configStorage.snapshot();
-        assertEquals(-1, configState.offset());
-        assertNull(configState.connectorConfig(CONNECTOR_IDS.get(0)));
-        assertNull(configState.connectorConfig(CONNECTOR_IDS.get(1)));
-
-        // Writing should block until it is written and read back from Kafka
-        configStorage.putConnectorConfig(CONNECTOR_IDS.get(0), 
SAMPLE_CONFIGS.get(0));
-        configState = configStorage.snapshot();
-        assertEquals(1, configState.offset());
-        assertEquals(SAMPLE_CONFIGS.get(0), 
configState.connectorConfig(CONNECTOR_IDS.get(0)));
-        assertNull(configState.connectorConfig(CONNECTOR_IDS.get(1)));
-
-        // Second should also block and all configs should still be available
-        configStorage.putConnectorConfig(CONNECTOR_IDS.get(1), 
SAMPLE_CONFIGS.get(1));
-        configState = configStorage.snapshot();
-        assertEquals(2, configState.offset());
-        assertEquals(SAMPLE_CONFIGS.get(0), 
configState.connectorConfig(CONNECTOR_IDS.get(0)));
-        assertEquals(SAMPLE_CONFIGS.get(1), 
configState.connectorConfig(CONNECTOR_IDS.get(1)));
-
-        // Deletion should remove the second one we added
-        configStorage.putConnectorConfig(CONNECTOR_IDS.get(1), null);
-        configState = configStorage.snapshot();
-        assertEquals(3, configState.offset());
-        assertEquals(SAMPLE_CONFIGS.get(0), 
configState.connectorConfig(CONNECTOR_IDS.get(0)));
-        assertNull(configState.connectorConfig(CONNECTOR_IDS.get(1)));
-
-        configStorage.stop();
-
-        PowerMock.verifyAll();
-    }
-
-    @Test
-    public void testPutTaskConfigs() throws Exception {
-        expectConfigure();
-        expectStart(Collections.EMPTY_LIST, Collections.EMPTY_MAP);
-
-        // Task configs should read to end, write to the log, read to end, 
write root, then read to end again
-        expectReadToEnd(new LinkedHashMap<String, byte[]>());
-        expectConvertWriteRead(
-                TASK_CONFIG_KEYS.get(0), 
KafkaConfigStorage.TASK_CONFIGURATION_V0, CONFIGS_SERIALIZED.get(0),
-                "properties", SAMPLE_CONFIGS.get(0));
-        expectConvertWriteRead(
-                TASK_CONFIG_KEYS.get(1), 
KafkaConfigStorage.TASK_CONFIGURATION_V0, CONFIGS_SERIALIZED.get(1),
-                "properties", SAMPLE_CONFIGS.get(1));
-        expectReadToEnd(new LinkedHashMap<String, byte[]>());
-        expectConvertWriteRead(
-                COMMIT_TASKS_CONFIG_KEYS.get(0), 
KafkaConfigStorage.CONNECTOR_TASKS_COMMIT_V0, CONFIGS_SERIALIZED.get(2),
-                "tasks", 2); // Starts with 0 tasks, after update has 2
-        // As soon as root is rewritten, we should see a callback notifying us 
that we reconfigured some tasks
-        tasksReconfiguredCallback.onCompletion(null, 
Arrays.asList(TASK_IDS.get(0), TASK_IDS.get(1)));
-        EasyMock.expectLastCall();
-
-        // Records to be read by consumer as it reads to the end of the log
-        LinkedHashMap<String, byte[]> serializedConfigs = new 
LinkedHashMap<>();
-        serializedConfigs.put(TASK_CONFIG_KEYS.get(0), 
CONFIGS_SERIALIZED.get(0));
-        serializedConfigs.put(TASK_CONFIG_KEYS.get(1), 
CONFIGS_SERIALIZED.get(1));
-        serializedConfigs.put(COMMIT_TASKS_CONFIG_KEYS.get(0), 
CONFIGS_SERIALIZED.get(2));
-        expectReadToEnd(serializedConfigs);
-
-        expectStop();
-
-        PowerMock.replayAll();
-
-
-        configStorage.configure(DEFAULT_CONFIG_STORAGE_PROPS);
-        configStorage.start();
-
-        // Bootstrap as if we had already added the connector, but no tasks 
had been added yet
-        whiteboxAddConnector(CONNECTOR_IDS.get(0), SAMPLE_CONFIGS.get(0), 
Collections.EMPTY_LIST);
-
-        // Null before writing
-        ClusterConfigState configState = configStorage.snapshot();
-        assertEquals(-1, configState.offset());
-        assertNull(configState.taskConfig(TASK_IDS.get(0)));
-        assertNull(configState.taskConfig(TASK_IDS.get(1)));
-
-        // Writing task task configs should block until all the writes have 
been performed and the root record update
-        // has completed
-        Map<ConnectorTaskId, Map<String, String>> taskConfigs = new 
HashMap<>();
-        taskConfigs.put(TASK_IDS.get(0), SAMPLE_CONFIGS.get(0));
-        taskConfigs.put(TASK_IDS.get(1), SAMPLE_CONFIGS.get(1));
-        configStorage.putTaskConfigs(taskConfigs);
-
-        // Validate root config by listing all connectors and tasks
-        configState = configStorage.snapshot();
-        assertEquals(3, configState.offset());
-        String connectorName = CONNECTOR_IDS.get(0);
-        assertEquals(Arrays.asList(connectorName), new 
ArrayList<>(configState.connectors()));
-        assertEquals(Arrays.asList(TASK_IDS.get(0), TASK_IDS.get(1)), 
configState.tasks(connectorName));
-        assertEquals(SAMPLE_CONFIGS.get(0), 
configState.taskConfig(TASK_IDS.get(0)));
-        assertEquals(SAMPLE_CONFIGS.get(1), 
configState.taskConfig(TASK_IDS.get(1)));
-        assertEquals(Collections.EMPTY_SET, 
configState.inconsistentConnectors());
-
-        configStorage.stop();
-
-        PowerMock.verifyAll();
-    }
-
-    @Test
-    public void testRestore() throws Exception {
-        // Restoring data should notify only of the latest values after 
loading is complete. This also validates
-        // that inconsistent state is ignored.
-
-        expectConfigure();
-        // Overwrite each type at least once to ensure we see the latest data 
after loading
-        List<ConsumerRecord<String, byte[]>> existingRecords = Arrays.asList(
-                new ConsumerRecord<>(TOPIC, 0, 0, 
CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(0)),
-                new ConsumerRecord<>(TOPIC, 0, 1, TASK_CONFIG_KEYS.get(0), 
CONFIGS_SERIALIZED.get(1)),
-                new ConsumerRecord<>(TOPIC, 0, 2, TASK_CONFIG_KEYS.get(1), 
CONFIGS_SERIALIZED.get(2)),
-                new ConsumerRecord<>(TOPIC, 0, 3, 
CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(3)),
-                new ConsumerRecord<>(TOPIC, 0, 4, 
COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(4)),
-                // Connector after root update should make it through, task 
update shouldn't
-                new ConsumerRecord<>(TOPIC, 0, 5, 
CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(5)),
-                new ConsumerRecord<>(TOPIC, 0, 6, TASK_CONFIG_KEYS.get(0), 
CONFIGS_SERIALIZED.get(6)));
-        LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap();
-        deserialized.put(CONFIGS_SERIALIZED.get(0), 
CONNECTOR_CONFIG_STRUCTS.get(0));
-        deserialized.put(CONFIGS_SERIALIZED.get(1), 
TASK_CONFIG_STRUCTS.get(0));
-        deserialized.put(CONFIGS_SERIALIZED.get(2), 
TASK_CONFIG_STRUCTS.get(0));
-        deserialized.put(CONFIGS_SERIALIZED.get(3), 
CONNECTOR_CONFIG_STRUCTS.get(1));
-        deserialized.put(CONFIGS_SERIALIZED.get(4), 
TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR);
-        deserialized.put(CONFIGS_SERIALIZED.get(5), 
CONNECTOR_CONFIG_STRUCTS.get(2));
-        deserialized.put(CONFIGS_SERIALIZED.get(6), 
TASK_CONFIG_STRUCTS.get(1));
-        logOffset = 7;
-        expectStart(existingRecords, deserialized);
-
-        // Shouldn't see any callbacks since this is during startup
-
-        expectStop();
-
-        PowerMock.replayAll();
-
-        configStorage.configure(DEFAULT_CONFIG_STORAGE_PROPS);
-        configStorage.start();
-
-        // Should see a single connector and its config should be the last one 
seen anywhere in the log
-        ClusterConfigState configState = configStorage.snapshot();
-        assertEquals(7, configState.offset()); // Should always be next to be 
read, even if uncommitted
-        assertEquals(Arrays.asList(CONNECTOR_IDS.get(0)), new 
ArrayList<>(configState.connectors()));
-        // CONNECTOR_CONFIG_STRUCTS[2] -> SAMPLE_CONFIGS[2]
-        assertEquals(SAMPLE_CONFIGS.get(2), 
configState.connectorConfig(CONNECTOR_IDS.get(0)));
-        // Should see 2 tasks for that connector. Only config updates before 
the root key update should be reflected
-        assertEquals(Arrays.asList(TASK_IDS.get(0), TASK_IDS.get(1)), 
configState.tasks(CONNECTOR_IDS.get(0)));
-        // Both TASK_CONFIG_STRUCTS[0] -> SAMPLE_CONFIGS[0]
-        assertEquals(SAMPLE_CONFIGS.get(0), 
configState.taskConfig(TASK_IDS.get(0)));
-        assertEquals(SAMPLE_CONFIGS.get(0), 
configState.taskConfig(TASK_IDS.get(1)));
-        assertEquals(Collections.EMPTY_SET, 
configState.inconsistentConnectors());
-
-        configStorage.stop();
-
-        PowerMock.verifyAll();
-    }
-
-    @Test
-    public void testPutTaskConfigsDoesNotResolveAllInconsistencies() throws 
Exception {
-        // Test a case where a failure and compaction has left us in an 
inconsistent state when reading the log.
-        // We start out by loading an initial configuration where we started 
to write a task update and failed before
-        // writing an the commit, and then compaction cleaned up the earlier 
record.
-
-        expectConfigure();
-        List<ConsumerRecord<String, byte[]>> existingRecords = Arrays.asList(
-                new ConsumerRecord<>(TOPIC, 0, 0, 
CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(0)),
-                // This is the record that has been compacted:
-                //new ConsumerRecord<>(TOPIC, 0, 1, TASK_CONFIG_KEYS.get(0), 
CONFIGS_SERIALIZED.get(1)),
-                new ConsumerRecord<>(TOPIC, 0, 2, TASK_CONFIG_KEYS.get(1), 
CONFIGS_SERIALIZED.get(2)),
-                new ConsumerRecord<>(TOPIC, 0, 4, 
COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(4)),
-                new ConsumerRecord<>(TOPIC, 0, 5, TASK_CONFIG_KEYS.get(0), 
CONFIGS_SERIALIZED.get(5)));
-        LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap();
-        deserialized.put(CONFIGS_SERIALIZED.get(0), 
CONNECTOR_CONFIG_STRUCTS.get(0));
-        deserialized.put(CONFIGS_SERIALIZED.get(2), 
TASK_CONFIG_STRUCTS.get(0));
-        deserialized.put(CONFIGS_SERIALIZED.get(4), 
TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR);
-        deserialized.put(CONFIGS_SERIALIZED.get(5), 
TASK_CONFIG_STRUCTS.get(1));
-        logOffset = 6;
-        expectStart(existingRecords, deserialized);
-
-        // One failed attempt to write new task configs
-        expectReadToEnd(new LinkedHashMap<String, byte[]>());
-
-        // Successful attempt to write new task config
-        expectReadToEnd(new LinkedHashMap<String, byte[]>());
-        expectConvertWriteRead(
-                TASK_CONFIG_KEYS.get(0), 
KafkaConfigStorage.TASK_CONFIGURATION_V0, CONFIGS_SERIALIZED.get(0),
-                "properties", SAMPLE_CONFIGS.get(0));
-        expectReadToEnd(new LinkedHashMap<String, byte[]>());
-        expectConvertWriteRead(
-                COMMIT_TASKS_CONFIG_KEYS.get(0), 
KafkaConfigStorage.CONNECTOR_TASKS_COMMIT_V0, CONFIGS_SERIALIZED.get(2),
-                "tasks", 1); // Updated to just 1 task
-        // As soon as root is rewritten, we should see a callback notifying us 
that we reconfigured some tasks
-        tasksReconfiguredCallback.onCompletion(null, 
Arrays.asList(TASK_IDS.get(0)));
-        EasyMock.expectLastCall();
-        // Records to be read by consumer as it reads to the end of the log
-        LinkedHashMap<String, byte[]> serializedConfigs = new 
LinkedHashMap<>();
-        serializedConfigs.put(TASK_CONFIG_KEYS.get(0), 
CONFIGS_SERIALIZED.get(0));
-        serializedConfigs.put(COMMIT_TASKS_CONFIG_KEYS.get(0), 
CONFIGS_SERIALIZED.get(2));
-        expectReadToEnd(serializedConfigs);
-
-
-        expectStop();
-
-        PowerMock.replayAll();
-
-        configStorage.configure(DEFAULT_CONFIG_STORAGE_PROPS);
-        configStorage.start();
-        // After reading the log, it should have been in an inconsistent state
-        ClusterConfigState configState = configStorage.snapshot();
-        assertEquals(6, configState.offset()); // Should always be next to be 
read, not last committed
-        assertEquals(Arrays.asList(CONNECTOR_IDS.get(0)), new 
ArrayList<>(configState.connectors()));
-        // Inconsistent data should leave us with no tasks listed for the 
connector and an entry in the inconsistent list
-        assertEquals(Collections.EMPTY_LIST, 
configState.tasks(CONNECTOR_IDS.get(0)));
-        // Both TASK_CONFIG_STRUCTS[0] -> SAMPLE_CONFIGS[0]
-        assertNull(configState.taskConfig(TASK_IDS.get(0)));
-        assertNull(configState.taskConfig(TASK_IDS.get(1)));
-        assertEquals(Collections.singleton(CONNECTOR_IDS.get(0)), 
configState.inconsistentConnectors());
-
-        // First try sending an invalid set of configs (can't possibly 
represent a valid config set for the tasks)
-        try {
-            
configStorage.putTaskConfigs(Collections.singletonMap(TASK_IDS.get(1), 
SAMPLE_CONFIGS.get(2)));
-            fail("Should have failed due to incomplete task set.");
-        } catch (KafkaException e) {
-            // expected
-        }
-
-        // Next, issue a write that has everything that is needed and it 
should be accepted. Note that in this case
-        // we are going to shrink the number of tasks to 1
-        configStorage.putTaskConfigs(Collections.singletonMap(TASK_IDS.get(0), 
SAMPLE_CONFIGS.get(0)));
-        // Validate updated config
-        configState = configStorage.snapshot();
-        // This is only two more ahead of the last one because multiple calls 
fail, and so their configs are not written
-        // to the topic. Only the last call with 1 task config + 1 commit 
actually gets written.
-        assertEquals(8, configState.offset());
-        assertEquals(Arrays.asList(CONNECTOR_IDS.get(0)), new 
ArrayList<>(configState.connectors()));
-        assertEquals(Arrays.asList(TASK_IDS.get(0)), 
configState.tasks(CONNECTOR_IDS.get(0)));
-        assertEquals(SAMPLE_CONFIGS.get(0), 
configState.taskConfig(TASK_IDS.get(0)));
-        assertEquals(Collections.EMPTY_SET, 
configState.inconsistentConnectors());
-
-        configStorage.stop();
-
-        PowerMock.verifyAll();
-    }
-
-    private void expectConfigure() throws Exception {
-        PowerMock.expectPrivate(configStorage, "createKafkaBasedLog",
-                EasyMock.capture(capturedTopic), 
EasyMock.capture(capturedProducerProps),
-                EasyMock.capture(capturedConsumerProps), 
EasyMock.capture(capturedConsumedCallback))
-                .andReturn(storeLog);
-    }
-
-    // If non-empty, deserializations should be a LinkedHashMap
-    private void expectStart(final List<ConsumerRecord<String, byte[]>> 
preexistingRecords,
-                             final Map<byte[], Struct> deserializations) 
throws Exception {
-        storeLog.start();
-        PowerMock.expectLastCall().andAnswer(new IAnswer<Object>() {
-            @Override
-            public Object answer() throws Throwable {
-                for (ConsumerRecord<String, byte[]> rec : preexistingRecords)
-                    capturedConsumedCallback.getValue().onCompletion(null, 
rec);
-                return null;
-            }
-        });
-        for (Map.Entry<byte[], Struct> deserializationEntry : 
deserializations.entrySet()) {
-            // Note null schema because default settings for internal 
serialization are schema-less
-            EasyMock.expect(converter.toCopycatData(EasyMock.eq(TOPIC), 
EasyMock.aryEq(deserializationEntry.getKey())))
-                    .andReturn(new SchemaAndValue(null, 
structToMap(deserializationEntry.getValue())));
-        }
-    }
-
-    private void expectStop() {
-        storeLog.stop();
-        PowerMock.expectLastCall();
-    }
-
-    // Expect a conversion & write to the underlying log, followed by a 
subsequent read when the data is consumed back
-    // from the log. Validate the data that is captured when the conversion is 
performed matches the specified data
-    // (by checking a single field's value)
-    private void expectConvertWriteRead(final String configKey, final Schema 
valueSchema, final byte[] serialized,
-                                        final String dataFieldName, final 
Object dataFieldValue) {
-        final Capture<Struct> capturedRecord = EasyMock.newCapture();
-        if (serialized != null)
-            EasyMock.expect(converter.fromCopycatData(EasyMock.eq(TOPIC), 
EasyMock.eq(valueSchema), EasyMock.capture(capturedRecord)))
-                    .andReturn(serialized);
-        storeLog.send(EasyMock.eq(configKey), EasyMock.aryEq(serialized));
-        PowerMock.expectLastCall();
-        EasyMock.expect(converter.toCopycatData(EasyMock.eq(TOPIC), 
EasyMock.aryEq(serialized)))
-                .andAnswer(new IAnswer<SchemaAndValue>() {
-                    @Override
-                    public SchemaAndValue answer() throws Throwable {
-                        if (dataFieldName != null)
-                            assertEquals(dataFieldValue, 
capturedRecord.getValue().get(dataFieldName));
-                        // Note null schema because default settings for 
internal serialization are schema-less
-                        return new SchemaAndValue(null, serialized == null ? 
null : structToMap(capturedRecord.getValue()));
-                    }
-                });
-    }
-
-    // This map needs to maintain ordering
-    private void expectReadToEnd(final LinkedHashMap<String, byte[]> 
serializedConfigs) {
-        EasyMock.expect(storeLog.readToEnd())
-                .andAnswer(new IAnswer<Future<Void>>() {
-                    @Override
-                    public Future<Void> answer() throws Throwable {
-                        TestFuture<Void> future = new TestFuture<Void>();
-                        for (Map.Entry<String, byte[]> entry : 
serializedConfigs.entrySet())
-                            
capturedConsumedCallback.getValue().onCompletion(null, new 
ConsumerRecord<>(TOPIC, 0, logOffset++, entry.getKey(), entry.getValue()));
-                        future.resolveOnGet((Void) null);
-                        return future;
-                    }
-                });
-    }
-
-
-    private void expectConvertWriteAndRead(final String configKey, final 
Schema valueSchema, final byte[] serialized,
-                                                                 final String 
dataFieldName, final Object dataFieldValue) {
-        expectConvertWriteRead(configKey, valueSchema, serialized, 
dataFieldName, dataFieldValue);
-        LinkedHashMap<String, byte[]> recordsToRead = new LinkedHashMap<>();
-        recordsToRead.put(configKey, serialized);
-        expectReadToEnd(recordsToRead);
-    }
-
-    // Manually insert a connector into config storage, updating the task 
configs, connector config, and root config
-    private void whiteboxAddConnector(String connectorName, Map<String, 
String> connectorConfig, List<Map<String, String>> taskConfigs) {
-        Map<ConnectorTaskId, Map<String, String>> storageTaskConfigs = 
Whitebox.getInternalState(configStorage, "taskConfigs");
-        for (int i = 0; i < taskConfigs.size(); i++)
-            storageTaskConfigs.put(new ConnectorTaskId(connectorName, i), 
taskConfigs.get(i));
-
-        Map<String, Map<String, String>> connectorConfigs = 
Whitebox.getInternalState(configStorage, "connectorConfigs");
-        connectorConfigs.put(connectorName, connectorConfig);
-
-        Whitebox.<Map<String, Integer>>getInternalState(configStorage, 
"connectorTaskCounts").put(connectorName, taskConfigs.size());
-    }
-
-    // Generates a Map representation of Struct. Only does shallow traversal, 
so nested structs are not converted
-    private Map<String, Object> structToMap(Struct struct) {
-        HashMap<String, Object> result = new HashMap<>();
-        for (Field field : struct.schema().fields())
-            result.put(field.name(), struct.get(field));
-        return result;
-    }
-
-}

Reply via email to