Repository: aurora
Updated Branches:
  refs/heads/master f1e09a9c7 -> 496397aa5


Batching writes - Part 1 (of 3): Introducing BatchWorker and task event 
batching.

Reviewed at https://reviews.apache.org/r/51759/


Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/ebfeb3e6
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/ebfeb3e6
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/ebfeb3e6

Branch: refs/heads/master
Commit: ebfeb3e602faa9281ff7ff50f42bd21885518953
Parents: f1e09a9
Author: Maxim Khutornenko <ma...@apache.org>
Authored: Fri Sep 16 14:17:04 2016 -0700
Committer: Maxim Khutornenko <ma...@apache.org>
Committed: Fri Sep 16 14:17:04 2016 -0700

----------------------------------------------------------------------
 .../apache/aurora/scheduler/BatchWorker.java    | 254 +++++++++++++++++++
 .../aurora/scheduler/SchedulerModule.java       |  25 ++
 .../scheduler/pruning/TaskHistoryPruner.java    |  14 +-
 .../scheduler/scheduling/TaskThrottler.java     |  28 +-
 .../scheduler/state/MaintenanceController.java  |  14 +-
 .../updater/JobUpdateControllerImpl.java        |  10 +-
 .../aurora/scheduler/BatchWorkerTest.java       |  96 +++++++
 .../pruning/TaskHistoryPrunerTest.java          |  10 +-
 .../scheduler/scheduling/TaskThrottlerTest.java |   9 +-
 .../state/MaintenanceControllerImplTest.java    |   5 +
 .../scheduler/testing/BatchWorkerUtil.java      |  59 +++++
 .../aurora/scheduler/updater/JobUpdaterIT.java  |   7 +-
 12 files changed, 505 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/aurora/blob/ebfeb3e6/src/main/java/org/apache/aurora/scheduler/BatchWorker.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/BatchWorker.java 
