http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskThreadedTest.java
----------------------------------------------------------------------
diff --git 
a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskThreadedTest.java
 
b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskThreadedTest.java
deleted file mode 100644
index ded78a1..0000000
--- 
a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskThreadedTest.java
+++ /dev/null
@@ -1,563 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.kafka.copycat.runtime;
-
-import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.consumer.OffsetAndMetadata;
-import org.apache.kafka.clients.consumer.OffsetCommitCallback;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.copycat.data.Schema;
-import org.apache.kafka.copycat.data.SchemaAndValue;
-import org.apache.kafka.copycat.errors.CopycatException;
-import org.apache.kafka.copycat.runtime.standalone.StandaloneConfig;
-import org.apache.kafka.copycat.sink.SinkConnector;
-import org.apache.kafka.copycat.sink.SinkRecord;
-import org.apache.kafka.copycat.sink.SinkTask;
-import org.apache.kafka.copycat.storage.Converter;
-import org.apache.kafka.copycat.util.ConnectorTaskId;
-import org.apache.kafka.copycat.util.MockTime;
-import org.apache.kafka.copycat.util.ThreadedTest;
-import org.easymock.Capture;
-import org.easymock.CaptureType;
-import org.easymock.EasyMock;
-import org.easymock.IAnswer;
-import org.easymock.IExpectationSetters;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.powermock.api.easymock.PowerMock;
-import org.powermock.api.easymock.annotation.Mock;
-import org.powermock.core.classloader.annotations.PowerMockIgnore;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-import org.powermock.reflect.Whitebox;
-
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-@RunWith(PowerMockRunner.class)
-@PrepareForTest(WorkerSinkTask.class)
-@PowerMockIgnore("javax.management.*")
-public class WorkerSinkTaskThreadedTest extends ThreadedTest {
-
-    // These are fixed to keep this code simpler. In this example we assume 
byte[] raw values
-    // with mix of integer/string in Copycat
-    private static final String TOPIC = "test";
-    private static final int PARTITION = 12;
-    private static final int PARTITION2 = 13;
-    private static final int PARTITION3 = 14;
-    private static final long FIRST_OFFSET = 45;
-    private static final Schema KEY_SCHEMA = Schema.INT32_SCHEMA;
-    private static final int KEY = 12;
-    private static final Schema VALUE_SCHEMA = Schema.STRING_SCHEMA;
-    private static final String VALUE = "VALUE";
-    private static final byte[] RAW_KEY = "key".getBytes();
-    private static final byte[] RAW_VALUE = "value".getBytes();
-
-    private static final TopicPartition TOPIC_PARTITION = new 
TopicPartition(TOPIC, PARTITION);
-    private static final TopicPartition TOPIC_PARTITION2 = new 
TopicPartition(TOPIC, PARTITION2);
-    private static final TopicPartition TOPIC_PARTITION3 = new 
TopicPartition(TOPIC, PARTITION3);
-    private static final TopicPartition UNASSIGNED_TOPIC_PARTITION = new 
TopicPartition(TOPIC, 200);
-
-    private static final Map<String, String> TASK_PROPS = new HashMap<>();
-    static {
-        TASK_PROPS.put(SinkConnector.TOPICS_CONFIG, TOPIC);
-    }
-
-    private ConnectorTaskId taskId = new ConnectorTaskId("job", 0);
-    private Time time;
-    @Mock private SinkTask sinkTask;
-    private Capture<WorkerSinkTaskContext> sinkTaskContext = 
EasyMock.newCapture();
-    private WorkerConfig workerConfig;
-    @Mock private Converter keyConverter;
-    @Mock
-    private Converter valueConverter;
-    private WorkerSinkTask workerTask;
-    @Mock private KafkaConsumer<byte[], byte[]> consumer;
-    private WorkerSinkTaskThread workerThread;
-    private Capture<ConsumerRebalanceListener> rebalanceListener = 
EasyMock.newCapture();
-
-    private long recordsReturned;
-
-    @SuppressWarnings("unchecked")
-    @Override
-    public void setup() {
-        super.setup();
-        time = new MockTime();
-        Map<String, String> workerProps = new HashMap<>();
-        workerProps.put("key.converter", 
"org.apache.kafka.copycat.json.JsonConverter");
-        workerProps.put("value.converter", 
"org.apache.kafka.copycat.json.JsonConverter");
-        workerProps.put("internal.key.converter", 
"org.apache.kafka.copycat.json.JsonConverter");
-        workerProps.put("internal.value.converter", 
"org.apache.kafka.copycat.json.JsonConverter");
-        workerProps.put("internal.key.converter.schemas.enable", "false");
-        workerProps.put("internal.value.converter.schemas.enable", "false");
-        workerConfig = new StandaloneConfig(workerProps);
-        workerTask = PowerMock.createPartialMock(
-                WorkerSinkTask.class, new String[]{"createConsumer", 
"createWorkerThread"},
-                taskId, sinkTask, workerConfig, keyConverter, valueConverter, 
time);
-
-        recordsReturned = 0;
-    }
-
-    @Test
-    public void testPollsInBackground() throws Exception {
-        expectInitializeTask();
-        Capture<Collection<SinkRecord>> capturedRecords = expectPolls(1L);
-        expectStopTask(10L);
-        EasyMock.expect(workerThread.awaitShutdown(EasyMock.anyLong(), 
EasyMock.<TimeUnit>anyObject())).andReturn(true);
-
-        PowerMock.replayAll();
-
-        workerTask.start(TASK_PROPS);
-        workerTask.joinConsumerGroupAndStart();
-        for (int i = 0; i < 10; i++) {
-            workerThread.iteration();
-        }
-        workerTask.stop();
-        workerTask.awaitStop(Long.MAX_VALUE);
-        workerTask.close();
-
-        // Verify contents match expected values, i.e. that they were 
translated properly. With max
-        // batch size 1 and poll returns 1 message at a time, we should have a 
matching # of batches
-        assertEquals(10, capturedRecords.getValues().size());
-        int offset = 0;
-        for (Collection<SinkRecord> recs : capturedRecords.getValues()) {
-            assertEquals(1, recs.size());
-            for (SinkRecord rec : recs) {
-                SinkRecord referenceSinkRecord
-                        = new SinkRecord(TOPIC, PARTITION, KEY_SCHEMA, KEY, 
VALUE_SCHEMA, VALUE, FIRST_OFFSET + offset);
-                assertEquals(referenceSinkRecord, rec);
-                offset++;
-            }
-        }
-
-        PowerMock.verifyAll();
-    }
-
-    @Test
-    public void testCommit() throws Exception {
-        expectInitializeTask();
-        // Make each poll() take the offset commit interval
-        Capture<Collection<SinkRecord>> capturedRecords
-                = expectPolls(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_DEFAULT);
-        expectOffsetFlush(1L, null, null, 0, true);
-        expectStopTask(2);
-        EasyMock.expect(workerThread.awaitShutdown(EasyMock.anyLong(), 
EasyMock.<TimeUnit>anyObject())).andReturn(true);
-
-        PowerMock.replayAll();
-
-        workerTask.start(TASK_PROPS);
-        workerTask.joinConsumerGroupAndStart();
-        // First iteration gets one record
-        workerThread.iteration();
-        // Second triggers commit, gets a second offset
-        workerThread.iteration();
-        // Commit finishes synchronously for testing so we can check this 
immediately
-        assertEquals(0, workerThread.commitFailures());
-        workerTask.stop();
-        workerTask.awaitStop(Long.MAX_VALUE);
-        workerTask.close();
-
-        assertEquals(2, capturedRecords.getValues().size());
-
-        PowerMock.verifyAll();
-    }
-
-    @Test
-    public void testCommitTaskFlushFailure() throws Exception {
-        expectInitializeTask();
-        Capture<Collection<SinkRecord>> capturedRecords = 
expectPolls(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_DEFAULT);
-        expectOffsetFlush(1L, new RuntimeException(), null, 0, true);
-        // Should rewind to last known good positions, which in this case will 
be the offsets loaded during initialization
-        // for all topic partitions
-        consumer.seek(TOPIC_PARTITION, FIRST_OFFSET);
-        PowerMock.expectLastCall();
-        consumer.seek(TOPIC_PARTITION2, FIRST_OFFSET);
-        PowerMock.expectLastCall();
-        consumer.seek(TOPIC_PARTITION3, FIRST_OFFSET);
-        PowerMock.expectLastCall();
-        expectStopTask(2);
-        EasyMock.expect(workerThread.awaitShutdown(EasyMock.anyLong(), 
EasyMock.<TimeUnit>anyObject())).andReturn(true);
-
-        PowerMock.replayAll();
-
-        workerTask.start(TASK_PROPS);
-        workerTask.joinConsumerGroupAndStart();
-        // Second iteration triggers commit
-        workerThread.iteration();
-        workerThread.iteration();
-        assertEquals(1, workerThread.commitFailures());
-        assertEquals(false, Whitebox.getInternalState(workerThread, 
"committing"));
-        workerTask.stop();
-        workerTask.awaitStop(Long.MAX_VALUE);
-        workerTask.close();
-
-        PowerMock.verifyAll();
-    }
-
-    @Test
-    public void testCommitTaskSuccessAndFlushFailure() throws Exception {
-        // Validate that we rewind to the correct offsets if a task's flush 
method throws an exception
-
-        expectInitializeTask();
-        Capture<Collection<SinkRecord>> capturedRecords = 
expectPolls(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_DEFAULT);
-        expectOffsetFlush(1L, null, null, 0, true);
-        expectOffsetFlush(2L, new RuntimeException(), null, 0, true);
-        // Should rewind to last known committed positions
-        consumer.seek(TOPIC_PARTITION, FIRST_OFFSET + 1);
-        PowerMock.expectLastCall();
-        consumer.seek(TOPIC_PARTITION2, FIRST_OFFSET);
-        PowerMock.expectLastCall();
-        consumer.seek(TOPIC_PARTITION3, FIRST_OFFSET);
-        PowerMock.expectLastCall();
-        expectStopTask(2);
-        EasyMock.expect(workerThread.awaitShutdown(EasyMock.anyLong(), 
EasyMock.<TimeUnit>anyObject())).andReturn(true);
-
-        PowerMock.replayAll();
-
-        workerTask.start(TASK_PROPS);
-        workerTask.joinConsumerGroupAndStart();
-        // Second iteration triggers first commit, third iteration triggers 
second (failing) commit
-        workerThread.iteration();
-        workerThread.iteration();
-        workerThread.iteration();
-        assertEquals(1, workerThread.commitFailures());
-        assertEquals(false, Whitebox.getInternalState(workerThread, 
"committing"));
-        workerTask.stop();
-        workerTask.awaitStop(Long.MAX_VALUE);
-        workerTask.close();
-
-        PowerMock.verifyAll();
-    }
-
-    @Test
-    public void testCommitConsumerFailure() throws Exception {
-        expectInitializeTask();
-        Capture<Collection<SinkRecord>> capturedRecords
-                = expectPolls(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_DEFAULT);
-        expectOffsetFlush(1L, null, new Exception(), 0, true);
-        expectStopTask(2);
-        EasyMock.expect(workerThread.awaitShutdown(EasyMock.anyLong(), 
EasyMock.<TimeUnit>anyObject())).andReturn(true);
-
-        PowerMock.replayAll();
-
-        workerTask.start(TASK_PROPS);
-        workerTask.joinConsumerGroupAndStart();
-        // Second iteration triggers commit
-        workerThread.iteration();
-        workerThread.iteration();
-        // TODO Response to consistent failures?
-        assertEquals(1, workerThread.commitFailures());
-        assertEquals(false, Whitebox.getInternalState(workerThread, 
"committing"));
-        workerTask.stop();
-        workerTask.awaitStop(Long.MAX_VALUE);
-        workerTask.close();
-
-        PowerMock.verifyAll();
-    }
-
-    @Test
-    public void testCommitTimeout() throws Exception {
-        expectInitializeTask();
-        // Cut down amount of time to pass in each poll so we trigger exactly 
1 offset commit
-        Capture<Collection<SinkRecord>> capturedRecords
-                = expectPolls(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_DEFAULT / 
2);
-        expectOffsetFlush(2L, null, null, 
WorkerConfig.OFFSET_COMMIT_TIMEOUT_MS_DEFAULT, false);
-        expectStopTask(4);
-        EasyMock.expect(workerThread.awaitShutdown(EasyMock.anyLong(), 
EasyMock.<TimeUnit>anyObject())).andReturn(true);
-
-        PowerMock.replayAll();
-
-        workerTask.start(TASK_PROPS);
-        workerTask.joinConsumerGroupAndStart();
-        // Third iteration triggers commit, fourth gives a chance to trigger 
the timeout but doesn't
-        // trigger another commit
-        workerThread.iteration();
-        workerThread.iteration();
-        workerThread.iteration();
-        workerThread.iteration();
-        // TODO Response to consistent failures?
-        assertEquals(1, workerThread.commitFailures());
-        assertEquals(false, Whitebox.getInternalState(workerThread, 
"committing"));
-        workerTask.stop();
-        workerTask.awaitStop(Long.MAX_VALUE);
-        workerTask.close();
-
-        PowerMock.verifyAll();
-    }
-
-    @Test
-    public void testAssignmentPauseResume() throws Exception {
-        // Just validate that the calls are passed through to the consumer, 
and that where appropriate errors are
-        // converted
-        expectInitializeTask();
-
-        expectOnePoll().andAnswer(new IAnswer<Object>() {
-            @Override
-            public Object answer() throws Throwable {
-                assertEquals(new HashSet<>(Arrays.asList(TOPIC_PARTITION, 
TOPIC_PARTITION2, TOPIC_PARTITION3)),
-                        sinkTaskContext.getValue().assignment());
-                return null;
-            }
-        });
-        EasyMock.expect(consumer.assignment()).andReturn(new 
HashSet<>(Arrays.asList(TOPIC_PARTITION, TOPIC_PARTITION2, TOPIC_PARTITION3)));
-
-        expectOnePoll().andAnswer(new IAnswer<Object>() {
-            @Override
-            public Object answer() throws Throwable {
-                try {
-                    
sinkTaskContext.getValue().pause(UNASSIGNED_TOPIC_PARTITION);
-                    fail("Trying to pause unassigned partition should have 
thrown an Copycat exception");
-                } catch (CopycatException e) {
-                    // expected
-                }
-                sinkTaskContext.getValue().pause(TOPIC_PARTITION, 
TOPIC_PARTITION2);
-                return null;
-            }
-        });
-        consumer.pause(UNASSIGNED_TOPIC_PARTITION);
-        PowerMock.expectLastCall().andThrow(new 
IllegalStateException("unassigned topic partition"));
-        consumer.pause(TOPIC_PARTITION, TOPIC_PARTITION2);
-        PowerMock.expectLastCall();
-
-        expectOnePoll().andAnswer(new IAnswer<Object>() {
-            @Override
-            public Object answer() throws Throwable {
-                try {
-                    
sinkTaskContext.getValue().resume(UNASSIGNED_TOPIC_PARTITION);
-                    fail("Trying to resume unassigned partition should have 
thrown an Copycat exception");
-                } catch (CopycatException e) {
-                    // expected
-                }
-
-                sinkTaskContext.getValue().resume(TOPIC_PARTITION, 
TOPIC_PARTITION2);
-                return null;
-            }
-        });
-        consumer.resume(UNASSIGNED_TOPIC_PARTITION);
-        PowerMock.expectLastCall().andThrow(new 
IllegalStateException("unassigned topic partition"));
-        consumer.resume(TOPIC_PARTITION, TOPIC_PARTITION2);
-        PowerMock.expectLastCall();
-
-        expectStopTask(0);
-        EasyMock.expect(workerThread.awaitShutdown(EasyMock.anyLong(), 
EasyMock.<TimeUnit>anyObject())).andReturn(true);
-
-        PowerMock.replayAll();
-
-        workerTask.start(TASK_PROPS);
-        workerTask.joinConsumerGroupAndStart();
-        workerThread.iteration();
-        workerThread.iteration();
-        workerThread.iteration();
-        workerTask.stop();
-        workerTask.awaitStop(Long.MAX_VALUE);
-        workerTask.close();
-
-        PowerMock.verifyAll();
-    }
-
-    @Test
-    public void testRewind() throws Exception {
-        expectInitializeTask();
-        final long startOffset = 40L;
-        final Map<TopicPartition, Long> offsets = new HashMap<>();
-
-        expectOnePoll().andAnswer(new IAnswer<Object>() {
-            @Override
-            public Object answer() throws Throwable {
-                offsets.put(TOPIC_PARTITION, startOffset);
-                sinkTaskContext.getValue().offset(offsets);
-                return null;
-            }
-        });
-
-        consumer.seek(TOPIC_PARTITION, startOffset);
-        EasyMock.expectLastCall();
-
-        expectOnePoll().andAnswer(new IAnswer<Object>() {
-            @Override
-            public Object answer() throws Throwable {
-                Map<TopicPartition, Long> offsets = 
sinkTaskContext.getValue().offsets();
-                assertEquals(0, offsets.size());
-                return null;
-            }
-        });
-
-        expectStopTask(3);
-        EasyMock.expect(workerThread.awaitShutdown(EasyMock.anyLong(), 
EasyMock.<TimeUnit>anyObject())).andReturn(true);
-
-        PowerMock.replayAll();
-
-        workerTask.start(TASK_PROPS);
-        workerTask.joinConsumerGroupAndStart();
-        workerThread.iteration();
-        workerThread.iteration();
-        workerTask.stop();
-        workerTask.awaitStop(Long.MAX_VALUE);
-        workerTask.close();
-
-        PowerMock.verifyAll();
-    }
-
-    private void expectInitializeTask() throws Exception {
-        PowerMock.expectPrivate(workerTask, 
"createConsumer").andReturn(consumer);
-
-        workerThread = PowerMock.createPartialMock(WorkerSinkTaskThread.class, 
new String[]{"start", "awaitShutdown"},
-                workerTask, "mock-worker-thread", time,
-                workerConfig);
-        PowerMock.expectPrivate(workerTask, "createWorkerThread")
-                .andReturn(workerThread);
-        workerThread.start();
-        PowerMock.expectLastCall();
-
-        consumer.subscribe(EasyMock.eq(Arrays.asList(TOPIC)), 
EasyMock.capture(rebalanceListener));
-        PowerMock.expectLastCall();
-
-        EasyMock.expect(consumer.poll(EasyMock.anyLong())).andAnswer(new 
IAnswer<ConsumerRecords<byte[], byte[]>>() {
-            @Override
-            public ConsumerRecords<byte[], byte[]> answer() throws Throwable {
-                
rebalanceListener.getValue().onPartitionsAssigned(Arrays.asList(TOPIC_PARTITION,
 TOPIC_PARTITION2, TOPIC_PARTITION3));
-                return ConsumerRecords.empty();
-            }
-        });
-        
EasyMock.expect(consumer.position(TOPIC_PARTITION)).andReturn(FIRST_OFFSET);
-        
EasyMock.expect(consumer.position(TOPIC_PARTITION2)).andReturn(FIRST_OFFSET);
-        
EasyMock.expect(consumer.position(TOPIC_PARTITION3)).andReturn(FIRST_OFFSET);
-
-        sinkTask.initialize(EasyMock.capture(sinkTaskContext));
-        PowerMock.expectLastCall();
-        sinkTask.start(TASK_PROPS);
-        PowerMock.expectLastCall();
-    }
-
-    private void expectStopTask(final long expectedMessages) throws Exception {
-        final long finalOffset = FIRST_OFFSET + expectedMessages - 1;
-
-        sinkTask.stop();
-        PowerMock.expectLastCall();
-
-        // No offset commit since it happens in the mocked worker thread, but 
the main thread does need to wake up the
-        // consumer so it exits quickly
-        consumer.wakeup();
-        PowerMock.expectLastCall();
-
-        consumer.close();
-        PowerMock.expectLastCall();
-    }
-
-    // Note that this can only be called once per test currently
-    private Capture<Collection<SinkRecord>> expectPolls(final long 
pollDelayMs) throws Exception {
-        // Stub out all the consumer stream/iterator responses, which we just 
want to verify occur,
-        // but don't care about the exact details here.
-        EasyMock.expect(consumer.poll(EasyMock.anyLong())).andStubAnswer(
-                new IAnswer<ConsumerRecords<byte[], byte[]>>() {
-                    @Override
-                    public ConsumerRecords<byte[], byte[]> answer() throws 
Throwable {
-                        // "Sleep" so time will progress
-                        time.sleep(pollDelayMs);
-                        ConsumerRecords<byte[], byte[]> records = new 
ConsumerRecords<>(
-                                Collections.singletonMap(
-                                        new TopicPartition(TOPIC, PARTITION),
-                                        Arrays.asList(
-                                                new ConsumerRecord<>(TOPIC, 
PARTITION, FIRST_OFFSET + recordsReturned, RAW_KEY, RAW_VALUE)
-                                        )));
-                        recordsReturned++;
-                        return records;
-                    }
-                });
-        EasyMock.expect(keyConverter.toCopycatData(TOPIC, 
RAW_KEY)).andReturn(new SchemaAndValue(KEY_SCHEMA, KEY)).anyTimes();
-        EasyMock.expect(valueConverter.toCopycatData(TOPIC, 
RAW_VALUE)).andReturn(new SchemaAndValue(VALUE_SCHEMA, VALUE)).anyTimes();
-        Capture<Collection<SinkRecord>> capturedRecords = 
EasyMock.newCapture(CaptureType.ALL);
-        sinkTask.put(EasyMock.capture(capturedRecords));
-        EasyMock.expectLastCall().anyTimes();
-        return capturedRecords;
-    }
-
-    private IExpectationSetters<Object> expectOnePoll() {
-        // Currently the SinkTask's put() method will not be invoked unless we 
provide some data, so instead of
-        // returning empty data, we return one record. The expectation is that 
the data will be ignored by the
-        // response behavior specified using the return value of this method.
-        EasyMock.expect(consumer.poll(EasyMock.anyLong())).andAnswer(
-                new IAnswer<ConsumerRecords<byte[], byte[]>>() {
-                    @Override
-                    public ConsumerRecords<byte[], byte[]> answer() throws 
Throwable {
-                        // "Sleep" so time will progress
-                        time.sleep(1L);
-                        ConsumerRecords<byte[], byte[]> records = new 
ConsumerRecords<>(
-                                Collections.singletonMap(
-                                        new TopicPartition(TOPIC, PARTITION),
-                                        Arrays.asList(
-                                                new ConsumerRecord<>(TOPIC, 
PARTITION, FIRST_OFFSET + recordsReturned, RAW_KEY, RAW_VALUE)
-                                        )));
-                        recordsReturned++;
-                        return records;
-                    }
-                });
-        EasyMock.expect(keyConverter.toCopycatData(TOPIC, 
RAW_KEY)).andReturn(new SchemaAndValue(KEY_SCHEMA, KEY));
-        EasyMock.expect(valueConverter.toCopycatData(TOPIC, 
RAW_VALUE)).andReturn(new SchemaAndValue(VALUE_SCHEMA, VALUE));
-        sinkTask.put(EasyMock.anyObject(Collection.class));
-        return EasyMock.expectLastCall();
-    }
-
-    private Capture<OffsetCommitCallback> expectOffsetFlush(final long 
expectedMessages,
-                                                              final 
RuntimeException flushError,
-                                                              final Exception 
consumerCommitError,
-                                                              final long 
consumerCommitDelayMs,
-                                                              final boolean 
invokeCallback)
-            throws Exception {
-        final long finalOffset = FIRST_OFFSET + expectedMessages;
-
-        // All assigned partitions will have offsets committed, but we've only 
processed messages/updated offsets for one
-        final Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new 
HashMap<>();
-        offsetsToCommit.put(TOPIC_PARTITION, new 
OffsetAndMetadata(finalOffset));
-        offsetsToCommit.put(TOPIC_PARTITION2, new 
OffsetAndMetadata(FIRST_OFFSET));
-        offsetsToCommit.put(TOPIC_PARTITION3, new 
OffsetAndMetadata(FIRST_OFFSET));
-        sinkTask.flush(offsetsToCommit);
-        IExpectationSetters<Object> flushExpectation = 
PowerMock.expectLastCall();
-        if (flushError != null) {
-            flushExpectation.andThrow(flushError).once();
-            return null;
-        }
-
-        final Capture<OffsetCommitCallback> capturedCallback = 
EasyMock.newCapture();
-        consumer.commitAsync(EasyMock.eq(offsetsToCommit),
-                EasyMock.capture(capturedCallback));
-        PowerMock.expectLastCall().andAnswer(new IAnswer<Object>() {
-            @Override
-            public Object answer() throws Throwable {
-                time.sleep(consumerCommitDelayMs);
-                if (invokeCallback)
-                    capturedCallback.getValue().onComplete(offsetsToCommit, 
consumerCommitError);
-                return null;
-            }
-        });
-        return capturedCallback;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSourceTaskTest.java
----------------------------------------------------------------------
diff --git 
a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSourceTaskTest.java
 
b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSourceTaskTest.java
deleted file mode 100644
index 0fa14bd..0000000
--- 
a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSourceTaskTest.java
+++ /dev/null
@@ -1,308 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.kafka.copycat.runtime;
-
-import org.apache.kafka.common.utils.SystemTime;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.clients.producer.RecordMetadata;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.copycat.data.Schema;
-import org.apache.kafka.copycat.runtime.standalone.StandaloneConfig;
-import org.apache.kafka.copycat.source.SourceRecord;
-import org.apache.kafka.copycat.source.SourceTask;
-import org.apache.kafka.copycat.source.SourceTaskContext;
-import org.apache.kafka.copycat.storage.Converter;
-import org.apache.kafka.copycat.storage.OffsetStorageReader;
-import org.apache.kafka.copycat.storage.OffsetStorageWriter;
-import org.apache.kafka.copycat.util.Callback;
-import org.apache.kafka.copycat.util.ConnectorTaskId;
-import org.apache.kafka.copycat.util.ThreadedTest;
-import org.easymock.Capture;
-import org.easymock.EasyMock;
-import org.easymock.IAnswer;
-import org.easymock.IExpectationSetters;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.powermock.api.easymock.PowerMock;
-import org.powermock.api.easymock.annotation.Mock;
-import org.powermock.modules.junit4.PowerMockRunner;
-import org.powermock.reflect.Whitebox;
-
-import java.util.*;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-import static org.junit.Assert.*;
-
-@RunWith(PowerMockRunner.class)
-public class WorkerSourceTaskTest extends ThreadedTest {
-    private static final String TOPIC = "topic";
-    private static final Map<String, byte[]> PARTITION = 
Collections.singletonMap("key", "partition".getBytes());
-    private static final Map<String, Integer> OFFSET = 
Collections.singletonMap("key", 12);
-
-    // Copycat-format data
-    private static final Schema KEY_SCHEMA = Schema.INT32_SCHEMA;
-    private static final Integer KEY = -1;
-    private static final Schema RECORD_SCHEMA = Schema.INT64_SCHEMA;
-    private static final Long RECORD = 12L;
-    // Serialized data. The actual format of this data doesn't matter -- we 
just want to see that the right version
-    // is used in the right place.
-    private static final byte[] SERIALIZED_KEY = "converted-key".getBytes();
-    private static final byte[] SERIALIZED_RECORD = 
"converted-record".getBytes();
-
-    private ConnectorTaskId taskId = new ConnectorTaskId("job", 0);
-    private WorkerConfig config;
-    @Mock private SourceTask sourceTask;
-    @Mock private Converter keyConverter;
-    @Mock private Converter valueConverter;
-    @Mock private KafkaProducer<byte[], byte[]> producer;
-    @Mock private OffsetStorageReader offsetReader;
-    @Mock private OffsetStorageWriter offsetWriter;
-    private WorkerSourceTask workerTask;
-    @Mock private Future<RecordMetadata> sendFuture;
-
-    private Capture<org.apache.kafka.clients.producer.Callback> 
producerCallbacks;
-
-    private static final Map<String, String> EMPTY_TASK_PROPS = 
Collections.emptyMap();
-    private static final List<SourceRecord> RECORDS = Arrays.asList(
-            new SourceRecord(PARTITION, OFFSET, "topic", null, KEY_SCHEMA, 
KEY, RECORD_SCHEMA, RECORD)
-    );
-
-    @Override
-    public void setup() {
-        super.setup();
-        Map<String, String> workerProps = new HashMap<>();
-        workerProps.put("key.converter", 
"org.apache.kafka.copycat.json.JsonConverter");
-        workerProps.put("value.converter", 
"org.apache.kafka.copycat.json.JsonConverter");
-        workerProps.put("internal.key.converter", 
"org.apache.kafka.copycat.json.JsonConverter");
-        workerProps.put("internal.value.converter", 
"org.apache.kafka.copycat.json.JsonConverter");
-        workerProps.put("internal.key.converter.schemas.enable", "false");
-        workerProps.put("internal.value.converter.schemas.enable", "false");
-        config = new StandaloneConfig(workerProps);
-        producerCallbacks = EasyMock.newCapture();
-    }
-
-    private void createWorkerTask() {
-        workerTask = new WorkerSourceTask(taskId, sourceTask, keyConverter, 
valueConverter, producer,
-                offsetReader, offsetWriter, config, new SystemTime());
-    }
-
-    @Test
-    public void testPollsInBackground() throws Exception {
-        createWorkerTask();
-
-        sourceTask.initialize(EasyMock.anyObject(SourceTaskContext.class));
-        EasyMock.expectLastCall();
-        sourceTask.start(EMPTY_TASK_PROPS);
-        EasyMock.expectLastCall();
-
-        final CountDownLatch pollLatch = expectPolls(10);
-        // In this test, we don't flush, so nothing goes any further than the 
offset writer
-
-        sourceTask.stop();
-        EasyMock.expectLastCall();
-        expectOffsetFlush(true);
-
-        PowerMock.replayAll();
-
-        workerTask.start(EMPTY_TASK_PROPS);
-        awaitPolls(pollLatch);
-        workerTask.stop();
-        assertEquals(true, workerTask.awaitStop(1000));
-
-        PowerMock.verifyAll();
-    }
-
-    @Test
-    public void testCommit() throws Exception {
-        // Test that the task commits properly when prompted
-        createWorkerTask();
-
-        sourceTask.initialize(EasyMock.anyObject(SourceTaskContext.class));
-        EasyMock.expectLastCall();
-        sourceTask.start(EMPTY_TASK_PROPS);
-        EasyMock.expectLastCall();
-
-        // We'll wait for some data, then trigger a flush
-        final CountDownLatch pollLatch = expectPolls(1);
-        expectOffsetFlush(true);
-
-        sourceTask.stop();
-        EasyMock.expectLastCall();
-        expectOffsetFlush(true);
-
-        PowerMock.replayAll();
-
-        workerTask.start(EMPTY_TASK_PROPS);
-        awaitPolls(pollLatch);
-        assertTrue(workerTask.commitOffsets());
-        workerTask.stop();
-        assertEquals(true, workerTask.awaitStop(1000));
-
-        PowerMock.verifyAll();
-    }
-
-    @Test
-    public void testCommitFailure() throws Exception {
-        // Test that the task commits properly when prompted
-        createWorkerTask();
-
-        sourceTask.initialize(EasyMock.anyObject(SourceTaskContext.class));
-        EasyMock.expectLastCall();
-        sourceTask.start(EMPTY_TASK_PROPS);
-        EasyMock.expectLastCall();
-
-        // We'll wait for some data, then trigger a flush
-        final CountDownLatch pollLatch = expectPolls(1);
-        expectOffsetFlush(false);
-
-        sourceTask.stop();
-        EasyMock.expectLastCall();
-        expectOffsetFlush(true);
-
-        PowerMock.replayAll();
-
-        workerTask.start(EMPTY_TASK_PROPS);
-        awaitPolls(pollLatch);
-        assertFalse(workerTask.commitOffsets());
-        workerTask.stop();
-        assertEquals(true, workerTask.awaitStop(1000));
-
-        PowerMock.verifyAll();
-    }
-
-    @Test
-    public void testSendRecordsConvertsData() throws Exception {
-        createWorkerTask();
-
-        List<SourceRecord> records = new ArrayList<>();
-        // Can just use the same record for key and value
-        records.add(new SourceRecord(PARTITION, OFFSET, "topic", null, 
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD));
-
-        Capture<ProducerRecord<byte[], byte[]>> sent = expectSendRecord();
-
-        PowerMock.replayAll();
-
-        Whitebox.invokeMethod(workerTask, "sendRecords", records);
-        assertEquals(SERIALIZED_KEY, sent.getValue().key());
-        assertEquals(SERIALIZED_RECORD, sent.getValue().value());
-
-        PowerMock.verifyAll();
-    }
-
-    @Test
-    public void testSlowTaskStart() throws Exception {
-        createWorkerTask();
-
-        sourceTask.initialize(EasyMock.anyObject(SourceTaskContext.class));
-        EasyMock.expectLastCall();
-        sourceTask.start(EMPTY_TASK_PROPS);
-        EasyMock.expectLastCall().andAnswer(new IAnswer<Object>() {
-            @Override
-            public Object answer() throws Throwable {
-                Utils.sleep(100);
-                return null;
-            }
-        });
-        sourceTask.stop();
-        EasyMock.expectLastCall();
-
-        PowerMock.replayAll();
-
-        workerTask.start(EMPTY_TASK_PROPS);
-        // Stopping immediately while the other thread has work to do should 
result in no polling, no offset commits,
-        // exiting the work thread immediately, and the stop() method will be 
invoked in the background thread since it
-        // cannot be invoked immediately in the thread trying to stop the task.
-        workerTask.stop();
-        assertEquals(true, workerTask.awaitStop(1000));
-
-        PowerMock.verifyAll();
-    }
-
-    private CountDownLatch expectPolls(int count) throws InterruptedException {
-        final CountDownLatch latch = new CountDownLatch(count);
-        // Note that we stub these to allow any number of calls because the 
thread will continue to
-        // run. The count passed in + latch returned just makes sure we get 
*at least* that number of
-        // calls
-        EasyMock.expect(sourceTask.poll())
-                .andStubAnswer(new IAnswer<List<SourceRecord>>() {
-                    @Override
-                    public List<SourceRecord> answer() throws Throwable {
-                        latch.countDown();
-                        return RECORDS;
-                    }
-                });
-        // Fallout of the poll() call
-        expectSendRecord();
-        return latch;
-    }
-
-    private Capture<ProducerRecord<byte[], byte[]>> expectSendRecord() throws 
InterruptedException {
-        EasyMock.expect(keyConverter.fromCopycatData(TOPIC, KEY_SCHEMA, 
KEY)).andStubReturn(SERIALIZED_KEY);
-        EasyMock.expect(valueConverter.fromCopycatData(TOPIC, RECORD_SCHEMA, 
RECORD)).andStubReturn(SERIALIZED_RECORD);
-
-        Capture<ProducerRecord<byte[], byte[]>> sent = EasyMock.newCapture();
-        // 1. Converted data passed to the producer, which will need callbacks 
invoked for flush to work
-        EasyMock.expect(
-                producer.send(EasyMock.capture(sent),
-                        EasyMock.capture(producerCallbacks)))
-                .andStubAnswer(new IAnswer<Future<RecordMetadata>>() {
-                    @Override
-                    public Future<RecordMetadata> answer() throws Throwable {
-                        synchronized (producerCallbacks) {
-                            for (org.apache.kafka.clients.producer.Callback cb 
: producerCallbacks.getValues()) {
-                                cb.onCompletion(new RecordMetadata(new 
TopicPartition("foo", 0), 0, 0), null);
-                            }
-                            producerCallbacks.reset();
-                        }
-                        return sendFuture;
-                    }
-                });
-        // 2. Offset data is passed to the offset storage.
-        offsetWriter.offset(PARTITION, OFFSET);
-        PowerMock.expectLastCall().anyTimes();
-
-        return sent;
-    }
-
-    private void awaitPolls(CountDownLatch latch) throws InterruptedException {
-        latch.await(1000, TimeUnit.MILLISECONDS);
-    }
-
-    @SuppressWarnings("unchecked")
-    private void expectOffsetFlush(boolean succeed) throws Exception {
-        EasyMock.expect(offsetWriter.beginFlush()).andReturn(true);
-        Future<Void> flushFuture = PowerMock.createMock(Future.class);
-        
EasyMock.expect(offsetWriter.doFlush(EasyMock.anyObject(Callback.class))).andReturn(flushFuture);
-        // Should throw for failure
-        IExpectationSetters<Void> futureGetExpect = EasyMock.expect(
-                flushFuture.get(EasyMock.anyLong(), 
EasyMock.anyObject(TimeUnit.class)));
-        if (succeed) {
-            futureGetExpect.andReturn(null);
-        } else {
-            futureGetExpect.andThrow(new TimeoutException());
-            offsetWriter.cancelFlush();
-            PowerMock.expectLastCall();
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerTest.java
----------------------------------------------------------------------
diff --git 
a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerTest.java
 
b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerTest.java
deleted file mode 100644
index 00cef2b..0000000
--- 
a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerTest.java
+++ /dev/null
@@ -1,397 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.kafka.copycat.runtime;
-
-import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.copycat.connector.Connector;
-import org.apache.kafka.copycat.connector.ConnectorContext;
-import org.apache.kafka.copycat.connector.Task;
-import org.apache.kafka.copycat.errors.CopycatException;
-import org.apache.kafka.copycat.runtime.standalone.StandaloneConfig;
-import org.apache.kafka.copycat.sink.SinkTask;
-import org.apache.kafka.copycat.source.SourceRecord;
-import org.apache.kafka.copycat.source.SourceTask;
-import org.apache.kafka.copycat.storage.Converter;
-import org.apache.kafka.copycat.storage.OffsetBackingStore;
-import org.apache.kafka.copycat.storage.OffsetStorageReader;
-import org.apache.kafka.copycat.storage.OffsetStorageWriter;
-import org.apache.kafka.copycat.util.ConnectorTaskId;
-import org.apache.kafka.copycat.util.MockTime;
-import org.apache.kafka.copycat.util.ThreadedTest;
-import org.easymock.EasyMock;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.powermock.api.easymock.PowerMock;
-import org.powermock.core.classloader.annotations.PowerMockIgnore;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-@RunWith(PowerMockRunner.class)
-@PrepareForTest(Worker.class)
-@PowerMockIgnore("javax.management.*")
-public class WorkerTest extends ThreadedTest {
-
-    private static final String CONNECTOR_ID = "test-connector";
-    private static final ConnectorTaskId TASK_ID = new ConnectorTaskId("job", 
0);
-
-    private WorkerConfig config;
-    private Worker worker;
-    private OffsetBackingStore offsetBackingStore = 
PowerMock.createMock(OffsetBackingStore.class);
-
-    @Before
-    public void setup() {
-        super.setup();
-
-        Map<String, String> workerProps = new HashMap<>();
-        workerProps.put("key.converter", 
"org.apache.kafka.copycat.json.JsonConverter");
-        workerProps.put("value.converter", 
"org.apache.kafka.copycat.json.JsonConverter");
-        workerProps.put("internal.key.converter", 
"org.apache.kafka.copycat.json.JsonConverter");
-        workerProps.put("internal.value.converter", 
"org.apache.kafka.copycat.json.JsonConverter");
-        workerProps.put("internal.key.converter.schemas.enable", "false");
-        workerProps.put("internal.value.converter.schemas.enable", "false");
-        config = new StandaloneConfig(workerProps);
-    }
-
-    @Test
-    public void testAddRemoveConnector() throws Exception {
-        offsetBackingStore.configure(EasyMock.anyObject(Map.class));
-        EasyMock.expectLastCall();
-        offsetBackingStore.start();
-        EasyMock.expectLastCall();
-
-        // Create
-        Connector connector = PowerMock.createMock(Connector.class);
-        ConnectorContext ctx = PowerMock.createMock(ConnectorContext.class);
-
-        PowerMock.mockStatic(Worker.class);
-        PowerMock.expectPrivate(Worker.class, "instantiateConnector", new 
Object[]{TestConnector.class}).andReturn(connector);
-        EasyMock.expect(connector.version()).andReturn("1.0");
-
-        Map<String, String> props = new HashMap<>();
-        props.put(ConnectorConfig.TOPICS_CONFIG, "foo,bar");
-        props.put(ConnectorConfig.TASKS_MAX_CONFIG, "1");
-        props.put(ConnectorConfig.NAME_CONFIG, CONNECTOR_ID);
-        props.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, 
TestConnector.class.getName());
-
-        connector.initialize(ctx);
-        EasyMock.expectLastCall();
-        connector.start(props);
-        EasyMock.expectLastCall();
-
-        // Remove
-        connector.stop();
-        EasyMock.expectLastCall();
-
-        offsetBackingStore.stop();
-        EasyMock.expectLastCall();
-
-        PowerMock.replayAll();
-
-        worker = new Worker(new MockTime(), config, offsetBackingStore);
-        worker.start();
-
-        ConnectorConfig config = new ConnectorConfig(props);
-        assertEquals(Collections.emptySet(), worker.connectorNames());
-        worker.addConnector(config, ctx);
-        assertEquals(new HashSet<>(Arrays.asList(CONNECTOR_ID)), 
worker.connectorNames());
-        try {
-            worker.addConnector(config, ctx);
-            fail("Should have thrown exception when trying to add connector 
with same name.");
-        } catch (CopycatException e) {
-            // expected
-        }
-        worker.stopConnector(CONNECTOR_ID);
-        assertEquals(Collections.emptySet(), worker.connectorNames());
-        // Nothing should be left, so this should effectively be a nop
-        worker.stop();
-
-        PowerMock.verifyAll();
-    }
-
-    @Test(expected = CopycatException.class)
-    public void testStopInvalidConnector() {
-        offsetBackingStore.configure(EasyMock.anyObject(Map.class));
-        EasyMock.expectLastCall();
-        offsetBackingStore.start();
-        EasyMock.expectLastCall();
-
-        PowerMock.replayAll();
-
-        worker = new Worker(new MockTime(), config, offsetBackingStore);
-        worker.start();
-
-        worker.stopConnector(CONNECTOR_ID);
-    }
-
-    @Test
-    public void testReconfigureConnectorTasks() throws Exception {
-        offsetBackingStore.configure(EasyMock.anyObject(Map.class));
-        EasyMock.expectLastCall();
-        offsetBackingStore.start();
-        EasyMock.expectLastCall();
-
-        // Create
-        Connector connector = PowerMock.createMock(Connector.class);
-        ConnectorContext ctx = PowerMock.createMock(ConnectorContext.class);
-
-        PowerMock.mockStatic(Worker.class);
-        PowerMock.expectPrivate(Worker.class, "instantiateConnector", new 
Object[]{TestConnector.class}).andReturn(connector);
-        EasyMock.expect(connector.version()).andReturn("1.0");
-
-        Map<String, String> props = new HashMap<>();
-        props.put(ConnectorConfig.TOPICS_CONFIG, "foo,bar");
-        props.put(ConnectorConfig.TASKS_MAX_CONFIG, "1");
-        props.put(ConnectorConfig.NAME_CONFIG, CONNECTOR_ID);
-        props.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, 
TestConnector.class.getName());
-
-        connector.initialize(ctx);
-        EasyMock.expectLastCall();
-        connector.start(props);
-        EasyMock.expectLastCall();
-
-        // Reconfigure
-        EasyMock.<Class<? extends 
Task>>expect(connector.taskClass()).andReturn(TestSourceTask.class);
-        Map<String, String> taskProps = new HashMap<>();
-        taskProps.put("foo", "bar");
-        
EasyMock.expect(connector.taskConfigs(2)).andReturn(Arrays.asList(taskProps, 
taskProps));
-
-        // Remove
-        connector.stop();
-        EasyMock.expectLastCall();
-
-        offsetBackingStore.stop();
-        EasyMock.expectLastCall();
-
-        PowerMock.replayAll();
-
-        worker = new Worker(new MockTime(), config, offsetBackingStore);
-        worker.start();
-
-        ConnectorConfig config = new ConnectorConfig(props);
-        assertEquals(Collections.emptySet(), worker.connectorNames());
-        worker.addConnector(config, ctx);
-        assertEquals(new HashSet<>(Arrays.asList(CONNECTOR_ID)), 
worker.connectorNames());
-        try {
-            worker.addConnector(config, ctx);
-            fail("Should have thrown exception when trying to add connector 
with same name.");
-        } catch (CopycatException e) {
-            // expected
-        }
-        List<Map<String, String>> taskConfigs = 
worker.connectorTaskConfigs(CONNECTOR_ID, 2, Arrays.asList("foo", "bar"));
-        Map<String, String> expectedTaskProps = new HashMap<>();
-        expectedTaskProps.put("foo", "bar");
-        expectedTaskProps.put(TaskConfig.TASK_CLASS_CONFIG, 
TestSourceTask.class.getName());
-        expectedTaskProps.put(SinkTask.TOPICS_CONFIG, "foo,bar");
-        assertEquals(2, taskConfigs.size());
-        assertEquals(expectedTaskProps, taskConfigs.get(0));
-        assertEquals(expectedTaskProps, taskConfigs.get(1));
-        worker.stopConnector(CONNECTOR_ID);
-        assertEquals(Collections.emptySet(), worker.connectorNames());
-        // Nothing should be left, so this should effectively be a nop
-        worker.stop();
-
-        PowerMock.verifyAll();
-    }
-
-
-    @Test
-    public void testAddRemoveTask() throws Exception {
-        offsetBackingStore.configure(EasyMock.anyObject(Map.class));
-        EasyMock.expectLastCall();
-        offsetBackingStore.start();
-        EasyMock.expectLastCall();
-
-        ConnectorTaskId taskId = new ConnectorTaskId("job", 0);
-
-        // Create
-        TestSourceTask task = PowerMock.createMock(TestSourceTask.class);
-        WorkerSourceTask workerTask = 
PowerMock.createMock(WorkerSourceTask.class);
-
-        PowerMock.mockStatic(Worker.class);
-        PowerMock.expectPrivate(Worker.class, "instantiateTask", new 
Object[]{TestSourceTask.class}).andReturn(task);
-        EasyMock.expect(task.version()).andReturn("1.0");
-
-        PowerMock.expectNew(
-                WorkerSourceTask.class, EasyMock.eq(taskId), EasyMock.eq(task),
-                EasyMock.anyObject(Converter.class),
-                EasyMock.anyObject(Converter.class),
-                EasyMock.anyObject(KafkaProducer.class),
-                EasyMock.anyObject(OffsetStorageReader.class),
-                EasyMock.anyObject(OffsetStorageWriter.class),
-                EasyMock.anyObject(WorkerConfig.class),
-                EasyMock.anyObject(Time.class))
-                .andReturn(workerTask);
-        Map<String, String> origProps = new HashMap<>();
-        origProps.put(TaskConfig.TASK_CLASS_CONFIG, 
TestSourceTask.class.getName());
-        workerTask.start(origProps);
-        EasyMock.expectLastCall();
-
-        // Remove
-        workerTask.stop();
-        EasyMock.expectLastCall();
-        
EasyMock.expect(workerTask.awaitStop(EasyMock.anyLong())).andStubReturn(true);
-        workerTask.close();
-        EasyMock.expectLastCall();
-
-        offsetBackingStore.stop();
-        EasyMock.expectLastCall();
-
-        PowerMock.replayAll();
-
-        worker = new Worker(new MockTime(), config, offsetBackingStore);
-        worker.start();
-        assertEquals(Collections.emptySet(), worker.taskIds());
-        worker.addTask(taskId, new TaskConfig(origProps));
-        assertEquals(new HashSet<>(Arrays.asList(taskId)), worker.taskIds());
-        worker.stopTask(taskId);
-        assertEquals(Collections.emptySet(), worker.taskIds());
-        // Nothing should be left, so this should effectively be a nop
-        worker.stop();
-
-        PowerMock.verifyAll();
-    }
-
-    @Test(expected = CopycatException.class)
-    public void testStopInvalidTask() {
-        offsetBackingStore.configure(EasyMock.anyObject(Map.class));
-        EasyMock.expectLastCall();
-        offsetBackingStore.start();
-        EasyMock.expectLastCall();
-
-        PowerMock.replayAll();
-
-        worker = new Worker(new MockTime(), config, offsetBackingStore);
-        worker.start();
-
-        worker.stopTask(TASK_ID);
-    }
-
-    @Test
-    public void testCleanupTasksOnStop() throws Exception {
-        offsetBackingStore.configure(EasyMock.anyObject(Map.class));
-        EasyMock.expectLastCall();
-        offsetBackingStore.start();
-        EasyMock.expectLastCall();
-
-        // Create
-        TestSourceTask task = PowerMock.createMock(TestSourceTask.class);
-        WorkerSourceTask workerTask = 
PowerMock.createMock(WorkerSourceTask.class);
-
-        PowerMock.mockStatic(Worker.class);
-        PowerMock.expectPrivate(Worker.class, "instantiateTask", new 
Object[]{TestSourceTask.class}).andReturn(task);
-        EasyMock.expect(task.version()).andReturn("1.0");
-        
-        PowerMock.expectNew(
-                WorkerSourceTask.class, EasyMock.eq(TASK_ID), 
EasyMock.eq(task),
-                EasyMock.anyObject(Converter.class),
-                EasyMock.anyObject(Converter.class),
-                EasyMock.anyObject(KafkaProducer.class),
-                EasyMock.anyObject(OffsetStorageReader.class),
-                EasyMock.anyObject(OffsetStorageWriter.class),
-                EasyMock.anyObject(WorkerConfig.class),
-                EasyMock.anyObject(Time.class))
-                .andReturn(workerTask);
-        Map<String, String> origProps = new HashMap<>();
-        origProps.put(TaskConfig.TASK_CLASS_CONFIG, 
TestSourceTask.class.getName());
-        workerTask.start(origProps);
-        EasyMock.expectLastCall();
-
-        // Remove on Worker.stop()
-        workerTask.stop();
-        EasyMock.expectLastCall();
-        
EasyMock.expect(workerTask.awaitStop(EasyMock.anyLong())).andReturn(true);
-        // Note that in this case we *do not* commit offsets since it's an 
unclean shutdown
-        workerTask.close();
-        EasyMock.expectLastCall();
-
-        offsetBackingStore.stop();
-        EasyMock.expectLastCall();
-
-        PowerMock.replayAll();
-
-        worker = new Worker(new MockTime(), config, offsetBackingStore);
-        worker.start();
-        worker.addTask(TASK_ID, new TaskConfig(origProps));
-        worker.stop();
-
-        PowerMock.verifyAll();
-    }
-
-
-    private static class TestConnector extends Connector {
-        @Override
-        public String version() {
-            return "1.0";
-        }
-
-        @Override
-        public void start(Map<String, String> props) {
-
-        }
-
-        @Override
-        public Class<? extends Task> taskClass() {
-            return null;
-        }
-
-        @Override
-        public List<Map<String, String>> taskConfigs(int maxTasks) {
-            return null;
-        }
-
-        @Override
-        public void stop() {
-
-        }
-    }
-
-    private static class TestSourceTask extends SourceTask {
-        public TestSourceTask() {
-        }
-
-        @Override
-        public String version() {
-            return "1.0";
-        }
-
-        @Override
-        public void start(Map<String, String> props) {
-        }
-
-        @Override
-        public List<SourceRecord> poll() throws InterruptedException {
-            return null;
-        }
-
-        @Override
-        public void stop() {
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerderTest.java
----------------------------------------------------------------------
diff --git 
a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerderTest.java
 
b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerderTest.java
deleted file mode 100644
index 512cb5c..0000000
--- 
a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerderTest.java
+++ /dev/null
@@ -1,573 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.kafka.copycat.runtime.distributed;
-
-import org.apache.kafka.clients.CommonClientConfigs;
-import org.apache.kafka.common.utils.MockTime;
-import org.apache.kafka.copycat.connector.ConnectorContext;
-import org.apache.kafka.copycat.errors.AlreadyExistsException;
-import org.apache.kafka.copycat.runtime.ConnectorConfig;
-import org.apache.kafka.copycat.runtime.Herder;
-import org.apache.kafka.copycat.runtime.TaskConfig;
-import org.apache.kafka.copycat.runtime.Worker;
-import org.apache.kafka.copycat.runtime.WorkerConfig;
-import org.apache.kafka.copycat.runtime.rest.entities.ConnectorInfo;
-import org.apache.kafka.copycat.runtime.rest.entities.TaskInfo;
-import org.apache.kafka.copycat.source.SourceConnector;
-import org.apache.kafka.copycat.source.SourceTask;
-import org.apache.kafka.copycat.storage.KafkaConfigStorage;
-import org.apache.kafka.copycat.util.Callback;
-import org.apache.kafka.copycat.util.ConnectorTaskId;
-import org.apache.kafka.copycat.util.FutureCallback;
-import org.apache.kafka.copycat.util.TestFuture;
-import org.easymock.Capture;
-import org.easymock.EasyMock;
-import org.easymock.IAnswer;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.powermock.api.easymock.PowerMock;
-import org.powermock.api.easymock.annotation.Mock;
-import org.powermock.core.classloader.annotations.PowerMockIgnore;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-import org.powermock.reflect.Whitebox;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeoutException;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-@RunWith(PowerMockRunner.class)
-@PrepareForTest(DistributedHerder.class)
-@PowerMockIgnore("javax.management.*")
-public class DistributedHerderTest {
-    private static final Map<String, String> HERDER_CONFIG = new HashMap<>();
-    static {
-        HERDER_CONFIG.put(KafkaConfigStorage.CONFIG_TOPIC_CONFIG, 
"config-topic");
-        HERDER_CONFIG.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
"localhost:9092");
-        HERDER_CONFIG.put(DistributedConfig.GROUP_ID_CONFIG, 
"test-copycat-group");
-        // The WorkerConfig base class has some required settings without 
defaults
-        HERDER_CONFIG.put(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, 
"org.apache.kafka.copycat.json.JsonConverter");
-        HERDER_CONFIG.put(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, 
"org.apache.kafka.copycat.json.JsonConverter");
-        HERDER_CONFIG.put(WorkerConfig.INTERNAL_KEY_CONVERTER_CLASS_CONFIG, 
"org.apache.kafka.copycat.json.JsonConverter");
-        HERDER_CONFIG.put(WorkerConfig.INTERNAL_VALUE_CONVERTER_CLASS_CONFIG, 
"org.apache.kafka.copycat.json.JsonConverter");
-    }
-    private static final String MEMBER_URL = "memberUrl";
-
-    private static final String CONN1 = "sourceA";
-    private static final String CONN2 = "sourceB";
-    private static final ConnectorTaskId TASK0 = new ConnectorTaskId(CONN1, 0);
-    private static final ConnectorTaskId TASK1 = new ConnectorTaskId(CONN1, 1);
-    private static final ConnectorTaskId TASK2 = new ConnectorTaskId(CONN1, 2);
-    private static final Integer MAX_TASKS = 3;
-    private static final Map<String, String> CONN1_CONFIG = new HashMap<>();
-    static {
-        CONN1_CONFIG.put(ConnectorConfig.NAME_CONFIG, CONN1);
-        CONN1_CONFIG.put(ConnectorConfig.TASKS_MAX_CONFIG, 
MAX_TASKS.toString());
-        CONN1_CONFIG.put(ConnectorConfig.TOPICS_CONFIG, "foo,bar");
-        CONN1_CONFIG.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, 
BogusSourceConnector.class.getName());
-    }
-    private static final Map<String, String> CONN1_CONFIG_UPDATED = new 
HashMap<>(CONN1_CONFIG);
-    static {
-        CONN1_CONFIG_UPDATED.put(ConnectorConfig.TOPICS_CONFIG, "foo,bar,baz");
-    }
-    private static final Map<String, String> CONN2_CONFIG = new HashMap<>();
-    static {
-        CONN2_CONFIG.put(ConnectorConfig.NAME_CONFIG, CONN2);
-        CONN2_CONFIG.put(ConnectorConfig.TASKS_MAX_CONFIG, 
MAX_TASKS.toString());
-        CONN2_CONFIG.put(ConnectorConfig.TOPICS_CONFIG, "foo,bar");
-        CONN2_CONFIG.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, 
BogusSourceConnector.class.getName());
-    }
-    private static final Map<String, String> TASK_CONFIG = new HashMap<>();
-    static {
-        TASK_CONFIG.put(TaskConfig.TASK_CLASS_CONFIG, 
BogusSourceTask.class.getName());
-    }
-    private static final List<Map<String, String>> TASK_CONFIGS = new 
ArrayList<>();
-    static {
-        TASK_CONFIGS.add(TASK_CONFIG);
-        TASK_CONFIGS.add(TASK_CONFIG);
-        TASK_CONFIGS.add(TASK_CONFIG);
-    }
-    private static final HashMap<ConnectorTaskId, Map<String, String>> 
TASK_CONFIGS_MAP = new HashMap<>();
-    static {
-        TASK_CONFIGS_MAP.put(TASK0, TASK_CONFIG);
-        TASK_CONFIGS_MAP.put(TASK1, TASK_CONFIG);
-        TASK_CONFIGS_MAP.put(TASK2, TASK_CONFIG);
-    }
-    private static final ClusterConfigState SNAPSHOT = new 
ClusterConfigState(1, Collections.singletonMap(CONN1, 3),
-            Collections.singletonMap(CONN1, CONN1_CONFIG), TASK_CONFIGS_MAP, 
Collections.<String>emptySet());
-    private static final ClusterConfigState SNAPSHOT_UPDATED_CONN1_CONFIG = 
new ClusterConfigState(1, Collections.singletonMap(CONN1, 3),
-            Collections.singletonMap(CONN1, CONN1_CONFIG_UPDATED), 
TASK_CONFIGS_MAP, Collections.<String>emptySet());
-
-    @Mock private KafkaConfigStorage configStorage;
-    @Mock private WorkerGroupMember member;
-    private MockTime time;
-    private DistributedHerder herder;
-    @Mock private Worker worker;
-    @Mock private Callback<Herder.Created<ConnectorInfo>> putConnectorCallback;
-
-    private Callback<String> connectorConfigCallback;
-    private Callback<List<ConnectorTaskId>> taskConfigCallback;
-    private WorkerRebalanceListener rebalanceListener;
-
-    @Before
-    public void setUp() throws Exception {
-        worker = PowerMock.createMock(Worker.class);
-        time = new MockTime();
-
-        herder = PowerMock.createPartialMock(DistributedHerder.class, new 
String[]{"backoff"},
-                new DistributedConfig(HERDER_CONFIG), worker, configStorage, 
member, MEMBER_URL, time);
-        connectorConfigCallback = Whitebox.invokeMethod(herder, 
"connectorConfigCallback");
-        taskConfigCallback = Whitebox.invokeMethod(herder, 
"taskConfigCallback");
-        rebalanceListener = Whitebox.invokeMethod(herder, "rebalanceListener");
-    }
-
-    @Test
-    public void testJoinAssignment() {
-        // Join group and get assignment
-        EasyMock.expect(member.memberId()).andStubReturn("member");
-        expectRebalance(1, Arrays.asList(CONN1), Arrays.asList(TASK1));
-        expectPostRebalanceCatchup(SNAPSHOT);
-        worker.addConnector(EasyMock.<ConnectorConfig>anyObject(), 
EasyMock.<ConnectorContext>anyObject());
-        PowerMock.expectLastCall();
-        EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, 
null)).andReturn(TASK_CONFIGS);
-        worker.addTask(EasyMock.eq(TASK1), EasyMock.<TaskConfig>anyObject());
-        PowerMock.expectLastCall();
-        member.poll(EasyMock.anyInt());
-        PowerMock.expectLastCall();
-
-        PowerMock.replayAll();
-
-        herder.tick();
-
-        PowerMock.verifyAll();
-    }
-
-    @Test
-    public void testHaltCleansUpWorker() {
-        
EasyMock.expect(worker.connectorNames()).andReturn(Collections.singleton(CONN1));
-        worker.stopConnector(CONN1);
-        PowerMock.expectLastCall();
-        
EasyMock.expect(worker.taskIds()).andReturn(Collections.singleton(TASK1));
-        worker.stopTask(TASK1);
-        PowerMock.expectLastCall();
-        member.stop();
-        PowerMock.expectLastCall();
-        configStorage.stop();
-        PowerMock.expectLastCall();
-
-        PowerMock.replayAll();
-
-        herder.halt();
-
-        PowerMock.verifyAll();
-    }
-
-    @Test
-    public void testCreateConnector() throws Exception {
-        EasyMock.expect(member.memberId()).andStubReturn("leader");
-        expectRebalance(1, Collections.<String>emptyList(), 
Collections.<ConnectorTaskId>emptyList());
-        expectPostRebalanceCatchup(SNAPSHOT);
-
-        member.wakeup();
-        PowerMock.expectLastCall();
-        // CONN2 is new, should succeed
-        configStorage.putConnectorConfig(CONN2, CONN2_CONFIG);
-        PowerMock.expectLastCall();
-        ConnectorInfo info = new ConnectorInfo(CONN2, CONN2_CONFIG, 
Collections.<ConnectorTaskId>emptyList());
-        putConnectorCallback.onCompletion(null, new Herder.Created<>(true, 
info));
-        PowerMock.expectLastCall();
-        member.poll(EasyMock.anyInt());
-        PowerMock.expectLastCall();
-        // No immediate action besides this -- change will be picked up via 
the config log
-
-        PowerMock.replayAll();
-
-        herder.putConnectorConfig(CONN2, CONN2_CONFIG, false, 
putConnectorCallback);
-        herder.tick();
-
-        PowerMock.verifyAll();
-    }
-
-    @Test
-    public void testCreateConnectorAlreadyExists() throws Exception {
-        EasyMock.expect(member.memberId()).andStubReturn("leader");
-        expectRebalance(1, Collections.<String>emptyList(), 
Collections.<ConnectorTaskId>emptyList());
-        expectPostRebalanceCatchup(SNAPSHOT);
-
-        member.wakeup();
-        PowerMock.expectLastCall();
-        // CONN1 already exists
-        
putConnectorCallback.onCompletion(EasyMock.<AlreadyExistsException>anyObject(), 
EasyMock.<Herder.Created<ConnectorInfo>>isNull());
-        PowerMock.expectLastCall();
-        member.poll(EasyMock.anyInt());
-        PowerMock.expectLastCall();
-        // No immediate action besides this -- change will be picked up via 
the config log
-
-        PowerMock.replayAll();
-
-        herder.putConnectorConfig(CONN1, CONN1_CONFIG, false, 
putConnectorCallback);
-        herder.tick();
-
-        PowerMock.verifyAll();
-    }
-
-    @Test
-    public void testDestroyConnector() throws Exception {
-        EasyMock.expect(member.memberId()).andStubReturn("leader");
-        // Start with one connector
-        expectRebalance(1, Arrays.asList(CONN1), 
Collections.<ConnectorTaskId>emptyList());
-        expectPostRebalanceCatchup(SNAPSHOT);
-        worker.addConnector(EasyMock.<ConnectorConfig>anyObject(), 
EasyMock.<ConnectorContext>anyObject());
-        PowerMock.expectLastCall();
-        EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, 
null)).andReturn(TASK_CONFIGS);
-
-        // And delete the connector
-        member.wakeup();
-        PowerMock.expectLastCall();
-        configStorage.putConnectorConfig(CONN1, null);
-        PowerMock.expectLastCall();
-        putConnectorCallback.onCompletion(null, new 
Herder.Created<ConnectorInfo>(false, null));
-        PowerMock.expectLastCall();
-        member.poll(EasyMock.anyInt());
-        PowerMock.expectLastCall();
-        // No immediate action besides this -- change will be picked up via 
the config log
-
-        PowerMock.replayAll();
-
-        herder.putConnectorConfig(CONN1, null, true, putConnectorCallback);
-        herder.tick();
-
-        PowerMock.verifyAll();
-    }
-
-    @Test
-    public void testConnectorConfigAdded() {
-        // If a connector was added, we need to rebalance
-        EasyMock.expect(member.memberId()).andStubReturn("member");
-
-        // join, no configs so no need to catch up on config topic
-        expectRebalance(-1, Collections.<String>emptyList(), 
Collections.<ConnectorTaskId>emptyList());
-        member.poll(EasyMock.anyInt());
-        PowerMock.expectLastCall();
-
-        // apply config
-        member.wakeup();
-        member.ensureActive();
-        PowerMock.expectLastCall();
-        // Checks for config updates and starts rebalance
-        EasyMock.expect(configStorage.snapshot()).andReturn(SNAPSHOT);
-        member.requestRejoin();
-        PowerMock.expectLastCall();
-        // Performs rebalance and gets new assignment
-        expectRebalance(Collections.<String>emptyList(), 
Collections.<ConnectorTaskId>emptyList(),
-                CopycatProtocol.Assignment.NO_ERROR, 1, Arrays.asList(CONN1), 
Collections.<ConnectorTaskId>emptyList());
-        worker.addConnector(EasyMock.<ConnectorConfig>anyObject(), 
EasyMock.<ConnectorContext>anyObject());
-        PowerMock.expectLastCall();
-        EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, 
null)).andReturn(TASK_CONFIGS);
-        member.poll(EasyMock.anyInt());
-        PowerMock.expectLastCall();
-
-        PowerMock.replayAll();
-
-        herder.tick(); // join
-        connectorConfigCallback.onCompletion(null, CONN1); // read updated 
config
-        herder.tick(); // apply config
-        herder.tick(); // do rebalance
-
-        PowerMock.verifyAll();
-    }
-
-    @Test
-    public void testConnectorConfigUpdate() {
-        // Connector config can be applied without any rebalance
-
-        EasyMock.expect(member.memberId()).andStubReturn("member");
-        
EasyMock.expect(worker.connectorNames()).andStubReturn(Collections.singleton(CONN1));
-
-        // join
-        expectRebalance(1, Arrays.asList(CONN1), 
Collections.<ConnectorTaskId>emptyList());
-        expectPostRebalanceCatchup(SNAPSHOT);
-        worker.addConnector(EasyMock.<ConnectorConfig>anyObject(), 
EasyMock.<ConnectorContext>anyObject());
-        PowerMock.expectLastCall();
-        EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, 
null)).andReturn(TASK_CONFIGS);
-        member.poll(EasyMock.anyInt());
-        PowerMock.expectLastCall();
-
-        // apply config
-        member.wakeup();
-        member.ensureActive();
-        PowerMock.expectLastCall();
-        EasyMock.expect(configStorage.snapshot()).andReturn(SNAPSHOT); // for 
this test, it doesn't matter if we use the same config snapshot
-        worker.stopConnector(CONN1);
-        PowerMock.expectLastCall();
-        worker.addConnector(EasyMock.<ConnectorConfig>anyObject(), 
EasyMock.<ConnectorContext>anyObject());
-        PowerMock.expectLastCall();
-        EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, 
null)).andReturn(TASK_CONFIGS);
-        member.poll(EasyMock.anyInt());
-        PowerMock.expectLastCall();
-
-        PowerMock.replayAll();
-
-        herder.tick(); // join
-        connectorConfigCallback.onCompletion(null, CONN1); // read updated 
config
-        herder.tick(); // apply config
-
-        PowerMock.verifyAll();
-    }
-
-    @Test
-    public void testTaskConfigAdded() {
-        // Task config always requires rebalance
-        EasyMock.expect(member.memberId()).andStubReturn("member");
-
-        // join
-        expectRebalance(-1, Collections.<String>emptyList(), 
Collections.<ConnectorTaskId>emptyList());
-        member.poll(EasyMock.anyInt());
-        PowerMock.expectLastCall();
-
-        // apply config
-        member.wakeup();
-        member.ensureActive();
-        PowerMock.expectLastCall();
-        // Checks for config updates and starts rebalance
-        EasyMock.expect(configStorage.snapshot()).andReturn(SNAPSHOT);
-        member.requestRejoin();
-        PowerMock.expectLastCall();
-        // Performs rebalance and gets new assignment
-        expectRebalance(Collections.<String>emptyList(), 
Collections.<ConnectorTaskId>emptyList(),
-                CopycatProtocol.Assignment.NO_ERROR, 1, 
Collections.<String>emptyList(), Arrays.asList(TASK0));
-        worker.addTask(EasyMock.eq(TASK0), EasyMock.<TaskConfig>anyObject());
-        PowerMock.expectLastCall();
-        member.poll(EasyMock.anyInt());
-        PowerMock.expectLastCall();
-
-        PowerMock.replayAll();
-
-        herder.tick(); // join
-        taskConfigCallback.onCompletion(null, Arrays.asList(TASK0, TASK1, 
TASK2)); // read updated config
-        herder.tick(); // apply config
-        herder.tick(); // do rebalance
-
-        PowerMock.verifyAll();
-    }
-
-    @Test
-    public void testJoinLeaderCatchUpFails() throws Exception {
-        // Join group and as leader fail to do assignment
-        EasyMock.expect(member.memberId()).andStubReturn("leader");
-        expectRebalance(Collections.<String>emptyList(), 
Collections.<ConnectorTaskId>emptyList(),
-                CopycatProtocol.Assignment.CONFIG_MISMATCH, 1, 
Collections.<String>emptyList(), Collections.<ConnectorTaskId>emptyList());
-        // Reading to end of log times out
-        TestFuture<Void> readToEndFuture = new TestFuture<>();
-        readToEndFuture.resolveOnGet(new TimeoutException());
-        EasyMock.expect(configStorage.readToEnd()).andReturn(readToEndFuture);
-        PowerMock.expectPrivate(herder, "backoff", 
DistributedConfig.WORKER_UNSYNC_BACKOFF_MS_DEFAULT);
-        member.requestRejoin();
-
-        // After backoff, restart the process and this time succeed
-        expectRebalance(1, Arrays.asList(CONN1), Arrays.asList(TASK1));
-        expectPostRebalanceCatchup(SNAPSHOT);
-
-        worker.addConnector(EasyMock.<ConnectorConfig>anyObject(), 
EasyMock.<ConnectorContext>anyObject());
-        PowerMock.expectLastCall();
-        EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, 
null)).andReturn(TASK_CONFIGS);
-        worker.addTask(EasyMock.eq(TASK1), EasyMock.<TaskConfig>anyObject());
-        PowerMock.expectLastCall();
-        member.poll(EasyMock.anyInt());
-        PowerMock.expectLastCall();
-
-        PowerMock.replayAll();
-
-        herder.tick();
-        herder.tick();
-
-        PowerMock.verifyAll();
-    }
-
-    @Test
-    public void testAccessors() throws Exception {
-        EasyMock.expect(member.memberId()).andStubReturn("leader");
-        expectRebalance(1, Collections.<String>emptyList(), 
Collections.<ConnectorTaskId>emptyList());
-        expectPostRebalanceCatchup(SNAPSHOT);
-
-
-        member.wakeup();
-        PowerMock.expectLastCall().anyTimes();
-        // list connectors, get connector info, get connector config, get task 
configs
-        member.poll(EasyMock.anyInt());
-        PowerMock.expectLastCall();
-
-
-        PowerMock.replayAll();
-
-        FutureCallback<Collection<String>> listConnectorsCb = new 
FutureCallback<>();
-        herder.connectors(listConnectorsCb);
-        FutureCallback<ConnectorInfo> connectorInfoCb = new FutureCallback<>();
-        herder.connectorInfo(CONN1, connectorInfoCb);
-        FutureCallback<Map<String, String>> connectorConfigCb = new 
FutureCallback<>();
-        herder.connectorConfig(CONN1, connectorConfigCb);
-        FutureCallback<List<TaskInfo>> taskConfigsCb = new FutureCallback<>();
-        herder.taskConfigs(CONN1, taskConfigsCb);
-
-        herder.tick();
-        assertTrue(listConnectorsCb.isDone());
-        assertEquals(Collections.singleton(CONN1), listConnectorsCb.get());
-        assertTrue(connectorInfoCb.isDone());
-        ConnectorInfo info = new ConnectorInfo(CONN1, CONN1_CONFIG, 
Arrays.asList(TASK0, TASK1, TASK2));
-        assertEquals(info, connectorInfoCb.get());
-        assertTrue(connectorConfigCb.isDone());
-        assertEquals(CONN1_CONFIG, connectorConfigCb.get());
-        assertTrue(taskConfigsCb.isDone());
-        assertEquals(Arrays.asList(
-                        new TaskInfo(TASK0, TASK_CONFIG),
-                        new TaskInfo(TASK1, TASK_CONFIG),
-                        new TaskInfo(TASK2, TASK_CONFIG)),
-                taskConfigsCb.get());
-
-        PowerMock.verifyAll();
-    }
-
-    @Test
-    public void testPutConnectorConfig() throws Exception {
-        EasyMock.expect(member.memberId()).andStubReturn("leader");
-        expectRebalance(1, Arrays.asList(CONN1), 
Collections.<ConnectorTaskId>emptyList());
-        expectPostRebalanceCatchup(SNAPSHOT);
-        worker.addConnector(EasyMock.<ConnectorConfig>anyObject(), 
EasyMock.<ConnectorContext>anyObject());
-        PowerMock.expectLastCall();
-        EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, 
null)).andReturn(TASK_CONFIGS);
-
-        // list connectors, get connector info, get connector config, get task 
configs
-        member.wakeup();
-        PowerMock.expectLastCall().anyTimes();
-        member.poll(EasyMock.anyInt());
-        PowerMock.expectLastCall();
-
-        // Poll loop for second round of calls
-        member.ensureActive();
-        PowerMock.expectLastCall();
-        configStorage.putConnectorConfig(CONN1, CONN1_CONFIG_UPDATED);
-        PowerMock.expectLastCall().andAnswer(new IAnswer<Object>() {
-            @Override
-            public Object answer() throws Throwable {
-                // Simulate response to writing config + waiting until end of 
log to be read
-                connectorConfigCallback.onCompletion(null, CONN1);
-                return null;
-            }
-        });
-        // As a result of reconfig, should need to update snapshot. With only 
connector updates, we'll just restart
-        // connector without rebalance
-        
EasyMock.expect(configStorage.snapshot()).andReturn(SNAPSHOT_UPDATED_CONN1_CONFIG);
-        worker.stopConnector(CONN1);
-        PowerMock.expectLastCall();
-        Capture<ConnectorConfig> capturedUpdatedConfig = EasyMock.newCapture();
-        worker.addConnector(EasyMock.capture(capturedUpdatedConfig), 
EasyMock.<ConnectorContext>anyObject());
-        PowerMock.expectLastCall();
-        EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, 
null)).andReturn(TASK_CONFIGS);
-        member.poll(EasyMock.anyInt());
-        PowerMock.expectLastCall();
-
-        // Third tick just to read the config
-        member.ensureActive();
-        PowerMock.expectLastCall();
-        member.poll(EasyMock.anyInt());
-        PowerMock.expectLastCall();
-
-        PowerMock.replayAll();
-
-        // Should pick up original config
-        FutureCallback<Map<String, String>> connectorConfigCb = new 
FutureCallback<>();
-        herder.connectorConfig(CONN1, connectorConfigCb);
-        herder.tick();
-        assertTrue(connectorConfigCb.isDone());
-        assertEquals(CONN1_CONFIG, connectorConfigCb.get());
-
-        // Apply new config.
-        FutureCallback<Herder.Created<ConnectorInfo>> putConfigCb = new 
FutureCallback<>();
-        herder.putConnectorConfig(CONN1, CONN1_CONFIG_UPDATED, true, 
putConfigCb);
-        herder.tick();
-        assertTrue(putConfigCb.isDone());
-        ConnectorInfo updatedInfo = new ConnectorInfo(CONN1, 
CONN1_CONFIG_UPDATED, Arrays.asList(TASK0, TASK1, TASK2));
-        assertEquals(new Herder.Created<>(false, updatedInfo), 
putConfigCb.get());
-
-        // Check config again to validate change
-        connectorConfigCb = new FutureCallback<>();
-        herder.connectorConfig(CONN1, connectorConfigCb);
-        herder.tick();
-        assertTrue(connectorConfigCb.isDone());
-        assertEquals(CONN1_CONFIG_UPDATED, connectorConfigCb.get());
-        // The config passed to Worker should
-        assertEquals(Arrays.asList("foo", "bar", "baz"),
-                
capturedUpdatedConfig.getValue().getList(ConnectorConfig.TOPICS_CONFIG));
-        PowerMock.verifyAll();
-    }
-
-    @Test
-    public void testInconsistentConfigs() throws Exception {
-        // FIXME: if we have inconsistent configs, we need to request forced 
reconfig + write of the connector's task configs
-        // This requires inter-worker communication, so needs the REST API
-    }
-
-
-    private void expectRebalance(final long offset, final List<String> 
assignedConnectors, final List<ConnectorTaskId> assignedTasks) {
-        expectRebalance(null, null, CopycatProtocol.Assignment.NO_ERROR, 
offset, assignedConnectors, assignedTasks);
-    }
-
-    // Handles common initial part of rebalance callback. Does not handle 
instantiation of connectors and tasks.
-    private void expectRebalance(final Collection<String> revokedConnectors, 
final List<ConnectorTaskId> revokedTasks,
-                                 final short error, final long offset, final 
List<String> assignedConnectors, final List<ConnectorTaskId> assignedTasks) {
-        member.ensureActive();
-        PowerMock.expectLastCall().andAnswer(new IAnswer<Object>() {
-            @Override
-            public Object answer() throws Throwable {
-                if (revokedConnectors != null)
-                    rebalanceListener.onRevoked("leader", revokedConnectors, 
revokedTasks);
-                CopycatProtocol.Assignment assignment = new 
CopycatProtocol.Assignment(
-                        error, "leader", "leaderUrl", offset, 
assignedConnectors, assignedTasks);
-                rebalanceListener.onAssigned(assignment);
-                return null;
-            }
-        });
-        member.wakeup();
-        PowerMock.expectLastCall();
-    }
-
-    private void expectPostRebalanceCatchup(final ClusterConfigState 
readToEndSnapshot) {
-        TestFuture<Void> readToEndFuture = new TestFuture<>();
-        readToEndFuture.resolveOnGet((Void) null);
-        EasyMock.expect(configStorage.readToEnd()).andReturn(readToEndFuture);
-        EasyMock.expect(configStorage.snapshot()).andReturn(readToEndSnapshot);
-    }
-
-
-    // We need to use a real class here due to some issue with mocking 
java.lang.Class
-    private abstract class BogusSourceConnector extends SourceConnector {
-    }
-
-    private abstract class BogusSourceTask extends SourceTask {
-    }
-
-}

Reply via email to