http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskThreadedTest.java ---------------------------------------------------------------------- diff --git a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskThreadedTest.java b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskThreadedTest.java deleted file mode 100644 index ded78a1..0000000 --- a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskThreadedTest.java +++ /dev/null @@ -1,563 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - **/ - -package org.apache.kafka.copycat.runtime; - -import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.clients.consumer.OffsetAndMetadata; -import org.apache.kafka.clients.consumer.OffsetCommitCallback; -import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.utils.Time; -import org.apache.kafka.copycat.data.Schema; -import org.apache.kafka.copycat.data.SchemaAndValue; -import org.apache.kafka.copycat.errors.CopycatException; -import org.apache.kafka.copycat.runtime.standalone.StandaloneConfig; -import org.apache.kafka.copycat.sink.SinkConnector; -import org.apache.kafka.copycat.sink.SinkRecord; -import org.apache.kafka.copycat.sink.SinkTask; -import org.apache.kafka.copycat.storage.Converter; -import org.apache.kafka.copycat.util.ConnectorTaskId; -import org.apache.kafka.copycat.util.MockTime; -import org.apache.kafka.copycat.util.ThreadedTest; -import org.easymock.Capture; -import org.easymock.CaptureType; -import org.easymock.EasyMock; -import org.easymock.IAnswer; -import org.easymock.IExpectationSetters; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.powermock.api.easymock.PowerMock; -import org.powermock.api.easymock.annotation.Mock; -import org.powermock.core.classloader.annotations.PowerMockIgnore; -import org.powermock.core.classloader.annotations.PrepareForTest; -import org.powermock.modules.junit4.PowerMockRunner; -import org.powermock.reflect.Whitebox; - -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.concurrent.TimeUnit; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.fail; - -@RunWith(PowerMockRunner.class) -@PrepareForTest(WorkerSinkTask.class) -@PowerMockIgnore("javax.management.*") -public class WorkerSinkTaskThreadedTest extends ThreadedTest { - - // These are fixed to keep this code simpler. In this example we assume byte[] raw values - // with mix of integer/string in Copycat - private static final String TOPIC = "test"; - private static final int PARTITION = 12; - private static final int PARTITION2 = 13; - private static final int PARTITION3 = 14; - private static final long FIRST_OFFSET = 45; - private static final Schema KEY_SCHEMA = Schema.INT32_SCHEMA; - private static final int KEY = 12; - private static final Schema VALUE_SCHEMA = Schema.STRING_SCHEMA; - private static final String VALUE = "VALUE"; - private static final byte[] RAW_KEY = "key".getBytes(); - private static final byte[] RAW_VALUE = "value".getBytes(); - - private static final TopicPartition TOPIC_PARTITION = new TopicPartition(TOPIC, PARTITION); - private static final TopicPartition TOPIC_PARTITION2 = new TopicPartition(TOPIC, PARTITION2); - private static final TopicPartition TOPIC_PARTITION3 = new TopicPartition(TOPIC, PARTITION3); - private static final TopicPartition UNASSIGNED_TOPIC_PARTITION = new TopicPartition(TOPIC, 200); - - private static final Map<String, String> TASK_PROPS = new HashMap<>(); - static { - TASK_PROPS.put(SinkConnector.TOPICS_CONFIG, TOPIC); - } - - private ConnectorTaskId taskId = new ConnectorTaskId("job", 0); - private Time time; - @Mock private SinkTask sinkTask; - private Capture<WorkerSinkTaskContext> sinkTaskContext = EasyMock.newCapture(); - private WorkerConfig workerConfig; - @Mock private Converter keyConverter; - @Mock - private Converter valueConverter; - private WorkerSinkTask workerTask; - @Mock private KafkaConsumer<byte[], byte[]> consumer; - private WorkerSinkTaskThread workerThread; - private Capture<ConsumerRebalanceListener> rebalanceListener = EasyMock.newCapture(); - - private long recordsReturned; - - @SuppressWarnings("unchecked") - @Override - public void setup() { - super.setup(); - time = new MockTime(); - Map<String, String> workerProps = new HashMap<>(); - workerProps.put("key.converter", "org.apache.kafka.copycat.json.JsonConverter"); - workerProps.put("value.converter", "org.apache.kafka.copycat.json.JsonConverter"); - workerProps.put("internal.key.converter", "org.apache.kafka.copycat.json.JsonConverter"); - workerProps.put("internal.value.converter", "org.apache.kafka.copycat.json.JsonConverter"); - workerProps.put("internal.key.converter.schemas.enable", "false"); - workerProps.put("internal.value.converter.schemas.enable", "false"); - workerConfig = new StandaloneConfig(workerProps); - workerTask = PowerMock.createPartialMock( - WorkerSinkTask.class, new String[]{"createConsumer", "createWorkerThread"}, - taskId, sinkTask, workerConfig, keyConverter, valueConverter, time); - - recordsReturned = 0; - } - - @Test - public void testPollsInBackground() throws Exception { - expectInitializeTask(); - Capture<Collection<SinkRecord>> capturedRecords = expectPolls(1L); - expectStopTask(10L); - EasyMock.expect(workerThread.awaitShutdown(EasyMock.anyLong(), EasyMock.<TimeUnit>anyObject())).andReturn(true); - - PowerMock.replayAll(); - - workerTask.start(TASK_PROPS); - workerTask.joinConsumerGroupAndStart(); - for (int i = 0; i < 10; i++) { - workerThread.iteration(); - } - workerTask.stop(); - workerTask.awaitStop(Long.MAX_VALUE); - workerTask.close(); - - // Verify contents match expected values, i.e. that they were translated properly. With max - // batch size 1 and poll returns 1 message at a time, we should have a matching # of batches - assertEquals(10, capturedRecords.getValues().size()); - int offset = 0; - for (Collection<SinkRecord> recs : capturedRecords.getValues()) { - assertEquals(1, recs.size()); - for (SinkRecord rec : recs) { - SinkRecord referenceSinkRecord - = new SinkRecord(TOPIC, PARTITION, KEY_SCHEMA, KEY, VALUE_SCHEMA, VALUE, FIRST_OFFSET + offset); - assertEquals(referenceSinkRecord, rec); - offset++; - } - } - - PowerMock.verifyAll(); - } - - @Test - public void testCommit() throws Exception { - expectInitializeTask(); - // Make each poll() take the offset commit interval - Capture<Collection<SinkRecord>> capturedRecords - = expectPolls(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_DEFAULT); - expectOffsetFlush(1L, null, null, 0, true); - expectStopTask(2); - EasyMock.expect(workerThread.awaitShutdown(EasyMock.anyLong(), EasyMock.<TimeUnit>anyObject())).andReturn(true); - - PowerMock.replayAll(); - - workerTask.start(TASK_PROPS); - workerTask.joinConsumerGroupAndStart(); - // First iteration gets one record - workerThread.iteration(); - // Second triggers commit, gets a second offset - workerThread.iteration(); - // Commit finishes synchronously for testing so we can check this immediately - assertEquals(0, workerThread.commitFailures()); - workerTask.stop(); - workerTask.awaitStop(Long.MAX_VALUE); - workerTask.close(); - - assertEquals(2, capturedRecords.getValues().size()); - - PowerMock.verifyAll(); - } - - @Test - public void testCommitTaskFlushFailure() throws Exception { - expectInitializeTask(); - Capture<Collection<SinkRecord>> capturedRecords = expectPolls(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_DEFAULT); - expectOffsetFlush(1L, new RuntimeException(), null, 0, true); - // Should rewind to last known good positions, which in this case will be the offsets loaded during initialization - // for all topic partitions - consumer.seek(TOPIC_PARTITION, FIRST_OFFSET); - PowerMock.expectLastCall(); - consumer.seek(TOPIC_PARTITION2, FIRST_OFFSET); - PowerMock.expectLastCall(); - consumer.seek(TOPIC_PARTITION3, FIRST_OFFSET); - PowerMock.expectLastCall(); - expectStopTask(2); - EasyMock.expect(workerThread.awaitShutdown(EasyMock.anyLong(), EasyMock.<TimeUnit>anyObject())).andReturn(true); - - PowerMock.replayAll(); - - workerTask.start(TASK_PROPS); - workerTask.joinConsumerGroupAndStart(); - // Second iteration triggers commit - workerThread.iteration(); - workerThread.iteration(); - assertEquals(1, workerThread.commitFailures()); - assertEquals(false, Whitebox.getInternalState(workerThread, "committing")); - workerTask.stop(); - workerTask.awaitStop(Long.MAX_VALUE); - workerTask.close(); - - PowerMock.verifyAll(); - } - - @Test - public void testCommitTaskSuccessAndFlushFailure() throws Exception { - // Validate that we rewind to the correct offsets if a task's flush method throws an exception - - expectInitializeTask(); - Capture<Collection<SinkRecord>> capturedRecords = expectPolls(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_DEFAULT); - expectOffsetFlush(1L, null, null, 0, true); - expectOffsetFlush(2L, new RuntimeException(), null, 0, true); - // Should rewind to last known committed positions - consumer.seek(TOPIC_PARTITION, FIRST_OFFSET + 1); - PowerMock.expectLastCall(); - consumer.seek(TOPIC_PARTITION2, FIRST_OFFSET); - PowerMock.expectLastCall(); - consumer.seek(TOPIC_PARTITION3, FIRST_OFFSET); - PowerMock.expectLastCall(); - expectStopTask(2); - EasyMock.expect(workerThread.awaitShutdown(EasyMock.anyLong(), EasyMock.<TimeUnit>anyObject())).andReturn(true); - - PowerMock.replayAll(); - - workerTask.start(TASK_PROPS); - workerTask.joinConsumerGroupAndStart(); - // Second iteration triggers first commit, third iteration triggers second (failing) commit - workerThread.iteration(); - workerThread.iteration(); - workerThread.iteration(); - assertEquals(1, workerThread.commitFailures()); - assertEquals(false, Whitebox.getInternalState(workerThread, "committing")); - workerTask.stop(); - workerTask.awaitStop(Long.MAX_VALUE); - workerTask.close(); - - PowerMock.verifyAll(); - } - - @Test - public void testCommitConsumerFailure() throws Exception { - expectInitializeTask(); - Capture<Collection<SinkRecord>> capturedRecords - = expectPolls(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_DEFAULT); - expectOffsetFlush(1L, null, new Exception(), 0, true); - expectStopTask(2); - EasyMock.expect(workerThread.awaitShutdown(EasyMock.anyLong(), EasyMock.<TimeUnit>anyObject())).andReturn(true); - - PowerMock.replayAll(); - - workerTask.start(TASK_PROPS); - workerTask.joinConsumerGroupAndStart(); - // Second iteration triggers commit - workerThread.iteration(); - workerThread.iteration(); - // TODO Response to consistent failures? - assertEquals(1, workerThread.commitFailures()); - assertEquals(false, Whitebox.getInternalState(workerThread, "committing")); - workerTask.stop(); - workerTask.awaitStop(Long.MAX_VALUE); - workerTask.close(); - - PowerMock.verifyAll(); - } - - @Test - public void testCommitTimeout() throws Exception { - expectInitializeTask(); - // Cut down amount of time to pass in each poll so we trigger exactly 1 offset commit - Capture<Collection<SinkRecord>> capturedRecords - = expectPolls(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_DEFAULT / 2); - expectOffsetFlush(2L, null, null, WorkerConfig.OFFSET_COMMIT_TIMEOUT_MS_DEFAULT, false); - expectStopTask(4); - EasyMock.expect(workerThread.awaitShutdown(EasyMock.anyLong(), EasyMock.<TimeUnit>anyObject())).andReturn(true); - - PowerMock.replayAll(); - - workerTask.start(TASK_PROPS); - workerTask.joinConsumerGroupAndStart(); - // Third iteration triggers commit, fourth gives a chance to trigger the timeout but doesn't - // trigger another commit - workerThread.iteration(); - workerThread.iteration(); - workerThread.iteration(); - workerThread.iteration(); - // TODO Response to consistent failures? - assertEquals(1, workerThread.commitFailures()); - assertEquals(false, Whitebox.getInternalState(workerThread, "committing")); - workerTask.stop(); - workerTask.awaitStop(Long.MAX_VALUE); - workerTask.close(); - - PowerMock.verifyAll(); - } - - @Test - public void testAssignmentPauseResume() throws Exception { - // Just validate that the calls are passed through to the consumer, and that where appropriate errors are - // converted - expectInitializeTask(); - - expectOnePoll().andAnswer(new IAnswer<Object>() { - @Override - public Object answer() throws Throwable { - assertEquals(new HashSet<>(Arrays.asList(TOPIC_PARTITION, TOPIC_PARTITION2, TOPIC_PARTITION3)), - sinkTaskContext.getValue().assignment()); - return null; - } - }); - EasyMock.expect(consumer.assignment()).andReturn(new HashSet<>(Arrays.asList(TOPIC_PARTITION, TOPIC_PARTITION2, TOPIC_PARTITION3))); - - expectOnePoll().andAnswer(new IAnswer<Object>() { - @Override - public Object answer() throws Throwable { - try { - sinkTaskContext.getValue().pause(UNASSIGNED_TOPIC_PARTITION); - fail("Trying to pause unassigned partition should have thrown an Copycat exception"); - } catch (CopycatException e) { - // expected - } - sinkTaskContext.getValue().pause(TOPIC_PARTITION, TOPIC_PARTITION2); - return null; - } - }); - consumer.pause(UNASSIGNED_TOPIC_PARTITION); - PowerMock.expectLastCall().andThrow(new IllegalStateException("unassigned topic partition")); - consumer.pause(TOPIC_PARTITION, TOPIC_PARTITION2); - PowerMock.expectLastCall(); - - expectOnePoll().andAnswer(new IAnswer<Object>() { - @Override - public Object answer() throws Throwable { - try { - sinkTaskContext.getValue().resume(UNASSIGNED_TOPIC_PARTITION); - fail("Trying to resume unassigned partition should have thrown an Copycat exception"); - } catch (CopycatException e) { - // expected - } - - sinkTaskContext.getValue().resume(TOPIC_PARTITION, TOPIC_PARTITION2); - return null; - } - }); - consumer.resume(UNASSIGNED_TOPIC_PARTITION); - PowerMock.expectLastCall().andThrow(new IllegalStateException("unassigned topic partition")); - consumer.resume(TOPIC_PARTITION, TOPIC_PARTITION2); - PowerMock.expectLastCall(); - - expectStopTask(0); - EasyMock.expect(workerThread.awaitShutdown(EasyMock.anyLong(), EasyMock.<TimeUnit>anyObject())).andReturn(true); - - PowerMock.replayAll(); - - workerTask.start(TASK_PROPS); - workerTask.joinConsumerGroupAndStart(); - workerThread.iteration(); - workerThread.iteration(); - workerThread.iteration(); - workerTask.stop(); - workerTask.awaitStop(Long.MAX_VALUE); - workerTask.close(); - - PowerMock.verifyAll(); - } - - @Test - public void testRewind() throws Exception { - expectInitializeTask(); - final long startOffset = 40L; - final Map<TopicPartition, Long> offsets = new HashMap<>(); - - expectOnePoll().andAnswer(new IAnswer<Object>() { - @Override - public Object answer() throws Throwable { - offsets.put(TOPIC_PARTITION, startOffset); - sinkTaskContext.getValue().offset(offsets); - return null; - } - }); - - consumer.seek(TOPIC_PARTITION, startOffset); - EasyMock.expectLastCall(); - - expectOnePoll().andAnswer(new IAnswer<Object>() { - @Override - public Object answer() throws Throwable { - Map<TopicPartition, Long> offsets = sinkTaskContext.getValue().offsets(); - assertEquals(0, offsets.size()); - return null; - } - }); - - expectStopTask(3); - EasyMock.expect(workerThread.awaitShutdown(EasyMock.anyLong(), EasyMock.<TimeUnit>anyObject())).andReturn(true); - - PowerMock.replayAll(); - - workerTask.start(TASK_PROPS); - workerTask.joinConsumerGroupAndStart(); - workerThread.iteration(); - workerThread.iteration(); - workerTask.stop(); - workerTask.awaitStop(Long.MAX_VALUE); - workerTask.close(); - - PowerMock.verifyAll(); - } - - private void expectInitializeTask() throws Exception { - PowerMock.expectPrivate(workerTask, "createConsumer").andReturn(consumer); - - workerThread = PowerMock.createPartialMock(WorkerSinkTaskThread.class, new String[]{"start", "awaitShutdown"}, - workerTask, "mock-worker-thread", time, - workerConfig); - PowerMock.expectPrivate(workerTask, "createWorkerThread") - .andReturn(workerThread); - workerThread.start(); - PowerMock.expectLastCall(); - - consumer.subscribe(EasyMock.eq(Arrays.asList(TOPIC)), EasyMock.capture(rebalanceListener)); - PowerMock.expectLastCall(); - - EasyMock.expect(consumer.poll(EasyMock.anyLong())).andAnswer(new IAnswer<ConsumerRecords<byte[], byte[]>>() { - @Override - public ConsumerRecords<byte[], byte[]> answer() throws Throwable { - rebalanceListener.getValue().onPartitionsAssigned(Arrays.asList(TOPIC_PARTITION, TOPIC_PARTITION2, TOPIC_PARTITION3)); - return ConsumerRecords.empty(); - } - }); - EasyMock.expect(consumer.position(TOPIC_PARTITION)).andReturn(FIRST_OFFSET); - EasyMock.expect(consumer.position(TOPIC_PARTITION2)).andReturn(FIRST_OFFSET); - EasyMock.expect(consumer.position(TOPIC_PARTITION3)).andReturn(FIRST_OFFSET); - - sinkTask.initialize(EasyMock.capture(sinkTaskContext)); - PowerMock.expectLastCall(); - sinkTask.start(TASK_PROPS); - PowerMock.expectLastCall(); - } - - private void expectStopTask(final long expectedMessages) throws Exception { - final long finalOffset = FIRST_OFFSET + expectedMessages - 1; - - sinkTask.stop(); - PowerMock.expectLastCall(); - - // No offset commit since it happens in the mocked worker thread, but the main thread does need to wake up the - // consumer so it exits quickly - consumer.wakeup(); - PowerMock.expectLastCall(); - - consumer.close(); - PowerMock.expectLastCall(); - } - - // Note that this can only be called once per test currently - private Capture<Collection<SinkRecord>> expectPolls(final long pollDelayMs) throws Exception { - // Stub out all the consumer stream/iterator responses, which we just want to verify occur, - // but don't care about the exact details here. - EasyMock.expect(consumer.poll(EasyMock.anyLong())).andStubAnswer( - new IAnswer<ConsumerRecords<byte[], byte[]>>() { - @Override - public ConsumerRecords<byte[], byte[]> answer() throws Throwable { - // "Sleep" so time will progress - time.sleep(pollDelayMs); - ConsumerRecords<byte[], byte[]> records = new ConsumerRecords<>( - Collections.singletonMap( - new TopicPartition(TOPIC, PARTITION), - Arrays.asList( - new ConsumerRecord<>(TOPIC, PARTITION, FIRST_OFFSET + recordsReturned, RAW_KEY, RAW_VALUE) - ))); - recordsReturned++; - return records; - } - }); - EasyMock.expect(keyConverter.toCopycatData(TOPIC, RAW_KEY)).andReturn(new SchemaAndValue(KEY_SCHEMA, KEY)).anyTimes(); - EasyMock.expect(valueConverter.toCopycatData(TOPIC, RAW_VALUE)).andReturn(new SchemaAndValue(VALUE_SCHEMA, VALUE)).anyTimes(); - Capture<Collection<SinkRecord>> capturedRecords = EasyMock.newCapture(CaptureType.ALL); - sinkTask.put(EasyMock.capture(capturedRecords)); - EasyMock.expectLastCall().anyTimes(); - return capturedRecords; - } - - private IExpectationSetters<Object> expectOnePoll() { - // Currently the SinkTask's put() method will not be invoked unless we provide some data, so instead of - // returning empty data, we return one record. The expectation is that the data will be ignored by the - // response behavior specified using the return value of this method. - EasyMock.expect(consumer.poll(EasyMock.anyLong())).andAnswer( - new IAnswer<ConsumerRecords<byte[], byte[]>>() { - @Override - public ConsumerRecords<byte[], byte[]> answer() throws Throwable { - // "Sleep" so time will progress - time.sleep(1L); - ConsumerRecords<byte[], byte[]> records = new ConsumerRecords<>( - Collections.singletonMap( - new TopicPartition(TOPIC, PARTITION), - Arrays.asList( - new ConsumerRecord<>(TOPIC, PARTITION, FIRST_OFFSET + recordsReturned, RAW_KEY, RAW_VALUE) - ))); - recordsReturned++; - return records; - } - }); - EasyMock.expect(keyConverter.toCopycatData(TOPIC, RAW_KEY)).andReturn(new SchemaAndValue(KEY_SCHEMA, KEY)); - EasyMock.expect(valueConverter.toCopycatData(TOPIC, RAW_VALUE)).andReturn(new SchemaAndValue(VALUE_SCHEMA, VALUE)); - sinkTask.put(EasyMock.anyObject(Collection.class)); - return EasyMock.expectLastCall(); - } - - private Capture<OffsetCommitCallback> expectOffsetFlush(final long expectedMessages, - final RuntimeException flushError, - final Exception consumerCommitError, - final long consumerCommitDelayMs, - final boolean invokeCallback) - throws Exception { - final long finalOffset = FIRST_OFFSET + expectedMessages; - - // All assigned partitions will have offsets committed, but we've only processed messages/updated offsets for one - final Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new HashMap<>(); - offsetsToCommit.put(TOPIC_PARTITION, new OffsetAndMetadata(finalOffset)); - offsetsToCommit.put(TOPIC_PARTITION2, new OffsetAndMetadata(FIRST_OFFSET)); - offsetsToCommit.put(TOPIC_PARTITION3, new OffsetAndMetadata(FIRST_OFFSET)); - sinkTask.flush(offsetsToCommit); - IExpectationSetters<Object> flushExpectation = PowerMock.expectLastCall(); - if (flushError != null) { - flushExpectation.andThrow(flushError).once(); - return null; - } - - final Capture<OffsetCommitCallback> capturedCallback = EasyMock.newCapture(); - consumer.commitAsync(EasyMock.eq(offsetsToCommit), - EasyMock.capture(capturedCallback)); - PowerMock.expectLastCall().andAnswer(new IAnswer<Object>() { - @Override - public Object answer() throws Throwable { - time.sleep(consumerCommitDelayMs); - if (invokeCallback) - capturedCallback.getValue().onComplete(offsetsToCommit, consumerCommitError); - return null; - } - }); - return capturedCallback; - } - -}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSourceTaskTest.java ---------------------------------------------------------------------- diff --git a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSourceTaskTest.java b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSourceTaskTest.java deleted file mode 100644 index 0fa14bd..0000000 --- a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSourceTaskTest.java +++ /dev/null @@ -1,308 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - **/ - -package org.apache.kafka.copycat.runtime; - -import org.apache.kafka.common.utils.SystemTime; -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.clients.producer.RecordMetadata; -import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.utils.Utils; -import org.apache.kafka.copycat.data.Schema; -import org.apache.kafka.copycat.runtime.standalone.StandaloneConfig; -import org.apache.kafka.copycat.source.SourceRecord; -import org.apache.kafka.copycat.source.SourceTask; -import org.apache.kafka.copycat.source.SourceTaskContext; -import org.apache.kafka.copycat.storage.Converter; -import org.apache.kafka.copycat.storage.OffsetStorageReader; -import org.apache.kafka.copycat.storage.OffsetStorageWriter; -import org.apache.kafka.copycat.util.Callback; -import org.apache.kafka.copycat.util.ConnectorTaskId; -import org.apache.kafka.copycat.util.ThreadedTest; -import org.easymock.Capture; -import org.easymock.EasyMock; -import org.easymock.IAnswer; -import org.easymock.IExpectationSetters; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.powermock.api.easymock.PowerMock; -import org.powermock.api.easymock.annotation.Mock; -import org.powermock.modules.junit4.PowerMockRunner; -import org.powermock.reflect.Whitebox; - -import java.util.*; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; - -import static org.junit.Assert.*; - -@RunWith(PowerMockRunner.class) -public class WorkerSourceTaskTest extends ThreadedTest { - private static final String TOPIC = "topic"; - private static final Map<String, byte[]> PARTITION = Collections.singletonMap("key", "partition".getBytes()); - private static final Map<String, Integer> OFFSET = Collections.singletonMap("key", 12); - - // Copycat-format data - private static final Schema KEY_SCHEMA = Schema.INT32_SCHEMA; - private static final Integer KEY = -1; - private static final Schema RECORD_SCHEMA = Schema.INT64_SCHEMA; - private static final Long RECORD = 12L; - // Serialized data. The actual format of this data doesn't matter -- we just want to see that the right version - // is used in the right place. - private static final byte[] SERIALIZED_KEY = "converted-key".getBytes(); - private static final byte[] SERIALIZED_RECORD = "converted-record".getBytes(); - - private ConnectorTaskId taskId = new ConnectorTaskId("job", 0); - private WorkerConfig config; - @Mock private SourceTask sourceTask; - @Mock private Converter keyConverter; - @Mock private Converter valueConverter; - @Mock private KafkaProducer<byte[], byte[]> producer; - @Mock private OffsetStorageReader offsetReader; - @Mock private OffsetStorageWriter offsetWriter; - private WorkerSourceTask workerTask; - @Mock private Future<RecordMetadata> sendFuture; - - private Capture<org.apache.kafka.clients.producer.Callback> producerCallbacks; - - private static final Map<String, String> EMPTY_TASK_PROPS = Collections.emptyMap(); - private static final List<SourceRecord> RECORDS = Arrays.asList( - new SourceRecord(PARTITION, OFFSET, "topic", null, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD) - ); - - @Override - public void setup() { - super.setup(); - Map<String, String> workerProps = new HashMap<>(); - workerProps.put("key.converter", "org.apache.kafka.copycat.json.JsonConverter"); - workerProps.put("value.converter", "org.apache.kafka.copycat.json.JsonConverter"); - workerProps.put("internal.key.converter", "org.apache.kafka.copycat.json.JsonConverter"); - workerProps.put("internal.value.converter", "org.apache.kafka.copycat.json.JsonConverter"); - workerProps.put("internal.key.converter.schemas.enable", "false"); - workerProps.put("internal.value.converter.schemas.enable", "false"); - config = new StandaloneConfig(workerProps); - producerCallbacks = EasyMock.newCapture(); - } - - private void createWorkerTask() { - workerTask = new WorkerSourceTask(taskId, sourceTask, keyConverter, valueConverter, producer, - offsetReader, offsetWriter, config, new SystemTime()); - } - - @Test - public void testPollsInBackground() throws Exception { - createWorkerTask(); - - sourceTask.initialize(EasyMock.anyObject(SourceTaskContext.class)); - EasyMock.expectLastCall(); - sourceTask.start(EMPTY_TASK_PROPS); - EasyMock.expectLastCall(); - - final CountDownLatch pollLatch = expectPolls(10); - // In this test, we don't flush, so nothing goes any further than the offset writer - - sourceTask.stop(); - EasyMock.expectLastCall(); - expectOffsetFlush(true); - - PowerMock.replayAll(); - - workerTask.start(EMPTY_TASK_PROPS); - awaitPolls(pollLatch); - workerTask.stop(); - assertEquals(true, workerTask.awaitStop(1000)); - - PowerMock.verifyAll(); - } - - @Test - public void testCommit() throws Exception { - // Test that the task commits properly when prompted - createWorkerTask(); - - sourceTask.initialize(EasyMock.anyObject(SourceTaskContext.class)); - EasyMock.expectLastCall(); - sourceTask.start(EMPTY_TASK_PROPS); - EasyMock.expectLastCall(); - - // We'll wait for some data, then trigger a flush - final CountDownLatch pollLatch = expectPolls(1); - expectOffsetFlush(true); - - sourceTask.stop(); - EasyMock.expectLastCall(); - expectOffsetFlush(true); - - PowerMock.replayAll(); - - workerTask.start(EMPTY_TASK_PROPS); - awaitPolls(pollLatch); - assertTrue(workerTask.commitOffsets()); - workerTask.stop(); - assertEquals(true, workerTask.awaitStop(1000)); - - PowerMock.verifyAll(); - } - - @Test - public void testCommitFailure() throws Exception { - // Test that the task commits properly when prompted - createWorkerTask(); - - sourceTask.initialize(EasyMock.anyObject(SourceTaskContext.class)); - EasyMock.expectLastCall(); - sourceTask.start(EMPTY_TASK_PROPS); - EasyMock.expectLastCall(); - - // We'll wait for some data, then trigger a flush - final CountDownLatch pollLatch = expectPolls(1); - expectOffsetFlush(false); - - sourceTask.stop(); - EasyMock.expectLastCall(); - expectOffsetFlush(true); - - PowerMock.replayAll(); - - workerTask.start(EMPTY_TASK_PROPS); - awaitPolls(pollLatch); - assertFalse(workerTask.commitOffsets()); - workerTask.stop(); - assertEquals(true, workerTask.awaitStop(1000)); - - PowerMock.verifyAll(); - } - - @Test - public void testSendRecordsConvertsData() throws Exception { - createWorkerTask(); - - List<SourceRecord> records = new ArrayList<>(); - // Can just use the same record for key and value - records.add(new SourceRecord(PARTITION, OFFSET, "topic", null, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD)); - - Capture<ProducerRecord<byte[], byte[]>> sent = expectSendRecord(); - - PowerMock.replayAll(); - - Whitebox.invokeMethod(workerTask, "sendRecords", records); - assertEquals(SERIALIZED_KEY, sent.getValue().key()); - assertEquals(SERIALIZED_RECORD, sent.getValue().value()); - - PowerMock.verifyAll(); - } - - @Test - public void testSlowTaskStart() throws Exception { - createWorkerTask(); - - sourceTask.initialize(EasyMock.anyObject(SourceTaskContext.class)); - EasyMock.expectLastCall(); - sourceTask.start(EMPTY_TASK_PROPS); - EasyMock.expectLastCall().andAnswer(new IAnswer<Object>() { - @Override - public Object answer() throws Throwable { - Utils.sleep(100); - return null; - } - }); - sourceTask.stop(); - EasyMock.expectLastCall(); - - PowerMock.replayAll(); - - workerTask.start(EMPTY_TASK_PROPS); - // Stopping immediately while the other thread has work to do should result in no polling, no offset commits, - // exiting the work thread immediately, and the stop() method will be invoked in the background thread since it - // cannot be invoked immediately in the thread trying to stop the task. - workerTask.stop(); - assertEquals(true, workerTask.awaitStop(1000)); - - PowerMock.verifyAll(); - } - - private CountDownLatch expectPolls(int count) throws InterruptedException { - final CountDownLatch latch = new CountDownLatch(count); - // Note that we stub these to allow any number of calls because the thread will continue to - // run. The count passed in + latch returned just makes sure we get *at least* that number of - // calls - EasyMock.expect(sourceTask.poll()) - .andStubAnswer(new IAnswer<List<SourceRecord>>() { - @Override - public List<SourceRecord> answer() throws Throwable { - latch.countDown(); - return RECORDS; - } - }); - // Fallout of the poll() call - expectSendRecord(); - return latch; - } - - private Capture<ProducerRecord<byte[], byte[]>> expectSendRecord() throws InterruptedException { - EasyMock.expect(keyConverter.fromCopycatData(TOPIC, KEY_SCHEMA, KEY)).andStubReturn(SERIALIZED_KEY); - EasyMock.expect(valueConverter.fromCopycatData(TOPIC, RECORD_SCHEMA, RECORD)).andStubReturn(SERIALIZED_RECORD); - - Capture<ProducerRecord<byte[], byte[]>> sent = EasyMock.newCapture(); - // 1. Converted data passed to the producer, which will need callbacks invoked for flush to work - EasyMock.expect( - producer.send(EasyMock.capture(sent), - EasyMock.capture(producerCallbacks))) - .andStubAnswer(new IAnswer<Future<RecordMetadata>>() { - @Override - public Future<RecordMetadata> answer() throws Throwable { - synchronized (producerCallbacks) { - for (org.apache.kafka.clients.producer.Callback cb : producerCallbacks.getValues()) { - cb.onCompletion(new RecordMetadata(new TopicPartition("foo", 0), 0, 0), null); - } - producerCallbacks.reset(); - } - return sendFuture; - } - }); - // 2. Offset data is passed to the offset storage. - offsetWriter.offset(PARTITION, OFFSET); - PowerMock.expectLastCall().anyTimes(); - - return sent; - } - - private void awaitPolls(CountDownLatch latch) throws InterruptedException { - latch.await(1000, TimeUnit.MILLISECONDS); - } - - @SuppressWarnings("unchecked") - private void expectOffsetFlush(boolean succeed) throws Exception { - EasyMock.expect(offsetWriter.beginFlush()).andReturn(true); - Future<Void> flushFuture = PowerMock.createMock(Future.class); - EasyMock.expect(offsetWriter.doFlush(EasyMock.anyObject(Callback.class))).andReturn(flushFuture); - // Should throw for failure - IExpectationSetters<Void> futureGetExpect = EasyMock.expect( - flushFuture.get(EasyMock.anyLong(), EasyMock.anyObject(TimeUnit.class))); - if (succeed) { - futureGetExpect.andReturn(null); - } else { - futureGetExpect.andThrow(new TimeoutException()); - offsetWriter.cancelFlush(); - PowerMock.expectLastCall(); - } - } - -} http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerTest.java ---------------------------------------------------------------------- diff --git a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerTest.java b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerTest.java deleted file mode 100644 index 00cef2b..0000000 --- a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerTest.java +++ /dev/null @@ -1,397 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - **/ - -package org.apache.kafka.copycat.runtime; - -import org.apache.kafka.common.utils.Time; -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.copycat.connector.Connector; -import org.apache.kafka.copycat.connector.ConnectorContext; -import org.apache.kafka.copycat.connector.Task; -import org.apache.kafka.copycat.errors.CopycatException; -import org.apache.kafka.copycat.runtime.standalone.StandaloneConfig; -import org.apache.kafka.copycat.sink.SinkTask; -import org.apache.kafka.copycat.source.SourceRecord; -import org.apache.kafka.copycat.source.SourceTask; -import org.apache.kafka.copycat.storage.Converter; -import org.apache.kafka.copycat.storage.OffsetBackingStore; -import org.apache.kafka.copycat.storage.OffsetStorageReader; -import org.apache.kafka.copycat.storage.OffsetStorageWriter; -import org.apache.kafka.copycat.util.ConnectorTaskId; -import org.apache.kafka.copycat.util.MockTime; -import org.apache.kafka.copycat.util.ThreadedTest; -import org.easymock.EasyMock; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.powermock.api.easymock.PowerMock; -import org.powermock.core.classloader.annotations.PowerMockIgnore; -import org.powermock.core.classloader.annotations.PrepareForTest; -import org.powermock.modules.junit4.PowerMockRunner; - -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.fail; - -@RunWith(PowerMockRunner.class) -@PrepareForTest(Worker.class) -@PowerMockIgnore("javax.management.*") -public class WorkerTest extends ThreadedTest { - - private static final String CONNECTOR_ID = "test-connector"; - private static final ConnectorTaskId TASK_ID = new ConnectorTaskId("job", 0); - - private WorkerConfig config; - private Worker worker; - private OffsetBackingStore offsetBackingStore = PowerMock.createMock(OffsetBackingStore.class); - - @Before - public void setup() { - super.setup(); - - Map<String, String> workerProps = new HashMap<>(); - workerProps.put("key.converter", "org.apache.kafka.copycat.json.JsonConverter"); - workerProps.put("value.converter", "org.apache.kafka.copycat.json.JsonConverter"); - workerProps.put("internal.key.converter", "org.apache.kafka.copycat.json.JsonConverter"); - workerProps.put("internal.value.converter", "org.apache.kafka.copycat.json.JsonConverter"); - workerProps.put("internal.key.converter.schemas.enable", "false"); - workerProps.put("internal.value.converter.schemas.enable", "false"); - config = new StandaloneConfig(workerProps); - } - - @Test - public void testAddRemoveConnector() throws Exception { - offsetBackingStore.configure(EasyMock.anyObject(Map.class)); - EasyMock.expectLastCall(); - offsetBackingStore.start(); - EasyMock.expectLastCall(); - - // Create - Connector connector = PowerMock.createMock(Connector.class); - ConnectorContext ctx = PowerMock.createMock(ConnectorContext.class); - - PowerMock.mockStatic(Worker.class); - PowerMock.expectPrivate(Worker.class, "instantiateConnector", new Object[]{TestConnector.class}).andReturn(connector); - EasyMock.expect(connector.version()).andReturn("1.0"); - - Map<String, String> props = new HashMap<>(); - props.put(ConnectorConfig.TOPICS_CONFIG, "foo,bar"); - props.put(ConnectorConfig.TASKS_MAX_CONFIG, "1"); - props.put(ConnectorConfig.NAME_CONFIG, CONNECTOR_ID); - props.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, TestConnector.class.getName()); - - connector.initialize(ctx); - EasyMock.expectLastCall(); - connector.start(props); - EasyMock.expectLastCall(); - - // Remove - connector.stop(); - EasyMock.expectLastCall(); - - offsetBackingStore.stop(); - EasyMock.expectLastCall(); - - PowerMock.replayAll(); - - worker = new Worker(new MockTime(), config, offsetBackingStore); - worker.start(); - - ConnectorConfig config = new ConnectorConfig(props); - assertEquals(Collections.emptySet(), worker.connectorNames()); - worker.addConnector(config, ctx); - assertEquals(new HashSet<>(Arrays.asList(CONNECTOR_ID)), worker.connectorNames()); - try { - worker.addConnector(config, ctx); - fail("Should have thrown exception when trying to add connector with same name."); - } catch (CopycatException e) { - // expected - } - worker.stopConnector(CONNECTOR_ID); - assertEquals(Collections.emptySet(), worker.connectorNames()); - // Nothing should be left, so this should effectively be a nop - worker.stop(); - - PowerMock.verifyAll(); - } - - @Test(expected = CopycatException.class) - public void testStopInvalidConnector() { - offsetBackingStore.configure(EasyMock.anyObject(Map.class)); - EasyMock.expectLastCall(); - offsetBackingStore.start(); - EasyMock.expectLastCall(); - - PowerMock.replayAll(); - - worker = new Worker(new MockTime(), config, offsetBackingStore); - worker.start(); - - worker.stopConnector(CONNECTOR_ID); - } - - @Test - public void testReconfigureConnectorTasks() throws Exception { - offsetBackingStore.configure(EasyMock.anyObject(Map.class)); - EasyMock.expectLastCall(); - offsetBackingStore.start(); - EasyMock.expectLastCall(); - - // Create - Connector connector = PowerMock.createMock(Connector.class); - ConnectorContext ctx = PowerMock.createMock(ConnectorContext.class); - - PowerMock.mockStatic(Worker.class); - PowerMock.expectPrivate(Worker.class, "instantiateConnector", new Object[]{TestConnector.class}).andReturn(connector); - EasyMock.expect(connector.version()).andReturn("1.0"); - - Map<String, String> props = new HashMap<>(); - props.put(ConnectorConfig.TOPICS_CONFIG, "foo,bar"); - props.put(ConnectorConfig.TASKS_MAX_CONFIG, "1"); - props.put(ConnectorConfig.NAME_CONFIG, CONNECTOR_ID); - props.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, TestConnector.class.getName()); - - connector.initialize(ctx); - EasyMock.expectLastCall(); - connector.start(props); - EasyMock.expectLastCall(); - - // Reconfigure - EasyMock.<Class<? extends Task>>expect(connector.taskClass()).andReturn(TestSourceTask.class); - Map<String, String> taskProps = new HashMap<>(); - taskProps.put("foo", "bar"); - EasyMock.expect(connector.taskConfigs(2)).andReturn(Arrays.asList(taskProps, taskProps)); - - // Remove - connector.stop(); - EasyMock.expectLastCall(); - - offsetBackingStore.stop(); - EasyMock.expectLastCall(); - - PowerMock.replayAll(); - - worker = new Worker(new MockTime(), config, offsetBackingStore); - worker.start(); - - ConnectorConfig config = new ConnectorConfig(props); - assertEquals(Collections.emptySet(), worker.connectorNames()); - worker.addConnector(config, ctx); - assertEquals(new HashSet<>(Arrays.asList(CONNECTOR_ID)), worker.connectorNames()); - try { - worker.addConnector(config, ctx); - fail("Should have thrown exception when trying to add connector with same name."); - } catch (CopycatException e) { - // expected - } - List<Map<String, String>> taskConfigs = worker.connectorTaskConfigs(CONNECTOR_ID, 2, Arrays.asList("foo", "bar")); - Map<String, String> expectedTaskProps = new HashMap<>(); - expectedTaskProps.put("foo", "bar"); - expectedTaskProps.put(TaskConfig.TASK_CLASS_CONFIG, TestSourceTask.class.getName()); - expectedTaskProps.put(SinkTask.TOPICS_CONFIG, "foo,bar"); - assertEquals(2, taskConfigs.size()); - assertEquals(expectedTaskProps, taskConfigs.get(0)); - assertEquals(expectedTaskProps, taskConfigs.get(1)); - worker.stopConnector(CONNECTOR_ID); - assertEquals(Collections.emptySet(), worker.connectorNames()); - // Nothing should be left, so this should effectively be a nop - worker.stop(); - - PowerMock.verifyAll(); - } - - - @Test - public void testAddRemoveTask() throws Exception { - offsetBackingStore.configure(EasyMock.anyObject(Map.class)); - EasyMock.expectLastCall(); - offsetBackingStore.start(); - EasyMock.expectLastCall(); - - ConnectorTaskId taskId = new ConnectorTaskId("job", 0); - - // Create - TestSourceTask task = PowerMock.createMock(TestSourceTask.class); - WorkerSourceTask workerTask = PowerMock.createMock(WorkerSourceTask.class); - - PowerMock.mockStatic(Worker.class); - PowerMock.expectPrivate(Worker.class, "instantiateTask", new Object[]{TestSourceTask.class}).andReturn(task); - EasyMock.expect(task.version()).andReturn("1.0"); - - PowerMock.expectNew( - WorkerSourceTask.class, EasyMock.eq(taskId), EasyMock.eq(task), - EasyMock.anyObject(Converter.class), - EasyMock.anyObject(Converter.class), - EasyMock.anyObject(KafkaProducer.class), - EasyMock.anyObject(OffsetStorageReader.class), - EasyMock.anyObject(OffsetStorageWriter.class), - EasyMock.anyObject(WorkerConfig.class), - EasyMock.anyObject(Time.class)) - .andReturn(workerTask); - Map<String, String> origProps = new HashMap<>(); - origProps.put(TaskConfig.TASK_CLASS_CONFIG, TestSourceTask.class.getName()); - workerTask.start(origProps); - EasyMock.expectLastCall(); - - // Remove - workerTask.stop(); - EasyMock.expectLastCall(); - EasyMock.expect(workerTask.awaitStop(EasyMock.anyLong())).andStubReturn(true); - workerTask.close(); - EasyMock.expectLastCall(); - - offsetBackingStore.stop(); - EasyMock.expectLastCall(); - - PowerMock.replayAll(); - - worker = new Worker(new MockTime(), config, offsetBackingStore); - worker.start(); - assertEquals(Collections.emptySet(), worker.taskIds()); - worker.addTask(taskId, new TaskConfig(origProps)); - assertEquals(new HashSet<>(Arrays.asList(taskId)), worker.taskIds()); - worker.stopTask(taskId); - assertEquals(Collections.emptySet(), worker.taskIds()); - // Nothing should be left, so this should effectively be a nop - worker.stop(); - - PowerMock.verifyAll(); - } - - @Test(expected = CopycatException.class) - public void testStopInvalidTask() { - offsetBackingStore.configure(EasyMock.anyObject(Map.class)); - EasyMock.expectLastCall(); - offsetBackingStore.start(); - EasyMock.expectLastCall(); - - PowerMock.replayAll(); - - worker = new Worker(new MockTime(), config, offsetBackingStore); - worker.start(); - - worker.stopTask(TASK_ID); - } - - @Test - public void testCleanupTasksOnStop() throws Exception { - offsetBackingStore.configure(EasyMock.anyObject(Map.class)); - EasyMock.expectLastCall(); - offsetBackingStore.start(); - EasyMock.expectLastCall(); - - // Create - TestSourceTask task = PowerMock.createMock(TestSourceTask.class); - WorkerSourceTask workerTask = PowerMock.createMock(WorkerSourceTask.class); - - PowerMock.mockStatic(Worker.class); - PowerMock.expectPrivate(Worker.class, "instantiateTask", new Object[]{TestSourceTask.class}).andReturn(task); - EasyMock.expect(task.version()).andReturn("1.0"); - - PowerMock.expectNew( - WorkerSourceTask.class, EasyMock.eq(TASK_ID), EasyMock.eq(task), - EasyMock.anyObject(Converter.class), - EasyMock.anyObject(Converter.class), - EasyMock.anyObject(KafkaProducer.class), - EasyMock.anyObject(OffsetStorageReader.class), - EasyMock.anyObject(OffsetStorageWriter.class), - EasyMock.anyObject(WorkerConfig.class), - EasyMock.anyObject(Time.class)) - .andReturn(workerTask); - Map<String, String> origProps = new HashMap<>(); - origProps.put(TaskConfig.TASK_CLASS_CONFIG, TestSourceTask.class.getName()); - workerTask.start(origProps); - EasyMock.expectLastCall(); - - // Remove on Worker.stop() - workerTask.stop(); - EasyMock.expectLastCall(); - EasyMock.expect(workerTask.awaitStop(EasyMock.anyLong())).andReturn(true); - // Note that in this case we *do not* commit offsets since it's an unclean shutdown - workerTask.close(); - EasyMock.expectLastCall(); - - offsetBackingStore.stop(); - EasyMock.expectLastCall(); - - PowerMock.replayAll(); - - worker = new Worker(new MockTime(), config, offsetBackingStore); - worker.start(); - worker.addTask(TASK_ID, new TaskConfig(origProps)); - worker.stop(); - - PowerMock.verifyAll(); - } - - - private static class TestConnector extends Connector { - @Override - public String version() { - return "1.0"; - } - - @Override - public void start(Map<String, String> props) { - - } - - @Override - public Class<? extends Task> taskClass() { - return null; - } - - @Override - public List<Map<String, String>> taskConfigs(int maxTasks) { - return null; - } - - @Override - public void stop() { - - } - } - - private static class TestSourceTask extends SourceTask { - public TestSourceTask() { - } - - @Override - public String version() { - return "1.0"; - } - - @Override - public void start(Map<String, String> props) { - } - - @Override - public List<SourceRecord> poll() throws InterruptedException { - return null; - } - - @Override - public void stop() { - } - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerderTest.java ---------------------------------------------------------------------- diff --git a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerderTest.java b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerderTest.java deleted file mode 100644 index 512cb5c..0000000 --- a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerderTest.java +++ /dev/null @@ -1,573 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - **/ - -package org.apache.kafka.copycat.runtime.distributed; - -import org.apache.kafka.clients.CommonClientConfigs; -import org.apache.kafka.common.utils.MockTime; -import org.apache.kafka.copycat.connector.ConnectorContext; -import org.apache.kafka.copycat.errors.AlreadyExistsException; -import org.apache.kafka.copycat.runtime.ConnectorConfig; -import org.apache.kafka.copycat.runtime.Herder; -import org.apache.kafka.copycat.runtime.TaskConfig; -import org.apache.kafka.copycat.runtime.Worker; -import org.apache.kafka.copycat.runtime.WorkerConfig; -import org.apache.kafka.copycat.runtime.rest.entities.ConnectorInfo; -import org.apache.kafka.copycat.runtime.rest.entities.TaskInfo; -import org.apache.kafka.copycat.source.SourceConnector; -import org.apache.kafka.copycat.source.SourceTask; -import org.apache.kafka.copycat.storage.KafkaConfigStorage; -import org.apache.kafka.copycat.util.Callback; -import org.apache.kafka.copycat.util.ConnectorTaskId; -import org.apache.kafka.copycat.util.FutureCallback; -import org.apache.kafka.copycat.util.TestFuture; -import org.easymock.Capture; -import org.easymock.EasyMock; -import org.easymock.IAnswer; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.powermock.api.easymock.PowerMock; -import org.powermock.api.easymock.annotation.Mock; -import org.powermock.core.classloader.annotations.PowerMockIgnore; -import org.powermock.core.classloader.annotations.PrepareForTest; -import org.powermock.modules.junit4.PowerMockRunner; -import org.powermock.reflect.Whitebox; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.TimeoutException; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -@RunWith(PowerMockRunner.class) -@PrepareForTest(DistributedHerder.class) -@PowerMockIgnore("javax.management.*") -public class DistributedHerderTest { - private static final Map<String, String> HERDER_CONFIG = new HashMap<>(); - static { - HERDER_CONFIG.put(KafkaConfigStorage.CONFIG_TOPIC_CONFIG, "config-topic"); - HERDER_CONFIG.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); - HERDER_CONFIG.put(DistributedConfig.GROUP_ID_CONFIG, "test-copycat-group"); - // The WorkerConfig base class has some required settings without defaults - HERDER_CONFIG.put(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.copycat.json.JsonConverter"); - HERDER_CONFIG.put(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.copycat.json.JsonConverter"); - HERDER_CONFIG.put(WorkerConfig.INTERNAL_KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.copycat.json.JsonConverter"); - HERDER_CONFIG.put(WorkerConfig.INTERNAL_VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.copycat.json.JsonConverter"); - } - private static final String MEMBER_URL = "memberUrl"; - - private static final String CONN1 = "sourceA"; - private static final String CONN2 = "sourceB"; - private static final ConnectorTaskId TASK0 = new ConnectorTaskId(CONN1, 0); - private static final ConnectorTaskId TASK1 = new ConnectorTaskId(CONN1, 1); - private static final ConnectorTaskId TASK2 = new ConnectorTaskId(CONN1, 2); - private static final Integer MAX_TASKS = 3; - private static final Map<String, String> CONN1_CONFIG = new HashMap<>(); - static { - CONN1_CONFIG.put(ConnectorConfig.NAME_CONFIG, CONN1); - CONN1_CONFIG.put(ConnectorConfig.TASKS_MAX_CONFIG, MAX_TASKS.toString()); - CONN1_CONFIG.put(ConnectorConfig.TOPICS_CONFIG, "foo,bar"); - CONN1_CONFIG.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, BogusSourceConnector.class.getName()); - } - private static final Map<String, String> CONN1_CONFIG_UPDATED = new HashMap<>(CONN1_CONFIG); - static { - CONN1_CONFIG_UPDATED.put(ConnectorConfig.TOPICS_CONFIG, "foo,bar,baz"); - } - private static final Map<String, String> CONN2_CONFIG = new HashMap<>(); - static { - CONN2_CONFIG.put(ConnectorConfig.NAME_CONFIG, CONN2); - CONN2_CONFIG.put(ConnectorConfig.TASKS_MAX_CONFIG, MAX_TASKS.toString()); - CONN2_CONFIG.put(ConnectorConfig.TOPICS_CONFIG, "foo,bar"); - CONN2_CONFIG.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, BogusSourceConnector.class.getName()); - } - private static final Map<String, String> TASK_CONFIG = new HashMap<>(); - static { - TASK_CONFIG.put(TaskConfig.TASK_CLASS_CONFIG, BogusSourceTask.class.getName()); - } - private static final List<Map<String, String>> TASK_CONFIGS = new ArrayList<>(); - static { - TASK_CONFIGS.add(TASK_CONFIG); - TASK_CONFIGS.add(TASK_CONFIG); - TASK_CONFIGS.add(TASK_CONFIG); - } - private static final HashMap<ConnectorTaskId, Map<String, String>> TASK_CONFIGS_MAP = new HashMap<>(); - static { - TASK_CONFIGS_MAP.put(TASK0, TASK_CONFIG); - TASK_CONFIGS_MAP.put(TASK1, TASK_CONFIG); - TASK_CONFIGS_MAP.put(TASK2, TASK_CONFIG); - } - private static final ClusterConfigState SNAPSHOT = new ClusterConfigState(1, Collections.singletonMap(CONN1, 3), - Collections.singletonMap(CONN1, CONN1_CONFIG), TASK_CONFIGS_MAP, Collections.<String>emptySet()); - private static final ClusterConfigState SNAPSHOT_UPDATED_CONN1_CONFIG = new ClusterConfigState(1, Collections.singletonMap(CONN1, 3), - Collections.singletonMap(CONN1, CONN1_CONFIG_UPDATED), TASK_CONFIGS_MAP, Collections.<String>emptySet()); - - @Mock private KafkaConfigStorage configStorage; - @Mock private WorkerGroupMember member; - private MockTime time; - private DistributedHerder herder; - @Mock private Worker worker; - @Mock private Callback<Herder.Created<ConnectorInfo>> putConnectorCallback; - - private Callback<String> connectorConfigCallback; - private Callback<List<ConnectorTaskId>> taskConfigCallback; - private WorkerRebalanceListener rebalanceListener; - - @Before - public void setUp() throws Exception { - worker = PowerMock.createMock(Worker.class); - time = new MockTime(); - - herder = PowerMock.createPartialMock(DistributedHerder.class, new String[]{"backoff"}, - new DistributedConfig(HERDER_CONFIG), worker, configStorage, member, MEMBER_URL, time); - connectorConfigCallback = Whitebox.invokeMethod(herder, "connectorConfigCallback"); - taskConfigCallback = Whitebox.invokeMethod(herder, "taskConfigCallback"); - rebalanceListener = Whitebox.invokeMethod(herder, "rebalanceListener"); - } - - @Test - public void testJoinAssignment() { - // Join group and get assignment - EasyMock.expect(member.memberId()).andStubReturn("member"); - expectRebalance(1, Arrays.asList(CONN1), Arrays.asList(TASK1)); - expectPostRebalanceCatchup(SNAPSHOT); - worker.addConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject()); - PowerMock.expectLastCall(); - EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS); - worker.addTask(EasyMock.eq(TASK1), EasyMock.<TaskConfig>anyObject()); - PowerMock.expectLastCall(); - member.poll(EasyMock.anyInt()); - PowerMock.expectLastCall(); - - PowerMock.replayAll(); - - herder.tick(); - - PowerMock.verifyAll(); - } - - @Test - public void testHaltCleansUpWorker() { - EasyMock.expect(worker.connectorNames()).andReturn(Collections.singleton(CONN1)); - worker.stopConnector(CONN1); - PowerMock.expectLastCall(); - EasyMock.expect(worker.taskIds()).andReturn(Collections.singleton(TASK1)); - worker.stopTask(TASK1); - PowerMock.expectLastCall(); - member.stop(); - PowerMock.expectLastCall(); - configStorage.stop(); - PowerMock.expectLastCall(); - - PowerMock.replayAll(); - - herder.halt(); - - PowerMock.verifyAll(); - } - - @Test - public void testCreateConnector() throws Exception { - EasyMock.expect(member.memberId()).andStubReturn("leader"); - expectRebalance(1, Collections.<String>emptyList(), Collections.<ConnectorTaskId>emptyList()); - expectPostRebalanceCatchup(SNAPSHOT); - - member.wakeup(); - PowerMock.expectLastCall(); - // CONN2 is new, should succeed - configStorage.putConnectorConfig(CONN2, CONN2_CONFIG); - PowerMock.expectLastCall(); - ConnectorInfo info = new ConnectorInfo(CONN2, CONN2_CONFIG, Collections.<ConnectorTaskId>emptyList()); - putConnectorCallback.onCompletion(null, new Herder.Created<>(true, info)); - PowerMock.expectLastCall(); - member.poll(EasyMock.anyInt()); - PowerMock.expectLastCall(); - // No immediate action besides this -- change will be picked up via the config log - - PowerMock.replayAll(); - - herder.putConnectorConfig(CONN2, CONN2_CONFIG, false, putConnectorCallback); - herder.tick(); - - PowerMock.verifyAll(); - } - - @Test - public void testCreateConnectorAlreadyExists() throws Exception { - EasyMock.expect(member.memberId()).andStubReturn("leader"); - expectRebalance(1, Collections.<String>emptyList(), Collections.<ConnectorTaskId>emptyList()); - expectPostRebalanceCatchup(SNAPSHOT); - - member.wakeup(); - PowerMock.expectLastCall(); - // CONN1 already exists - putConnectorCallback.onCompletion(EasyMock.<AlreadyExistsException>anyObject(), EasyMock.<Herder.Created<ConnectorInfo>>isNull()); - PowerMock.expectLastCall(); - member.poll(EasyMock.anyInt()); - PowerMock.expectLastCall(); - // No immediate action besides this -- change will be picked up via the config log - - PowerMock.replayAll(); - - herder.putConnectorConfig(CONN1, CONN1_CONFIG, false, putConnectorCallback); - herder.tick(); - - PowerMock.verifyAll(); - } - - @Test - public void testDestroyConnector() throws Exception { - EasyMock.expect(member.memberId()).andStubReturn("leader"); - // Start with one connector - expectRebalance(1, Arrays.asList(CONN1), Collections.<ConnectorTaskId>emptyList()); - expectPostRebalanceCatchup(SNAPSHOT); - worker.addConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject()); - PowerMock.expectLastCall(); - EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS); - - // And delete the connector - member.wakeup(); - PowerMock.expectLastCall(); - configStorage.putConnectorConfig(CONN1, null); - PowerMock.expectLastCall(); - putConnectorCallback.onCompletion(null, new Herder.Created<ConnectorInfo>(false, null)); - PowerMock.expectLastCall(); - member.poll(EasyMock.anyInt()); - PowerMock.expectLastCall(); - // No immediate action besides this -- change will be picked up via the config log - - PowerMock.replayAll(); - - herder.putConnectorConfig(CONN1, null, true, putConnectorCallback); - herder.tick(); - - PowerMock.verifyAll(); - } - - @Test - public void testConnectorConfigAdded() { - // If a connector was added, we need to rebalance - EasyMock.expect(member.memberId()).andStubReturn("member"); - - // join, no configs so no need to catch up on config topic - expectRebalance(-1, Collections.<String>emptyList(), Collections.<ConnectorTaskId>emptyList()); - member.poll(EasyMock.anyInt()); - PowerMock.expectLastCall(); - - // apply config - member.wakeup(); - member.ensureActive(); - PowerMock.expectLastCall(); - // Checks for config updates and starts rebalance - EasyMock.expect(configStorage.snapshot()).andReturn(SNAPSHOT); - member.requestRejoin(); - PowerMock.expectLastCall(); - // Performs rebalance and gets new assignment - expectRebalance(Collections.<String>emptyList(), Collections.<ConnectorTaskId>emptyList(), - CopycatProtocol.Assignment.NO_ERROR, 1, Arrays.asList(CONN1), Collections.<ConnectorTaskId>emptyList()); - worker.addConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject()); - PowerMock.expectLastCall(); - EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS); - member.poll(EasyMock.anyInt()); - PowerMock.expectLastCall(); - - PowerMock.replayAll(); - - herder.tick(); // join - connectorConfigCallback.onCompletion(null, CONN1); // read updated config - herder.tick(); // apply config - herder.tick(); // do rebalance - - PowerMock.verifyAll(); - } - - @Test - public void testConnectorConfigUpdate() { - // Connector config can be applied without any rebalance - - EasyMock.expect(member.memberId()).andStubReturn("member"); - EasyMock.expect(worker.connectorNames()).andStubReturn(Collections.singleton(CONN1)); - - // join - expectRebalance(1, Arrays.asList(CONN1), Collections.<ConnectorTaskId>emptyList()); - expectPostRebalanceCatchup(SNAPSHOT); - worker.addConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject()); - PowerMock.expectLastCall(); - EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS); - member.poll(EasyMock.anyInt()); - PowerMock.expectLastCall(); - - // apply config - member.wakeup(); - member.ensureActive(); - PowerMock.expectLastCall(); - EasyMock.expect(configStorage.snapshot()).andReturn(SNAPSHOT); // for this test, it doesn't matter if we use the same config snapshot - worker.stopConnector(CONN1); - PowerMock.expectLastCall(); - worker.addConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject()); - PowerMock.expectLastCall(); - EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS); - member.poll(EasyMock.anyInt()); - PowerMock.expectLastCall(); - - PowerMock.replayAll(); - - herder.tick(); // join - connectorConfigCallback.onCompletion(null, CONN1); // read updated config - herder.tick(); // apply config - - PowerMock.verifyAll(); - } - - @Test - public void testTaskConfigAdded() { - // Task config always requires rebalance - EasyMock.expect(member.memberId()).andStubReturn("member"); - - // join - expectRebalance(-1, Collections.<String>emptyList(), Collections.<ConnectorTaskId>emptyList()); - member.poll(EasyMock.anyInt()); - PowerMock.expectLastCall(); - - // apply config - member.wakeup(); - member.ensureActive(); - PowerMock.expectLastCall(); - // Checks for config updates and starts rebalance - EasyMock.expect(configStorage.snapshot()).andReturn(SNAPSHOT); - member.requestRejoin(); - PowerMock.expectLastCall(); - // Performs rebalance and gets new assignment - expectRebalance(Collections.<String>emptyList(), Collections.<ConnectorTaskId>emptyList(), - CopycatProtocol.Assignment.NO_ERROR, 1, Collections.<String>emptyList(), Arrays.asList(TASK0)); - worker.addTask(EasyMock.eq(TASK0), EasyMock.<TaskConfig>anyObject()); - PowerMock.expectLastCall(); - member.poll(EasyMock.anyInt()); - PowerMock.expectLastCall(); - - PowerMock.replayAll(); - - herder.tick(); // join - taskConfigCallback.onCompletion(null, Arrays.asList(TASK0, TASK1, TASK2)); // read updated config - herder.tick(); // apply config - herder.tick(); // do rebalance - - PowerMock.verifyAll(); - } - - @Test - public void testJoinLeaderCatchUpFails() throws Exception { - // Join group and as leader fail to do assignment - EasyMock.expect(member.memberId()).andStubReturn("leader"); - expectRebalance(Collections.<String>emptyList(), Collections.<ConnectorTaskId>emptyList(), - CopycatProtocol.Assignment.CONFIG_MISMATCH, 1, Collections.<String>emptyList(), Collections.<ConnectorTaskId>emptyList()); - // Reading to end of log times out - TestFuture<Void> readToEndFuture = new TestFuture<>(); - readToEndFuture.resolveOnGet(new TimeoutException()); - EasyMock.expect(configStorage.readToEnd()).andReturn(readToEndFuture); - PowerMock.expectPrivate(herder, "backoff", DistributedConfig.WORKER_UNSYNC_BACKOFF_MS_DEFAULT); - member.requestRejoin(); - - // After backoff, restart the process and this time succeed - expectRebalance(1, Arrays.asList(CONN1), Arrays.asList(TASK1)); - expectPostRebalanceCatchup(SNAPSHOT); - - worker.addConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject()); - PowerMock.expectLastCall(); - EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS); - worker.addTask(EasyMock.eq(TASK1), EasyMock.<TaskConfig>anyObject()); - PowerMock.expectLastCall(); - member.poll(EasyMock.anyInt()); - PowerMock.expectLastCall(); - - PowerMock.replayAll(); - - herder.tick(); - herder.tick(); - - PowerMock.verifyAll(); - } - - @Test - public void testAccessors() throws Exception { - EasyMock.expect(member.memberId()).andStubReturn("leader"); - expectRebalance(1, Collections.<String>emptyList(), Collections.<ConnectorTaskId>emptyList()); - expectPostRebalanceCatchup(SNAPSHOT); - - - member.wakeup(); - PowerMock.expectLastCall().anyTimes(); - // list connectors, get connector info, get connector config, get task configs - member.poll(EasyMock.anyInt()); - PowerMock.expectLastCall(); - - - PowerMock.replayAll(); - - FutureCallback<Collection<String>> listConnectorsCb = new FutureCallback<>(); - herder.connectors(listConnectorsCb); - FutureCallback<ConnectorInfo> connectorInfoCb = new FutureCallback<>(); - herder.connectorInfo(CONN1, connectorInfoCb); - FutureCallback<Map<String, String>> connectorConfigCb = new FutureCallback<>(); - herder.connectorConfig(CONN1, connectorConfigCb); - FutureCallback<List<TaskInfo>> taskConfigsCb = new FutureCallback<>(); - herder.taskConfigs(CONN1, taskConfigsCb); - - herder.tick(); - assertTrue(listConnectorsCb.isDone()); - assertEquals(Collections.singleton(CONN1), listConnectorsCb.get()); - assertTrue(connectorInfoCb.isDone()); - ConnectorInfo info = new ConnectorInfo(CONN1, CONN1_CONFIG, Arrays.asList(TASK0, TASK1, TASK2)); - assertEquals(info, connectorInfoCb.get()); - assertTrue(connectorConfigCb.isDone()); - assertEquals(CONN1_CONFIG, connectorConfigCb.get()); - assertTrue(taskConfigsCb.isDone()); - assertEquals(Arrays.asList( - new TaskInfo(TASK0, TASK_CONFIG), - new TaskInfo(TASK1, TASK_CONFIG), - new TaskInfo(TASK2, TASK_CONFIG)), - taskConfigsCb.get()); - - PowerMock.verifyAll(); - } - - @Test - public void testPutConnectorConfig() throws Exception { - EasyMock.expect(member.memberId()).andStubReturn("leader"); - expectRebalance(1, Arrays.asList(CONN1), Collections.<ConnectorTaskId>emptyList()); - expectPostRebalanceCatchup(SNAPSHOT); - worker.addConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject()); - PowerMock.expectLastCall(); - EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS); - - // list connectors, get connector info, get connector config, get task configs - member.wakeup(); - PowerMock.expectLastCall().anyTimes(); - member.poll(EasyMock.anyInt()); - PowerMock.expectLastCall(); - - // Poll loop for second round of calls - member.ensureActive(); - PowerMock.expectLastCall(); - configStorage.putConnectorConfig(CONN1, CONN1_CONFIG_UPDATED); - PowerMock.expectLastCall().andAnswer(new IAnswer<Object>() { - @Override - public Object answer() throws Throwable { - // Simulate response to writing config + waiting until end of log to be read - connectorConfigCallback.onCompletion(null, CONN1); - return null; - } - }); - // As a result of reconfig, should need to update snapshot. With only connector updates, we'll just restart - // connector without rebalance - EasyMock.expect(configStorage.snapshot()).andReturn(SNAPSHOT_UPDATED_CONN1_CONFIG); - worker.stopConnector(CONN1); - PowerMock.expectLastCall(); - Capture<ConnectorConfig> capturedUpdatedConfig = EasyMock.newCapture(); - worker.addConnector(EasyMock.capture(capturedUpdatedConfig), EasyMock.<ConnectorContext>anyObject()); - PowerMock.expectLastCall(); - EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS); - member.poll(EasyMock.anyInt()); - PowerMock.expectLastCall(); - - // Third tick just to read the config - member.ensureActive(); - PowerMock.expectLastCall(); - member.poll(EasyMock.anyInt()); - PowerMock.expectLastCall(); - - PowerMock.replayAll(); - - // Should pick up original config - FutureCallback<Map<String, String>> connectorConfigCb = new FutureCallback<>(); - herder.connectorConfig(CONN1, connectorConfigCb); - herder.tick(); - assertTrue(connectorConfigCb.isDone()); - assertEquals(CONN1_CONFIG, connectorConfigCb.get()); - - // Apply new config. - FutureCallback<Herder.Created<ConnectorInfo>> putConfigCb = new FutureCallback<>(); - herder.putConnectorConfig(CONN1, CONN1_CONFIG_UPDATED, true, putConfigCb); - herder.tick(); - assertTrue(putConfigCb.isDone()); - ConnectorInfo updatedInfo = new ConnectorInfo(CONN1, CONN1_CONFIG_UPDATED, Arrays.asList(TASK0, TASK1, TASK2)); - assertEquals(new Herder.Created<>(false, updatedInfo), putConfigCb.get()); - - // Check config again to validate change - connectorConfigCb = new FutureCallback<>(); - herder.connectorConfig(CONN1, connectorConfigCb); - herder.tick(); - assertTrue(connectorConfigCb.isDone()); - assertEquals(CONN1_CONFIG_UPDATED, connectorConfigCb.get()); - // The config passed to Worker should - assertEquals(Arrays.asList("foo", "bar", "baz"), - capturedUpdatedConfig.getValue().getList(ConnectorConfig.TOPICS_CONFIG)); - PowerMock.verifyAll(); - } - - @Test - public void testInconsistentConfigs() throws Exception { - // FIXME: if we have inconsistent configs, we need to request forced reconfig + write of the connector's task configs - // This requires inter-worker communication, so needs the REST API - } - - - private void expectRebalance(final long offset, final List<String> assignedConnectors, final List<ConnectorTaskId> assignedTasks) { - expectRebalance(null, null, CopycatProtocol.Assignment.NO_ERROR, offset, assignedConnectors, assignedTasks); - } - - // Handles common initial part of rebalance callback. Does not handle instantiation of connectors and tasks. - private void expectRebalance(final Collection<String> revokedConnectors, final List<ConnectorTaskId> revokedTasks, - final short error, final long offset, final List<String> assignedConnectors, final List<ConnectorTaskId> assignedTasks) { - member.ensureActive(); - PowerMock.expectLastCall().andAnswer(new IAnswer<Object>() { - @Override - public Object answer() throws Throwable { - if (revokedConnectors != null) - rebalanceListener.onRevoked("leader", revokedConnectors, revokedTasks); - CopycatProtocol.Assignment assignment = new CopycatProtocol.Assignment( - error, "leader", "leaderUrl", offset, assignedConnectors, assignedTasks); - rebalanceListener.onAssigned(assignment); - return null; - } - }); - member.wakeup(); - PowerMock.expectLastCall(); - } - - private void expectPostRebalanceCatchup(final ClusterConfigState readToEndSnapshot) { - TestFuture<Void> readToEndFuture = new TestFuture<>(); - readToEndFuture.resolveOnGet((Void) null); - EasyMock.expect(configStorage.readToEnd()).andReturn(readToEndFuture); - EasyMock.expect(configStorage.snapshot()).andReturn(readToEndSnapshot); - } - - - // We need to use a real class here due to some issue with mocking java.lang.Class - private abstract class BogusSourceConnector extends SourceConnector { - } - - private abstract class BogusSourceTask extends SourceTask { - } - -}