b/src/main/java/org/apache/aurora/scheduler/BatchWorker.java
new file mode 100644
index 0000000..e05d4b4
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/BatchWorker.java
@@ -0,0 +1,254 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.inject.Inject;
+
+import com.google.common.util.concurrent.AbstractExecutionThreadService;
+
+import org.apache.aurora.common.stats.SlidingStats;
+import org.apache.aurora.common.stats.StatsProvider;
+import org.apache.aurora.common.util.BackoffStrategy;
+import org.apache.aurora.scheduler.base.AsyncUtil;
+import org.apache.aurora.scheduler.storage.Storage;
+import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static java.util.Objects.requireNonNull;
+
+import static com.google.common.base.Preconditions.checkState;
+
+/**
+ * Generic helper that allows bundling multiple work items into a single 
{@link Storage}
+ * transaction aiming to reduce the write lock contention.
+ *
+ * @param <T> Expected result type.
+ */
+public class BatchWorker<T> extends AbstractExecutionThreadService {
+  /**
+   * Empty result placeholder.
+   */
+  public interface NoResult { }
+
+  /**
+   * Convenience wrapper for a non-repeatable no value work {@link Result}.
+   */
+  public static final NoResult NO_RESULT = new NoResult() { };
+
+  private static final Logger LOG = LoggerFactory.getLogger(BatchWorker.class);
+  private final Storage storage;
+  private final int maxBatchSize;
+  private final SlidingStats batchUnlocked;
+  private final SlidingStats batchLocked;
+  private final BlockingQueue<WorkItem<T>> workQueue = new 
LinkedBlockingQueue<>();
+  private final ScheduledExecutorService scheduledExecutor;
+  private final AtomicInteger lastBatchSize = new AtomicInteger(0);
+  private final AtomicLong itemsProcessed;
+  private final AtomicLong batchesProcessed;
+
+  /**
+   * Wraps result returned by the {@link RepeatableWork} item.
+   *
+   * @param <T> Expected result type.
+   */
+  public static class Result<T> {
+    private final boolean isCompleted;
+    private final T value;
+
+    /**
+     * Initializes a {@link Result} instance with {@code isCompleted} and 
{@code value}.
+     * <p>
+     * The {@code isCompleted} may be set to {@code False} for a {@link 
RepeatableWork} that has
+     * not finished yet. Otherwise, it must be set to {@code True}.
+     *
+     * @param isCompleted Flag indicating if the {@link RepeatableWork} has 
completed.
+     * @param value result value.
+     */
+    public Result(boolean isCompleted, T value) {
+      this.isCompleted = isCompleted;
+      this.value = value;
+    }
+  }
+
+  /**
+   * Encapsulates a potentially repeatable operation.
+   */
+  public interface RepeatableWork<T> {
+    /**
+     * Abstracts a unit of repeatable (i.e.: "repeat until completed") work.
+     * <p>
+     * The work unit may be repeated as instructed by the {@link Result}.
+     *
+     * @param storeProvider {@link MutableStoreProvider} instance.
+     * @return {@link Result}
+     */
+    Result<T> apply(MutableStoreProvider storeProvider);
+  }
+
+  /**
+   * Encapsulates a non-repeatable operation.
+   */
+  public interface Work<T> extends RepeatableWork<T> {
+    @Override
+    default Result<T> apply(MutableStoreProvider storeProvider) {
+      T value = execute(storeProvider);
+      return new Result<>(true, value);
+    }
+
+    /**
+     * Abstracts a unit of non-repeatable (i.e.: "run exactly once") work.
+     *
+     * @param storeProvider {@link MutableStoreProvider} instance.
+     * @return result value.
+     */
+    T execute(MutableStoreProvider storeProvider);
+  }
+
+  @Inject
+  protected BatchWorker(Storage storage, StatsProvider statsProvider, int 
maxBatchSize) {
+    this.storage = requireNonNull(storage);
+    this.maxBatchSize = maxBatchSize;
+
+    scheduledExecutor = 
AsyncUtil.singleThreadLoggingScheduledExecutor(serviceName() + "-%d", LOG);
+    statsProvider.makeGauge(serviceName() + "_queue_size", () -> 
workQueue.size());
+    statsProvider.makeGauge(
+        serviceName() + "_last_processed_batch_size",
+        () -> lastBatchSize.intValue());
+    batchUnlocked = new SlidingStats(serviceName() + "_batch_unlocked", 
"nanos");
+    batchLocked = new SlidingStats(serviceName() + "_batch_locked", "nanos");
+    itemsProcessed = statsProvider.makeCounter(serviceName() + 
"_items_processed");
+    batchesProcessed = statsProvider.makeCounter(serviceName() + 
"_batches_processed");
+  }
+
+  /**
+   * Executes a non-repeatable {@link Work} and returns {@link 
CompletableFuture} to wait on.
+   *
+   * @param work A non-repeatable {@link Work} to execute.
+   * @return {@link CompletableFuture} to wait on.
+   */
+  public CompletableFuture<T> execute(Work<T> work) {
+    CompletableFuture<T> result = new CompletableFuture<>();
+    workQueue.add(new WorkItem<>(
+        work,
+        result,
+        Optional.empty(),
+        Optional.empty()));
+
+    return result;
+  }
+
+  /**
+   * Executes a {@link RepeatableWork} until it completes and returns {@link 
CompletableFuture}
+   * to wait on.
+   *
+   * @param backoffStrategy A {@link BackoffStrategy} instance to backoff 
subsequent runs.
+   * @param work A {@link RepeatableWork} to execute.
+   */
+  public CompletableFuture<T> executeWithReplay(
+      BackoffStrategy backoffStrategy,
+      RepeatableWork<T> work) {
+
+    CompletableFuture<T> result = new CompletableFuture<>();
+    workQueue.add(new WorkItem<>(
+        work,
+        result,
+        Optional.of(backoffStrategy),
+        Optional.of(0L)));
+
+    return result;
+  }
+
+  @Override
+  protected void run() throws Exception {
+    while (isRunning()) {
+      List<WorkItem<T>> batch = new LinkedList<>();
+      batch.add(workQueue.take());
+      workQueue.drainTo(batch, maxBatchSize - batch.size());
+      processBatch(batch);
+    }
+  }
+
+  private void processBatch(List<WorkItem<T>> batch) {
+    if (!batch.isEmpty()) {
+      long unlockedStart = System.nanoTime();
+      storage.write((Storage.MutateWork.NoResult.Quiet) storeProvider -> {
+        long lockedStart = System.nanoTime();
+        for (WorkItem<T> item : batch) {
+          try {
+            Result<T> itemResult = item.work.apply(storeProvider);
+            if (itemResult.isCompleted) {
+              item.result.complete(itemResult.value);
+            } else {
+              // Work not finished yet - re-queue for a followup later.
+              long backoffMsec = backoffFor(item);
+              scheduledExecutor.schedule(
+                  () -> workQueue.add(new WorkItem<>(
+                      item.work,
+                      item.result,
+                      item.backoffStrategy,
+                      Optional.of(backoffMsec))),
+                  backoffMsec,
+                  TimeUnit.MILLISECONDS);
+            }
+          } catch (RuntimeException e) {
+            LOG.error("{}: Failed to process batch item. Error: {}", 
serviceName(), e);
+            item.result.completeExceptionally(e);
+          }
+        }
+        batchLocked.accumulate(System.nanoTime() - lockedStart);
+      });
+      batchUnlocked.accumulate(System.nanoTime() - unlockedStart);
+      batchesProcessed.incrementAndGet();
+      lastBatchSize.set(batch.size());
+      itemsProcessed.addAndGet(batch.size());
+    }
+  }
+
+  private long backoffFor(WorkItem<T> item) {
+    checkState(item.backoffStrategy.isPresent());
+    checkState(item.lastBackoffMsec.isPresent());
+    return 
item.backoffStrategy.get().calculateBackoffMs(item.lastBackoffMsec.get());
+  }
+
+  private class WorkItem<V> {
+    private final RepeatableWork<V> work;
+    private final CompletableFuture<T> result;
+    private final Optional<BackoffStrategy> backoffStrategy;
+    private final Optional<Long> lastBackoffMsec;
+
+    WorkItem(
+        RepeatableWork<V> work,
+        CompletableFuture<T> result,
+        Optional<BackoffStrategy> backoffStrategy,
+        Optional<Long> lastBackoffMsec) {
+
+      this.work = work;
+      this.result = result;
+      this.backoffStrategy = backoffStrategy;
+      this.lastBackoffMsec = lastBackoffMsec;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/ebfeb3e6/src/main/java/org/apache/aurora/scheduler/SchedulerModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/SchedulerModule.java 
b/src/main/java/org/apache/aurora/scheduler/SchedulerModule.java
index 4a7ef0b..2ec3967 100644
--- a/src/main/java/org/apache/aurora/scheduler/SchedulerModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/SchedulerModule.java
@@ -16,6 +16,8 @@ package org.apache.aurora.scheduler;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ScheduledExecutorService;
+
+import javax.inject.Inject;
 import javax.inject.Singleton;
 
 import com.google.inject.AbstractModule;
@@ -27,10 +29,13 @@ import org.apache.aurora.common.args.CmdLine;
 import org.apache.aurora.common.args.constraints.Positive;
 import org.apache.aurora.common.quantity.Amount;
 import org.apache.aurora.common.quantity.Time;
+import org.apache.aurora.common.stats.StatsProvider;
+import org.apache.aurora.scheduler.BatchWorker.NoResult;
 import org.apache.aurora.scheduler.SchedulerLifecycle.LeadingOptions;
 import org.apache.aurora.scheduler.TaskIdGenerator.TaskIdGeneratorImpl;
 import org.apache.aurora.scheduler.base.AsyncUtil;
 import org.apache.aurora.scheduler.events.PubsubEventModule;
+import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.mesos.Protos;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -59,6 +64,11 @@ public class SchedulerModule extends AbstractModule {
       help = "The maximum number of status updates that can be processed in a 
batch.")
   private static final Arg<Integer> MAX_STATUS_UPDATE_BATCH_SIZE = 
Arg.create(1000);
 
+  @Positive
+  @CmdLine(name = "max_task_event_batch_size",
+      help = "The maximum number of task state change events that can be 
processed in a batch.")
+  private static final Arg<Integer> MAX_TASK_EVENT_BATCH_SIZE = 
Arg.create(300);
+
   @Override
   protected void configure() {
     bind(TaskIdGenerator.class).to(TaskIdGeneratorImpl.class);
@@ -93,6 +103,21 @@ public class SchedulerModule extends AbstractModule {
     bind(TaskStatusHandler.class).to(TaskStatusHandlerImpl.class);
     bind(TaskStatusHandlerImpl.class).in(Singleton.class);
     addSchedulerActiveServiceBinding(binder()).to(TaskStatusHandlerImpl.class);
+
+    bind(TaskEventBatchWorker.class).in(Singleton.class);
+    addSchedulerActiveServiceBinding(binder()).to(TaskEventBatchWorker.class);
   }
 
+  public static class TaskEventBatchWorker extends BatchWorker<NoResult> {
+    @Inject
+    TaskEventBatchWorker(Storage storage, StatsProvider statsProvider) {
+
+      super(storage, statsProvider, MAX_TASK_EVENT_BATCH_SIZE.get());
+    }
+
+    @Override
+    protected String serviceName() {
+      return "TaskEventBatchWorker";
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/ebfeb3e6/src/main/java/org/apache/aurora/scheduler/pruning/TaskHistoryPruner.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/aurora/scheduler/pruning/TaskHistoryPruner.java 
b/src/main/java/org/apache/aurora/scheduler/pruning/TaskHistoryPruner.java
index f07746c..c672826 100644
--- a/src/main/java/org/apache/aurora/scheduler/pruning/TaskHistoryPruner.java
+++ b/src/main/java/org/apache/aurora/scheduler/pruning/TaskHistoryPruner.java
@@ -29,13 +29,14 @@ import org.apache.aurora.common.quantity.Amount;
 import org.apache.aurora.common.quantity.Time;
 import org.apache.aurora.common.util.Clock;
 import org.apache.aurora.gen.apiConstants;
+import org.apache.aurora.scheduler.BatchWorker;
+import org.apache.aurora.scheduler.SchedulerModule.TaskEventBatchWorker;
 import org.apache.aurora.scheduler.async.AsyncModule.AsyncExecutor;
 import org.apache.aurora.scheduler.async.DelayExecutor;
 import org.apache.aurora.scheduler.base.Query;
 import org.apache.aurora.scheduler.base.Tasks;
 import org.apache.aurora.scheduler.state.StateManager;
 import org.apache.aurora.scheduler.storage.Storage;
-import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult;
 import org.apache.aurora.scheduler.storage.entities.IJobKey;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
 import org.slf4j.Logger;
@@ -62,6 +63,7 @@ public class TaskHistoryPruner implements EventSubscriber {
   private final HistoryPrunnerSettings settings;
   private final Storage storage;
   private final Lifecycle lifecycle;
+  private final TaskEventBatchWorker batchWorker;
 
   private final Predicate<IScheduledTask> safeToDelete = new 
Predicate<IScheduledTask>() {
     @Override
@@ -94,7 +96,8 @@ public class TaskHistoryPruner implements EventSubscriber {
       Clock clock,
       HistoryPrunnerSettings settings,
       Storage storage,
-      Lifecycle lifecycle) {
+      Lifecycle lifecycle,
+      TaskEventBatchWorker batchWorker) {
 
     this.executor = requireNonNull(executor);
     this.stateManager = requireNonNull(stateManager);
@@ -102,6 +105,7 @@ public class TaskHistoryPruner implements EventSubscriber {
     this.settings = requireNonNull(settings);
     this.storage = requireNonNull(storage);
     this.lifecycle = requireNonNull(lifecycle);
+    this.batchWorker = requireNonNull(batchWorker);
   }
 
   @VisibleForTesting
@@ -131,8 +135,10 @@ public class TaskHistoryPruner implements EventSubscriber {
 
   private void deleteTasks(final Set<String> taskIds) {
     LOG.info("Pruning inactive tasks " + taskIds);
-    storage.write(
-        (NoResult.Quiet) storeProvider -> 
stateManager.deleteTasks(storeProvider, taskIds));
+    batchWorker.execute(storeProvider -> {
+      stateManager.deleteTasks(storeProvider, taskIds);
+      return BatchWorker.NO_RESULT;
+    });
   }
 
   @VisibleForTesting

http://git-wip-us.apache.org/repos/asf/aurora/blob/ebfeb3e6/src/main/java/org/apache/aurora/scheduler/scheduling/TaskThrottler.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/aurora/scheduler/scheduling/TaskThrottler.java 
b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskThrottler.java
index bbd971a..867c9bd 100644
--- a/src/main/java/org/apache/aurora/scheduler/scheduling/TaskThrottler.java
+++ b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskThrottler.java
@@ -22,13 +22,14 @@ import org.apache.aurora.common.quantity.Amount;
 import org.apache.aurora.common.quantity.Time;
 import org.apache.aurora.common.stats.SlidingStats;
 import org.apache.aurora.common.util.Clock;
+import org.apache.aurora.scheduler.BatchWorker;
+import org.apache.aurora.scheduler.SchedulerModule.TaskEventBatchWorker;
 import org.apache.aurora.scheduler.async.AsyncModule.AsyncExecutor;
 import org.apache.aurora.scheduler.async.DelayExecutor;
 import org.apache.aurora.scheduler.base.Tasks;
 import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber;
 import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
 import org.apache.aurora.scheduler.state.StateManager;
-import org.apache.aurora.scheduler.storage.Storage;
 
 import static java.util.Objects.requireNonNull;
 
@@ -46,8 +47,8 @@ class TaskThrottler implements EventSubscriber {
   private final RescheduleCalculator rescheduleCalculator;
   private final Clock clock;
   private final DelayExecutor executor;
-  private final Storage storage;
   private final StateManager stateManager;
+  private final TaskEventBatchWorker batchWorker;
 
   private final SlidingStats throttleStats = new SlidingStats("task_throttle", 
"ms");
 
@@ -56,14 +57,14 @@ class TaskThrottler implements EventSubscriber {
       RescheduleCalculator rescheduleCalculator,
       Clock clock,
       @AsyncExecutor DelayExecutor executor,
-      Storage storage,
-      StateManager stateManager) {
+      StateManager stateManager,
+      TaskEventBatchWorker batchWorker) {
 
     this.rescheduleCalculator = requireNonNull(rescheduleCalculator);
     this.clock = requireNonNull(clock);
     this.executor = requireNonNull(executor);
-    this.storage = requireNonNull(storage);
     this.stateManager = requireNonNull(stateManager);
+    this.batchWorker = requireNonNull(batchWorker);
   }
 
   @Subscribe
@@ -73,13 +74,16 @@ class TaskThrottler implements EventSubscriber {
           + rescheduleCalculator.getFlappingPenaltyMs(stateChange.getTask());
       long delayMs = Math.max(0, readyAtMs - clock.nowMillis());
       throttleStats.accumulate(delayMs);
-      executor.execute(
-          () -> storage.write(storeProvider -> stateManager.changeState(
-              storeProvider,
-              stateChange.getTaskId(),
-              Optional.of(THROTTLED),
-              PENDING,
-              Optional.absent())),
+      executor.execute(() ->
+              batchWorker.execute(storeProvider -> {
+                stateManager.changeState(
+                    storeProvider,
+                    stateChange.getTaskId(),
+                    Optional.of(THROTTLED),
+                    PENDING,
+                    Optional.absent());
+                return BatchWorker.NO_RESULT;
+              }),
           Amount.of(delayMs, Time.MILLISECONDS));
     }
   }

http://git-wip-us.apache.org/repos/asf/aurora/blob/ebfeb3e6/src/main/java/org/apache/aurora/scheduler/state/MaintenanceController.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/aurora/scheduler/state/MaintenanceController.java 
b/src/main/java/org/apache/aurora/scheduler/state/MaintenanceController.java
index 3c7cda0..574efc9 100644
--- a/src/main/java/org/apache/aurora/scheduler/state/MaintenanceController.java
+++ b/src/main/java/org/apache/aurora/scheduler/state/MaintenanceController.java
@@ -30,6 +30,8 @@ import com.google.common.eventbus.Subscribe;
 import org.apache.aurora.gen.HostStatus;
 import org.apache.aurora.gen.MaintenanceMode;
 import org.apache.aurora.gen.ScheduleStatus;
+import org.apache.aurora.scheduler.BatchWorker;
+import org.apache.aurora.scheduler.SchedulerModule.TaskEventBatchWorker;
 import org.apache.aurora.scheduler.base.Query;
 import org.apache.aurora.scheduler.base.Tasks;
 import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber;
@@ -37,7 +39,6 @@ import 
org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
 import org.apache.aurora.scheduler.storage.AttributeStore;
 import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
-import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult;
 import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
 import org.apache.aurora.scheduler.storage.entities.IHostStatus;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
@@ -106,11 +107,17 @@ public interface MaintenanceController {
     private static final Logger LOG = 
LoggerFactory.getLogger(MaintenanceControllerImpl.class);
     private final Storage storage;
     private final StateManager stateManager;
+    private final TaskEventBatchWorker batchWorker;
 
     @Inject
-    public MaintenanceControllerImpl(Storage storage, StateManager 
stateManager) {
+    public MaintenanceControllerImpl(
+        Storage storage,
+        StateManager stateManager,
+        TaskEventBatchWorker batchWorker) {
+
       this.storage = requireNonNull(storage);
       this.stateManager = requireNonNull(stateManager);
+      this.batchWorker = requireNonNull(batchWorker);
     }
 
     private Set<IHostStatus> watchDrainingTasks(MutableStoreProvider store, 
Set<String> hosts) {
@@ -153,7 +160,7 @@ public interface MaintenanceController {
     public void taskChangedState(final TaskStateChange change) {
       if (Tasks.isTerminated(change.getNewState())) {
         final String host = change.getTask().getAssignedTask().getSlaveHost();
-        storage.write((NoResult.Quiet) (MutableStoreProvider store) -> {
+        batchWorker.execute(store -> {
           // If the task _was_ associated with a draining host, and it was the 
last task on the
           // host.
           Optional<IHostAttributes> attributes =
@@ -168,6 +175,7 @@ public interface MaintenanceController {
               LOG.info("Host {} is DRAINING with active tasks: {}", host, 
Tasks.ids(activeTasks));
             }
           }
+          return BatchWorker.NO_RESULT;
         });
       }
     }

http://git-wip-us.apache.org/repos/asf/aurora/blob/ebfeb3e6/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java
 
b/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java
index ef6253e..25b3f37 100644
--- 
a/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java
+++ 
b/src/main/java/org/apache/aurora/scheduler/updater/JobUpdateControllerImpl.java
@@ -44,6 +44,8 @@ import org.apache.aurora.gen.JobUpdateQuery;
 import org.apache.aurora.gen.JobUpdateStatus;
 import org.apache.aurora.gen.Lock;
 import org.apache.aurora.gen.LockKey;
+import org.apache.aurora.scheduler.BatchWorker;
+import org.apache.aurora.scheduler.SchedulerModule.TaskEventBatchWorker;
 import org.apache.aurora.scheduler.base.InstanceKeys;
 import org.apache.aurora.scheduler.base.JobKeys;
 import org.apache.aurora.scheduler.base.Query;
@@ -120,6 +122,7 @@ class JobUpdateControllerImpl implements 
JobUpdateController {
   private final Clock clock;
   private final PulseHandler pulseHandler;
   private final Lifecycle lifecycle;
+  private final TaskEventBatchWorker batchWorker;
 
   // Currently-active updaters. An active updater is one that is rolling 
forward or back. Paused
   // and completed updates are represented only in storage, not here.
@@ -134,7 +137,8 @@ class JobUpdateControllerImpl implements 
JobUpdateController {
       ScheduledExecutorService executor,
       StateManager stateManager,
       Clock clock,
-      Lifecycle lifecycle) {
+      Lifecycle lifecycle,
+      TaskEventBatchWorker batchWorker) {
 
     this.updateFactory = requireNonNull(updateFactory);
     this.lockManager = requireNonNull(lockManager);
@@ -143,6 +147,7 @@ class JobUpdateControllerImpl implements 
JobUpdateController {
     this.stateManager = requireNonNull(stateManager);
     this.clock = requireNonNull(clock);
     this.lifecycle = requireNonNull(lifecycle);
+    this.batchWorker = requireNonNull(batchWorker);
     this.pulseHandler = new PulseHandler(clock);
   }
 
@@ -346,7 +351,7 @@ class JobUpdateControllerImpl implements 
JobUpdateController {
   }
 
   private void instanceChanged(final IInstanceKey instance, final 
Optional<IScheduledTask> state) {
-    storage.write((NoResult.Quiet) storeProvider -> {
+    batchWorker.execute(storeProvider -> {
       IJobKey job = instance.getJobKey();
       UpdateFactory.Update update = updates.get(job);
       if (update != null) {
@@ -366,6 +371,7 @@ class JobUpdateControllerImpl implements 
JobUpdateController {
               + JobKeys.canonicalString(job));
         }
       }
+      return BatchWorker.NO_RESULT;
     });
   }
 

http://git-wip-us.apache.org/repos/asf/aurora/blob/ebfeb3e6/src/test/java/org/apache/aurora/scheduler/BatchWorkerTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/BatchWorkerTest.java 
b/src/test/java/org/apache/aurora/scheduler/BatchWorkerTest.java
new file mode 100644
index 0000000..a86dc82
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/BatchWorkerTest.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.aurora.common.testing.easymock.EasyMockTest;
+import org.apache.aurora.common.util.BackoffStrategy;
+import org.apache.aurora.scheduler.BatchWorker.Result;
+import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
+import org.apache.aurora.scheduler.testing.FakeStatsProvider;
+import org.easymock.EasyMock;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.easymock.EasyMock.expect;
+import static org.junit.Assert.assertTrue;
+
+public class BatchWorkerTest extends EasyMockTest {
+  private static final String SERVICE_NAME = "TestWorker";
+  private static final String BATCH_STAT = SERVICE_NAME + "_batches_processed";
+  private FakeStatsProvider statsProvider;
+  private BatchWorker<Boolean> batchWorker;
+
+  @Before
+  public void setUp() {
+    StorageTestUtil storageUtil = new StorageTestUtil(this);
+    storageUtil.expectOperations();
+    statsProvider = new FakeStatsProvider();
+    batchWorker = new BatchWorker<Boolean>(storageUtil.storage, statsProvider, 
2) {
+      @Override
+      protected String serviceName() {
+        return SERVICE_NAME;
+      }
+    };
+  }
+
+  @Test
+  public void testExecute() throws Exception {
+    control.replay();
+
+    CompletableFuture<Boolean> result1 = batchWorker.execute(store -> true);
+    CompletableFuture<Boolean> result2 = batchWorker.execute(store -> true);
+    CompletableFuture<Boolean> result3 = batchWorker.execute(store -> true);
+    batchWorker.startAsync().awaitRunning();
+
+    assertTrue(result1.get());
+    assertTrue(result2.get());
+    assertTrue(result3.get());
+  }
+
+  @Test(expected = ExecutionException.class)
+  public void testExecuteThrows() throws Exception {
+    control.replay();
+
+    CompletableFuture<Boolean> result =
+        batchWorker.execute(store -> { throw new IllegalArgumentException(); 
});
+    batchWorker.startAsync().awaitRunning();
+
+    result.get();
+  }
+
+  @Test
+  public void testExecuteWithReplay() throws Exception {
+    BackoffStrategy backoff = createMock(BackoffStrategy.class);
+    final CountDownLatch complete = new CountDownLatch(1);
+
+    
expect(backoff.calculateBackoffMs(EasyMock.anyLong())).andReturn(0L).anyTimes();
+
+    control.replay();
+
+    batchWorker.startAsync().awaitRunning();
+    batchWorker.executeWithReplay(
+        backoff,
+        store -> statsProvider.getValue(BATCH_STAT).longValue() > 1L
+            ? new Result<>(true, true)
+            : new Result<>(false, false))
+        .thenAccept(result -> complete.countDown());
+
+    assertTrue(complete.await(10L, TimeUnit.SECONDS));
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/ebfeb3e6/src/test/java/org/apache/aurora/scheduler/pruning/TaskHistoryPrunerTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/aurora/scheduler/pruning/TaskHistoryPrunerTest.java 
b/src/test/java/org/apache/aurora/scheduler/pruning/TaskHistoryPrunerTest.java
index 99c27e8..8469596 100644
--- 
a/src/test/java/org/apache/aurora/scheduler/pruning/TaskHistoryPrunerTest.java
+++ 
b/src/test/java/org/apache/aurora/scheduler/pruning/TaskHistoryPrunerTest.java
@@ -27,6 +27,7 @@ import org.apache.aurora.common.testing.easymock.EasyMockTest;
 import org.apache.aurora.common.util.testing.FakeClock;
 import org.apache.aurora.gen.ScheduleStatus;
 import org.apache.aurora.gen.ScheduledTask;
+import org.apache.aurora.scheduler.SchedulerModule.TaskEventBatchWorker;
 import org.apache.aurora.scheduler.async.DelayExecutor;
 import org.apache.aurora.scheduler.base.JobKeys;
 import org.apache.aurora.scheduler.base.TaskTestUtil;
@@ -48,6 +49,7 @@ import static org.apache.aurora.gen.ScheduleStatus.KILLED;
 import static org.apache.aurora.gen.ScheduleStatus.LOST;
 import static org.apache.aurora.gen.ScheduleStatus.RUNNING;
 import static org.apache.aurora.gen.ScheduleStatus.STARTING;
+import static 
org.apache.aurora.scheduler.testing.BatchWorkerUtil.expectBatchExecute;
 import static org.easymock.EasyMock.eq;
 import static org.easymock.EasyMock.expectLastCall;
 
@@ -68,20 +70,24 @@ public class TaskHistoryPrunerTest extends EasyMockTest {
   private Command shutdownCommand;
 
   @Before
-  public void setUp() {
+  public void setUp() throws Exception {
     executor = createMock(DelayExecutor.class);
     clock = new FakeClock();
     stateManager = createMock(StateManager.class);
     storageUtil = new StorageTestUtil(this);
     storageUtil.expectOperations();
     shutdownCommand = createMock(Command.class);
+    TaskEventBatchWorker batchWorker = createMock(TaskEventBatchWorker.class);
+    expectBatchExecute(batchWorker, storageUtil.storage, control).anyTimes();
+
     pruner = new TaskHistoryPruner(
         executor,
         stateManager,
         clock,
         new HistoryPrunnerSettings(ONE_DAY, ONE_MINUTE, PER_JOB_HISTORY),
         storageUtil.storage,
-        new Lifecycle(shutdownCommand));
+        new Lifecycle(shutdownCommand),
+        batchWorker);
     closer = Closer.create();
   }
 

http://git-wip-us.apache.org/repos/asf/aurora/blob/ebfeb3e6/src/test/java/org/apache/aurora/scheduler/scheduling/TaskThrottlerTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/aurora/scheduler/scheduling/TaskThrottlerTest.java 
b/src/test/java/org/apache/aurora/scheduler/scheduling/TaskThrottlerTest.java
index 7d104aa..433f791 100644
--- 
a/src/test/java/org/apache/aurora/scheduler/scheduling/TaskThrottlerTest.java
+++ 
b/src/test/java/org/apache/aurora/scheduler/scheduling/TaskThrottlerTest.java
@@ -24,6 +24,7 @@ import org.apache.aurora.gen.AssignedTask;
 import org.apache.aurora.gen.ScheduleStatus;
 import org.apache.aurora.gen.ScheduledTask;
 import org.apache.aurora.gen.TaskEvent;
+import org.apache.aurora.scheduler.SchedulerModule.TaskEventBatchWorker;
 import org.apache.aurora.scheduler.async.DelayExecutor;
 import org.apache.aurora.scheduler.base.Tasks;
 import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
@@ -39,6 +40,7 @@ import static org.apache.aurora.gen.ScheduleStatus.INIT;
 import static org.apache.aurora.gen.ScheduleStatus.PENDING;
 import static org.apache.aurora.gen.ScheduleStatus.RUNNING;
 import static org.apache.aurora.gen.ScheduleStatus.THROTTLED;
+import static 
org.apache.aurora.scheduler.testing.BatchWorkerUtil.expectBatchExecute;
 import static org.easymock.EasyMock.capture;
 import static org.easymock.EasyMock.eq;
 import static org.easymock.EasyMock.expect;
@@ -60,12 +62,15 @@ public class TaskThrottlerTest extends EasyMockTest {
     storageUtil = new StorageTestUtil(this);
     storageUtil.expectOperations();
     stateManager = createMock(StateManager.class);
+    TaskEventBatchWorker batchWorker = createMock(TaskEventBatchWorker.class);
+    expectBatchExecute(batchWorker, storageUtil.storage, control).anyTimes();
+
     throttler = new TaskThrottler(
         rescheduleCalculator,
         clock,
         executor,
-        storageUtil.storage,
-        stateManager);
+        stateManager,
+        batchWorker);
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/aurora/blob/ebfeb3e6/src/test/java/org/apache/aurora/scheduler/state/MaintenanceControllerImplTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/aurora/scheduler/state/MaintenanceControllerImplTest.java
 
b/src/test/java/org/apache/aurora/scheduler/state/MaintenanceControllerImplTest.java
index 94f5ca5..ae83dea 100644
--- 
a/src/test/java/org/apache/aurora/scheduler/state/MaintenanceControllerImplTest.java
+++ 
b/src/test/java/org/apache/aurora/scheduler/state/MaintenanceControllerImplTest.java
@@ -30,6 +30,7 @@ import org.apache.aurora.gen.HostStatus;
 import org.apache.aurora.gen.MaintenanceMode;
 import org.apache.aurora.gen.ScheduleStatus;
 import org.apache.aurora.gen.ScheduledTask;
+import org.apache.aurora.scheduler.SchedulerModule.TaskEventBatchWorker;
 import org.apache.aurora.scheduler.async.AsyncModule.AsyncExecutor;
 import org.apache.aurora.scheduler.base.Query;
 import org.apache.aurora.scheduler.base.TaskTestUtil;
@@ -53,6 +54,7 @@ import static org.apache.aurora.gen.MaintenanceMode.SCHEDULED;
 import static org.apache.aurora.gen.ScheduleStatus.KILLED;
 import static org.apache.aurora.gen.ScheduleStatus.RUNNING;
 import static 
org.apache.aurora.scheduler.state.MaintenanceController.MaintenanceControllerImpl;
+import static 
org.apache.aurora.scheduler.testing.BatchWorkerUtil.expectBatchExecute;
 import static org.easymock.EasyMock.expect;
 import static org.junit.Assert.assertEquals;
 
@@ -71,6 +73,8 @@ public class MaintenanceControllerImplTest extends 
EasyMockTest {
     storageUtil = new StorageTestUtil(this);
     storageUtil.expectOperations();
     stateManager = createMock(StateManager.class);
+    TaskEventBatchWorker batchWorker = createMock(TaskEventBatchWorker.class);
+    expectBatchExecute(batchWorker, storageUtil.storage, control).anyTimes();
 
     Injector injector = Guice.createInjector(
         new PubsubEventModule(),
@@ -83,6 +87,7 @@ public class MaintenanceControllerImplTest extends 
EasyMockTest {
             bind(StatsProvider.class).toInstance(new FakeStatsProvider());
             bind(Executor.class).annotatedWith(AsyncExecutor.class)
                 .toInstance(MoreExecutors.directExecutor());
+            bind(TaskEventBatchWorker.class).toInstance(batchWorker);
           }
         });
     maintenance = injector.getInstance(MaintenanceController.class);

http://git-wip-us.apache.org/repos/asf/aurora/blob/ebfeb3e6/src/test/java/org/apache/aurora/scheduler/testing/BatchWorkerUtil.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/aurora/scheduler/testing/BatchWorkerUtil.java 
b/src/test/java/org/apache/aurora/scheduler/testing/BatchWorkerUtil.java
new file mode 100644
index 0000000..46b2e36
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/testing/BatchWorkerUtil.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.testing;
+
+import java.util.concurrent.CompletableFuture;
+
+import org.apache.aurora.common.testing.easymock.EasyMockTest;
+import org.apache.aurora.scheduler.BatchWorker;
+import org.apache.aurora.scheduler.BatchWorker.Work;
+import org.apache.aurora.scheduler.storage.Storage;
+import org.easymock.Capture;
+import org.easymock.IExpectationSetters;
+import org.easymock.IMocksControl;
+
+import static 
org.apache.aurora.common.testing.easymock.EasyMockTest.createCapture;
+import static org.easymock.EasyMock.capture;
+import static org.easymock.EasyMock.expect;
+
+public final class BatchWorkerUtil {
+  private BatchWorkerUtil() {
+    // Utility class.
+  }
+
+  public static <T> IExpectationSetters<CompletableFuture<T>> 
expectBatchExecute(
+      BatchWorker<T> batchWorker,
+      Storage storage,
+      IMocksControl control,
+      T resultValue) throws Exception {
+
+    final CompletableFuture<T> result = new 
EasyMockTest.Clazz<CompletableFuture<T>>() { }
+        .createMock(control);
+    expect(result.get()).andReturn(resultValue).anyTimes();
+
+    final Capture<Work<T>> capture = createCapture();
+    return expect(batchWorker.execute(capture(capture))).andAnswer(() -> {
+      storage.write((Storage.MutateWork.NoResult.Quiet) store -> 
capture.getValue().apply(store));
+      return result;
+    });
+  }
+
+  public static <T> IExpectationSetters<CompletableFuture<T>> 
expectBatchExecute(
+      BatchWorker<T> batchWorker,
+      Storage storage,
+      IMocksControl control) throws Exception {
+
+    return expectBatchExecute(batchWorker, storage, control, null);
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/ebfeb3e6/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java 
b/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java
index f879827..ea0b89a 100644
--- a/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java
+++ b/src/test/java/org/apache/aurora/scheduler/updater/JobUpdaterIT.java
@@ -60,6 +60,7 @@ import org.apache.aurora.gen.Range;
 import org.apache.aurora.gen.ScheduleStatus;
 import org.apache.aurora.gen.ScheduledTask;
 import org.apache.aurora.gen.TaskConfig;
+import org.apache.aurora.scheduler.SchedulerModule.TaskEventBatchWorker;
 import org.apache.aurora.scheduler.TaskIdGenerator;
 import org.apache.aurora.scheduler.TaskIdGenerator.TaskIdGeneratorImpl;
 import org.apache.aurora.scheduler.base.JobKeys;
@@ -125,6 +126,7 @@ import static org.apache.aurora.gen.ScheduleStatus.KILLED;
 import static org.apache.aurora.gen.ScheduleStatus.RUNNING;
 import static org.apache.aurora.gen.ScheduleStatus.STARTING;
 import static org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult;
+import static 
org.apache.aurora.scheduler.testing.BatchWorkerUtil.expectBatchExecute;
 import static 
org.apache.aurora.scheduler.updater.UpdateFactory.UpdateFactoryImpl.expandInstanceIds;
 import static org.easymock.EasyMock.expectLastCall;
 import static org.junit.Assert.assertEquals;
@@ -164,7 +166,7 @@ public class JobUpdaterIT extends EasyMockTest {
   }
 
   @Before
-  public void setUp() {
+  public void setUp() throws Exception {
     // Avoid console spam due to stats registered multiple times.
     Stats.flush();
     ScheduledExecutorService executor = 
createMock(ScheduledExecutorService.class);
@@ -172,6 +174,7 @@ public class JobUpdaterIT extends EasyMockTest {
     driver = createMock(Driver.class);
     shutdownCommand = createMock(Command.class);
     eventBus = new EventBus();
+    TaskEventBatchWorker batchWorker = createMock(TaskEventBatchWorker.class);
 
     Injector injector = Guice.createInjector(
         new UpdaterModule(executor),
@@ -195,6 +198,7 @@ public class JobUpdaterIT extends EasyMockTest {
             bind(LockManager.class).to(LockManagerImpl.class);
             bind(UUIDGenerator.class).to(UUIDGeneratorImpl.class);
             bind(Lifecycle.class).toInstance(new Lifecycle(shutdownCommand));
+            bind(TaskEventBatchWorker.class).toInstance(batchWorker);
           }
         });
     updater = injector.getInstance(JobUpdateController.class);
@@ -204,6 +208,7 @@ public class JobUpdaterIT extends EasyMockTest {
     stateManager = injector.getInstance(StateManager.class);
     eventBus.register(injector.getInstance(JobUpdateEventSubscriber.class));
     subscriber = injector.getInstance(JobUpdateEventSubscriber.class);
+    expectBatchExecute(batchWorker, storage, control).anyTimes();
   }
 
   @After

Reply via email to