Batching writes - Part 2 (of 3): Converting cron jobs to use BatchWorker. Reviewed at https://reviews.apache.org/r/51763/
Project: http://git-wip-us.apache.org/repos/asf/aurora/repo Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/2cb43d61 Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/2cb43d61 Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/2cb43d61 Branch: refs/heads/master Commit: 2cb43d61ecafb79b31d36332ef4713b9857b3c1a Parents: ebfeb3e Author: Maxim Khutornenko <ma...@apache.org> Authored: Fri Sep 16 14:17:26 2016 -0700 Committer: Maxim Khutornenko <ma...@apache.org> Committed: Fri Sep 16 14:17:26 2016 -0700 ---------------------------------------------------------------------- .../aurora/common/util/BackoffHelper.java | 8 + .../aurora/common/util/BackoffHelperTest.java | 7 + .../scheduler/cron/quartz/AuroraCronJob.java | 239 +++++++++++-------- .../scheduler/cron/quartz/CronModule.java | 25 +- .../cron/quartz/AuroraCronJobTest.java | 107 ++++++--- .../aurora/scheduler/cron/quartz/CronIT.java | 4 + 6 files changed, 256 insertions(+), 134 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/aurora/blob/2cb43d61/commons/src/main/java/org/apache/aurora/common/util/BackoffHelper.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/util/BackoffHelper.java b/commons/src/main/java/org/apache/aurora/common/util/BackoffHelper.java index 8e73dd9..517c0ef 100644 --- a/commons/src/main/java/org/apache/aurora/common/util/BackoffHelper.java +++ b/commons/src/main/java/org/apache/aurora/common/util/BackoffHelper.java @@ -90,6 +90,14 @@ public class BackoffHelper { } /** + * Gets {@link BackoffStrategy} instance the BackoffHelper is initialized with. + * @return instance of {@link BackoffStrategy} used by BackoffHelper. + */ + public BackoffStrategy getBackoffStrategy() { + return backoffStrategy; + } + + /** * Executes the given task using the configured backoff strategy until the task succeeds as * indicated by returning a non-null value. * http://git-wip-us.apache.org/repos/asf/aurora/blob/2cb43d61/commons/src/test/java/org/apache/aurora/common/util/BackoffHelperTest.java ---------------------------------------------------------------------- diff --git a/commons/src/test/java/org/apache/aurora/common/util/BackoffHelperTest.java b/commons/src/test/java/org/apache/aurora/common/util/BackoffHelperTest.java index bc30990..012fbac 100644 --- a/commons/src/test/java/org/apache/aurora/common/util/BackoffHelperTest.java +++ b/commons/src/test/java/org/apache/aurora/common/util/BackoffHelperTest.java @@ -41,6 +41,13 @@ public class BackoffHelperTest extends EasyMockTest { } @Test + public void testGetBackoffStrategy() { + control.replay(); + + assertEquals(backoffStrategy, backoffHelper.getBackoffStrategy()); + } + + @Test public void testDoUntilSuccess() throws Exception { ExceptionalSupplier<Boolean, RuntimeException> task = createMock(new Clazz<ExceptionalSupplier<Boolean, RuntimeException>>() { }); http://git-wip-us.apache.org/repos/asf/aurora/blob/2cb43d61/src/main/java/org/apache/aurora/scheduler/cron/quartz/AuroraCronJob.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/cron/quartz/AuroraCronJob.java b/src/main/java/org/apache/aurora/scheduler/cron/quartz/AuroraCronJob.java index c07551e..7c8047a 100644 --- a/src/main/java/org/apache/aurora/scheduler/cron/quartz/AuroraCronJob.java +++ b/src/main/java/org/apache/aurora/scheduler/cron/quartz/AuroraCronJob.java @@ -13,29 +13,37 @@ */ package org.apache.aurora.scheduler.cron.quartz; +import java.lang.annotation.Retention; +import java.lang.annotation.Target; import java.util.Date; import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicLong; import javax.inject.Inject; +import javax.inject.Qualifier; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; import com.google.common.collect.Iterables; +import com.google.common.collect.Sets; import org.apache.aurora.common.stats.Stats; +import org.apache.aurora.common.stats.StatsProvider; import org.apache.aurora.common.util.BackoffHelper; import org.apache.aurora.gen.CronCollisionPolicy; +import org.apache.aurora.scheduler.BatchWorker; +import org.apache.aurora.scheduler.BatchWorker.NoResult; import org.apache.aurora.scheduler.base.JobKeys; import org.apache.aurora.scheduler.base.Query; import org.apache.aurora.scheduler.base.Tasks; import org.apache.aurora.scheduler.configuration.ConfigurationManager; import org.apache.aurora.scheduler.cron.CronException; import org.apache.aurora.scheduler.cron.SanitizedCronJob; +import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber; import org.apache.aurora.scheduler.state.StateManager; import org.apache.aurora.scheduler.storage.Storage; -import org.apache.aurora.scheduler.storage.Storage.MutateWork; -import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult; import org.apache.aurora.scheduler.storage.entities.IJobConfiguration; import org.apache.aurora.scheduler.storage.entities.IJobKey; import org.apache.aurora.scheduler.storage.entities.ITaskConfig; @@ -43,9 +51,14 @@ import org.quartz.DisallowConcurrentExecution; import org.quartz.Job; import org.quartz.JobExecutionContext; import org.quartz.JobExecutionException; +import org.quartz.PersistJobDataAfterExecution; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static java.lang.annotation.ElementType.FIELD; +import static java.lang.annotation.ElementType.METHOD; +import static java.lang.annotation.ElementType.PARAMETER; +import static java.lang.annotation.RetentionPolicy.RUNTIME; import static java.util.Objects.requireNonNull; import static com.google.common.base.Preconditions.checkState; @@ -61,7 +74,8 @@ import static org.apache.aurora.gen.ScheduleStatus.KILLING; * scheduler should therefore be configured with a large number of threads. */ @DisallowConcurrentExecution -class AuroraCronJob implements Job { +@PersistJobDataAfterExecution +class AuroraCronJob implements Job, EventSubscriber { private static final Logger LOG = LoggerFactory.getLogger(AuroraCronJob.class); private static final AtomicLong CRON_JOB_TRIGGERS = Stats.exportLong("cron_job_triggers"); @@ -69,147 +83,166 @@ class AuroraCronJob implements Job { private static final AtomicLong CRON_JOB_PARSE_FAILURES = Stats.exportLong("cron_job_parse_failures"); private static final AtomicLong CRON_JOB_COLLISIONS = Stats.exportLong("cron_job_collisions"); + private static final AtomicLong CRON_JOB_CONCURRENT_RUNS = + Stats.exportLong("cron_job_concurrent_runs"); @VisibleForTesting static final Optional<String> KILL_AUDIT_MESSAGE = Optional.of("Killed by cronScheduler"); private final ConfigurationManager configurationManager; - private final Storage storage; private final StateManager stateManager; private final BackoffHelper delayedStartBackoff; + private final BatchWorker<NoResult> batchWorker; + private final Set<IJobKey> killFollowups = Sets.newConcurrentHashSet(); + + /** + * Annotation for the max cron batch size. + */ + @VisibleForTesting + @Qualifier + @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME) + @interface CronMaxBatchSize { } + + static class CronBatchWorker extends BatchWorker<NoResult> { + @Inject + CronBatchWorker( + Storage storage, + StatsProvider statsProvider, + @CronMaxBatchSize int maxBatchSize) { + + super(storage, statsProvider, maxBatchSize); + } + + @Override + protected String serviceName() { + return "CronBatchWorker"; + } + } @Inject AuroraCronJob( ConfigurationManager configurationManager, Config config, - Storage storage, - StateManager stateManager) { + StateManager stateManager, + CronBatchWorker batchWorker) { this.configurationManager = requireNonNull(configurationManager); - this.storage = requireNonNull(storage); this.stateManager = requireNonNull(stateManager); + this.batchWorker = requireNonNull(batchWorker); this.delayedStartBackoff = requireNonNull(config.getDelayedStartBackoff()); } - private static final class DeferredLaunch { - private final ITaskConfig task; - private final Set<Integer> instanceIds; - private final Set<String> activeTaskIds; - - DeferredLaunch(ITaskConfig task, Set<Integer> instanceIds, Set<String> activeTaskIds) { - this.task = task; - this.instanceIds = instanceIds; - this.activeTaskIds = activeTaskIds; - } - } - @Override public void execute(JobExecutionContext context) throws JobExecutionException { // We assume quartz prevents concurrent runs of this job for a given job key. This allows us // to avoid races where we might kill another run's tasks. checkState(context.getJobDetail().isConcurrentExectionDisallowed()); - doExecute(Quartz.auroraJobKey(context.getJobDetail().getKey())); + doExecute(context); } @VisibleForTesting - void doExecute(final IJobKey key) throws JobExecutionException { + void doExecute(JobExecutionContext context) throws JobExecutionException { + final IJobKey key = Quartz.auroraJobKey(context.getJobDetail().getKey()); final String path = JobKeys.canonicalString(key); - final Optional<DeferredLaunch> deferredLaunch = storage.write( - (MutateWork.Quiet<Optional<DeferredLaunch>>) storeProvider -> { - Optional<IJobConfiguration> config = storeProvider.getCronJobStore().fetchJob(key); - if (!config.isPresent()) { - LOG.warn( - "Cron was triggered for {} but no job with that key was found in storage.", - path); - CRON_JOB_MISFIRES.incrementAndGet(); - return Optional.absent(); - } - - SanitizedCronJob cronJob; - try { - cronJob = SanitizedCronJob.fromUnsanitized(configurationManager, config.get()); - } catch (ConfigurationManager.TaskDescriptionException | CronException e) { - LOG.warn( - "Invalid cron job for {} in storage - failed to parse with {}", key, e); - CRON_JOB_PARSE_FAILURES.incrementAndGet(); - return Optional.absent(); - } - - CronCollisionPolicy collisionPolicy = cronJob.getCronCollisionPolicy(); - LOG.info( - "Cron triggered for {} at {} with policy {}", path, new Date(), collisionPolicy); - CRON_JOB_TRIGGERS.incrementAndGet(); - - final Query.Builder activeQuery = Query.jobScoped(key).active(); - Set<String> activeTasks = - Tasks.ids(storeProvider.getTaskStore().fetchTasks(activeQuery)); + // Prevent a concurrent run for this job in case a previous trigger took longer to run. + // This approach relies on saving the "work in progress" token within the job context itself + // (see below) and relying on killFollowups to signal "work completion". + if (context.getJobDetail().getJobDataMap().containsKey(path)) { + CRON_JOB_CONCURRENT_RUNS.incrementAndGet(); + if (killFollowups.contains(key)) { + context.getJobDetail().getJobDataMap().remove(path); + killFollowups.remove(key); + LOG.info("Resetting job context for cron " + path); + } else { + LOG.info("Ignoring trigger as another concurrent run is active for cron " + path); + return; + } + } - ITaskConfig task = cronJob.getSanitizedConfig().getJobConfig().getTaskConfig(); - Set<Integer> instanceIds = cronJob.getSanitizedConfig().getInstanceIds(); - if (activeTasks.isEmpty()) { - stateManager.insertPendingTasks(storeProvider, task, instanceIds); + CompletableFuture<NoResult> scheduleResult = batchWorker.<NoResult>execute(storeProvider -> { + Optional<IJobConfiguration> config = storeProvider.getCronJobStore().fetchJob(key); + if (!config.isPresent()) { + LOG.warn("Cron was triggered for {} but no job with that key was found in storage.", path); + CRON_JOB_MISFIRES.incrementAndGet(); + return BatchWorker.NO_RESULT; + } - return Optional.absent(); - } + SanitizedCronJob cronJob; + try { + cronJob = SanitizedCronJob.fromUnsanitized(configurationManager, config.get()); + } catch (ConfigurationManager.TaskDescriptionException | CronException e) { + LOG.warn("Invalid cron job for {} in storage - failed to parse with {}", key, e); + CRON_JOB_PARSE_FAILURES.incrementAndGet(); + return BatchWorker.NO_RESULT; + } - CRON_JOB_COLLISIONS.incrementAndGet(); - switch (collisionPolicy) { - case KILL_EXISTING: - return Optional.of(new DeferredLaunch(task, instanceIds, activeTasks)); + CronCollisionPolicy collisionPolicy = cronJob.getCronCollisionPolicy(); + LOG.info("Cron triggered for {} at {} with policy {}", path, new Date(), collisionPolicy); + CRON_JOB_TRIGGERS.incrementAndGet(); - case RUN_OVERLAP: - LOG.error("Ignoring trigger for job {} with deprecated collision" - + "policy RUN_OVERLAP due to unterminated active tasks.", path); - return Optional.absent(); + final Query.Builder activeQuery = Query.jobScoped(key).active(); + Set<String> activeTasks = Tasks.ids(storeProvider.getTaskStore().fetchTasks(activeQuery)); - case CANCEL_NEW: - return Optional.absent(); + ITaskConfig task = cronJob.getSanitizedConfig().getJobConfig().getTaskConfig(); + Set<Integer> instanceIds = cronJob.getSanitizedConfig().getInstanceIds(); + if (activeTasks.isEmpty()) { + stateManager.insertPendingTasks(storeProvider, task, instanceIds); + return BatchWorker.NO_RESULT; + } - default: - LOG.error("Unrecognized cron collision policy: " + collisionPolicy); - return Optional.absent(); + CRON_JOB_COLLISIONS.incrementAndGet(); + switch (collisionPolicy) { + case KILL_EXISTING: + for (String taskId : activeTasks) { + stateManager.changeState( + storeProvider, + taskId, + Optional.absent(), + KILLING, + KILL_AUDIT_MESSAGE); } - } - ); - if (!deferredLaunch.isPresent()) { - return; - } - - storage.write((NoResult.Quiet) storeProvider -> { - for (String taskId : deferredLaunch.get().activeTaskIds) { - stateManager.changeState( - storeProvider, - taskId, - Optional.absent(), - KILLING, - KILL_AUDIT_MESSAGE); + LOG.info("Waiting for job to terminate before launching cron job " + path); + // Use job detail map to signal a "work in progress" condition to subsequent triggers. + context.getJobDetail().getJobDataMap().put(path, null); + batchWorker.executeWithReplay( + delayedStartBackoff.getBackoffStrategy(), + store -> { + Query.Builder query = Query.taskScoped(activeTasks).active(); + if (Iterables.isEmpty(storeProvider.getTaskStore().fetchTasks(query))) { + LOG.info("Initiating delayed launch of cron " + path); + stateManager.insertPendingTasks(store, task, instanceIds); + return new BatchWorker.Result<>(true, null); + } else { + LOG.info("Not yet safe to run cron " + path); + return new BatchWorker.Result<>(false, null); + } + }) + .thenAccept(ignored -> { + killFollowups.add(key); + LOG.info("Finished delayed launch for cron " + path); + }); + break; + + case RUN_OVERLAP: + LOG.error("Ignoring trigger for job {} with deprecated collision" + + "policy RUN_OVERLAP due to unterminated active tasks.", path); + break; + + case CANCEL_NEW: + break; + + default: + LOG.error("Unrecognized cron collision policy: " + collisionPolicy); } + return BatchWorker.NO_RESULT; }); - LOG.info("Waiting for job to terminate before launching cron job {}.", path); - - final Query.Builder query = Query.taskScoped(deferredLaunch.get().activeTaskIds).active(); try { - // NOTE: We block the quartz execution thread here until we've successfully killed our - // ancestor. We mitigate this by using a cached thread pool for quartz. - delayedStartBackoff.doUntilSuccess(() -> { - if (Iterables.isEmpty(Storage.Util.fetchTasks(storage, query))) { - LOG.info("Initiating delayed launch of cron " + path); - storage.write((NoResult.Quiet) storeProvider -> stateManager.insertPendingTasks( - storeProvider, - deferredLaunch.get().task, - deferredLaunch.get().instanceIds)); - - return true; - } else { - LOG.info("Not yet safe to run cron " + path); - return false; - } - }); - } catch (InterruptedException e) { + scheduleResult.get(); + } catch (ExecutionException | InterruptedException e) { LOG.warn("Interrupted while trying to launch cron " + path, e); Thread.currentThread().interrupt(); throw new JobExecutionException(e); http://git-wip-us.apache.org/repos/asf/aurora/blob/2cb43d61/src/main/java/org/apache/aurora/scheduler/cron/quartz/CronModule.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/cron/quartz/CronModule.java b/src/main/java/org/apache/aurora/scheduler/cron/quartz/CronModule.java index 155d702..9c88a2a 100644 --- a/src/main/java/org/apache/aurora/scheduler/cron/quartz/CronModule.java +++ b/src/main/java/org/apache/aurora/scheduler/cron/quartz/CronModule.java @@ -21,16 +21,19 @@ import javax.inject.Singleton; import com.google.inject.AbstractModule; import com.google.inject.Provides; +import com.google.inject.TypeLiteral; import org.apache.aurora.common.args.Arg; import org.apache.aurora.common.args.CmdLine; +import org.apache.aurora.common.args.constraints.Positive; import org.apache.aurora.common.quantity.Amount; import org.apache.aurora.common.quantity.Time; import org.apache.aurora.common.util.BackoffHelper; -import org.apache.aurora.scheduler.SchedulerServicesModule; import org.apache.aurora.scheduler.cron.CronJobManager; import org.apache.aurora.scheduler.cron.CronPredictor; import org.apache.aurora.scheduler.cron.CronScheduler; +import org.apache.aurora.scheduler.cron.quartz.AuroraCronJob.CronBatchWorker; +import org.apache.aurora.scheduler.events.PubsubEventModule; import org.quartz.Scheduler; import org.quartz.SchedulerException; import org.quartz.impl.StdSchedulerFactory; @@ -38,6 +41,7 @@ import org.quartz.simpl.SimpleThreadPool; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.aurora.scheduler.SchedulerServicesModule.addSchedulerActiveServiceBinding; import static org.quartz.impl.StdSchedulerFactory.PROP_SCHED_INSTANCE_ID; import static org.quartz.impl.StdSchedulerFactory.PROP_SCHED_MAKE_SCHEDULER_THREAD_DAEMON; import static org.quartz.impl.StdSchedulerFactory.PROP_SCHED_NAME; @@ -55,7 +59,7 @@ public class CronModule extends AbstractModule { @CmdLine(name = "cron_scheduler_num_threads", help = "Number of threads to use for the cron scheduler thread pool.") - private static final Arg<Integer> NUM_THREADS = Arg.create(100); + private static final Arg<Integer> NUM_THREADS = Arg.create(10); @CmdLine(name = "cron_timezone", help = "TimeZone to use for cron predictions.") private static final Arg<String> CRON_TIMEZONE = Arg.create("GMT"); @@ -63,13 +67,18 @@ public class CronModule extends AbstractModule { @CmdLine(name = "cron_start_initial_backoff", help = "Initial backoff delay while waiting for a previous cron run to be killed.") public static final Arg<Amount<Long, Time>> CRON_START_INITIAL_BACKOFF = - Arg.create(Amount.of(1L, Time.SECONDS)); + Arg.create(Amount.of(5L, Time.SECONDS)); @CmdLine(name = "cron_start_max_backoff", help = "Max backoff delay while waiting for a previous cron run to be killed.") public static final Arg<Amount<Long, Time>> CRON_START_MAX_BACKOFF = Arg.create(Amount.of(1L, Time.MINUTES)); + @Positive + @CmdLine(name = "cron_scheduling_max_batch_size", + help = "The maximum number of triggered cron jobs that can be processed in a batch.") + private static final Arg<Integer> CRON_MAX_BATCH_SIZE = Arg.create(10); + // Global per-JVM ID number generator for the provided Quartz Scheduler. private static final AtomicLong ID_GENERATOR = new AtomicLong(); @@ -90,8 +99,16 @@ public class CronModule extends AbstractModule { bind(AuroraCronJob.Config.class).toInstance(new AuroraCronJob.Config( new BackoffHelper(CRON_START_INITIAL_BACKOFF.get(), CRON_START_MAX_BACKOFF.get()))); + PubsubEventModule.bindSubscriber(binder(), AuroraCronJob.class); + bind(CronLifecycle.class).in(Singleton.class); - SchedulerServicesModule.addSchedulerActiveServiceBinding(binder()).to(CronLifecycle.class); + addSchedulerActiveServiceBinding(binder()).to(CronLifecycle.class); + + bind(new TypeLiteral<Integer>() { }) + .annotatedWith(AuroraCronJob.CronMaxBatchSize.class) + .toInstance(CRON_MAX_BATCH_SIZE.get()); + bind(CronBatchWorker.class).in(Singleton.class); + addSchedulerActiveServiceBinding(binder()).to(CronBatchWorker.class); } @Provides http://git-wip-us.apache.org/repos/asf/aurora/blob/2cb43d61/src/test/java/org/apache/aurora/scheduler/cron/quartz/AuroraCronJobTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/cron/quartz/AuroraCronJobTest.java b/src/test/java/org/apache/aurora/scheduler/cron/quartz/AuroraCronJobTest.java index 5c64ff2..fb06c28 100644 --- a/src/test/java/org/apache/aurora/scheduler/cron/quartz/AuroraCronJobTest.java +++ b/src/test/java/org/apache/aurora/scheduler/cron/quartz/AuroraCronJobTest.java @@ -13,17 +13,23 @@ */ package org.apache.aurora.scheduler.cron.quartz; +import java.util.HashMap; +import java.util.concurrent.CompletableFuture; + import com.google.common.base.Optional; import com.google.common.collect.ImmutableSet; -import org.apache.aurora.common.base.ExceptionalSupplier; import org.apache.aurora.common.testing.easymock.EasyMockTest; import org.apache.aurora.common.util.BackoffHelper; import org.apache.aurora.gen.AssignedTask; import org.apache.aurora.gen.CronCollisionPolicy; import org.apache.aurora.gen.ScheduleStatus; import org.apache.aurora.gen.ScheduledTask; +import org.apache.aurora.scheduler.BatchWorker; +import org.apache.aurora.scheduler.BatchWorker.RepeatableWork; +import org.apache.aurora.scheduler.base.JobKeys; import org.apache.aurora.scheduler.base.TaskTestUtil; +import org.apache.aurora.scheduler.cron.quartz.AuroraCronJob.CronBatchWorker; import org.apache.aurora.scheduler.state.StateChangeResult; import org.apache.aurora.scheduler.state.StateManager; import org.apache.aurora.scheduler.storage.Storage; @@ -31,11 +37,17 @@ import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult; import org.apache.aurora.scheduler.storage.db.DbUtil; import org.apache.aurora.scheduler.storage.entities.IScheduledTask; import org.easymock.Capture; -import org.easymock.EasyMock; import org.junit.Before; import org.junit.Test; +import org.quartz.JobDataMap; +import org.quartz.JobExecutionContext; import org.quartz.JobExecutionException; +import org.quartz.impl.JobDetailImpl; +import static org.apache.aurora.scheduler.cron.quartz.QuartzTestUtil.AURORA_JOB_KEY; +import static org.apache.aurora.scheduler.testing.BatchWorkerUtil.expectBatchExecute; +import static org.easymock.EasyMock.anyObject; +import static org.easymock.EasyMock.capture; import static org.easymock.EasyMock.eq; import static org.easymock.EasyMock.expect; import static org.easymock.EasyMock.expectLastCall; @@ -43,50 +55,61 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; public class AuroraCronJobTest extends EasyMockTest { - public static final String TASK_ID = "A"; + private static final String TASK_ID = "A"; + private JobDetailImpl jobDetails; private Storage storage; private StateManager stateManager; private BackoffHelper backoffHelper; - + private CronBatchWorker batchWorker; + private JobExecutionContext context; private AuroraCronJob auroraCronJob; @Before - public void setUp() { + public void setUp() throws Exception { storage = DbUtil.createStorage(); stateManager = createMock(StateManager.class); backoffHelper = createMock(BackoffHelper.class); + context = createMock(JobExecutionContext.class); + + jobDetails = new JobDetailImpl(); + jobDetails.setKey(Quartz.jobKey(AURORA_JOB_KEY)); + jobDetails.setJobDataMap(new JobDataMap(new HashMap())); + expect(context.getJobDetail()).andReturn(jobDetails).anyTimes(); + + batchWorker = createMock(CronBatchWorker.class); + expectBatchExecute(batchWorker, storage, control).anyTimes(); auroraCronJob = new AuroraCronJob( TaskTestUtil.CONFIGURATION_MANAGER, - new AuroraCronJob.Config(backoffHelper), storage, stateManager); + new AuroraCronJob.Config(backoffHelper), + stateManager, + batchWorker); } @Test public void testExecuteNonexistentIsNoop() throws JobExecutionException { control.replay(); - auroraCronJob.doExecute(QuartzTestUtil.AURORA_JOB_KEY); + auroraCronJob.doExecute(context); } @Test public void testEmptyStorage() throws JobExecutionException { - stateManager.insertPendingTasks( - EasyMock.anyObject(), - EasyMock.anyObject(), - EasyMock.anyObject()); + stateManager.insertPendingTasks(anyObject(), anyObject(), anyObject()); expectLastCall().times(3); control.replay(); + populateStorage(CronCollisionPolicy.CANCEL_NEW); - auroraCronJob.doExecute(QuartzTestUtil.AURORA_JOB_KEY); - storage = DbUtil.createStorage(); + auroraCronJob.doExecute(context); - populateStorage(CronCollisionPolicy.KILL_EXISTING); - auroraCronJob.doExecute(QuartzTestUtil.AURORA_JOB_KEY); storage = DbUtil.createStorage(); + populateStorage(CronCollisionPolicy.KILL_EXISTING); + auroraCronJob.doExecute(context); + storage = DbUtil.createStorage(); populateStorage(CronCollisionPolicy.RUN_OVERLAP); - auroraCronJob.doExecute(QuartzTestUtil.AURORA_JOB_KEY); + auroraCronJob.doExecute(context); } @Test @@ -95,35 +118,65 @@ public class AuroraCronJobTest extends EasyMockTest { populateTaskStore(); populateStorage(CronCollisionPolicy.CANCEL_NEW); - auroraCronJob.doExecute(QuartzTestUtil.AURORA_JOB_KEY); + auroraCronJob.doExecute(context); + } + + @Test + public void testOverlap() throws JobExecutionException { + control.replay(); + + populateTaskStore(); + populateStorage(CronCollisionPolicy.RUN_OVERLAP); + auroraCronJob.doExecute(context); } @Test public void testKillExisting() throws Exception { - Capture<ExceptionalSupplier<Boolean, RuntimeException>> capture = createCapture(); + Capture<RepeatableWork<BatchWorker.NoResult>> killCapture = createCapture(); + CompletableFuture<BatchWorker.NoResult> killResult = new CompletableFuture<>(); + expect(batchWorker.executeWithReplay(anyObject(), capture(killCapture))).andReturn(killResult); + expect(backoffHelper.getBackoffStrategy()).andReturn(null).anyTimes(); expect(stateManager.changeState( - EasyMock.anyObject(), + anyObject(), eq(TASK_ID), eq(Optional.absent()), eq(ScheduleStatus.KILLING), eq(AuroraCronJob.KILL_AUDIT_MESSAGE))) .andReturn(StateChangeResult.SUCCESS); - backoffHelper.doUntilSuccess(EasyMock.capture(capture)); - stateManager.insertPendingTasks( - EasyMock.anyObject(), - EasyMock.anyObject(), - EasyMock.anyObject()); + stateManager.insertPendingTasks(anyObject(), anyObject(), anyObject()); + expectLastCall().times(2); control.replay(); populateStorage(CronCollisionPolicy.KILL_EXISTING); populateTaskStore(); - auroraCronJob.doExecute(QuartzTestUtil.AURORA_JOB_KEY); - assertFalse(capture.getValue().get()); + auroraCronJob.doExecute(context); + storage.write( (NoResult.Quiet) storeProvider -> storeProvider.getUnsafeTaskStore().deleteAllTasks()); - assertTrue(capture.getValue().get()); + storage.write((NoResult.Quiet) store -> killCapture.getValue().apply(store)); + + // Simulate a trigger in progress. + jobDetails.getJobDataMap().put(JobKeys.canonicalString(AURORA_JOB_KEY), null); + assertFalse(jobDetails.getJobDataMap().isEmpty()); + + // Attempt a concurrent run that must be rejected. + auroraCronJob.doExecute(context); + + // Complete previous run and trigger another one. + killResult.complete(BatchWorker.NO_RESULT); + auroraCronJob.doExecute(context); + assertTrue(jobDetails.getJobDataMap().isEmpty()); + } + + @Test + public void testNoConcurrentRun() throws Exception { + jobDetails.getJobDataMap().put(JobKeys.canonicalString(AURORA_JOB_KEY), null); + + control.replay(); + + auroraCronJob.doExecute(context); } private void populateTaskStore() { http://git-wip-us.apache.org/repos/asf/aurora/blob/2cb43d61/src/test/java/org/apache/aurora/scheduler/cron/quartz/CronIT.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/cron/quartz/CronIT.java b/src/test/java/org/apache/aurora/scheduler/cron/quartz/CronIT.java index 1c0a3fa..8556253 100644 --- a/src/test/java/org/apache/aurora/scheduler/cron/quartz/CronIT.java +++ b/src/test/java/org/apache/aurora/scheduler/cron/quartz/CronIT.java @@ -21,6 +21,7 @@ import com.google.inject.Guice; import com.google.inject.Injector; import com.google.inject.util.Modules; +import org.apache.aurora.common.stats.StatsProvider; import org.apache.aurora.common.testing.easymock.EasyMockTest; import org.apache.aurora.common.util.Clock; import org.apache.aurora.gen.Container; @@ -34,6 +35,7 @@ import org.apache.aurora.scheduler.configuration.ConfigurationManager; import org.apache.aurora.scheduler.cron.CronJobManager; import org.apache.aurora.scheduler.cron.CrontabEntry; import org.apache.aurora.scheduler.cron.SanitizedCronJob; +import org.apache.aurora.scheduler.events.EventSink; import org.apache.aurora.scheduler.state.StateManager; import org.apache.aurora.scheduler.storage.Storage; import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult; @@ -95,6 +97,8 @@ public class CronIT extends EasyMockTest { bind(Clock.class).toInstance(Clock.SYSTEM_CLOCK); bind(StateManager.class).toInstance(stateManager); bind(Storage.class).toInstance(storage); + bind(StatsProvider.class).toInstance(createMock(StatsProvider.class)); + bind(EventSink.class).toInstance(createMock(EventSink.class)); } }); }