Repository: kafka Updated Branches: refs/heads/trunk 21443f214 -> 2e6177359
http://git-wip-us.apache.org/repos/asf/kafka/blob/2e617735/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerderTest.java ---------------------------------------------------------------------- diff --git a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerderTest.java b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerderTest.java index 0463b85..1213656 100644 --- a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerderTest.java +++ b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerderTest.java @@ -17,20 +17,19 @@ package org.apache.kafka.copycat.runtime.distributed; -import org.apache.kafka.copycat.connector.Connector; -import org.apache.kafka.copycat.connector.Task; +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.copycat.connector.ConnectorContext; import org.apache.kafka.copycat.runtime.ConnectorConfig; -import org.apache.kafka.copycat.runtime.HerderConnectorContext; +import org.apache.kafka.copycat.runtime.TaskConfig; import org.apache.kafka.copycat.runtime.Worker; -import org.apache.kafka.copycat.sink.SinkConnector; -import org.apache.kafka.copycat.sink.SinkTask; import org.apache.kafka.copycat.source.SourceConnector; import org.apache.kafka.copycat.source.SourceTask; import org.apache.kafka.copycat.storage.KafkaConfigStorage; import org.apache.kafka.copycat.util.Callback; import org.apache.kafka.copycat.util.ConnectorTaskId; -import org.apache.kafka.copycat.util.FutureCallback; +import org.apache.kafka.copycat.util.TestFuture; import org.easymock.EasyMock; +import org.easymock.IAnswer; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -39,251 +38,354 @@ import org.powermock.api.easymock.annotation.Mock; import org.powermock.core.classloader.annotations.PowerMockIgnore; import org.powermock.core.classloader.annotations.PrepareForTest; import org.powermock.modules.junit4.PowerMockRunner; +import org.powermock.reflect.Whitebox; -import java.util.*; -import java.util.concurrent.TimeUnit; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeoutException; @RunWith(PowerMockRunner.class) -@PrepareForTest({DistributedHerder.class}) +@PrepareForTest(DistributedHerder.class) @PowerMockIgnore("javax.management.*") public class DistributedHerderTest { - private static final List<String> CONNECTOR_NAMES = Arrays.asList("source-test1", "source-test2", "sink-test3"); - private static final List<String> SOURCE_CONNECTOR_NAMES = Arrays.asList("source-test1", "source-test2"); - private static final List<String> SINK_CONNECTOR_NAMES = Arrays.asList("sink-test3"); - private static final String TOPICS_LIST_STR = "topic1,topic2"; + private static final Map<String, String> HERDER_CONFIG = new HashMap<>(); + static { + HERDER_CONFIG.put(KafkaConfigStorage.CONFIG_TOPIC_CONFIG, "config-topic"); + HERDER_CONFIG.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); + HERDER_CONFIG.put(DistributedHerderConfig.GROUP_ID_CONFIG, "test-copycat-group"); + } - private static final Map<String, String> CONFIG_STORAGE_CONFIG = Collections.singletonMap(KafkaConfigStorage.CONFIG_TOPIC_CONFIG, "config-topic"); + private static final String CONN1 = "sourceA"; + private static final String CONN2 = "sourceA"; + private static final ConnectorTaskId TASK0 = new ConnectorTaskId(CONN1, 0); + private static final ConnectorTaskId TASK1 = new ConnectorTaskId(CONN1, 1); + private static final ConnectorTaskId TASK2 = new ConnectorTaskId(CONN1, 2); + private static final Integer MAX_TASKS = 3; + private static final Map<String, String> CONNECTOR_CONFIG = new HashMap<>(); + static { + CONNECTOR_CONFIG.put(ConnectorConfig.NAME_CONFIG, "sourceA"); + CONNECTOR_CONFIG.put(ConnectorConfig.TASKS_MAX_CONFIG, MAX_TASKS.toString()); + CONNECTOR_CONFIG.put(ConnectorConfig.TOPICS_CONFIG, "foo,bar"); + CONNECTOR_CONFIG.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, BogusSourceConnector.class.getName()); + } + private static final Map<String, String> TASK_CONFIG = new HashMap<>(); + static { + TASK_CONFIG.put(TaskConfig.TASK_CLASS_CONFIG, BogusSourceTask.class.getName()); + } + private static final HashMap<ConnectorTaskId, Map<String, String>> TASK_CONFIGS = new HashMap<>(); + static { + TASK_CONFIGS.put(TASK0, TASK_CONFIG); + TASK_CONFIGS.put(TASK1, TASK_CONFIG); + TASK_CONFIGS.put(TASK2, TASK_CONFIG); + } + private static final ClusterConfigState SNAPSHOT = new ClusterConfigState(1, Collections.singletonMap(CONN1, 3), + Collections.singletonMap(CONN1, CONNECTOR_CONFIG), TASK_CONFIGS, Collections.<String>emptySet()); @Mock private KafkaConfigStorage configStorage; + @Mock private WorkerGroupMember member; private DistributedHerder herder; @Mock private Worker worker; @Mock private Callback<String> createCallback; + @Mock private Callback<Void> destroyCallback; - private Map<String, Map<String, String>> connectorProps; - private Map<String, Class<? extends Connector>> connectorClasses; - private Map<String, Class<? extends Task>> connectorTaskClasses; - private Map<String, Connector> connectors; - private Properties taskProps; + private Callback<String> connectorConfigCallback; + private Callback<List<ConnectorTaskId>> taskConfigCallback; + private WorkerRebalanceListener rebalanceListener; @Before - public void setUp() { + public void setUp() throws Exception { worker = PowerMock.createMock(Worker.class); - herder = new DistributedHerder(worker, configStorage); - - connectorProps = new HashMap<>(); - connectorClasses = new HashMap<>(); - connectorTaskClasses = new HashMap<>(); - connectors = new HashMap<>(); - for (String connectorName : CONNECTOR_NAMES) { - Class<? extends Connector> connectorClass = connectorName.contains("source") ? BogusSourceConnector.class : BogusSinkConnector.class; - Class<? extends Task> taskClass = connectorName.contains("source") ? BogusSourceTask.class : BogusSinkTask.class; - Connector connector = connectorName.contains("source") ? PowerMock.createMock(BogusSourceConnector.class) : PowerMock.createMock(BogusSinkConnector.class); - - Map<String, String> props = new HashMap<>(); - props.put(ConnectorConfig.NAME_CONFIG, connectorName); - props.put(SinkConnector.TOPICS_CONFIG, TOPICS_LIST_STR); - props.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, connectorClass.getName()); - - connectorProps.put(connectorName, props); - connectorClasses.put(connectorName, connectorClass); - connectorTaskClasses.put(connectorName, taskClass); - connectors.put(connectorName, connector); - } - - PowerMock.mockStatic(DistributedHerder.class); - - // These can be anything since connectors can pass along whatever they want. - taskProps = new Properties(); - taskProps.setProperty("foo", "bar"); + + herder = PowerMock.createPartialMock(DistributedHerder.class, new String[]{"backoff"}, + worker, HERDER_CONFIG, configStorage, member); + connectorConfigCallback = Whitebox.invokeMethod(herder, "connectorConfigCallback"); + taskConfigCallback = Whitebox.invokeMethod(herder, "taskConfigCallback"); + rebalanceListener = Whitebox.invokeMethod(herder, "rebalanceListener"); } @Test - public void testCreateSourceConnector() throws Exception { - String connectorName = SOURCE_CONNECTOR_NAMES.get(0); + public void testJoinAssignment() { + // Join group and get assignment + EasyMock.expect(member.memberId()).andStubReturn("member"); + expectRebalance(1, Arrays.asList(CONN1), Arrays.asList(TASK1)); + expectPostRebalanceCatchup(SNAPSHOT); + worker.addConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject()); + PowerMock.expectLastCall(); + EasyMock.expect(worker.reconfigureConnectorTasks(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS); + worker.addTask(EasyMock.eq(TASK1), EasyMock.<TaskConfig>anyObject()); + PowerMock.expectLastCall(); + member.poll(EasyMock.anyInt()); + PowerMock.expectLastCall(); - expectConfigStorageConfigureStart(); - expectEmptyRestore(); - expectAdd(connectorName); PowerMock.replayAll(); - herder.configure(CONFIG_STORAGE_CONFIG); - herder.start(); - herder.addConnector(connectorProps.get(connectorName), createCallback); + herder.tick(); PowerMock.verifyAll(); } @Test - public void testCreateSinkConnector() throws Exception { - String connectorName = SINK_CONNECTOR_NAMES.get(0); + public void testHaltCleansUpWorker() { + EasyMock.expect(worker.connectorNames()).andReturn(Collections.singleton(CONN1)); + worker.stopConnector(CONN1); + PowerMock.expectLastCall(); + EasyMock.expect(worker.taskIds()).andReturn(Collections.singleton(TASK1)); + worker.stopTask(TASK1); + PowerMock.expectLastCall(); + member.stop(); + PowerMock.expectLastCall(); + configStorage.stop(); + PowerMock.expectLastCall(); - expectConfigStorageConfigureStart(); - expectEmptyRestore(); - expectAdd(connectorName); PowerMock.replayAll(); - herder.configure(CONFIG_STORAGE_CONFIG); - herder.start(); - herder.addConnector(connectorProps.get(connectorName), createCallback); + herder.halt(); PowerMock.verifyAll(); } @Test - public void testDestroyConnector() throws Exception { - String connectorName = SOURCE_CONNECTOR_NAMES.get(0); - - expectConfigStorageConfigureStart(); - expectEmptyRestore(); - expectAdd(connectorName); - expectDestroy(connectorName); - PowerMock.replayAll(); + public void testCreateConnector() throws Exception { + EasyMock.expect(member.memberId()).andStubReturn("leader"); + expectRebalance(1, Collections.<String>emptyList(), Collections.<ConnectorTaskId>emptyList()); + expectPostRebalanceCatchup(SNAPSHOT); - herder.configure(CONFIG_STORAGE_CONFIG); - herder.start(); - herder.addConnector(connectorProps.get(connectorName), createCallback); + member.wakeup(); + PowerMock.expectLastCall(); + configStorage.putConnectorConfig(CONN1, CONNECTOR_CONFIG); + PowerMock.expectLastCall(); + createCallback.onCompletion(null, CONN1); + PowerMock.expectLastCall(); + member.poll(EasyMock.anyInt()); + PowerMock.expectLastCall(); + // No immediate action besides this -- change will be picked up via the config log - FutureCallback<Void> futureCb = new FutureCallback<>(new Callback<Void>() { - @Override - public void onCompletion(Throwable error, Void result) { + PowerMock.replayAll(); - } - }); - herder.deleteConnector(CONNECTOR_NAMES.get(0), futureCb); - futureCb.get(1000L, TimeUnit.MILLISECONDS); + herder.addConnector(CONNECTOR_CONFIG, createCallback); + herder.tick(); PowerMock.verifyAll(); } @Test - public void testCreateAndStop() throws Exception { - String connectorName = SOURCE_CONNECTOR_NAMES.get(0); + public void testDestroyConnector() throws Exception { + EasyMock.expect(member.memberId()).andStubReturn("leader"); + // Start with one connector + expectRebalance(1, Arrays.asList(CONN1), Collections.<ConnectorTaskId>emptyList()); + expectPostRebalanceCatchup(SNAPSHOT); + worker.addConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject()); + PowerMock.expectLastCall(); + EasyMock.expect(worker.reconfigureConnectorTasks(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS); + + // And delete the connector + member.wakeup(); + PowerMock.expectLastCall(); + configStorage.putConnectorConfig(CONN1, null); + PowerMock.expectLastCall(); + destroyCallback.onCompletion(null, null); + PowerMock.expectLastCall(); + member.poll(EasyMock.anyInt()); + PowerMock.expectLastCall(); + // No immediate action besides this -- change will be picked up via the config log - expectConfigStorageConfigureStart(); - expectEmptyRestore(); - expectAdd(connectorName); PowerMock.replayAll(); - herder.configure(CONFIG_STORAGE_CONFIG); - herder.start(); - herder.addConnector(connectorProps.get(connectorName), createCallback); + herder.deleteConnector(CONN1, destroyCallback); + herder.tick(); PowerMock.verifyAll(); } @Test - public void testRestoreAndStop() throws Exception { - String restoreConnectorName1 = SOURCE_CONNECTOR_NAMES.get(0); - String restoreConnectorName2 = SINK_CONNECTOR_NAMES.get(0); - String additionalConnectorName = SOURCE_CONNECTOR_NAMES.get(1); - - expectConfigStorageConfigureStart(); - expectRestore(Arrays.asList(restoreConnectorName1, restoreConnectorName2)); - expectAdd(additionalConnectorName); - // Stopping the herder should correctly stop all restored and new connectors - expectStop(restoreConnectorName1); - expectStop(restoreConnectorName2); - expectStop(additionalConnectorName); - configStorage.stop(); + public void testConnectorConfigAdded() { + // If a connector was added, we need to rebalance + EasyMock.expect(member.memberId()).andStubReturn("member"); + + // join, no configs so no need to catch up on config topic + expectRebalance(-1, Collections.<String>emptyList(), Collections.<ConnectorTaskId>emptyList()); + member.poll(EasyMock.anyInt()); + PowerMock.expectLastCall(); + + // apply config + member.wakeup(); + member.ensureActive(); + PowerMock.expectLastCall(); + // Checks for config updates and starts rebalance + EasyMock.expect(configStorage.snapshot()).andReturn(SNAPSHOT); + member.requestRejoin(); + PowerMock.expectLastCall(); + // Performs rebalance and gets new assignment + expectRebalance(Collections.<String>emptyList(), Collections.<ConnectorTaskId>emptyList(), + CopycatProtocol.Assignment.NO_ERROR, 1, Arrays.asList(CONN1), Collections.<ConnectorTaskId>emptyList()); + worker.addConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject()); + PowerMock.expectLastCall(); + EasyMock.expect(worker.reconfigureConnectorTasks(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS); + member.poll(EasyMock.anyInt()); PowerMock.expectLastCall(); PowerMock.replayAll(); - herder.configure(CONFIG_STORAGE_CONFIG); - herder.start(); - herder.addConnector(connectorProps.get(additionalConnectorName), createCallback); - herder.stop(); + herder.tick(); // join + connectorConfigCallback.onCompletion(null, CONN1); // read updated config + herder.tick(); // apply config + herder.tick(); // do rebalance PowerMock.verifyAll(); } - private void expectConfigStorageConfigureStart() { - configStorage.configure(CONFIG_STORAGE_CONFIG); + @Test + public void testConnectorConfigUpdate() { + // Connector config can be applied without any rebalance + + EasyMock.expect(member.memberId()).andStubReturn("member"); + EasyMock.expect(worker.connectorNames()).andStubReturn(Collections.singleton(CONN1)); + + // join + expectRebalance(1, Arrays.asList(CONN1), Collections.<ConnectorTaskId>emptyList()); + expectPostRebalanceCatchup(SNAPSHOT); + worker.addConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject()); PowerMock.expectLastCall(); - configStorage.start(); + EasyMock.expect(worker.reconfigureConnectorTasks(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS); + member.poll(EasyMock.anyInt()); PowerMock.expectLastCall(); - } - private void expectAdd(String connectorName) throws Exception { - configStorage.putConnectorConfig(connectorName, connectorProps.get(connectorName)); + // apply config + member.wakeup(); + member.ensureActive(); + PowerMock.expectLastCall(); + EasyMock.expect(configStorage.snapshot()).andReturn(SNAPSHOT); // for this test, it doesn't matter if we use the same config snapshot + worker.stopConnector(CONN1); + PowerMock.expectLastCall(); + worker.addConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject()); + PowerMock.expectLastCall(); + EasyMock.expect(worker.reconfigureConnectorTasks(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS); + member.poll(EasyMock.anyInt()); PowerMock.expectLastCall(); - expectInstantiateConnector(connectorName, true); - } - private void expectEmptyRestore() throws Exception { - expectRestore(Collections.<String>emptyList()); - } + PowerMock.replayAll(); + + herder.tick(); // join + connectorConfigCallback.onCompletion(null, CONN1); // read updated config + herder.tick(); // apply config - private void expectRestore(List<String> connectorNames) throws Exception { - Map<String, Integer> rootConfig = new HashMap<>(); - Map<String, Map<String, String>> connectorConfigs = new HashMap<>(); - for (String connName : connectorNames) { - rootConfig.put(connName, 0); - connectorConfigs.put(connName, connectorProps.get(connName)); - } - EasyMock.expect(configStorage.snapshot()) - .andReturn(new ClusterConfigState(1, rootConfig, connectorConfigs, Collections.EMPTY_MAP, Collections.EMPTY_SET)); - - // Restore never uses a callback - for (String connectorName : connectorNames) - expectInstantiateConnector(connectorName, false); + PowerMock.verifyAll(); } - private void expectInstantiateConnector(String connectorName, boolean expectCallback) throws Exception { - PowerMock.expectPrivate(DistributedHerder.class, "instantiateConnector", connectorClasses.get(connectorName).getName()) - .andReturn(connectors.get(connectorName)); - if (expectCallback) { - createCallback.onCompletion(null, connectorName); - PowerMock.expectLastCall(); - } + @Test + public void testTaskConfigAdded() { + // Task config always requires rebalance + EasyMock.expect(member.memberId()).andStubReturn("member"); + + // join + expectRebalance(-1, Collections.<String>emptyList(), Collections.<ConnectorTaskId>emptyList()); + member.poll(EasyMock.anyInt()); + PowerMock.expectLastCall(); - Connector connector = connectors.get(connectorName); - connector.initialize(EasyMock.anyObject(HerderConnectorContext.class)); + // apply config + member.wakeup(); + member.ensureActive(); + PowerMock.expectLastCall(); + // Checks for config updates and starts rebalance + EasyMock.expect(configStorage.snapshot()).andReturn(SNAPSHOT); + member.requestRejoin(); PowerMock.expectLastCall(); - connector.start(new Properties()); + // Performs rebalance and gets new assignment + expectRebalance(Collections.<String>emptyList(), Collections.<ConnectorTaskId>emptyList(), + CopycatProtocol.Assignment.NO_ERROR, 1, Collections.<String>emptyList(), Arrays.asList(TASK0)); + worker.addTask(EasyMock.eq(TASK0), EasyMock.<TaskConfig>anyObject()); PowerMock.expectLastCall(); + member.poll(EasyMock.anyInt()); + PowerMock.expectLastCall(); + + PowerMock.replayAll(); - // Just return the connector properties for the individual task we generate by default - EasyMock.<Class<? extends Task>>expect(connector.taskClass()).andReturn(connectorTaskClasses.get(connectorName)); + herder.tick(); // join + taskConfigCallback.onCompletion(null, Arrays.asList(TASK0, TASK1, TASK2)); // read updated config + herder.tick(); // apply config + herder.tick(); // do rebalance - EasyMock.expect(connector.taskConfigs(ConnectorConfig.TASKS_MAX_DEFAULT)) - .andReturn(Arrays.asList(taskProps)); - // And we should instantiate the tasks. For a sink task, we should see added properties for - // the input topic partitions - Properties generatedTaskProps = new Properties(); - generatedTaskProps.putAll(taskProps); - if (connectorName.contains("sink")) - generatedTaskProps.setProperty(SinkTask.TOPICS_CONFIG, TOPICS_LIST_STR); - ConnectorTaskId taskId = new ConnectorTaskId(connectorName, 0); - worker.addTask(taskId, connectorTaskClasses.get(connectorName).getName(), generatedTaskProps); + PowerMock.verifyAll(); + } + + @Test + public void testJoinLeaderCatchUpFails() throws Exception { + // Join group and as leader fail to do assignment + EasyMock.expect(member.memberId()).andStubReturn("leader"); + expectRebalance(Collections.<String>emptyList(), Collections.<ConnectorTaskId>emptyList(), + CopycatProtocol.Assignment.CONFIG_MISMATCH, 1, Collections.<String>emptyList(), Collections.<ConnectorTaskId>emptyList()); + // Reading to end of log times out + TestFuture<Void> readToEndFuture = new TestFuture<>(); + readToEndFuture.resolveOnGet(new TimeoutException()); + EasyMock.expect(configStorage.readToEnd()).andReturn(readToEndFuture); + PowerMock.expectPrivate(herder, "backoff", DistributedHerderConfig.WORKER_UNSYNC_BACKOFF_MS_DEFAULT); + member.requestRejoin(); + + // After backoff, restart the process and this time succeed + expectRebalance(1, Arrays.asList(CONN1), Arrays.asList(TASK1)); + expectPostRebalanceCatchup(SNAPSHOT); + + worker.addConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject()); PowerMock.expectLastCall(); + EasyMock.expect(worker.reconfigureConnectorTasks(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS); + worker.addTask(EasyMock.eq(TASK1), EasyMock.<TaskConfig>anyObject()); + PowerMock.expectLastCall(); + member.poll(EasyMock.anyInt()); + PowerMock.expectLastCall(); + + PowerMock.replayAll(); + + herder.tick(); + herder.tick(); + + PowerMock.verifyAll(); } - private void expectStop(String connectorName) { - worker.stopTask(new ConnectorTaskId(connectorName, 0)); - EasyMock.expectLastCall(); - Connector connector = connectors.get(connectorName); - connector.stop(); - EasyMock.expectLastCall(); + @Test + public void testInconsistentConfigs() throws Exception { + // FIXME: if we have inconsistent configs, we need to request forced reconfig + write of the connector's task configs + // This requires inter-worker communication, so needs the REST API } - private void expectDestroy(String connectorName) { - expectStop(connectorName); - configStorage.putConnectorConfig(connectorName, null); - PowerMock.expectLastCall(); + + private void expectRebalance(final long offset, final List<String> assignedConnectors, final List<ConnectorTaskId> assignedTasks) { + expectRebalance(null, null, CopycatProtocol.Assignment.NO_ERROR, offset, assignedConnectors, assignedTasks); } - // We need to use a real class here due to some issue with mocking java.lang.Class - private abstract class BogusSourceConnector extends SourceConnector { + // Handles common initial part of rebalance callback. Does not handle instantiation of connectors and tasks. + private void expectRebalance(final Collection<String> revokedConnectors, final List<ConnectorTaskId> revokedTasks, + final short error, final long offset, final List<String> assignedConnectors, final List<ConnectorTaskId> assignedTasks) { + member.ensureActive(); + PowerMock.expectLastCall().andAnswer(new IAnswer<Object>() { + @Override + public Object answer() throws Throwable { + if (revokedConnectors != null) + rebalanceListener.onRevoked("leader", revokedConnectors, revokedTasks); + CopycatProtocol.Assignment assignment = new CopycatProtocol.Assignment( + error, "leader", offset, assignedConnectors, assignedTasks); + rebalanceListener.onAssigned(assignment); + return null; + } + }); } - private abstract class BogusSourceTask extends SourceTask { + private void expectPostRebalanceCatchup(final ClusterConfigState readToEndSnapshot) { + TestFuture<Void> readToEndFuture = new TestFuture<>(); + readToEndFuture.resolveOnGet((Void) null); + EasyMock.expect(configStorage.readToEnd()).andReturn(readToEndFuture); + EasyMock.expect(configStorage.snapshot()).andReturn(readToEndSnapshot); } - private abstract class BogusSinkConnector extends SinkConnector { + + // We need to use a real class here due to some issue with mocking java.lang.Class + private abstract class BogusSourceConnector extends SourceConnector { } - private abstract class BogusSinkTask extends SourceTask { + private abstract class BogusSourceTask extends SourceTask { } } http://git-wip-us.apache.org/repos/asf/kafka/blob/2e617735/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/distributed/WorkerCoordinatorTest.java ---------------------------------------------------------------------- diff --git a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/distributed/WorkerCoordinatorTest.java b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/distributed/WorkerCoordinatorTest.java new file mode 100644 index 0000000..30c76a2 --- /dev/null +++ b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/distributed/WorkerCoordinatorTest.java @@ -0,0 +1,436 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ +package org.apache.kafka.copycat.runtime.distributed; + +import org.apache.kafka.clients.ClientRequest; +import org.apache.kafka.clients.Metadata; +import org.apache.kafka.clients.MockClient; +import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient; +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.protocol.types.Struct; +import org.apache.kafka.common.requests.GroupMetadataResponse; +import org.apache.kafka.common.requests.JoinGroupResponse; +import org.apache.kafka.common.requests.SyncGroupRequest; +import org.apache.kafka.common.requests.SyncGroupResponse; +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.copycat.storage.KafkaConfigStorage; +import org.apache.kafka.copycat.util.ConnectorTaskId; +import org.apache.kafka.test.TestUtils; +import org.easymock.EasyMock; +import org.easymock.Mock; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.powermock.api.easymock.PowerMock; +import org.powermock.reflect.Whitebox; + +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; + +public class WorkerCoordinatorTest { + + private String connectorId = "connector"; + private String connectorId2 = "connector2"; + private ConnectorTaskId taskId0 = new ConnectorTaskId(connectorId, 0); + private ConnectorTaskId taskId1 = new ConnectorTaskId(connectorId, 1); + private ConnectorTaskId taskId2 = new ConnectorTaskId(connectorId2, 0); + + private String groupId = "test-group"; + private int sessionTimeoutMs = 10; + private int heartbeatIntervalMs = 2; + private long retryBackoffMs = 100; + private long requestTimeoutMs = 5000; + private MockTime time; + private MockClient client; + private Cluster cluster = TestUtils.singletonCluster("topic", 1); + private Node node = cluster.nodes().get(0); + private Metadata metadata; + private Metrics metrics; + private Map<String, String> metricTags = new LinkedHashMap<>(); + private ConsumerNetworkClient consumerClient; + private MockRebalanceListener rebalanceListener; + @Mock private KafkaConfigStorage configStorage; + private WorkerCoordinator coordinator; + + private ClusterConfigState configState1; + private ClusterConfigState configState2; + + @Before + public void setup() { + this.time = new MockTime(); + this.client = new MockClient(time); + this.metadata = new Metadata(0, Long.MAX_VALUE); + this.metadata.update(cluster, time.milliseconds()); + this.consumerClient = new ConsumerNetworkClient(client, metadata, time, 100); + this.metrics = new Metrics(time); + this.rebalanceListener = new MockRebalanceListener(); + this.configStorage = PowerMock.createMock(KafkaConfigStorage.class); + + client.setNode(node); + + this.coordinator = new WorkerCoordinator(consumerClient, + groupId, + sessionTimeoutMs, + heartbeatIntervalMs, + metrics, + "consumer" + groupId, + metricTags, + time, + requestTimeoutMs, + retryBackoffMs, + configStorage, + rebalanceListener); + + configState1 = new ClusterConfigState( + 1L, Collections.singletonMap(connectorId, 1), + Collections.singletonMap(connectorId, (Map<String, String>) new HashMap<String, String>()), + Collections.singletonMap(taskId0, (Map<String, String>) new HashMap<String, String>()), + Collections.<String>emptySet() + ); + Map<String, Integer> configState2ConnectorTaskCounts = new HashMap<>(); + configState2ConnectorTaskCounts.put(connectorId, 2); + configState2ConnectorTaskCounts.put(connectorId2, 1); + Map<String, Map<String, String>> configState2ConnectorConfigs = new HashMap<>(); + configState2ConnectorConfigs.put(connectorId, new HashMap<String, String>()); + configState2ConnectorConfigs.put(connectorId2, new HashMap<String, String>()); + Map<ConnectorTaskId, Map<String, String>> configState2TaskConfigs = new HashMap<>(); + configState2TaskConfigs.put(taskId0, new HashMap<String, String>()); + configState2TaskConfigs.put(taskId1, new HashMap<String, String>()); + configState2TaskConfigs.put(taskId2, new HashMap<String, String>()); + configState2 = new ClusterConfigState( + 2L, configState2ConnectorTaskCounts, + configState2ConnectorConfigs, + configState2TaskConfigs, + Collections.<String>emptySet() + ); + } + + @After + public void teardown() { + this.metrics.close(); + } + + // We only test functionality unique to WorkerCoordinator. Most functionality is already well tested via the tests + // that cover AbstractCoordinator & ConsumerCoordinator. + + @Test + public void testMetadata() { + EasyMock.expect(configStorage.snapshot()).andReturn(configState1); + + PowerMock.replayAll(); + + LinkedHashMap<String, ByteBuffer> serialized = coordinator.metadata(); + assertEquals(1, serialized.size()); + CopycatProtocol.ConfigState state = CopycatProtocol.deserializeMetadata(serialized.get(WorkerCoordinator.DEFAULT_SUBPROTOCOL)); + assertEquals(1, state.offset()); + + PowerMock.verifyAll(); + } + + @Test + public void testNormalJoinGroupLeader() { + EasyMock.expect(configStorage.snapshot()).andReturn(configState1); + + PowerMock.replayAll(); + + final String consumerId = "leader"; + + client.prepareResponse(groupMetadataResponse(node, Errors.NONE.code())); + coordinator.ensureCoordinatorKnown(); + + // normal join group + Map<String, Long> memberConfigOffsets = new HashMap<>(); + memberConfigOffsets.put("leader", 1L); + memberConfigOffsets.put("member", 1L); + client.prepareResponse(joinGroupLeaderResponse(1, consumerId, memberConfigOffsets, Errors.NONE.code())); + client.prepareResponse(new MockClient.RequestMatcher() { + @Override + public boolean matches(ClientRequest request) { + SyncGroupRequest sync = new SyncGroupRequest(request.request().body()); + return sync.memberId().equals(consumerId) && + sync.generationId() == 1 && + sync.groupAssignment().containsKey(consumerId); + } + }, syncGroupResponse(CopycatProtocol.Assignment.NO_ERROR, "leader", 1L, Collections.singletonList(connectorId), + Collections.<ConnectorTaskId>emptyList(), Errors.NONE.code())); + coordinator.ensureActiveGroup(); + + assertFalse(coordinator.needRejoin()); + assertEquals(0, rebalanceListener.revokedCount); + assertEquals(1, rebalanceListener.assignedCount); + assertFalse(rebalanceListener.assignment.failed()); + assertEquals(1L, rebalanceListener.assignment.offset()); + assertEquals("leader", rebalanceListener.assignment.leader()); + assertEquals(Collections.singletonList(connectorId), rebalanceListener.assignment.connectors()); + assertEquals(Collections.emptyList(), rebalanceListener.assignment.tasks()); + + PowerMock.verifyAll(); + } + + @Test + public void testNormalJoinGroupFollower() { + EasyMock.expect(configStorage.snapshot()).andReturn(configState1); + + PowerMock.replayAll(); + + final String memberId = "member"; + + client.prepareResponse(groupMetadataResponse(node, Errors.NONE.code())); + coordinator.ensureCoordinatorKnown(); + + // normal join group + client.prepareResponse(joinGroupFollowerResponse(1, memberId, "leader", Errors.NONE.code())); + client.prepareResponse(new MockClient.RequestMatcher() { + @Override + public boolean matches(ClientRequest request) { + SyncGroupRequest sync = new SyncGroupRequest(request.request().body()); + return sync.memberId().equals(memberId) && + sync.generationId() == 1 && + sync.groupAssignment().isEmpty(); + } + }, syncGroupResponse(CopycatProtocol.Assignment.NO_ERROR, "leader", 1L, Collections.<String>emptyList(), + Collections.singletonList(taskId0), Errors.NONE.code())); + coordinator.ensureActiveGroup(); + + assertFalse(coordinator.needRejoin()); + assertEquals(0, rebalanceListener.revokedCount); + assertEquals(1, rebalanceListener.assignedCount); + assertFalse(rebalanceListener.assignment.failed()); + assertEquals(1L, rebalanceListener.assignment.offset()); + assertEquals(Collections.emptyList(), rebalanceListener.assignment.connectors()); + assertEquals(Collections.singletonList(taskId0), rebalanceListener.assignment.tasks()); + + PowerMock.verifyAll(); + } + + @Test + public void testJoinLeaderCannotAssign() { + // If the selected leader can't get up to the maximum offset, it will fail to assign and we should immediately + // need to retry the join. + + // When the first round fails, we'll take an updated config snapshot + EasyMock.expect(configStorage.snapshot()).andReturn(configState1); + EasyMock.expect(configStorage.snapshot()).andReturn(configState2); + + PowerMock.replayAll(); + + final String memberId = "member"; + + client.prepareResponse(groupMetadataResponse(node, Errors.NONE.code())); + coordinator.ensureCoordinatorKnown(); + + // config mismatch results in assignment error + client.prepareResponse(joinGroupFollowerResponse(1, memberId, "leader", Errors.NONE.code())); + MockClient.RequestMatcher matcher = new MockClient.RequestMatcher() { + @Override + public boolean matches(ClientRequest request) { + SyncGroupRequest sync = new SyncGroupRequest(request.request().body()); + return sync.memberId().equals(memberId) && + sync.generationId() == 1 && + sync.groupAssignment().isEmpty(); + } + }; + client.prepareResponse(matcher, syncGroupResponse(CopycatProtocol.Assignment.CONFIG_MISMATCH, "leader", 10L, + Collections.<String>emptyList(), Collections.<ConnectorTaskId>emptyList(), Errors.NONE.code())); + client.prepareResponse(joinGroupFollowerResponse(1, memberId, "leader", Errors.NONE.code())); + client.prepareResponse(matcher, syncGroupResponse(CopycatProtocol.Assignment.NO_ERROR, "leader", 1L, + Collections.<String>emptyList(), Collections.singletonList(taskId0), Errors.NONE.code())); + coordinator.ensureActiveGroup(); + + PowerMock.verifyAll(); + } + + @Test + public void testRejoinGroup() { + EasyMock.expect(configStorage.snapshot()).andReturn(configState1); + EasyMock.expect(configStorage.snapshot()).andReturn(configState1); + + PowerMock.replayAll(); + + client.prepareResponse(groupMetadataResponse(node, Errors.NONE.code())); + coordinator.ensureCoordinatorKnown(); + + // join the group once + client.prepareResponse(joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE.code())); + client.prepareResponse(syncGroupResponse(CopycatProtocol.Assignment.NO_ERROR, "leader", 1L, Collections.<String>emptyList(), + Collections.singletonList(taskId0), Errors.NONE.code())); + coordinator.ensureActiveGroup(); + + assertEquals(0, rebalanceListener.revokedCount); + assertEquals(1, rebalanceListener.assignedCount); + assertFalse(rebalanceListener.assignment.failed()); + assertEquals(1L, rebalanceListener.assignment.offset()); + assertEquals(Collections.emptyList(), rebalanceListener.assignment.connectors()); + assertEquals(Collections.singletonList(taskId0), rebalanceListener.assignment.tasks()); + + // and join the group again + coordinator.requestRejoin(); + client.prepareResponse(joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE.code())); + client.prepareResponse(syncGroupResponse(CopycatProtocol.Assignment.NO_ERROR, "leader", 1L, Collections.singletonList(connectorId), + Collections.<ConnectorTaskId>emptyList(), Errors.NONE.code())); + coordinator.ensureActiveGroup(); + + assertEquals(1, rebalanceListener.revokedCount); + assertEquals(Collections.emptyList(), rebalanceListener.revokedConnectors); + assertEquals(Collections.singletonList(taskId0), rebalanceListener.revokedTasks); + assertEquals(2, rebalanceListener.assignedCount); + assertFalse(rebalanceListener.assignment.failed()); + assertEquals(1L, rebalanceListener.assignment.offset()); + assertEquals(Collections.singletonList(connectorId), rebalanceListener.assignment.connectors()); + assertEquals(Collections.emptyList(), rebalanceListener.assignment.tasks()); + + PowerMock.verifyAll(); + } + + @Test + public void testLeaderDoSync1() throws Exception { + // Since all the protocol responses are mocked, the other tests validate doSync runs, but don't validate its + // output. So we test it directly here. + + EasyMock.expect(configStorage.snapshot()).andReturn(configState1); + + PowerMock.replayAll(); + + // Prime the current configuration state + coordinator.metadata(); + + Map<String, ByteBuffer> configs = new HashMap<>(); + // Mark everyone as in sync with configState1 + configs.put("leader", CopycatProtocol.serializeMetadata(new CopycatProtocol.ConfigState(1L))); + configs.put("member", CopycatProtocol.serializeMetadata(new CopycatProtocol.ConfigState(1L))); + Map<String, ByteBuffer> result = Whitebox.invokeMethod(coordinator, "doSync", "leader", WorkerCoordinator.DEFAULT_SUBPROTOCOL, configs); + + // configState1 has 1 connector, 1 task + CopycatProtocol.Assignment leaderAssignment = CopycatProtocol.deserializeAssignment(result.get("leader")); + assertEquals(false, leaderAssignment.failed()); + assertEquals("leader", leaderAssignment.leader()); + assertEquals(1, leaderAssignment.offset()); + assertEquals(Collections.singletonList(connectorId), leaderAssignment.connectors()); + assertEquals(Collections.emptyList(), leaderAssignment.tasks()); + + CopycatProtocol.Assignment memberAssignment = CopycatProtocol.deserializeAssignment(result.get("member")); + assertEquals(false, memberAssignment.failed()); + assertEquals("leader", memberAssignment.leader()); + assertEquals(1, memberAssignment.offset()); + assertEquals(Collections.emptyList(), memberAssignment.connectors()); + assertEquals(Collections.singletonList(taskId0), memberAssignment.tasks()); + + PowerMock.verifyAll(); + } + + @Test + public void testLeaderDoSync2() throws Exception { + // Since all the protocol responses are mocked, the other tests validate doSync runs, but don't validate its + // output. So we test it directly here. + + EasyMock.expect(configStorage.snapshot()).andReturn(configState2); + + PowerMock.replayAll(); + + // Prime the current configuration state + coordinator.metadata(); + + Map<String, ByteBuffer> configs = new HashMap<>(); + // Mark everyone as in sync with configState1 + configs.put("leader", CopycatProtocol.serializeMetadata(new CopycatProtocol.ConfigState(1L))); + configs.put("member", CopycatProtocol.serializeMetadata(new CopycatProtocol.ConfigState(1L))); + Map<String, ByteBuffer> result = Whitebox.invokeMethod(coordinator, "doSync", "leader", WorkerCoordinator.DEFAULT_SUBPROTOCOL, configs); + + // configState2 has 2 connector, 3 tasks and should trigger round robin assignment + CopycatProtocol.Assignment leaderAssignment = CopycatProtocol.deserializeAssignment(result.get("leader")); + assertEquals(false, leaderAssignment.failed()); + assertEquals("leader", leaderAssignment.leader()); + assertEquals(1, leaderAssignment.offset()); + assertEquals(Collections.singletonList(connectorId), leaderAssignment.connectors()); + assertEquals(Arrays.asList(taskId1, taskId2), leaderAssignment.tasks()); + + CopycatProtocol.Assignment memberAssignment = CopycatProtocol.deserializeAssignment(result.get("member")); + assertEquals(false, memberAssignment.failed()); + assertEquals("leader", memberAssignment.leader()); + assertEquals(1, memberAssignment.offset()); + assertEquals(Collections.singletonList(connectorId2), memberAssignment.connectors()); + assertEquals(Collections.singletonList(taskId0), memberAssignment.tasks()); + + PowerMock.verifyAll(); + } + + + private Struct groupMetadataResponse(Node node, short error) { + GroupMetadataResponse response = new GroupMetadataResponse(error, node); + return response.toStruct(); + } + + private Struct joinGroupLeaderResponse(int generationId, String memberId, + Map<String, Long> configOffsets, short error) { + Map<String, ByteBuffer> metadata = new HashMap<>(); + for (Map.Entry<String, Long> configStateEntry : configOffsets.entrySet()) { + ByteBuffer buf = CopycatProtocol.serializeMetadata(new CopycatProtocol.ConfigState(configStateEntry.getValue())); + metadata.put(configStateEntry.getKey(), buf); + } + return new JoinGroupResponse(error, generationId, WorkerCoordinator.DEFAULT_SUBPROTOCOL, memberId, memberId, metadata).toStruct(); + } + + private Struct joinGroupFollowerResponse(int generationId, String memberId, String leaderId, short error) { + return new JoinGroupResponse(error, generationId, WorkerCoordinator.DEFAULT_SUBPROTOCOL, memberId, leaderId, + Collections.<String, ByteBuffer>emptyMap()).toStruct(); + } + + private Struct syncGroupResponse(short assignmentError, String leader, long configOffset, List<String> connectorIds, + List<ConnectorTaskId> taskIds, short error) { + CopycatProtocol.Assignment assignment = new CopycatProtocol.Assignment(assignmentError, leader, configOffset, connectorIds, taskIds); + ByteBuffer buf = CopycatProtocol.serializeAssignment(assignment); + return new SyncGroupResponse(error, buf).toStruct(); + } + + + private static class MockRebalanceListener implements WorkerRebalanceListener { + public CopycatProtocol.Assignment assignment = null; + + public String revokedLeader; + public Collection<String> revokedConnectors; + public Collection<ConnectorTaskId> revokedTasks; + + public int revokedCount = 0; + public int assignedCount = 0; + + @Override + public void onAssigned(CopycatProtocol.Assignment assignment) { + this.assignment = assignment; + assignedCount++; + } + + @Override + public void onRevoked(String leader, Collection<String> connectors, Collection<ConnectorTaskId> tasks) { + this.revokedLeader = leader; + this.revokedConnectors = connectors; + this.revokedTasks = tasks; + revokedCount++; + } + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/2e617735/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/standalone/StandaloneHerderTest.java ---------------------------------------------------------------------- diff --git a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/standalone/StandaloneHerderTest.java b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/standalone/StandaloneHerderTest.java index 606b94d..b395fc7 100644 --- a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/standalone/StandaloneHerderTest.java +++ b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/standalone/StandaloneHerderTest.java @@ -21,6 +21,7 @@ import org.apache.kafka.copycat.connector.Connector; import org.apache.kafka.copycat.connector.Task; import org.apache.kafka.copycat.runtime.ConnectorConfig; import org.apache.kafka.copycat.runtime.HerderConnectorContext; +import org.apache.kafka.copycat.runtime.TaskConfig; import org.apache.kafka.copycat.runtime.Worker; import org.apache.kafka.copycat.sink.SinkConnector; import org.apache.kafka.copycat.sink.SinkTask; @@ -35,22 +36,21 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.powermock.api.easymock.PowerMock; import org.powermock.api.easymock.annotation.Mock; -import org.powermock.core.classloader.annotations.PowerMockIgnore; -import org.powermock.core.classloader.annotations.PrepareForTest; import org.powermock.modules.junit4.PowerMockRunner; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; -import java.util.Properties; import java.util.concurrent.TimeUnit; @RunWith(PowerMockRunner.class) -@PrepareForTest({StandaloneHerder.class}) -@PowerMockIgnore("javax.management.*") public class StandaloneHerderTest { private static final String CONNECTOR_NAME = "test"; + private static final List<String> TOPICS_LIST = Arrays.asList("topic1", "topic2"); private static final String TOPICS_LIST_STR = "topic1,topic2"; + private static final int DEFAULT_MAX_TASKS = 1; private StandaloneHerder herder; @Mock protected Worker worker; @@ -58,7 +58,7 @@ public class StandaloneHerderTest { @Mock protected Callback<String> createCallback; private Map<String, String> connectorProps; - private Properties taskProps; + private Map<String, String> taskProps; @Before public void setup() { @@ -68,11 +68,10 @@ public class StandaloneHerderTest { connectorProps = new HashMap<>(); connectorProps.put(ConnectorConfig.NAME_CONFIG, CONNECTOR_NAME); connectorProps.put(SinkConnector.TOPICS_CONFIG, TOPICS_LIST_STR); - PowerMock.mockStatic(StandaloneHerder.class); // These can be anything since connectors can pass along whatever they want. - taskProps = new Properties(); - taskProps.setProperty("foo", "bar"); + taskProps = new HashMap<>(); + taskProps.put("foo", "bar"); } @Test @@ -121,7 +120,9 @@ public class StandaloneHerderTest { public void testCreateAndStop() throws Exception { connector = PowerMock.createMock(BogusSourceConnector.class); expectAdd(BogusSourceConnector.class, BogusSourceTask.class, false); + // herder.stop() should stop any running connectors and tasks even if destroyConnector was not invoked expectStop(); + PowerMock.replayAll(); herder.addConnector(connectorProps, createCallback); @@ -135,36 +136,30 @@ public class StandaloneHerderTest { boolean sink) throws Exception { connectorProps.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, connClass.getName()); - PowerMock.expectPrivate(StandaloneHerder.class, "instantiateConnector", connClass.getName()) - .andReturn(connector); - - createCallback.onCompletion(null, CONNECTOR_NAME); + worker.addConnector(EasyMock.eq(new ConnectorConfig(connectorProps)), EasyMock.anyObject(HerderConnectorContext.class)); PowerMock.expectLastCall(); - connector.initialize(EasyMock.anyObject(HerderConnectorContext.class)); - PowerMock.expectLastCall(); - connector.start(new Properties()); + createCallback.onCompletion(null, CONNECTOR_NAME); PowerMock.expectLastCall(); - // Just return the connector properties for the individual task we generate by default - EasyMock.<Class<? extends Task>>expect(connector.taskClass()).andReturn(taskClass); - - EasyMock.expect(connector.taskConfigs(ConnectorConfig.TASKS_MAX_DEFAULT)) - .andReturn(Arrays.asList(taskProps)); // And we should instantiate the tasks. For a sink task, we should see added properties for // the input topic partitions - Properties generatedTaskProps = new Properties(); + Map<String, String> generatedTaskProps = new HashMap<>(); generatedTaskProps.putAll(taskProps); + generatedTaskProps.put(TaskConfig.TASK_CLASS_CONFIG, taskClass.getName()); if (sink) - generatedTaskProps.setProperty(SinkTask.TOPICS_CONFIG, TOPICS_LIST_STR); - worker.addTask(new ConnectorTaskId(CONNECTOR_NAME, 0), taskClass.getName(), generatedTaskProps); + generatedTaskProps.put(SinkTask.TOPICS_CONFIG, TOPICS_LIST_STR); + EasyMock.expect(worker.reconfigureConnectorTasks(CONNECTOR_NAME, DEFAULT_MAX_TASKS, TOPICS_LIST)) + .andReturn(Collections.singletonMap(new ConnectorTaskId(CONNECTOR_NAME, 0), generatedTaskProps)); + + worker.addTask(new ConnectorTaskId(CONNECTOR_NAME, 0), new TaskConfig(generatedTaskProps)); PowerMock.expectLastCall(); } private void expectStop() { worker.stopTask(new ConnectorTaskId(CONNECTOR_NAME, 0)); EasyMock.expectLastCall(); - connector.stop(); + worker.stopConnector(CONNECTOR_NAME); EasyMock.expectLastCall(); } http://git-wip-us.apache.org/repos/asf/kafka/blob/2e617735/copycat/runtime/src/test/java/org/apache/kafka/copycat/storage/KafkaConfigStorageTest.java ---------------------------------------------------------------------- diff --git a/copycat/runtime/src/test/java/org/apache/kafka/copycat/storage/KafkaConfigStorageTest.java b/copycat/runtime/src/test/java/org/apache/kafka/copycat/storage/KafkaConfigStorageTest.java index b02b752..cf9f8aa 100644 --- a/copycat/runtime/src/test/java/org/apache/kafka/copycat/storage/KafkaConfigStorageTest.java +++ b/copycat/runtime/src/test/java/org/apache/kafka/copycat/storage/KafkaConfigStorageTest.java @@ -169,6 +169,12 @@ public class KafkaConfigStorageTest { connectorReconfiguredCallback.onCompletion(null, CONNECTOR_IDS.get(1)); EasyMock.expectLastCall(); + // Config deletion + expectConvertWriteAndRead( + CONNECTOR_CONFIG_KEYS.get(1), KafkaConfigStorage.CONNECTOR_CONFIGURATION_V0, null, null, null); + connectorReconfiguredCallback.onCompletion(null, CONNECTOR_IDS.get(1)); + EasyMock.expectLastCall(); + expectStop(); PowerMock.replayAll(); @@ -185,17 +191,24 @@ public class KafkaConfigStorageTest { // Writing should block until it is written and read back from Kafka configStorage.putConnectorConfig(CONNECTOR_IDS.get(0), SAMPLE_CONFIGS.get(0)); configState = configStorage.snapshot(); - assertEquals(0, configState.offset()); + assertEquals(1, configState.offset()); assertEquals(SAMPLE_CONFIGS.get(0), configState.connectorConfig(CONNECTOR_IDS.get(0))); assertNull(configState.connectorConfig(CONNECTOR_IDS.get(1))); // Second should also block and all configs should still be available configStorage.putConnectorConfig(CONNECTOR_IDS.get(1), SAMPLE_CONFIGS.get(1)); configState = configStorage.snapshot(); - assertEquals(1, configState.offset()); + assertEquals(2, configState.offset()); assertEquals(SAMPLE_CONFIGS.get(0), configState.connectorConfig(CONNECTOR_IDS.get(0))); assertEquals(SAMPLE_CONFIGS.get(1), configState.connectorConfig(CONNECTOR_IDS.get(1))); + // Deletion should remove the second one we added + configStorage.putConnectorConfig(CONNECTOR_IDS.get(1), null); + configState = configStorage.snapshot(); + assertEquals(3, configState.offset()); + assertEquals(SAMPLE_CONFIGS.get(0), configState.connectorConfig(CONNECTOR_IDS.get(0))); + assertNull(configState.connectorConfig(CONNECTOR_IDS.get(1))); + configStorage.stop(); PowerMock.verifyAll(); @@ -255,13 +268,13 @@ public class KafkaConfigStorageTest { // Validate root config by listing all connectors and tasks configState = configStorage.snapshot(); - assertEquals(2, configState.offset()); + assertEquals(3, configState.offset()); String connectorName = CONNECTOR_IDS.get(0); assertEquals(Arrays.asList(connectorName), new ArrayList<>(configState.connectors())); - assertEquals(Arrays.asList(TASK_IDS.get(0), TASK_IDS.get(1)), configState.tasks(connectorName)); + assertEquals(new HashSet<>(Arrays.asList(TASK_IDS.get(0), TASK_IDS.get(1))), configState.tasks(connectorName)); assertEquals(SAMPLE_CONFIGS.get(0), configState.taskConfig(TASK_IDS.get(0))); assertEquals(SAMPLE_CONFIGS.get(1), configState.taskConfig(TASK_IDS.get(1))); - assertEquals(new HashSet<>(Collections.EMPTY_LIST), configState.inconsistentConnectors()); + assertEquals(Collections.EMPTY_SET, configState.inconsistentConnectors()); configStorage.stop(); @@ -306,16 +319,16 @@ public class KafkaConfigStorageTest { // Should see a single connector and its config should be the last one seen anywhere in the log ClusterConfigState configState = configStorage.snapshot(); - assertEquals(6, configState.offset()); // Should always be last read, even if uncommitted + assertEquals(7, configState.offset()); // Should always be next to be read, even if uncommitted assertEquals(Arrays.asList(CONNECTOR_IDS.get(0)), new ArrayList<>(configState.connectors())); // CONNECTOR_CONFIG_STRUCTS[2] -> SAMPLE_CONFIGS[2] assertEquals(SAMPLE_CONFIGS.get(2), configState.connectorConfig(CONNECTOR_IDS.get(0))); // Should see 2 tasks for that connector. Only config updates before the root key update should be reflected - assertEquals(Arrays.asList(TASK_IDS.get(0), TASK_IDS.get(1)), configState.tasks(CONNECTOR_IDS.get(0))); + assertEquals(new HashSet<>(Arrays.asList(TASK_IDS.get(0), TASK_IDS.get(1))), configState.tasks(CONNECTOR_IDS.get(0))); // Both TASK_CONFIG_STRUCTS[0] -> SAMPLE_CONFIGS[0] assertEquals(SAMPLE_CONFIGS.get(0), configState.taskConfig(TASK_IDS.get(0))); assertEquals(SAMPLE_CONFIGS.get(0), configState.taskConfig(TASK_IDS.get(1))); - assertEquals(new HashSet<>(Collections.EMPTY_LIST), configState.inconsistentConnectors()); + assertEquals(Collections.EMPTY_SET, configState.inconsistentConnectors()); configStorage.stop(); @@ -374,10 +387,10 @@ public class KafkaConfigStorageTest { configStorage.start(); // After reading the log, it should have been in an inconsistent state ClusterConfigState configState = configStorage.snapshot(); - assertEquals(5, configState.offset()); // Should always be last read, not last committed + assertEquals(6, configState.offset()); // Should always be next to be read, not last committed assertEquals(Arrays.asList(CONNECTOR_IDS.get(0)), new ArrayList<>(configState.connectors())); // Inconsistent data should leave us with no tasks listed for the connector and an entry in the inconsistent list - assertEquals(Collections.EMPTY_LIST, configState.tasks(CONNECTOR_IDS.get(0))); + assertEquals(Collections.EMPTY_SET, configState.tasks(CONNECTOR_IDS.get(0))); // Both TASK_CONFIG_STRUCTS[0] -> SAMPLE_CONFIGS[0] assertNull(configState.taskConfig(TASK_IDS.get(0))); assertNull(configState.taskConfig(TASK_IDS.get(1))); @@ -398,11 +411,11 @@ public class KafkaConfigStorageTest { configState = configStorage.snapshot(); // This is only two more ahead of the last one because multiple calls fail, and so their configs are not written // to the topic. Only the last call with 1 task config + 1 commit actually gets written. - assertEquals(7, configState.offset()); + assertEquals(8, configState.offset()); assertEquals(Arrays.asList(CONNECTOR_IDS.get(0)), new ArrayList<>(configState.connectors())); - assertEquals(Arrays.asList(TASK_IDS.get(0)), configState.tasks(CONNECTOR_IDS.get(0))); + assertEquals(new HashSet<>(Arrays.asList(TASK_IDS.get(0))), configState.tasks(CONNECTOR_IDS.get(0))); assertEquals(SAMPLE_CONFIGS.get(0), configState.taskConfig(TASK_IDS.get(0))); - assertEquals(new HashSet<>(Collections.EMPTY_LIST), configState.inconsistentConnectors()); + assertEquals(Collections.EMPTY_SET, configState.inconsistentConnectors()); configStorage.stop(); @@ -446,17 +459,19 @@ public class KafkaConfigStorageTest { private void expectConvertWriteRead(final String configKey, final Schema valueSchema, final byte[] serialized, final String dataFieldName, final Object dataFieldValue) { final Capture<Struct> capturedRecord = EasyMock.newCapture(); - EasyMock.expect(converter.fromCopycatData(EasyMock.eq(TOPIC), EasyMock.eq(valueSchema), EasyMock.capture(capturedRecord))) - .andReturn(serialized); + if (serialized != null) + EasyMock.expect(converter.fromCopycatData(EasyMock.eq(TOPIC), EasyMock.eq(valueSchema), EasyMock.capture(capturedRecord))) + .andReturn(serialized); storeLog.send(EasyMock.eq(configKey), EasyMock.aryEq(serialized)); PowerMock.expectLastCall(); EasyMock.expect(converter.toCopycatData(EasyMock.eq(TOPIC), EasyMock.aryEq(serialized))) .andAnswer(new IAnswer<SchemaAndValue>() { @Override public SchemaAndValue answer() throws Throwable { - assertEquals(dataFieldValue, capturedRecord.getValue().get(dataFieldName)); + if (dataFieldName != null) + assertEquals(dataFieldValue, capturedRecord.getValue().get(dataFieldName)); // Note null schema because default settings for internal serialization are schema-less - return new SchemaAndValue(null, structToMap(capturedRecord.getValue())); + return new SchemaAndValue(null, serialized == null ? null : structToMap(capturedRecord.getValue())); } }); } http://git-wip-us.apache.org/repos/asf/kafka/blob/2e617735/copycat/runtime/src/test/java/org/apache/kafka/copycat/util/TestFuture.java ---------------------------------------------------------------------- diff --git a/copycat/runtime/src/test/java/org/apache/kafka/copycat/util/TestFuture.java b/copycat/runtime/src/test/java/org/apache/kafka/copycat/util/TestFuture.java index c5978ec..55e24c8 100644 --- a/copycat/runtime/src/test/java/org/apache/kafka/copycat/util/TestFuture.java +++ b/copycat/runtime/src/test/java/org/apache/kafka/copycat/util/TestFuture.java @@ -102,8 +102,14 @@ public class TestFuture<T> implements Future<T> { } } - if (exception != null) - throw new ExecutionException(exception); + if (exception != null) { + if (exception instanceof TimeoutException) + throw (TimeoutException) exception; + else if (exception instanceof InterruptedException) + throw (InterruptedException) exception; + else + throw new ExecutionException(exception); + } return result; } http://git-wip-us.apache.org/repos/asf/kafka/blob/2e617735/tests/kafkatest/services/copycat.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/services/copycat.py b/tests/kafkatest/services/copycat.py index 4e2ab40..45ef330 100644 --- a/tests/kafkatest/services/copycat.py +++ b/tests/kafkatest/services/copycat.py @@ -39,6 +39,16 @@ class CopycatServiceBase(Service): except: return [] + def set_configs(self, config_template, connector_config_templates): + """ + Set configurations for the worker and the connector to run on + it. These are not provided in the constructor because the worker + config generally needs access to ZK/Kafka services to + create the configuration. + """ + self.config_template = config_template + self.connector_config_templates = connector_config_templates + def stop_node(self, node, clean_shutdown=True): pids = self.pids(node) sig = signal.SIGTERM if clean_shutdown else signal.SIGKILL @@ -51,7 +61,7 @@ class CopycatServiceBase(Service): node.account.ssh("rm -f /mnt/copycat.pid", allow_fail=False) def restart(self): - # We don't want to do any clean up here, just restart the process + # We don't want to do any clean up here, just restart the process. for node in self.nodes: self.stop_node(node) self.start_node(node) @@ -62,8 +72,11 @@ class CopycatServiceBase(Service): (self.__class__.__name__, node.account)) for pid in self.pids(node): node.account.signal(pid, signal.SIGKILL, allow_fail=False) - node.account.ssh("rm -rf /mnt/copycat.pid /mnt/copycat.log /mnt/copycat.properties /mnt/copycat-connector.properties " + " ".join(self.files), allow_fail=False) + node.account.ssh("rm -rf /mnt/copycat.pid /mnt/copycat.log /mnt/copycat.properties " + " ".join(self.config_filenames() + self.files), allow_fail=False) + + def config_filenames(self): + return ["/mnt/copycat-connector-" + str(idx) + ".properties" for idx, template in enumerate(self.connector_config_templates)] class CopycatStandaloneService(CopycatServiceBase): """Runs Copycat in standalone mode.""" @@ -71,16 +84,6 @@ class CopycatStandaloneService(CopycatServiceBase): def __init__(self, context, kafka, files): super(CopycatStandaloneService, self).__init__(context, 1, kafka, files) - def set_configs(self, config_template, connector_config_template): - """ - Set configurations for the worker and the connector to run on - it. These are not provided in the constructor because the worker - config generally needs access to ZK/Kafka services to - create the configuration. - """ - self.config_template = config_template - self.connector_config_template = connector_config_template - # For convenience since this service only makes sense with a single node @property def node(self): @@ -88,12 +91,17 @@ class CopycatStandaloneService(CopycatServiceBase): def start_node(self, node): node.account.create_file("/mnt/copycat.properties", self.config_template) - node.account.create_file("/mnt/copycat-connector.properties", self.connector_config_template) + remote_connector_configs = [] + for idx, template in enumerate(self.connector_config_templates): + target_file = "/mnt/copycat-connector-" + str(idx) + ".properties" + node.account.create_file(target_file, template) + remote_connector_configs.append(target_file) self.logger.info("Starting Copycat standalone process") with node.account.monitor_log("/mnt/copycat.log") as monitor: - node.account.ssh("/opt/kafka/bin/copycat-standalone.sh /mnt/copycat.properties /mnt/copycat-connector.properties " + - "1>> /mnt/copycat.log 2>> /mnt/copycat.log & echo $! > /mnt/copycat.pid") + node.account.ssh("/opt/kafka/bin/copycat-standalone.sh /mnt/copycat.properties " + + " ".join(remote_connector_configs) + + " 1>> /mnt/copycat.log 2>> /mnt/copycat.log & echo $! > /mnt/copycat.pid") monitor.wait_until('Copycat started', timeout_sec=10, err_msg="Never saw message indicating Copycat finished startup") if len(self.pids(node)) == 0: @@ -108,27 +116,28 @@ class CopycatDistributedService(CopycatServiceBase): super(CopycatDistributedService, self).__init__(context, num_nodes, kafka, files) self.offsets_topic = offsets_topic self.configs_topic = configs_topic - - def set_configs(self, config_template, connector_config_template): - """ - Set configurations for the worker and the connector to run on - it. These are not provided in the constructor because the worker - config generally needs access to ZK/Kafka services to - create the configuration. - """ - self.config_template = config_template - self.connector_config_template = connector_config_template + self.first_start = True def start_node(self, node): node.account.create_file("/mnt/copycat.properties", self.config_template) - node.account.create_file("/mnt/copycat-connector.properties", self.connector_config_template) + remote_connector_configs = [] + for idx, template in enumerate(self.connector_config_templates): + target_file = "/mnt/copycat-connector-" + str(idx) + ".properties" + node.account.create_file(target_file, template) + remote_connector_configs.append(target_file) - self.logger.info("Starting Copycat standalone process") + self.logger.info("Starting Copycat distributed process") with node.account.monitor_log("/mnt/copycat.log") as monitor: - node.account.ssh("/opt/kafka/bin/copycat-distributed.sh /mnt/copycat.properties /mnt/copycat-connector.properties " + - "1>> /mnt/copycat.log 2>> /mnt/copycat.log & echo $! > /mnt/copycat.pid") + cmd = "/opt/kafka/bin/copycat-distributed.sh /mnt/copycat.properties " + # Only submit connectors on the first node so they don't get submitted multiple times. Also only submit them + # the first time the node is started so + if self.first_start and node == self.nodes[0]: + cmd += " ".join(remote_connector_configs) + cmd += " 1>> /mnt/copycat.log 2>> /mnt/copycat.log & echo $! > /mnt/copycat.pid" + node.account.ssh(cmd) monitor.wait_until('Copycat started', timeout_sec=10, err_msg="Never saw message indicating Copycat finished startup") if len(self.pids(node)) == 0: raise RuntimeError("No process ids recorded") + self.first_start = False http://git-wip-us.apache.org/repos/asf/kafka/blob/2e617735/tests/kafkatest/tests/copycat_distributed_test.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/tests/copycat_distributed_test.py b/tests/kafkatest/tests/copycat_distributed_test.py index 9d00334..57965e5 100644 --- a/tests/kafkatest/tests/copycat_distributed_test.py +++ b/tests/kafkatest/tests/copycat_distributed_test.py @@ -31,10 +31,12 @@ class CopycatDistributedFileTest(KafkaTest): OFFSETS_TOPIC = "copycat-offsets" CONFIG_TOPIC = "copycat-configs" - FIRST_INPUT_LISTS = [["foo", "bar", "baz"], ["foo2", "bar2", "baz2"]] - FIRST_INPUTS = ["\n".join(input_list) + "\n" for input_list in FIRST_INPUT_LISTS] - SECOND_INPUT_LISTS = [["razz", "ma", "tazz"], ["razz2", "ma2", "tazz2"]] - SECOND_INPUTS = ["\n".join(input_list) + "\n" for input_list in SECOND_INPUT_LISTS] + # Since tasks can be assigned to any node and we're testing with files, we need to make sure the content is the same + # across all nodes. + FIRST_INPUT_LIST = ["foo", "bar", "baz"] + FIRST_INPUTS = "\n".join(FIRST_INPUT_LIST) + "\n" + SECOND_INPUT_LIST = ["razz", "ma", "tazz"] + SECOND_INPUTS = "\n".join(SECOND_INPUT_LIST) + "\n" SCHEMA = { "type": "string", "optional": False } @@ -43,13 +45,7 @@ class CopycatDistributedFileTest(KafkaTest): 'test' : { 'partitions': 1, 'replication-factor': 1 } }) - # FIXME these should have multiple nodes. However, currently the connectors are submitted via command line, - # which means we would get duplicates. Both would run, but they would have conflicting keys for offsets and - # configs. Until we have real distributed coordination of workers with unified connector submission, we need - # to restrict each of these to a single node. - self.num_nodes = 1 - self.source = CopycatDistributedService(test_context, self.num_nodes, self.kafka, [self.INPUT_FILE]) - self.sink = CopycatDistributedService(test_context, self.num_nodes, self.kafka, [self.OUTPUT_FILE]) + self.cc = CopycatDistributedService(test_context, 2, self.kafka, [self.INPUT_FILE, self.OUTPUT_FILE]) def test_file_source_and_sink(self, converter="org.apache.kafka.copycat.json.JsonConverter", schemas=True): assert converter != None, "converter type must be set" @@ -58,33 +54,40 @@ class CopycatDistributedFileTest(KafkaTest): self.value_converter = converter self.schemas = schemas - # These need to be set - self.source.set_configs(self.render("copycat-distributed.properties"), self.render("copycat-file-source.properties")) - self.sink.set_configs(self.render("copycat-distributed.properties"), self.render("copycat-file-sink.properties")) + self.cc.set_configs(self.render("copycat-distributed.properties"), [self.render("copycat-file-source.properties"), self.render("copycat-file-sink.properties")]) - self.source.start() - self.sink.start() + self.cc.start() - # Generating data on the source node should generate new records and create new output on the sink node - for node, input in zip(self.source.nodes, self.FIRST_INPUTS): - node.account.ssh("echo -e -n " + repr(input) + " >> " + self.INPUT_FILE) - wait_until(lambda: self.validate_output(self.FIRST_INPUT_LISTS[:self.num_nodes]), timeout_sec=60, err_msg="Data added to input file was not seen in the output file in a reasonable amount of time.") + # Generating data on the source node should generate new records and create new output on the sink node. Timeouts + # here need to be more generous than they are for standalone mode because a) it takes longer to write configs, + # do rebalancing of the group, etc, and b) without explicit leave group support, rebalancing takes awhile + for node in self.cc.nodes: + node.account.ssh("echo -e -n " + repr(self.FIRST_INPUTS) + " >> " + self.INPUT_FILE) + wait_until(lambda: self.validate_output(self.FIRST_INPUT_LIST), timeout_sec=120, err_msg="Data added to input file was not seen in the output file in a reasonable amount of time.") # Restarting both should result in them picking up where they left off, # only processing new data. - self.source.restart() - self.sink.restart() + self.cc.restart() - for node, input in zip(self.source.nodes, self.SECOND_INPUTS): - node.account.ssh("echo -e -n " + repr(input) + " >> " + self.INPUT_FILE) - wait_until(lambda: self.validate_output(self.FIRST_INPUT_LISTS[:self.num_nodes] + self.SECOND_INPUT_LISTS[:self.num_nodes]), timeout_sec=60, err_msg="Sink output file never converged to the same state as the input file") + for node in self.cc.nodes: + node.account.ssh("echo -e -n " + repr(self.SECOND_INPUTS) + " >> " + self.INPUT_FILE) + wait_until(lambda: self.validate_output(self.FIRST_INPUT_LIST + self.SECOND_INPUT_LIST), timeout_sec=120, err_msg="Sink output file never converged to the same state as the input file") - def validate_output(self, inputs): + def validate_output(self, input): + input_set = set(input) + # Output needs to be collected from all nodes because we can't be sure where the tasks will be scheduled. + # Between the first and second rounds, we might even end up with half the data on each node. + output_set = set(itertools.chain(*[ + [line.strip() for line in self.file_contents(node, self.OUTPUT_FILE)] for node in self.cc.nodes + ])) + #print input_set, output_set + return input_set == output_set + + + def file_contents(self, node, file): try: - input_set = set(itertools.chain(*inputs)) - output_set = set(itertools.chain(*[ - [line.strip() for line in node.account.ssh_capture("cat " + self.OUTPUT_FILE)] for node in self.sink.nodes - ])) - return input_set == output_set + # Convert to a list here or the CalledProcessError may be returned during a call to the generator instead of + # immediately + return list(node.account.ssh_capture("cat " + file)) except subprocess.CalledProcessError: - return False + return [] http://git-wip-us.apache.org/repos/asf/kafka/blob/2e617735/tests/kafkatest/tests/copycat_test.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/tests/copycat_test.py b/tests/kafkatest/tests/copycat_test.py index 1bd8ccb..bad5330 100644 --- a/tests/kafkatest/tests/copycat_test.py +++ b/tests/kafkatest/tests/copycat_test.py @@ -60,9 +60,8 @@ class CopycatStandaloneFileTest(KafkaTest): self.value_converter = converter self.schemas = schemas - # These need to be set - self.source.set_configs(self.render("copycat-standalone.properties"), self.render("copycat-file-source.properties")) - self.sink.set_configs(self.render("copycat-standalone.properties"), self.render("copycat-file-sink.properties")) + self.source.set_configs(self.render("copycat-standalone.properties"), [self.render("copycat-file-source.properties")]) + self.sink.set_configs(self.render("copycat-standalone.properties"), [self.render("copycat-file-sink.properties")]) self.source.start() self.sink.start() http://git-wip-us.apache.org/repos/asf/kafka/blob/2e617735/tests/kafkatest/tests/templates/copycat-distributed.properties ---------------------------------------------------------------------- diff --git a/tests/kafkatest/tests/templates/copycat-distributed.properties b/tests/kafkatest/tests/templates/copycat-distributed.properties index 31f9901..325dc85 100644 --- a/tests/kafkatest/tests/templates/copycat-distributed.properties +++ b/tests/kafkatest/tests/templates/copycat-distributed.properties @@ -15,6 +15,8 @@ bootstrap.servers={{ kafka.bootstrap_servers() }} +group.id={{ group|default("copycat-cluster") }} + key.converter={{ key_converter|default("org.apache.kafka.copycat.json.JsonConverter") }} value.converter={{ value_converter|default("org.apache.kafka.copycat.json.JsonConverter") }} {% if key_converter is not defined or key_converter.endswith("JsonConverter") %} @@ -30,4 +32,7 @@ internal.key.converter.schemas.enable=false internal.value.converter.schemas.enable=false offset.storage.topic={{ OFFSETS_TOPIC }} -config.storage.topic={{ CONFIG_TOPIC }} \ No newline at end of file +config.storage.topic={{ CONFIG_TOPIC }} + +# Make sure data gets flushed frequently so tests don't have to wait to ensure they see data in output systems +offset.flush.interval.ms=5000