Repository: kafka Updated Branches: refs/heads/trunk 88d8508b8 -> b65f9a777
KAFKA-3008: Parallel start and stop of connectors and tasks in Connect Author: Konstantine Karantasis <konstant...@confluent.io> Author: Konstantine Karantasis <k.karanta...@gmail.com> Reviewers: Jason Gustafson <ja...@confluent.io>, Shikhar Bhushan <shik...@confluent.io>, Ewen Cheslack-Postava <e...@confluent.io> Closes #1788 from kkonstantine/KAFKA-3008-Parallel-start-and-stop-of-connectors-and-tasks Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b65f9a77 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b65f9a77 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b65f9a77 Branch: refs/heads/trunk Commit: b65f9a777d46fbe4edfed8a4c7216dd1e741be53 Parents: 88d8508 Author: Konstantine Karantasis <konstant...@confluent.io> Authored: Thu Dec 1 14:42:15 2016 -0800 Committer: Ewen Cheslack-Postava <m...@ewencp.org> Committed: Thu Dec 1 14:42:15 2016 -0800 ---------------------------------------------------------------------- .../runtime/SourceTaskOffsetCommitter.java | 104 +++++----- .../apache/kafka/connect/runtime/Worker.java | 173 ++++++++++++----- .../kafka/connect/runtime/WorkerTask.java | 2 +- .../runtime/distributed/DistributedHerder.java | 178 +++++++++++++---- .../runtime/SourceTaskOffsetCommitterTest.java | 194 +++++++++++++++++++ .../kafka/connect/runtime/WorkerTest.java | 14 -- .../distributed/DistributedHerderTest.java | 59 +++--- .../standalone/StandaloneHerderTest.java | 12 +- 8 files changed, 539 insertions(+), 197 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/b65f9a77/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitter.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitter.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitter.java index c7f869e..acc2d0d 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitter.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitter.java @@ -22,8 +22,10 @@ import org.apache.kafka.connect.util.ConnectorTaskId; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.HashMap; -import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CancellationException; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; @@ -44,13 +46,22 @@ import java.util.concurrent.TimeUnit; class SourceTaskOffsetCommitter { private static final Logger log = LoggerFactory.getLogger(SourceTaskOffsetCommitter.class); - private WorkerConfig config; - private ScheduledExecutorService commitExecutorService = null; - private final HashMap<ConnectorTaskId, ScheduledCommitTask> committers = new HashMap<>(); + private final WorkerConfig config; + private final ScheduledExecutorService commitExecutorService; + private final ConcurrentMap<ConnectorTaskId, ScheduledFuture<?>> committers; - SourceTaskOffsetCommitter(WorkerConfig config) { + // visible for testing + SourceTaskOffsetCommitter(WorkerConfig config, + ScheduledExecutorService commitExecutorService, + ConcurrentMap<ConnectorTaskId, ScheduledFuture<?>> committers) { this.config = config; - commitExecutorService = Executors.newSingleThreadScheduledExecutor(); + this.commitExecutorService = commitExecutorService; + this.committers = committers; + } + + public SourceTaskOffsetCommitter(WorkerConfig config) { + this(config, Executors.newSingleThreadScheduledExecutor(), + new ConcurrentHashMap<ConnectorTaskId, ScheduledFuture<?>>()); } public void close(long timeoutMs) { @@ -65,72 +76,45 @@ class SourceTaskOffsetCommitter { } public void schedule(final ConnectorTaskId id, final WorkerSourceTask workerTask) { - synchronized (committers) { - long commitIntervalMs = config.getLong(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG); - ScheduledFuture<?> commitFuture = commitExecutorService.schedule(new Runnable() { - @Override - public void run() { - commit(id, workerTask); - } - }, commitIntervalMs, TimeUnit.MILLISECONDS); - committers.put(id, new ScheduledCommitTask(commitFuture)); - } + long commitIntervalMs = config.getLong(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG); + ScheduledFuture<?> commitFuture = commitExecutorService.scheduleWithFixedDelay(new Runnable() { + @Override + public void run() { + commit(workerTask); + } + }, commitIntervalMs, commitIntervalMs, TimeUnit.MILLISECONDS); + committers.put(id, commitFuture); } public void remove(ConnectorTaskId id) { - final ScheduledCommitTask task; - synchronized (committers) { - task = committers.remove(id); - task.cancelled = true; - task.commitFuture.cancel(false); - } - if (task.finishedLatch != null) { - try { - task.finishedLatch.await(); - } catch (InterruptedException e) { - throw new ConnectException("Unexpected interruption in SourceTaskOffsetCommitter.", e); - } - } - } + final ScheduledFuture<?> task = committers.remove(id); + if (task == null) + return; - private void commit(ConnectorTaskId id, WorkerSourceTask workerTask) { - final ScheduledCommitTask task; - synchronized (committers) { - task = committers.get(id); - if (task == null || task.cancelled) - return; - task.finishedLatch = new CountDownLatch(1); + try { + task.cancel(false); + if (!task.isDone()) + task.get(); + } catch (CancellationException e) { + // ignore + log.trace("Offset commit thread was cancelled by another thread while removing connector task with id: {}", id); + } catch (ExecutionException | InterruptedException e) { + throw new ConnectException("Unexpected interruption in SourceTaskOffsetCommitter while removing task with id: " + id, e); } + } + private void commit(WorkerSourceTask workerTask) { + log.debug("Committing offsets for {}", workerTask); try { - log.debug("Committing offsets for {}", workerTask); - boolean success = workerTask.commitOffsets(); - if (!success) { - log.error("Failed to commit offsets for {}", workerTask); + if (workerTask.commitOffsets()) { + return; } + log.error("Failed to commit offsets for {}", workerTask); } catch (Throwable t) { // We're very careful about exceptions here since any uncaught exceptions in the commit // thread would cause the fixed interval schedule on the ExecutorService to stop running // for that task log.error("Unhandled exception when committing {}: ", workerTask, t); - } finally { - synchronized (committers) { - task.finishedLatch.countDown(); - if (!task.cancelled) - schedule(id, workerTask); - } - } - } - - private static class ScheduledCommitTask { - ScheduledFuture<?> commitFuture; - boolean cancelled; - CountDownLatch finishedLatch; - - ScheduledCommitTask(ScheduledFuture<?> commitFuture) { - this.commitFuture = commitFuture; - this.cancelled = false; - this.finishedLatch = null; } } } http://git-wip-us.apache.org/repos/asf/kafka/blob/b65f9a77/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java index 1265f9e..c575d92 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java @@ -40,10 +40,11 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -72,8 +73,8 @@ public class Worker { private final OffsetBackingStore offsetBackingStore; private final Map<String, Object> producerProps; - private HashMap<String, WorkerConnector> connectors = new HashMap<>(); - private HashMap<ConnectorTaskId, WorkerTask> tasks = new HashMap<>(); + private final ConcurrentMap<String, WorkerConnector> connectors = new ConcurrentHashMap<>(); + private final ConcurrentMap<ConnectorTaskId, WorkerTask> tasks = new ConcurrentHashMap<>(); private SourceTaskOffsetCommitter sourceTaskOffsetCommitter; public Worker(String workerId, Time time, ConnectorFactory connectorFactory, WorkerConfig config, OffsetBackingStore offsetBackingStore) { @@ -109,6 +110,9 @@ public class Worker { producerProps.putAll(config.originalsWithPrefix("producer.")); } + /** + * Start worker. + */ public void start() { log.info("Worker starting"); @@ -118,6 +122,9 @@ public class Worker { log.info("Worker started"); } + /** + * Stop worker. + */ public void stop() { log.info("Worker stopping"); @@ -142,6 +149,16 @@ public class Worker { log.info("Worker stopped"); } + /** + * Start a connector managed by this worker. + * + * @param connName the connector name. + * @param connProps the properties of the connector. + * @param ctx the connector runtime context. + * @param statusListener a listener for the runtime status transitions of the connector. + * @param initialState the initial state of the connector. + * @return true if the connector started successfully. + */ public boolean startConnector( String connName, Map<String, String> connProps, @@ -168,18 +185,36 @@ public class Worker { return false; } - connectors.put(connName, workerConnector); + WorkerConnector existing = connectors.putIfAbsent(connName, workerConnector); + if (existing != null) + throw new ConnectException("Connector with name " + connName + " already exists"); log.info("Finished creating connector {}", connName); return true; } - /* Now that the configuration doesn't contain the actual class name, we need to be able to tell the herder whether a connector is a Sink */ + /** + * Return true if the connector associated with this worker is a sink connector. + * + * @param connName the connector name. + * @return true if the connector belongs to the worker and is a sink connector. + * @throws ConnectException if the worker does not manage a connector with the given name. + */ public boolean isSinkConnector(String connName) { WorkerConnector workerConnector = connectors.get(connName); + if (workerConnector == null) + throw new ConnectException("Connector " + connName + " not found in this worker."); return workerConnector.isSinkConnector(); } + /** + * Get a list of updated task properties for the tasks of this connector. + * + * @param connName the connector name. + * @param maxTasks the maxinum number of tasks. + * @param sinkTopics a list of sink topics. + * @return a list of updated tasks properties. + */ public List<Map<String, String>> connectorTaskConfigs(String connName, int maxTasks, List<String> sinkTopics) { log.trace("Reconfiguring connector tasks for {}", connName); @@ -200,31 +235,29 @@ public class Worker { return result; } - public void stopConnectors() { - stopConnectors(new HashSet<>(connectors.keySet())); - } - - public Collection<String> stopConnectors(Collection<String> connectors) { - final List<String> stopped = new ArrayList<>(connectors.size()); - for (String connector: connectors) { - if (stopConnector(connector)) { - stopped.add(connector); - } - } - return stopped; + private void stopConnectors() { + // Herder is responsible for stopping connectors. This is an internal method to sequentially + // stop connectors that have not explicitly been stopped. + for (String connector: connectors.keySet()) + stopConnector(connector); } + /** + * Stop a connector managed by this worker. + * + * @param connName the connector name. + * @return true if the connector belonged to this worker and was successfully stopped. + */ public boolean stopConnector(String connName) { log.info("Stopping connector {}", connName); - WorkerConnector connector = connectors.get(connName); + WorkerConnector connector = connectors.remove(connName); if (connector == null) { log.warn("Ignoring stop request for unowned connector {}", connName); return false; } connector.shutdown(); - connectors.remove(connName); log.info("Stopped connector {}", connName); return true; @@ -232,16 +265,34 @@ public class Worker { /** * Get the IDs of the connectors currently running in this worker. + * + * @return the set of connector IDs. */ public Set<String> connectorNames() { return connectors.keySet(); } + /** + * Return true if a connector with the given name is managed by this worker and is currently running. + * + * @param connName the connector name. + * @return true if the connector is running, false if the connector is not running or is not manages by this worker. + */ public boolean isRunning(String connName) { WorkerConnector connector = connectors.get(connName); return connector != null && connector.isRunning(); } + /** + * Start a task managed by this worker. + * + * @param id the task ID. + * @param connProps the connector properties. + * @param taskProps the tasks properties. + * @param statusListener a listener for the runtime status transitions of the task. + * @param initialState the initial state of the connector. + * @return true if the task started successfully. + */ public boolean startTask( ConnectorTaskId id, Map<String, String> connProps, @@ -282,11 +333,14 @@ public class Worker { return false; } + WorkerTask existing = tasks.putIfAbsent(id, workerTask); + if (existing != null) + throw new ConnectException("Task already exists in this worker: " + id); + executor.submit(workerTask); if (workerTask instanceof WorkerSourceTask) { sourceTaskOffsetCommitter.schedule(id, (WorkerSourceTask) workerTask); } - tasks.put(id, workerTask); return true; } @@ -314,34 +368,38 @@ public class Worker { } } - public boolean stopAndAwaitTask(ConnectorTaskId id) { - return !stopAndAwaitTasks(Collections.singleton(id)).isEmpty(); - } + private void stopTask(ConnectorTaskId taskId) { + WorkerTask task = tasks.get(taskId); + if (task == null) { + log.warn("Ignoring stop request for unowned task {}", taskId); + return; + } - public void stopAndAwaitTasks() { - stopAndAwaitTasks(new HashSet<>(tasks.keySet())); + log.info("Stopping task {}", task.id()); + if (task instanceof WorkerSourceTask) + sourceTaskOffsetCommitter.remove(task.id()); + task.stop(); } - public Collection<ConnectorTaskId> stopAndAwaitTasks(Collection<ConnectorTaskId> ids) { - final List<ConnectorTaskId> stoppable = new ArrayList<>(ids.size()); + private void stopTasks(Collection<ConnectorTaskId> ids) { + // Herder is responsible for stopping tasks. This is an internal method to sequentially + // stop the tasks that have not explicitly been stopped. for (ConnectorTaskId taskId : ids) { - final WorkerTask task = tasks.get(taskId); - if (task == null) { - log.warn("Ignoring stop request for unowned task {}", taskId); - continue; - } - stopTask(task); - stoppable.add(taskId); + stopTask(taskId); } - awaitStopTasks(stoppable); - return stoppable; } - private void stopTask(WorkerTask task) { - log.info("Stopping task {}", task.id()); - if (task instanceof WorkerSourceTask) - sourceTaskOffsetCommitter.remove(task.id()); - task.stop(); + private void awaitStopTask(ConnectorTaskId taskId, long timeout) { + WorkerTask task = tasks.remove(taskId); + if (task == null) { + log.warn("Ignoring await stop request for non-present task {}", taskId); + return; + } + + if (!task.awaitStop(timeout)) { + log.error("Graceful stop of task {} failed.", task.id()); + task.cancel(); + } } private void awaitStopTasks(Collection<ConnectorTaskId> ids) { @@ -349,16 +407,35 @@ public class Worker { long deadline = now + config.getLong(WorkerConfig.TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG); for (ConnectorTaskId id : ids) { long remaining = Math.max(0, deadline - time.milliseconds()); - awaitStopTask(tasks.get(id), remaining); + awaitStopTask(id, remaining); } } - private void awaitStopTask(WorkerTask task, long timeout) { - if (!task.awaitStop(timeout)) { - log.error("Graceful stop of task {} failed.", task.id()); - task.cancel(); - } - tasks.remove(task.id()); + /** + * Stop asynchronously all the worker's tasks and await their termination. + */ + public void stopAndAwaitTasks() { + stopAndAwaitTasks(new ArrayList<>(tasks.keySet())); + } + + /** + * Stop asynchronously a collection of tasks that belong to this worker and await their termination. + * + * @param ids the collection of tasks to be stopped. + */ + public void stopAndAwaitTasks(Collection<ConnectorTaskId> ids) { + stopTasks(ids); + awaitStopTasks(ids); + } + + /** + * Stop a task that belongs to this worker and await its termination. + * + * @param taskId the ID of the task to be stopped. + */ + public void stopAndAwaitTask(ConnectorTaskId taskId) { + stopTask(taskId); + awaitStopTasks(Collections.singletonList(taskId)); } /** http://git-wip-us.apache.org/repos/asf/kafka/blob/b65f9a77/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java index 846ca95..2f2ebb5 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java @@ -64,7 +64,7 @@ abstract class WorkerTask implements Runnable { /** * Initialize the task for execution. - * @param props initial configuration + * @param taskConfig initial configuration */ public abstract void initialize(TaskConfig taskConfig); http://git-wip-us.apache.org/repos/asf/kafka/blob/b65f9a77/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java index 170c983..ce2e72a 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java @@ -54,10 +54,11 @@ import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.PriorityQueue; -import java.util.Queue; +import java.util.NavigableSet; +import java.util.NoSuchElementException; import java.util.Set; import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -98,6 +99,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable { private static final Logger log = LoggerFactory.getLogger(DistributedHerder.class); private static final long RECONFIGURE_CONNECTOR_TASKS_BACKOFF_MS = 250; + private static final int START_STOP_THREAD_POOL_SIZE = 8; private final Time time; @@ -106,6 +108,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable { private final int workerUnsyncBackoffMs; private final ExecutorService forwardRequestExecutor; + private final ExecutorService startAndStopExecutor; private final WorkerGroupMember member; private final AtomicBoolean stopping; private final CountDownLatch stopLatch = new CountDownLatch(1); @@ -119,7 +122,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable { // To handle most external requests, like creating or destroying a connector, we can use a generic request where // the caller specifies all the code that should be executed. - private final Queue<HerderRequest> requests = new PriorityQueue<>(); + private final NavigableSet<HerderRequest> requests = new ConcurrentSkipListSet<>(); // Config updates can be collected and applied together when possible. Also, we need to take care to rebalance when // needed (e.g. task reconfiguration, which requires everyone to coordinate offset commits). private Set<String> connectorConfigUpdates = new HashSet<>(); @@ -144,11 +147,11 @@ public class DistributedHerder extends AbstractHerder implements Runnable { Worker worker, String workerId, StatusBackingStore statusBackingStore, - ConfigBackingStore configStorage, + ConfigBackingStore configBackingStore, WorkerGroupMember member, String restUrl, Time time) { - super(worker, workerId, statusBackingStore, configStorage); + super(worker, workerId, statusBackingStore, configBackingStore); this.time = time; this.workerGroupId = config.getString(DistributedConfig.GROUP_ID_CONFIG); @@ -156,6 +159,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable { this.workerUnsyncBackoffMs = config.getInt(DistributedConfig.WORKER_UNSYNC_BACKOFF_MS_CONFIG); this.member = member != null ? member : new WorkerGroupMember(config, restUrl, this.configBackingStore, new RebalanceListener(), time); this.forwardRequestExecutor = Executors.newSingleThreadExecutor(); + this.startAndStopExecutor = Executors.newFixedThreadPool(START_STOP_THREAD_POOL_SIZE); stopping = new AtomicBoolean(false); configState = ClusterConfigState.EMPTY; @@ -222,17 +226,14 @@ public class DistributedHerder extends AbstractHerder implements Runnable { final long now = time.milliseconds(); long nextRequestTimeoutMs = Long.MAX_VALUE; while (true) { - final HerderRequest next; - synchronized (this) { - next = requests.peek(); - if (next == null) { - break; - } else if (now >= next.at) { - requests.poll(); - } else { - nextRequestTimeoutMs = next.at - now; - break; - } + final HerderRequest next = peekWithoutException(); + if (next == null) { + break; + } else if (now >= next.at) { + requests.pollFirst(); + } else { + nextRequestTimeoutMs = next.at - now; + break; } try { @@ -292,7 +293,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable { try { member.poll(nextRequestTimeoutMs); // Ensure we're in a good state in our group. If not restart and everything should be setup to rejoin - if (!handleRebalanceCompleted()) return; + handleRebalanceCompleted(); } catch (WakeupException e) { // FIXME should not be WakeupException // Ignore. Just indicates we need to check the exit flag, for requested actions, etc. } @@ -338,17 +339,24 @@ public class DistributedHerder extends AbstractHerder implements Runnable { public void halt() { synchronized (this) { // Clean up any connectors and tasks that are still running. - log.info("Stopping connectors and tasks that are still assigned to the worker"); - worker.stopConnectors(); - worker.stopAndAwaitTasks(); + log.info("Stopping connectors and tasks that are still assigned to this worker."); + List<Callable<Void>> callables = new ArrayList<>(); + for (String connectorName : new ArrayList<>(worker.connectorNames())) { + callables.add(getConnectorStoppingCallable(connectorName)); + } + for (ConnectorTaskId taskId : new ArrayList<>(worker.taskIds())) { + callables.add(getTaskStoppingCallable(taskId)); + } + startAndStop(callables); member.stop(); - // Explicitly fail any outstanding requests so they actually get a response and get an understandable reason - // for their failure - while (!requests.isEmpty()) { - HerderRequest request = requests.poll(); + // Explicitly fail any outstanding requests so they actually get a response and get an + // understandable reason for their failure. + HerderRequest request = requests.pollFirst(); + while (request != null) { request.callback().onCompletion(new ConnectException("Worker is shutting down"), null); + request = requests.pollFirst(); } stopServices(); @@ -370,9 +378,12 @@ public class DistributedHerder extends AbstractHerder implements Runnable { } forwardRequestExecutor.shutdown(); + startAndStopExecutor.shutdown(); try { if (!forwardRequestExecutor.awaitTermination(10000, TimeUnit.MILLISECONDS)) forwardRequestExecutor.shutdownNow(); + if (!startAndStopExecutor.awaitTermination(1000, TimeUnit.MILLISECONDS)) + startAndStopExecutor.shutdownNow(); } catch (InterruptedException e) { // ignore } @@ -381,7 +392,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable { } @Override - public synchronized void connectors(final Callback<Collection<String>> callback) { + public void connectors(final Callback<Collection<String>> callback) { log.trace("Submitting connector listing request"); addRequest( @@ -400,7 +411,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable { } @Override - public synchronized void connectorInfo(final String connName, final Callback<ConnectorInfo> callback) { + public void connectorInfo(final String connName, final Callback<ConnectorInfo> callback) { log.trace("Submitting connector info request {}", connName); addRequest( @@ -523,7 +534,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable { } @Override - public synchronized void requestTaskReconfiguration(final String connName) { + public void requestTaskReconfiguration(final String connName) { log.trace("Submitting connector task reconfiguration request {}", connName); addRequest( @@ -547,7 +558,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable { } @Override - public synchronized void taskConfigs(final String connName, final Callback<List<TaskInfo>> callback) { + public void taskConfigs(final String connName, final Callback<List<TaskInfo>> callback) { log.trace("Submitting get task configuration request {}", connName); addRequest( @@ -575,7 +586,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable { } @Override - public synchronized void putTaskConfigs(final String connName, final List<Map<String, String>> configs, final Callback<Void> callback) { + public void putTaskConfigs(final String connName, final List<Map<String, String>> configs, final Callback<Void> callback) { log.trace("Submitting put task configuration request {}", connName); addRequest( @@ -598,7 +609,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable { } @Override - public synchronized void restartConnector(final String connName, final Callback<Void> callback) { + public void restartConnector(final String connName, final Callback<Void> callback) { addRequest(new Callable<Void>() { @Override public Void call() throws Exception { @@ -631,7 +642,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable { } @Override - public synchronized void restartTask(final ConnectorTaskId id, final Callback<Void> callback) { + public void restartTask(final ConnectorTaskId id, final Callback<Void> callback) { addRequest(new Callable<Void>() { @Override public Void call() throws Exception { @@ -788,15 +799,26 @@ public class DistributedHerder extends AbstractHerder implements Runnable { Utils.sleep(ms); } + private void startAndStop(Collection<Callable<Void>> callables) { + try { + startAndStopExecutor.invokeAll(callables); + } catch (InterruptedException e) { + // ignore + } + } + private void startWork() { // Start assigned connectors and tasks log.info("Starting connectors and tasks using config offset {}", assignment.offset()); + List<Callable<Void>> callables = new ArrayList<>(); for (String connectorName : assignment.connectors()) { - startConnector(connectorName); + callables.add(getConnectorStartingCallable(connectorName)); } + for (ConnectorTaskId taskId : assignment.tasks()) { - startTask(taskId); + callables.add(getTaskStartingCallable(taskId)); } + startAndStop(callables); log.info("Finished starting connectors and tasks"); } @@ -811,12 +833,38 @@ public class DistributedHerder extends AbstractHerder implements Runnable { ); } + private Callable<Void> getTaskStartingCallable(final ConnectorTaskId taskId) { + return new Callable<Void>() { + @Override + public Void call() throws Exception { + try { + startTask(taskId); + } catch (Throwable t) { + log.error("Couldn't instantiate task {} because it has an invalid task configuration. This task will not execute until reconfigured.", + taskId, t); + onFailure(taskId, t); + } + return null; + } + }; + } + + private Callable<Void> getTaskStoppingCallable(final ConnectorTaskId taskId) { + return new Callable<Void>() { + @Override + public Void call() throws Exception { + worker.stopAndAwaitTask(taskId); + return null; + } + }; + } + // Helper for starting a connector with the given name, which will extract & parse the config, generate connector // context and add to the worker. This needs to be called from within the main worker thread for this herder. private boolean startConnector(String connectorName) { log.info("Starting connector {}", connectorName); final Map<String, String> configProps = configState.connectorConfig(connectorName); - final ConnectorContext ctx = new HerderConnectorContext(DistributedHerder.this, connectorName); + final ConnectorContext ctx = new HerderConnectorContext(this, connectorName); final TargetState initialState = configState.targetState(connectorName); boolean started = worker.startConnector(connectorName, configProps, ctx, this, initialState); @@ -829,6 +877,36 @@ public class DistributedHerder extends AbstractHerder implements Runnable { return started; } + private Callable<Void> getConnectorStartingCallable(final String connectorName) { + return new Callable<Void>() { + @Override + public Void call() throws Exception { + try { + startConnector(connectorName); + } catch (Throwable t) { + log.error("Couldn't instantiate connector " + connectorName + " because it has an invalid connector " + + "configuration. This connector will not execute until reconfigured.", t); + onFailure(connectorName, t); + } + return null; + } + }; + } + + private Callable<Void> getConnectorStoppingCallable(final String connectorName) { + return new Callable<Void>() { + @Override + public Void call() throws Exception { + try { + worker.stopConnector(connectorName); + } catch (Throwable t) { + log.error("Failed to shut down connector " + connectorName, t); + } + return null; + } + }; + } + private void reconfigureConnectorTasksWithRetry(final String connName) { reconfigureConnector(connName, new Callback<Void>() { @Override @@ -941,10 +1019,20 @@ public class DistributedHerder extends AbstractHerder implements Runnable { private void addRequest(long delayMs, Callable<Void> action, Callback<Void> callback) { HerderRequest req = new HerderRequest(time.milliseconds() + delayMs, action, callback); requests.add(req); - if (requests.peek() == req) + if (peekWithoutException() == req) member.wakeup(); } + private HerderRequest peekWithoutException() { + try { + return requests.isEmpty() ? null : requests.first(); + } catch (NoSuchElementException e) { + // Ignore exception. Should be rare. Means that the collection became empty between + // checking the size and retrieving the first element. + } + return null; + } + public class ConfigUpdateListener implements ConfigBackingStore.UpdateListener { @Override public void onConnectorConfigRemove(String connector) { @@ -999,7 +1087,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable { } } - private class HerderRequest implements Comparable<HerderRequest> { + private static class HerderRequest implements Comparable<HerderRequest> { private final long at; private final Callable<Void> action; private final Callback<Void> callback; @@ -1020,7 +1108,9 @@ public class DistributedHerder extends AbstractHerder implements Runnable { @Override public int compareTo(HerderRequest o) { - return Long.compare(at, o.at); + final int soonest = Long.compare(at, o.at); + // If tied, returning a positive value should respect insertion order. + return soonest != 0 ? soonest : 1; } } @@ -1081,19 +1171,25 @@ public class DistributedHerder extends AbstractHerder implements Runnable { // it is still important to have a leader that can write configs, offsets, etc. if (rebalanceResolved) { - // TODO: Parallelize this. We should be able to request all connectors and tasks to stop, then wait on all of - // them to finish // TODO: Technically we don't have to stop connectors at all until we know they've really been removed from // this worker. Instead, we can let them continue to run but buffer any update requests (which should be // rare anyway). This would avoid a steady stream of start/stop, which probably also includes lots of // unnecessary repeated connections to the source/sink system. - worker.stopConnectors(connectors); + List<Callable<Void>> callables = new ArrayList<>(); + for (final String connectorName : connectors) { + callables.add(getConnectorStoppingCallable(connectorName)); + } // TODO: We need to at least commit task offsets, but if we could commit offsets & pause them instead of // stopping them then state could continue to be reused when the task remains on this worker. For example, // this would avoid having to close a connection and then reopen it when the task is assigned back to this // worker again. - worker.stopAndAwaitTasks(tasks); + for (final ConnectorTaskId taskId : tasks) { + callables.add(getTaskStoppingCallable(taskId)); + } + + // The actual timeout for graceful task stop is applied in worker's stopAndAwaitTask method. + startAndStop(callables); // Ensure that all status updates have been pushed to the storage system before rebalancing. // Otherwise, we may inadvertently overwrite the state with a stale value after the rebalance http://git-wip-us.apache.org/repos/asf/kafka/blob/b65f9a77/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitterTest.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitterTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitterTest.java new file mode 100644 index 0000000..45125cc --- /dev/null +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitterTest.java @@ -0,0 +1,194 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.connect.runtime; + +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.runtime.standalone.StandaloneConfig; +import org.apache.kafka.connect.util.ConnectorTaskId; +import org.apache.kafka.connect.util.ThreadedTest; +import org.easymock.Capture; +import org.easymock.EasyMock; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.powermock.api.easymock.PowerMock; +import org.powermock.api.easymock.annotation.Mock; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; +import org.powermock.reflect.Whitebox; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CancellationException; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +import static org.easymock.EasyMock.eq; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +@RunWith(PowerMockRunner.class) +@PrepareForTest({SourceTaskOffsetCommitter.class, LoggerFactory.class}) +public class SourceTaskOffsetCommitterTest extends ThreadedTest { + @Mock + private ScheduledExecutorService executor; + @Mock + private ConcurrentHashMap committers; + @Mock + private Logger mockLog; + + private SourceTaskOffsetCommitter committer; + + private static final long DEFAULT_OFFSET_COMMIT_INTERVAL_MS = 1000; + + @Override + public void setup() { + super.setup(); + Map<String, String> workerProps = new HashMap<>(); + workerProps.put("key.converter", "org.apache.kafka.connect.json.JsonConverter"); + workerProps.put("value.converter", "org.apache.kafka.connect.json.JsonConverter"); + workerProps.put("internal.key.converter", "org.apache.kafka.connect.json.JsonConverter"); + workerProps.put("internal.value.converter", "org.apache.kafka.connect.json.JsonConverter"); + workerProps.put("internal.key.converter.schemas.enable", "false"); + workerProps.put("internal.value.converter.schemas.enable", "false"); + workerProps.put("offset.storage.file.filename", "/tmp/connect.offsets"); + workerProps.put("offset.flush.interval.ms", + Long.toString(DEFAULT_OFFSET_COMMIT_INTERVAL_MS)); + WorkerConfig config = new StandaloneConfig(workerProps); + committer = new SourceTaskOffsetCommitter(config, executor, committers); + Whitebox.setInternalState(SourceTaskOffsetCommitter.class, "log", mockLog); + } + + @Test + public void testSchedule() throws Exception { + Capture<Runnable> taskWrapper = EasyMock.newCapture(); + + ScheduledFuture commitFuture = PowerMock.createMock(ScheduledFuture.class); + EasyMock.expect(executor.scheduleWithFixedDelay( + EasyMock.capture(taskWrapper), eq(DEFAULT_OFFSET_COMMIT_INTERVAL_MS), + eq(DEFAULT_OFFSET_COMMIT_INTERVAL_MS), eq(TimeUnit.MILLISECONDS)) + ).andReturn(commitFuture); + + ConnectorTaskId taskId = PowerMock.createMock(ConnectorTaskId.class); + WorkerSourceTask task = PowerMock.createMock(WorkerSourceTask.class); + + EasyMock.expect(committers.put(taskId, commitFuture)).andReturn(null); + + PowerMock.replayAll(); + + committer.schedule(taskId, task); + assertTrue(taskWrapper.hasCaptured()); + assertNotNull(taskWrapper.getValue()); + + PowerMock.verifyAll(); + } + + @Test + public void testClose() throws Exception { + long timeoutMs = 1000; + + // Normal termination, where termination times out. + executor.shutdown(); + PowerMock.expectLastCall(); + + EasyMock.expect(executor.awaitTermination(eq(timeoutMs), eq(TimeUnit.MILLISECONDS))) + .andReturn(false); + mockLog.error(EasyMock.anyString()); + PowerMock.expectLastCall(); + PowerMock.replayAll(); + + committer.close(timeoutMs); + + PowerMock.verifyAll(); + PowerMock.resetAll(); + + // Termination interrupted + executor.shutdown(); + PowerMock.expectLastCall(); + + EasyMock.expect(executor.awaitTermination(eq(timeoutMs), eq(TimeUnit.MILLISECONDS))) + .andThrow(new InterruptedException()); + PowerMock.replayAll(); + + committer.close(timeoutMs); + + PowerMock.verifyAll(); + } + + @Test + public void testRemove() throws Exception { + ConnectorTaskId taskId = PowerMock.createMock(ConnectorTaskId.class); + ScheduledFuture task = PowerMock.createMock(ScheduledFuture.class); + + // Try to remove a non-existing task + EasyMock.expect(committers.remove(taskId)).andReturn(null); + PowerMock.replayAll(); + + committer.remove(taskId); + + PowerMock.verifyAll(); + PowerMock.resetAll(); + + // Try to remove an existing task + EasyMock.expect(committers.remove(taskId)).andReturn(task); + EasyMock.expect(task.cancel(eq(false))).andReturn(false); + EasyMock.expect(task.isDone()).andReturn(false); + EasyMock.expect(task.get()).andReturn(null); + PowerMock.replayAll(); + + committer.remove(taskId); + + PowerMock.verifyAll(); + PowerMock.resetAll(); + + // Try to remove a cancelled task + EasyMock.expect(committers.remove(taskId)).andReturn(task); + EasyMock.expect(task.cancel(eq(false))).andReturn(false); + EasyMock.expect(task.isDone()).andReturn(false); + EasyMock.expect(task.get()).andThrow(new CancellationException()); + mockLog.trace(EasyMock.anyString(), EasyMock.anyObject()); + PowerMock.expectLastCall(); + PowerMock.replayAll(); + + committer.remove(taskId); + + PowerMock.verifyAll(); + PowerMock.resetAll(); + + // Try to remove an interrupted task + EasyMock.expect(committers.remove(taskId)).andReturn(task); + EasyMock.expect(task.cancel(eq(false))).andReturn(false); + EasyMock.expect(task.isDone()).andReturn(false); + EasyMock.expect(task.get()).andThrow(new InterruptedException()); + PowerMock.replayAll(); + + try { + committer.remove(taskId); + fail("Expected ConnectException to be raised"); + } catch (ConnectException e) { + //ignore + } + + PowerMock.verifyAll(); + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/b65f9a77/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java index 97e29be..eac0520 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java @@ -425,20 +425,6 @@ public class WorkerTest extends ThreadedTest { EasyMock.expectLastCall(); assertEquals(Collections.emptySet(), worker.taskIds()); - - assertFalse(worker.stopAndAwaitTask(TASK_ID)); - } - - @Test - public void testStopInvalidTask() { - expectStartStorage(); - - PowerMock.replayAll(); - - worker = new Worker(WORKER_ID, new MockTime(), connectorFactory, config, offsetBackingStore); - worker.start(); - - assertFalse(worker.stopAndAwaitTask(TASK_ID)); } @Test http://git-wip-us.apache.org/repos/asf/kafka/blob/b65f9a77/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java index 1da4595..5be2044 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java @@ -41,7 +41,6 @@ import org.apache.kafka.connect.sink.SinkConnector; import org.apache.kafka.connect.source.SourceConnector; import org.apache.kafka.connect.source.SourceTask; import org.apache.kafka.connect.storage.ConfigBackingStore; -import org.apache.kafka.connect.storage.KafkaConfigBackingStore; import org.apache.kafka.connect.storage.StatusBackingStore; import org.apache.kafka.connect.util.Callback; import org.apache.kafka.connect.util.ConnectorTaskId; @@ -145,7 +144,7 @@ public class DistributedHerderTest { private static final String WORKER_ID = "localhost:8083"; - @Mock private KafkaConfigBackingStore configStorage; + @Mock private ConfigBackingStore configBackingStore; @Mock private StatusBackingStore statusBackingStore; @Mock private WorkerGroupMember member; private MockTime time; @@ -163,7 +162,7 @@ public class DistributedHerderTest { time = new MockTime(); herder = PowerMock.createPartialMock(DistributedHerder.class, new String[]{"backoff", "updateDeletedConnectorStatus"}, - new DistributedConfig(HERDER_CONFIG), worker, WORKER_ID, statusBackingStore, configStorage, member, MEMBER_URL, time); + new DistributedConfig(HERDER_CONFIG), worker, WORKER_ID, statusBackingStore, configBackingStore, member, MEMBER_URL, time); configUpdateListener = herder.new ConfigUpdateListener(); rebalanceListener = herder.new RebalanceListener(); @@ -274,13 +273,15 @@ public class DistributedHerderTest { @Test public void testHaltCleansUpWorker() { - worker.stopConnectors(); - PowerMock.expectLastCall(); - worker.stopAndAwaitTasks(); + EasyMock.expect(worker.connectorNames()).andReturn(Collections.singleton(CONN1)); + worker.stopConnector(CONN1); + PowerMock.expectLastCall().andReturn(true); + EasyMock.expect(worker.taskIds()).andReturn(Collections.singleton(TASK1)); + worker.stopAndAwaitTask(TASK1); PowerMock.expectLastCall(); member.stop(); PowerMock.expectLastCall(); - configStorage.stop(); + configBackingStore.stop(); PowerMock.expectLastCall(); statusBackingStore.stop(); PowerMock.expectLastCall(); @@ -312,7 +313,7 @@ public class DistributedHerderTest { EasyMock.expect(connectorMock.validate(CONN2_CONFIG)).andReturn(new Config(Collections.<ConfigValue>emptyList())); // CONN2 is new, should succeed - configStorage.putConnectorConfig(CONN2, CONN2_CONFIG); + configBackingStore.putConnectorConfig(CONN2, CONN2_CONFIG); PowerMock.expectLastCall(); ConnectorInfo info = new ConnectorInfo(CONN2, CONN2_CONFIG, Collections.<ConnectorTaskId>emptyList()); putConnectorCallback.onCompletion(null, new Herder.Created<>(true, info)); @@ -494,7 +495,7 @@ public class DistributedHerderTest { // And delete the connector member.wakeup(); PowerMock.expectLastCall(); - configStorage.removeConnectorConfig(CONN1); + configBackingStore.removeConnectorConfig(CONN1); PowerMock.expectLastCall(); putConnectorCallback.onCompletion(null, new Herder.Created<ConnectorInfo>(false, null)); PowerMock.expectLastCall(); @@ -681,7 +682,7 @@ public class DistributedHerderTest { PowerMock.expectLastCall(); worker.stopAndAwaitTask(TASK0); - PowerMock.expectLastCall().andReturn(true); + PowerMock.expectLastCall(); worker.startTask(EasyMock.eq(TASK0), EasyMock.<Map<String, String>>anyObject(), EasyMock.<Map<String, String>>anyObject(), EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED)); PowerMock.expectLastCall().andReturn(true); @@ -817,7 +818,7 @@ public class DistributedHerderTest { member.ensureActive(); PowerMock.expectLastCall(); // Checks for config updates and starts rebalance - EasyMock.expect(configStorage.snapshot()).andReturn(SNAPSHOT); + EasyMock.expect(configBackingStore.snapshot()).andReturn(SNAPSHOT); member.requestRejoin(); PowerMock.expectLastCall(); // Performs rebalance and gets new assignment @@ -863,7 +864,7 @@ public class DistributedHerderTest { member.wakeup(); member.ensureActive(); PowerMock.expectLastCall(); - EasyMock.expect(configStorage.snapshot()).andReturn(SNAPSHOT); // for this test, it doesn't matter if we use the same config snapshot + EasyMock.expect(configBackingStore.snapshot()).andReturn(SNAPSHOT); // for this test, it doesn't matter if we use the same config snapshot worker.stopConnector(CONN1); PowerMock.expectLastCall().andReturn(true); worker.startConnector(EasyMock.eq(CONN1), EasyMock.<Map<String, String>>anyObject(), EasyMock.<ConnectorContext>anyObject(), @@ -906,7 +907,7 @@ public class DistributedHerderTest { member.ensureActive(); PowerMock.expectLastCall(); - EasyMock.expect(configStorage.snapshot()).andReturn(SNAPSHOT_PAUSED_CONN1); + EasyMock.expect(configBackingStore.snapshot()).andReturn(SNAPSHOT_PAUSED_CONN1); PowerMock.expectLastCall(); worker.setTargetState(CONN1, TargetState.PAUSED); @@ -944,7 +945,7 @@ public class DistributedHerderTest { member.ensureActive(); PowerMock.expectLastCall(); - EasyMock.expect(configStorage.snapshot()).andReturn(SNAPSHOT); + EasyMock.expect(configBackingStore.snapshot()).andReturn(SNAPSHOT); PowerMock.expectLastCall(); // we expect reconfiguration after resuming @@ -985,7 +986,7 @@ public class DistributedHerderTest { member.ensureActive(); PowerMock.expectLastCall(); - EasyMock.expect(configStorage.snapshot()).andReturn(SNAPSHOT); + EasyMock.expect(configBackingStore.snapshot()).andReturn(SNAPSHOT); PowerMock.expectLastCall(); member.poll(EasyMock.anyInt()); @@ -1022,7 +1023,7 @@ public class DistributedHerderTest { member.ensureActive(); PowerMock.expectLastCall(); - EasyMock.expect(configStorage.snapshot()).andReturn(SNAPSHOT_PAUSED_CONN1); + EasyMock.expect(configBackingStore.snapshot()).andReturn(SNAPSHOT_PAUSED_CONN1); PowerMock.expectLastCall(); worker.setTargetState(CONN1, TargetState.PAUSED); @@ -1062,7 +1063,7 @@ public class DistributedHerderTest { member.ensureActive(); PowerMock.expectLastCall(); - EasyMock.expect(configStorage.snapshot()).andReturn(SNAPSHOT); + EasyMock.expect(configBackingStore.snapshot()).andReturn(SNAPSHOT); PowerMock.expectLastCall(); worker.setTargetState(CONN1, TargetState.STARTED); @@ -1097,7 +1098,7 @@ public class DistributedHerderTest { member.ensureActive(); PowerMock.expectLastCall(); // Checks for config updates and starts rebalance - EasyMock.expect(configStorage.snapshot()).andReturn(SNAPSHOT); + EasyMock.expect(configBackingStore.snapshot()).andReturn(SNAPSHOT); member.requestRejoin(); PowerMock.expectLastCall(); // Performs rebalance and gets new assignment @@ -1128,7 +1129,7 @@ public class DistributedHerderTest { ConnectProtocol.Assignment.CONFIG_MISMATCH, 1, Collections.<String>emptyList(), Collections.<ConnectorTaskId>emptyList()); // Reading to end of log times out - configStorage.refresh(EasyMock.anyLong(), EasyMock.anyObject(TimeUnit.class)); + configBackingStore.refresh(EasyMock.anyLong(), EasyMock.anyObject(TimeUnit.class)); EasyMock.expectLastCall().andThrow(new TimeoutException()); member.maybeLeaveGroup(); EasyMock.expectLastCall(); @@ -1230,7 +1231,7 @@ public class DistributedHerderTest { EasyMock.expect(connectorMock.config()).andReturn(new ConfigDef()); EasyMock.expect(connectorMock.validate(CONN1_CONFIG_UPDATED)).andReturn(new Config(Collections.<ConfigValue>emptyList())); - configStorage.putConnectorConfig(CONN1, CONN1_CONFIG_UPDATED); + configBackingStore.putConnectorConfig(CONN1, CONN1_CONFIG_UPDATED); PowerMock.expectLastCall().andAnswer(new IAnswer<Object>() { @Override public Object answer() throws Throwable { @@ -1241,7 +1242,7 @@ public class DistributedHerderTest { }); // As a result of reconfig, should need to update snapshot. With only connector updates, we'll just restart // connector without rebalance - EasyMock.expect(configStorage.snapshot()).andReturn(SNAPSHOT_UPDATED_CONN1_CONFIG); + EasyMock.expect(configBackingStore.snapshot()).andReturn(SNAPSHOT_UPDATED_CONN1_CONFIG); worker.stopConnector(CONN1); PowerMock.expectLastCall().andReturn(true); worker.startConnector(EasyMock.eq(CONN1), EasyMock.<Map<String, String>>anyObject(), EasyMock.<ConnectorContext>anyObject(), @@ -1318,13 +1319,15 @@ public class DistributedHerderTest { }); if (revokedConnectors != null) { - worker.stopConnectors(revokedConnectors); - PowerMock.expectLastCall().andReturn(revokedConnectors); + for (String connector : revokedConnectors) { + worker.stopConnector(connector); + PowerMock.expectLastCall().andReturn(true); + } } - if (revokedTasks != null) { - worker.stopAndAwaitTasks(revokedTasks); - PowerMock.expectLastCall().andReturn(revokedTasks); + if (revokedTasks != null && !revokedTasks.isEmpty()) { + worker.stopAndAwaitTask(EasyMock.anyObject(ConnectorTaskId.class)); + PowerMock.expectLastCall(); } if (revokedConnectors != null) { @@ -1337,9 +1340,9 @@ public class DistributedHerderTest { } private void expectPostRebalanceCatchup(final ClusterConfigState readToEndSnapshot) throws TimeoutException { - configStorage.refresh(EasyMock.anyLong(), EasyMock.anyObject(TimeUnit.class)); + configBackingStore.refresh(EasyMock.anyLong(), EasyMock.anyObject(TimeUnit.class)); EasyMock.expectLastCall(); - EasyMock.expect(configStorage.snapshot()).andReturn(readToEndSnapshot); + EasyMock.expect(configBackingStore.snapshot()).andReturn(readToEndSnapshot); } http://git-wip-us.apache.org/repos/asf/kafka/blob/b65f9a77/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java index 0bc3d5c..010c0b2 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java @@ -66,7 +66,9 @@ import java.util.Map; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import static java.util.Collections.singleton; import static java.util.Collections.singletonList; +import static java.util.Collections.singletonMap; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -296,7 +298,7 @@ public class StandaloneHerderTest { expectConfigValidation(connectorConfig); worker.stopAndAwaitTask(taskId); - EasyMock.expectLastCall().andReturn(true); + EasyMock.expectLastCall(); worker.startTask(taskId, connectorConfig, taskConfig(SourceSink.SOURCE), herder, TargetState.STARTED); EasyMock.expectLastCall().andReturn(true); @@ -321,7 +323,7 @@ public class StandaloneHerderTest { expectConfigValidation(connectorConfig); worker.stopAndAwaitTask(taskId); - EasyMock.expectLastCall().andReturn(true); + EasyMock.expectLastCall(); worker.startTask(taskId, connectorConfig, taskConfig(SourceSink.SOURCE), herder, TargetState.STARTED); EasyMock.expectLastCall().andReturn(false); @@ -391,7 +393,7 @@ public class StandaloneHerderTest { expectConfigValidation(connConfig); // Validate accessors with 1 connector - listConnectorsCb.onCompletion(null, Collections.singleton(CONNECTOR_NAME)); + listConnectorsCb.onCompletion(null, singleton(CONNECTOR_NAME)); EasyMock.expectLastCall(); ConnectorInfo connInfo = new ConnectorInfo(CONNECTOR_NAME, connConfig, Arrays.asList(new ConnectorTaskId(CONNECTOR_NAME, 0))); connectorInfoCb.onCompletion(null, connInfo); @@ -477,7 +479,7 @@ public class StandaloneHerderTest { PowerMock.replayAll(); herder.putTaskConfigs(CONNECTOR_NAME, - Arrays.asList(Collections.singletonMap("config", "value")), + Arrays.asList(singletonMap("config", "value")), cb); PowerMock.verifyAll(); @@ -513,7 +515,7 @@ public class StandaloneHerderTest { private void expectStop() { ConnectorTaskId task = new ConnectorTaskId(CONNECTOR_NAME, 0); worker.stopAndAwaitTasks(singletonList(task)); - EasyMock.expectLastCall().andReturn(Collections.singleton(task)); + EasyMock.expectLastCall(); worker.stopConnector(CONNECTOR_NAME); EasyMock.expectLastCall().andReturn(true); }