Repository: aurora Updated Branches: refs/heads/master 2d6108bb4 -> c05632b21
Add more preemption metrics (jobs preempted, preemptors) and logging statements Added additional metrics: ``` 1. preemptor_tasks_preempted_[JOB_NAME] - The number of times [JOB_NAME] has been preempted for another task. 2. preemptor_tasks_preemptor_[JOB_NAME] - The number of times [JOB_NAME] has preempted another task. 3. preemptor_slot_search_[success|failed]_for_[JOB_NAME] - The number of times [JOB_NAME] has or hasn't found a slot for preemption. 4. preemptor_slot_validation_[success|failed]_for_[JOB_NAME] - The number of times [JOB_NAME] succeeded to or failed to validate a slot before preemption. ``` Additionally, added some `LOG.info` statements for better visibility into preemption/preemption slot finding. Did a little bit of code refactoring as well. Reviewed at https://reviews.apache.org/r/66536/ Project: http://git-wip-us.apache.org/repos/asf/aurora/repo Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/c05632b2 Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/c05632b2 Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/c05632b2 Branch: refs/heads/master Commit: c05632b219346abd483937933c04a2ad92bca3d2 Parents: 2d6108b Author: Jordan Ly <jordan....@gmail.com> Authored: Wed Apr 11 14:41:52 2018 -0700 Committer: Jordan Ly <j...@twitter.com> Committed: Wed Apr 11 14:41:52 2018 -0700 ---------------------------------------------------------------------- .../preemptor/PendingTaskProcessor.java | 16 +++++- .../preemptor/PreemptionVictimFilter.java | 37 ++++++------ .../aurora/scheduler/preemptor/Preemptor.java | 17 +++--- .../scheduler/preemptor/PreemptorMetrics.java | 48 ++++++++++++++-- .../scheduler/scheduling/TaskSchedulerImpl.java | 15 ++--- .../preemptor/PendingTaskProcessorTest.java | 14 +++++ .../preemptor/PreemptionVictimFilterTest.java | 2 +- .../scheduler/preemptor/PreemptorImplTest.java | 59 ++++++++++++++------ 8 files changed, 149 insertions(+), 59 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/aurora/blob/c05632b2/src/main/java/org/apache/aurora/scheduler/preemptor/PendingTaskProcessor.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/preemptor/PendingTaskProcessor.java b/src/main/java/org/apache/aurora/scheduler/preemptor/PendingTaskProcessor.java index ef06471..67103fa 100644 --- a/src/main/java/org/apache/aurora/scheduler/preemptor/PendingTaskProcessor.java +++ b/src/main/java/org/apache/aurora/scheduler/preemptor/PendingTaskProcessor.java @@ -28,6 +28,7 @@ import javax.inject.Qualifier; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; import com.google.common.base.Functions; +import com.google.common.base.Joiner; import com.google.common.base.Preconditions; import com.google.common.base.Predicate; import com.google.common.base.Predicates; @@ -62,6 +63,8 @@ import org.apache.aurora.scheduler.storage.entities.IAssignedTask; import org.apache.aurora.scheduler.storage.entities.IJobKey; import org.apache.aurora.scheduler.storage.entities.IScheduledTask; import org.apache.aurora.scheduler.storage.entities.ITaskConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import static java.lang.annotation.ElementType.FIELD; import static java.lang.annotation.ElementType.METHOD; @@ -76,6 +79,8 @@ import static org.apache.aurora.gen.ScheduleStatus.PENDING; */ @VisibleForTesting public class PendingTaskProcessor implements Runnable { + private static final Logger LOG = LoggerFactory.getLogger(PendingTaskProcessor.class); + private final Storage storage; private final OfferManager offerManager; private final PreemptionVictimFilter preemptionVictimFilter; @@ -168,8 +173,9 @@ public class PendingTaskProcessor implements Runnable { TaskGroupKey group = groups.next(); ITaskConfig task = group.getTask(); + LOG.info("Searching for preemptible slots for {}", group); metrics.recordPreemptionAttemptFor(task); - // start over only if a different task group is being processed + // Start over only if a different task group is being processed if (!group.equals(lastGroup)) { slaveIterator = allSlaves.iterator(); } @@ -186,6 +192,13 @@ public class PendingTaskProcessor implements Runnable { metrics.recordSlotSearchResult(candidates, task); if (candidates.isPresent()) { // Slot found -> remove slave to avoid multiple task reservations. + Iterable<String> candidateTaskIds = Iterables.transform( + candidates.get(), + PreemptionVictim::getTaskId); + LOG.info("Found preemptible slot on agent {} for {} with candidates {}", + slaveId, + group, + Joiner.on(",").join(candidateTaskIds)); slaveIterator.remove(); slotCache.put(new PreemptionProposal(candidates.get(), slaveId), group); matched = true; @@ -194,6 +207,7 @@ public class PendingTaskProcessor implements Runnable { } if (!matched) { // No slot found for the group -> remove group and reset group iterator. + LOG.info("Could not find preemptible slot for {}", group); pendingGroups.removeAll(ImmutableSet.of(group)); groups = Iterators.consumingIterator(pendingGroups.iterator()); metrics.recordUnmatchedTask(); http://git-wip-us.apache.org/repos/asf/aurora/blob/c05632b2/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilter.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilter.java b/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilter.java index 569cfe6..3aafd09 100644 --- a/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilter.java +++ b/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilter.java @@ -17,13 +17,14 @@ import java.time.Instant; import java.util.List; import java.util.Optional; import java.util.Set; +import java.util.stream.StreamSupport; import javax.inject.Inject; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; import com.google.common.base.Predicate; -import com.google.common.collect.FluentIterable; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import com.google.common.collect.Ordering; @@ -186,6 +187,15 @@ public interface PreemptionVictimFilter { Optional<HostOffer> offer, StoreProvider storeProvider) { + List<PreemptionVictim> sortedVictims = StreamSupport + .stream(possibleVictims.spliterator(), false) + .filter(preemptionFilter(pendingTask)) + .sorted(resourceOrder) + .collect(ImmutableList.toImmutableList()); + if (sortedVictims.isEmpty()) { + return Optional.empty(); + } + // This enforces the precondition that all of the resources are from the same host. We need to // get the host for the schedulingFilter. Set<String> hosts = ImmutableSet.<String>builder() @@ -193,24 +203,12 @@ public interface PreemptionVictimFilter { .addAll(offer.map(OFFER_TO_HOST).map(ImmutableSet::of).orElse(ImmutableSet.of())) .build(); - ResourceBag slackResources = offer.map(ImmutableSet::of).orElse(ImmutableSet.of()).stream() + ResourceBag slackResources = offer .map(o -> bagFromMesosResources(getNonRevocableOfferResources(o.getOffer()))) - .reduce((l, r) -> l.add(r)) .orElse(EMPTY); - FluentIterable<PreemptionVictim> preemptableTasks = FluentIterable.from(possibleVictims) - .filter(preemptionFilter(pendingTask)); - - List<PreemptionVictim> sortedVictims = resourceOrder.immutableSortedCopy(preemptableTasks); - if (sortedVictims.isEmpty()) { - return Optional.empty(); - } - - Set<PreemptionVictim> toPreemptTasks = Sets.newHashSet(); - Optional<IHostAttributes> attributes = storeProvider.getAttributeStore().getHostAttributes(Iterables.getOnlyElement(hosts)); - if (!attributes.isPresent()) { metrics.recordMissingAttributes(); return Optional.empty(); @@ -219,16 +217,14 @@ public interface PreemptionVictimFilter { ResourceRequest requiredResources = ResourceRequest.fromTask(pendingTask, executorSettings, jobState, tierManager); + Optional<Instant> unavailability = offer.flatMap(HostOffer::getUnavailabilityStart); + ResourceBag totalResource = slackResources; + Set<PreemptionVictim> toPreemptTasks = Sets.newHashSet(); for (PreemptionVictim victim : sortedVictims) { toPreemptTasks.add(victim); totalResource = totalResource.add(victimToResources.apply(victim)); - Optional<Instant> unavailability = Optional.empty(); - if (offer.isPresent()) { - unavailability = offer.get().getUnavailabilityStart(); - } - Set<Veto> vetoes = schedulingFilter.filter( new UnusedResource(totalResource, attributes.get(), unavailability), requiredResources); @@ -237,6 +233,7 @@ public interface PreemptionVictimFilter { return Optional.of(ImmutableSet.copyOf(toPreemptTasks)); } } + return Optional.empty(); } @@ -245,7 +242,7 @@ public interface PreemptionVictimFilter { * * @param pendingTask A task that is not scheduled to possibly preempt other tasks for. * @return A filter that will compare the priorities and resources required by other tasks - * with {@code preemptableTask}. + * with {@code preemptibleTask}. */ private Predicate<PreemptionVictim> preemptionFilter(final ITaskConfig pendingTask) { return possibleVictim -> { http://git-wip-us.apache.org/repos/asf/aurora/blob/c05632b2/src/main/java/org/apache/aurora/scheduler/preemptor/Preemptor.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/preemptor/Preemptor.java b/src/main/java/org/apache/aurora/scheduler/preemptor/Preemptor.java index 293d106..84d7543 100644 --- a/src/main/java/org/apache/aurora/scheduler/preemptor/Preemptor.java +++ b/src/main/java/org/apache/aurora/scheduler/preemptor/Preemptor.java @@ -13,7 +13,6 @@ */ package org.apache.aurora.scheduler.preemptor; -import java.util.Iterator; import java.util.Optional; import javax.inject.Inject; @@ -27,6 +26,8 @@ import org.apache.aurora.scheduler.state.StateManager; import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider; import org.apache.aurora.scheduler.storage.entities.IAssignedTask; import org.apache.mesos.v1.Protos.AgentID; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import static java.util.Objects.requireNonNull; @@ -51,6 +52,8 @@ public interface Preemptor { MutableStoreProvider storeProvider); class PreemptorImpl implements Preemptor { + private static final Logger LOG = LoggerFactory.getLogger(PreemptorImpl.class); + private final StateManager stateManager; private final OfferManager offerManager; private final PreemptionVictimFilter preemptionVictimFilter; @@ -79,12 +82,9 @@ public interface Preemptor { MutableStoreProvider store) { TaskGroupKey groupKey = TaskGroupKey.from(pendingTask.getTask()); - Iterator<PreemptionProposal> proposalIterator = slotCache.getByValue(groupKey).iterator(); // A preemption slot is available -> attempt to preempt tasks. - while (proposalIterator.hasNext()) { - // Get the next available preemption slot. - PreemptionProposal slot = proposalIterator.next(); + for (PreemptionProposal slot : slotCache.getByValue(groupKey)) { slotCache.remove(slot, groupKey); // Validate PreemptionProposal is still valid for the given task. @@ -97,10 +97,13 @@ public interface Preemptor { offerManager.get(slaveId), store); - metrics.recordSlotValidationResult(validatedVictims); + metrics.recordSlotValidationResult(validatedVictims, pendingTask); if (validatedVictims.isPresent()) { for (PreemptionVictim toPreempt : validatedVictims.get()) { - metrics.recordTaskPreemption(toPreempt); + LOG.info("Preempting {} in favor of {}", + toPreempt.getTaskId(), + pendingTask.getTaskId()); + metrics.recordTaskPreemption(toPreempt, pendingTask); stateManager.changeState( store, toPreempt.getTaskId(), http://git-wip-us.apache.org/repos/asf/aurora/blob/c05632b2/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptorMetrics.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptorMetrics.java b/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptorMetrics.java index 8730577..2bb3bc8 100644 --- a/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptorMetrics.java +++ b/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptorMetrics.java @@ -21,7 +21,10 @@ import javax.inject.Inject; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableSet; +import org.apache.aurora.scheduler.base.JobKeys; import org.apache.aurora.scheduler.stats.CachedCounters; +import org.apache.aurora.scheduler.storage.entities.IAssignedTask; +import org.apache.aurora.scheduler.storage.entities.IJobKey; import org.apache.aurora.scheduler.storage.entities.ITaskConfig; import static java.util.Objects.requireNonNull; @@ -66,8 +69,8 @@ public class PreemptorMetrics { Set<String> allStats = ImmutableSet.of( attemptsStatName(false), attemptsStatName(true), - successStatName(false), - successStatName(true), + preemptedStatName(false), + preemptedStatName(true), slotSearchStatName(true, false), slotSearchStatName(false, false), slotSearchStatName(true, true), @@ -95,38 +98,71 @@ public class PreemptorMetrics { } @VisibleForTesting - static String successStatName(boolean production) { + static String attemptsByJobStatName(IJobKey jobKey) { + return "preemption_slot_search_attempts_for_" + JobKeys.canonicalString(jobKey); + } + + @VisibleForTesting + static String preemptedStatName(boolean production) { return "preemptor_tasks_preempted_" + prod(production); } @VisibleForTesting + static String preemptedByJobStatName(IJobKey jobKey) { + return "preemptor_tasks_preempted_" + JobKeys.canonicalString(jobKey); + } + + @VisibleForTesting + static String preemptorByJobStatName(IJobKey jobKey) { + return "preemptor_task_preemptor_" + JobKeys.canonicalString(jobKey); + } + + @VisibleForTesting static String slotSearchStatName(boolean success, boolean production) { return "preemptor_slot_search_" + result(success) + "_for_" + prod(production); } @VisibleForTesting + static String slotSearchByJobStatName(boolean success, IJobKey jobKey) { + return "preemptor_slot_search_" + result(success) + "_for_" + JobKeys.canonicalString(jobKey); + } + + @VisibleForTesting static String slotValidationStatName(boolean success) { return "preemptor_slot_validation_" + result(success); } + @VisibleForTesting + static String slotValidationByJobStatName(boolean success, IJobKey jobKey) { + return "preemptor_slot_validation_" + + result(success) + + "_for_" + + JobKeys.canonicalString(jobKey); + } + void recordPreemptionAttemptFor(ITaskConfig task) { increment(attemptsStatName(task.isProduction())); + increment(attemptsByJobStatName(task.getJob())); } - void recordTaskPreemption(PreemptionVictim victim) { - increment(successStatName(victim.isProduction())); + void recordTaskPreemption(PreemptionVictim victim, IAssignedTask preemptor) { + increment(preemptedStatName(victim.isProduction())); + increment(preemptedByJobStatName(victim.getConfig().getJob())); + increment(preemptorByJobStatName(preemptor.getTask().getJob())); } void recordSlotSearchResult(Optional<?> result, ITaskConfig task) { increment(slotSearchStatName(result.isPresent(), task.isProduction())); + increment(slotSearchByJobStatName(result.isPresent(), task.getJob())); } void recordUnmatchedTask() { increment(UNMATCHED_TASKS); } - void recordSlotValidationResult(Optional<?> result) { + void recordSlotValidationResult(Optional<?> result, IAssignedTask task) { increment(slotValidationStatName(result.isPresent())); + increment(slotValidationByJobStatName(result.isPresent(), task.getTask().getJob())); } void recordMissingAttributes() { http://git-wip-us.apache.org/repos/asf/aurora/blob/c05632b2/src/main/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImpl.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImpl.java b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImpl.java index edab03d..46e6127 100644 --- a/src/main/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImpl.java +++ b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImpl.java @@ -128,8 +128,8 @@ public class TaskSchedulerImpl implements TaskScheduler { )); if (ids.size() != tasks.size()) { - LOG.warn("Failed to look up tasks " - + Joiner.on(", ").join(Sets.difference(ids, tasks.keySet()))); + LOG.warn("Failed to look up tasks {}", + Joiner.on(", ").join(Sets.difference(ids, tasks.keySet()))); } return tasks; } @@ -179,13 +179,14 @@ public class TaskSchedulerImpl implements TaskScheduler { AttributeAggregate jobState, MutableStoreProvider storeProvider) { - if (!reservations.getByValue(TaskGroupKey.from(task.getTask())).isEmpty()) { + TaskGroupKey groupKey = TaskGroupKey.from(task.getTask()); + + if (!reservations.getByValue(groupKey).isEmpty()) { return; } - Optional<String> slaveId = preemptor.attemptPreemptionFor(task, jobState, storeProvider); - if (slaveId.isPresent()) { - reservations.put(slaveId.get(), TaskGroupKey.from(task.getTask())); - } + + preemptor.attemptPreemptionFor(task, jobState, storeProvider) + .ifPresent(slaveId -> reservations.put(slaveId, groupKey)); } @Subscribe http://git-wip-us.apache.org/repos/asf/aurora/blob/c05632b2/src/test/java/org/apache/aurora/scheduler/preemptor/PendingTaskProcessorTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/preemptor/PendingTaskProcessorTest.java b/src/test/java/org/apache/aurora/scheduler/preemptor/PendingTaskProcessorTest.java index ba775f4..0bd8d21 100644 --- a/src/test/java/org/apache/aurora/scheduler/preemptor/PendingTaskProcessorTest.java +++ b/src/test/java/org/apache/aurora/scheduler/preemptor/PendingTaskProcessorTest.java @@ -55,7 +55,9 @@ import org.junit.Test; import static org.apache.aurora.gen.ScheduleStatus.PENDING; import static org.apache.aurora.scheduler.preemptor.PreemptorMetrics.TASK_PROCESSOR_RUN_NAME; import static org.apache.aurora.scheduler.preemptor.PreemptorMetrics.UNMATCHED_TASKS; +import static org.apache.aurora.scheduler.preemptor.PreemptorMetrics.attemptsByJobStatName; import static org.apache.aurora.scheduler.preemptor.PreemptorMetrics.attemptsStatName; +import static org.apache.aurora.scheduler.preemptor.PreemptorMetrics.slotSearchByJobStatName; import static org.apache.aurora.scheduler.preemptor.PreemptorMetrics.slotSearchStatName; import static org.easymock.EasyMock.anyObject; import static org.easymock.EasyMock.eq; @@ -132,7 +134,11 @@ public class PendingTaskProcessorTest extends EasyMockTest { slotFinder.run(); assertEquals(1L, statsProvider.getLongValue(TASK_PROCESSOR_RUN_NAME)); assertEquals(2L, statsProvider.getLongValue(attemptsStatName(true))); + assertEquals(1L, statsProvider.getLongValue(attemptsByJobStatName(JOB_A))); + assertEquals(1L, statsProvider.getLongValue(attemptsByJobStatName(JOB_B))); assertEquals(2L, statsProvider.getLongValue(slotSearchStatName(true, true))); + assertEquals(1L, statsProvider.getLongValue(slotSearchByJobStatName(true, JOB_A))); + assertEquals(1L, statsProvider.getLongValue(slotSearchByJobStatName(true, JOB_B))); assertEquals(0L, statsProvider.getLongValue(slotSearchStatName(false, true))); assertEquals(0L, statsProvider.getLongValue(UNMATCHED_TASKS)); assertEquals(2L, statsProvider.getLongValue(CACHE_SIZE_STAT_NAME)); @@ -153,8 +159,10 @@ public class PendingTaskProcessorTest extends EasyMockTest { slotFinder.run(); assertEquals(1L, statsProvider.getLongValue(TASK_PROCESSOR_RUN_NAME)); assertEquals(1L, statsProvider.getLongValue(attemptsStatName(true))); + assertEquals(1L, statsProvider.getLongValue(attemptsByJobStatName(JOB_A))); assertEquals(0L, statsProvider.getLongValue(slotSearchStatName(true, true))); assertEquals(1L, statsProvider.getLongValue(slotSearchStatName(false, true))); + assertEquals(1L, statsProvider.getLongValue(slotSearchByJobStatName(false, JOB_A))); assertEquals(1L, statsProvider.getLongValue(UNMATCHED_TASKS)); } @@ -206,13 +214,18 @@ public class PendingTaskProcessorTest extends EasyMockTest { assertEquals(slotCache.get(proposal2), Optional.of(group(task5))); assertEquals(1L, statsProvider.getLongValue(TASK_PROCESSOR_RUN_NAME)); assertEquals(3L, statsProvider.getLongValue(attemptsStatName(true))); + assertEquals(1L, statsProvider.getLongValue(attemptsByJobStatName(JOB_A))); + assertEquals(2L, statsProvider.getLongValue(attemptsByJobStatName(JOB_B))); assertEquals(2L, statsProvider.getLongValue(slotSearchStatName(true, true))); + assertEquals(2L, statsProvider.getLongValue(slotSearchByJobStatName(true, JOB_B))); // TODO(wfarner): This test depends on the iteration order of a hash set (the set containing // task groups), and as a result this stat could be 0 or 2 depending on which group is // evaluated first. assertTrue(ImmutableSet.of(0L, 2L).contains( statsProvider.getLongValue(slotSearchStatName(false, true)))); + assertTrue(ImmutableSet.of(0L, 2L).contains( + statsProvider.getLongValue(slotSearchByJobStatName(false, JOB_A)))); assertEquals(1L, statsProvider.getLongValue(UNMATCHED_TASKS)); assertEquals(2L, statsProvider.getLongValue(CACHE_SIZE_STAT_NAME)); } @@ -227,6 +240,7 @@ public class PendingTaskProcessorTest extends EasyMockTest { slotFinder.run(); assertEquals(1L, statsProvider.getLongValue(TASK_PROCESSOR_RUN_NAME)); assertEquals(0L, statsProvider.getLongValue(attemptsStatName(true))); + assertEquals(0L, statsProvider.getLongValue(attemptsStatName(false))); assertEquals(0L, statsProvider.getLongValue(slotSearchStatName(true, true))); assertEquals(0L, statsProvider.getLongValue(slotSearchStatName(false, true))); assertEquals(0L, statsProvider.getLongValue(UNMATCHED_TASKS)); http://git-wip-us.apache.org/repos/asf/aurora/blob/c05632b2/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilterTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilterTest.java b/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilterTest.java index b3ffb0d..89a9e53 100644 --- a/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilterTest.java +++ b/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilterTest.java @@ -174,7 +174,7 @@ public class PreemptionVictimFilterTest extends EasyMockTest { } @Test - public void testOnePreemptableTask() { + public void testOnePreemptibleTask() { setUpHost(); schedulingFilter = createMock(SchedulingFilter.class); http://git-wip-us.apache.org/repos/asf/aurora/blob/c05632b2/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptorImplTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptorImplTest.java b/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptorImplTest.java index 0ef29d5..f2a93e8 100644 --- a/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptorImplTest.java +++ b/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptorImplTest.java @@ -46,21 +46,29 @@ import org.junit.Test; import static org.apache.aurora.gen.ScheduleStatus.PENDING; import static org.apache.aurora.scheduler.filter.AttributeAggregate.empty; +import static org.apache.aurora.scheduler.preemptor.PreemptorMetrics.preemptedByJobStatName; +import static org.apache.aurora.scheduler.preemptor.PreemptorMetrics.preemptedStatName; +import static org.apache.aurora.scheduler.preemptor.PreemptorMetrics.preemptorByJobStatName; +import static org.apache.aurora.scheduler.preemptor.PreemptorMetrics.slotValidationByJobStatName; import static org.apache.aurora.scheduler.preemptor.PreemptorMetrics.slotValidationStatName; -import static org.apache.aurora.scheduler.preemptor.PreemptorMetrics.successStatName; import static org.easymock.EasyMock.anyObject; import static org.easymock.EasyMock.eq; import static org.easymock.EasyMock.expect; import static org.junit.Assert.assertEquals; public class PreemptorImplTest extends EasyMockTest { + private static final String TASK_NAME = "preemptor-name"; + private static final IScheduledTask TASK = IScheduledTask.build(makeTask(TASK_NAME)); private static final String SLAVE_ID_1 = "slave_id_1"; private static final String SLAVE_ID_2 = "slave_id_2"; - private static final IScheduledTask TASK = IScheduledTask.build(makeTask()); - private static final PreemptionProposal PROPOSAL_1 = createPreemptionProposal(TASK, SLAVE_ID_1); - private static final PreemptionProposal PROPOSAL_2 = createPreemptionProposal(TASK, SLAVE_ID_2); + private static final IScheduledTask SLAVE_1_TASK = IScheduledTask.build(makeTask(SLAVE_ID_1)); + private static final IScheduledTask SLAVE_2_TASK = IScheduledTask.build(makeTask(SLAVE_ID_2)); + private static final PreemptionProposal PROPOSAL_1 = createPreemptionProposal(SLAVE_1_TASK, + SLAVE_ID_1); + private static final PreemptionProposal PROPOSAL_2 = createPreemptionProposal(SLAVE_2_TASK, + SLAVE_ID_2); private static final TaskGroupKey GROUP_KEY = - TaskGroupKey.from(ITaskConfig.build(makeTask().getAssignedTask().getTask())); + TaskGroupKey.from(ITaskConfig.build(makeTask(TASK_NAME).getAssignedTask().getTask())); private static final Set<PreemptionProposal> NO_SLOTS = ImmutableSet.of(); private static final Optional<String> EMPTY_RESULT = Optional.empty(); @@ -99,16 +107,22 @@ public class PreemptorImplTest extends EasyMockTest { expect(slotCache.getByValue(GROUP_KEY)).andReturn(ImmutableSet.of(PROPOSAL_1, PROPOSAL_2)); slotCache.remove(PROPOSAL_1, GROUP_KEY); expectSlotValidation(PROPOSAL_1, Optional.of(ImmutableSet.of( - PreemptionVictim.fromTask(TASK.getAssignedTask())))); + PreemptionVictim.fromTask(SLAVE_1_TASK.getAssignedTask())))); expectPreempted(TASK); control.replay(); assertEquals(Optional.of(SLAVE_ID_1), callPreemptor()); - assertEquals(0L, statsProvider.getLongValue(slotValidationStatName(false))); assertEquals(1L, statsProvider.getLongValue(slotValidationStatName(true))); - assertEquals(1L, statsProvider.getLongValue(successStatName(true))); + assertEquals(1L, statsProvider.getLongValue(slotValidationByJobStatName(true, + TASK.getAssignedTask().getTask().getJob()))); + PROPOSAL_1.getVictims().forEach(victim -> + assertEquals(1L, statsProvider.getLongValue(preemptedByJobStatName( + victim.getConfig().getJob())))); + assertEquals(1L, statsProvider.getLongValue(preemptorByJobStatName( + TASK.getAssignedTask().getTask().getJob()))); + assertEquals(1L, statsProvider.getLongValue(preemptedStatName(true))); } @Test @@ -121,8 +135,9 @@ public class PreemptorImplTest extends EasyMockTest { assertEquals(EMPTY_RESULT, callPreemptor()); assertEquals(1L, statsProvider.getLongValue(slotValidationStatName(false))); - assertEquals(0L, statsProvider.getLongValue(slotValidationStatName(true))); - assertEquals(0L, statsProvider.getLongValue(successStatName(true))); + assertEquals(1L, statsProvider.getLongValue(slotValidationByJobStatName(false, + TASK.getAssignedTask().getTask().getJob()))); + assertEquals(0L, statsProvider.getLongValue(preemptedStatName(true))); } @Test @@ -132,7 +147,7 @@ public class PreemptorImplTest extends EasyMockTest { expectSlotValidation(PROPOSAL_1, Optional.empty()); slotCache.remove(PROPOSAL_2, GROUP_KEY); expectSlotValidation(PROPOSAL_2, Optional.of(ImmutableSet.of( - PreemptionVictim.fromTask(TASK.getAssignedTask())))); + PreemptionVictim.fromTask(SLAVE_2_TASK.getAssignedTask())))); expectPreempted(TASK); @@ -140,8 +155,17 @@ public class PreemptorImplTest extends EasyMockTest { assertEquals(Optional.of(SLAVE_ID_2), callPreemptor()); assertEquals(1L, statsProvider.getLongValue(slotValidationStatName(false))); + assertEquals(1L, statsProvider.getLongValue(slotValidationByJobStatName(false, + TASK.getAssignedTask().getTask().getJob()))); assertEquals(1L, statsProvider.getLongValue(slotValidationStatName(true))); - assertEquals(1L, statsProvider.getLongValue(successStatName(true))); + assertEquals(1L, statsProvider.getLongValue(slotValidationByJobStatName(true, + TASK.getAssignedTask().getTask().getJob()))); + PROPOSAL_2.getVictims().forEach(victim -> + assertEquals(1L, statsProvider.getLongValue(preemptedByJobStatName( + victim.getConfig().getJob())))); + assertEquals(1L, statsProvider.getLongValue(preemptorByJobStatName( + TASK.getAssignedTask().getTask().getJob()))); + assertEquals(1L, statsProvider.getLongValue(preemptedStatName(true))); } @Test @@ -156,8 +180,9 @@ public class PreemptorImplTest extends EasyMockTest { assertEquals(EMPTY_RESULT, callPreemptor()); assertEquals(2L, statsProvider.getLongValue(slotValidationStatName(false))); - assertEquals(0L, statsProvider.getLongValue(slotValidationStatName(true))); - assertEquals(0L, statsProvider.getLongValue(successStatName(true))); + assertEquals(2L, statsProvider.getLongValue(slotValidationByJobStatName(false, + TASK.getAssignedTask().getTask().getJob()))); + assertEquals(0L, statsProvider.getLongValue(preemptedStatName(true))); } @Test @@ -169,7 +194,7 @@ public class PreemptorImplTest extends EasyMockTest { assertEquals(EMPTY_RESULT, callPreemptor()); assertEquals(0L, statsProvider.getLongValue(slotValidationStatName(false))); assertEquals(0L, statsProvider.getLongValue(slotValidationStatName(true))); - assertEquals(0L, statsProvider.getLongValue(successStatName(true))); + assertEquals(0L, statsProvider.getLongValue(preemptedStatName(true))); } private Optional<String> callPreemptor() { @@ -203,13 +228,13 @@ public class PreemptorImplTest extends EasyMockTest { return new PreemptionProposal(ImmutableSet.of(PreemptionVictim.fromTask(assigned)), slaveId); } - private static ScheduledTask makeTask() { + private static ScheduledTask makeTask(String name) { ScheduledTask task = new ScheduledTask() .setAssignedTask(new AssignedTask() .setTask(new TaskConfig() .setPriority(1) .setProduction(true) - .setJob(new JobKey("role", "env", "name")))); + .setJob(new JobKey("role", "env", name)))); task.addToTaskEvents(new TaskEvent(0, PENDING)); return task; }