Repository: aurora
Updated Branches:
  refs/heads/master 2d6108bb4 -> c05632b21


Add more preemption metrics (jobs preempted, preemptors) and logging statements

Added additional metrics: ```
1. preemptor_tasks_preempted_[JOB_NAME] - The number of times [JOB_NAME] has
   been preempted for another task.
2. preemptor_tasks_preemptor_[JOB_NAME] - The number of times [JOB_NAME] has
   preempted another task.
3. preemptor_slot_search_[success|failed]_for_[JOB_NAME] - The number of times
   [JOB_NAME] has or hasn't found a slot for preemption.
4. preemptor_slot_validation_[success|failed]_for_[JOB_NAME] - The number of
   times [JOB_NAME] succeeded to or failed to validate a slot before
preemption.  ```

Additionally, added some `LOG.info` statements for better visibility into
preemption/preemption slot finding.

Did a little bit of code refactoring as well.

Reviewed at https://reviews.apache.org/r/66536/


Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/c05632b2
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/c05632b2
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/c05632b2

Branch: refs/heads/master
Commit: c05632b219346abd483937933c04a2ad92bca3d2
Parents: 2d6108b
Author: Jordan Ly <jordan....@gmail.com>
Authored: Wed Apr 11 14:41:52 2018 -0700
Committer: Jordan Ly <j...@twitter.com>
Committed: Wed Apr 11 14:41:52 2018 -0700

----------------------------------------------------------------------
 .../preemptor/PendingTaskProcessor.java         | 16 +++++-
 .../preemptor/PreemptionVictimFilter.java       | 37 ++++++------
 .../aurora/scheduler/preemptor/Preemptor.java   | 17 +++---
 .../scheduler/preemptor/PreemptorMetrics.java   | 48 ++++++++++++++--
 .../scheduler/scheduling/TaskSchedulerImpl.java | 15 ++---
 .../preemptor/PendingTaskProcessorTest.java     | 14 +++++
 .../preemptor/PreemptionVictimFilterTest.java   |  2 +-
 .../scheduler/preemptor/PreemptorImplTest.java  | 59 ++++++++++++++------
 8 files changed, 149 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/aurora/blob/c05632b2/src/main/java/org/apache/aurora/scheduler/preemptor/PendingTaskProcessor.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/aurora/scheduler/preemptor/PendingTaskProcessor.java 
b/src/main/java/org/apache/aurora/scheduler/preemptor/PendingTaskProcessor.java
index ef06471..67103fa 100644
--- 
a/src/main/java/org/apache/aurora/scheduler/preemptor/PendingTaskProcessor.java
+++ 
b/src/main/java/org/apache/aurora/scheduler/preemptor/PendingTaskProcessor.java
@@ -28,6 +28,7 @@ import javax.inject.Qualifier;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Function;
 import com.google.common.base.Functions;
+import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Predicate;
 import com.google.common.base.Predicates;
@@ -62,6 +63,8 @@ import 
org.apache.aurora.scheduler.storage.entities.IAssignedTask;
 import org.apache.aurora.scheduler.storage.entities.IJobKey;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
 import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import static java.lang.annotation.ElementType.FIELD;
 import static java.lang.annotation.ElementType.METHOD;
@@ -76,6 +79,8 @@ import static org.apache.aurora.gen.ScheduleStatus.PENDING;
  */
 @VisibleForTesting
 public class PendingTaskProcessor implements Runnable {
+  private static final Logger LOG = 
LoggerFactory.getLogger(PendingTaskProcessor.class);
+
   private final Storage storage;
   private final OfferManager offerManager;
   private final PreemptionVictimFilter preemptionVictimFilter;
@@ -168,8 +173,9 @@ public class PendingTaskProcessor implements Runnable {
         TaskGroupKey group = groups.next();
         ITaskConfig task = group.getTask();
 
+        LOG.info("Searching for preemptible slots for {}", group);
         metrics.recordPreemptionAttemptFor(task);
-        // start over only if a different task group is being processed
+        // Start over only if a different task group is being processed
         if (!group.equals(lastGroup)) {
           slaveIterator = allSlaves.iterator();
         }
@@ -186,6 +192,13 @@ public class PendingTaskProcessor implements Runnable {
           metrics.recordSlotSearchResult(candidates, task);
           if (candidates.isPresent()) {
             // Slot found -> remove slave to avoid multiple task reservations.
+            Iterable<String> candidateTaskIds = Iterables.transform(
+                candidates.get(),
+                PreemptionVictim::getTaskId);
+            LOG.info("Found preemptible slot on agent {} for {} with 
candidates {}",
+                slaveId,
+                group,
+                Joiner.on(",").join(candidateTaskIds));
             slaveIterator.remove();
             slotCache.put(new PreemptionProposal(candidates.get(), slaveId), 
group);
             matched = true;
@@ -194,6 +207,7 @@ public class PendingTaskProcessor implements Runnable {
         }
         if (!matched) {
           // No slot found for the group -> remove group and reset group 
iterator.
+          LOG.info("Could not find preemptible slot for {}", group);
           pendingGroups.removeAll(ImmutableSet.of(group));
           groups = Iterators.consumingIterator(pendingGroups.iterator());
           metrics.recordUnmatchedTask();

http://git-wip-us.apache.org/repos/asf/aurora/blob/c05632b2/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilter.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilter.java
 
b/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilter.java
index 569cfe6..3aafd09 100644
--- 
a/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilter.java
+++ 
b/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilter.java
@@ -17,13 +17,14 @@ import java.time.Instant;
 import java.util.List;
 import java.util.Optional;
 import java.util.Set;
+import java.util.stream.StreamSupport;
 
 import javax.inject.Inject;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Function;
 import com.google.common.base.Predicate;
-import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Ordering;
@@ -186,6 +187,15 @@ public interface PreemptionVictimFilter {
         Optional<HostOffer> offer,
         StoreProvider storeProvider) {
 
+      List<PreemptionVictim> sortedVictims = StreamSupport
+          .stream(possibleVictims.spliterator(), false)
+          .filter(preemptionFilter(pendingTask))
+          .sorted(resourceOrder)
+          .collect(ImmutableList.toImmutableList());
+      if (sortedVictims.isEmpty()) {
+        return Optional.empty();
+      }
+
       // This enforces the precondition that all of the resources are from the 
same host. We need to
       // get the host for the schedulingFilter.
       Set<String> hosts = ImmutableSet.<String>builder()
@@ -193,24 +203,12 @@ public interface PreemptionVictimFilter {
           
.addAll(offer.map(OFFER_TO_HOST).map(ImmutableSet::of).orElse(ImmutableSet.of()))
           .build();
 
-      ResourceBag slackResources = 
offer.map(ImmutableSet::of).orElse(ImmutableSet.of()).stream()
+      ResourceBag slackResources = offer
           .map(o -> 
bagFromMesosResources(getNonRevocableOfferResources(o.getOffer())))
-          .reduce((l, r) -> l.add(r))
           .orElse(EMPTY);
 
-      FluentIterable<PreemptionVictim> preemptableTasks = 
FluentIterable.from(possibleVictims)
-          .filter(preemptionFilter(pendingTask));
-
-      List<PreemptionVictim> sortedVictims = 
resourceOrder.immutableSortedCopy(preemptableTasks);
-      if (sortedVictims.isEmpty()) {
-        return Optional.empty();
-      }
-
-      Set<PreemptionVictim> toPreemptTasks = Sets.newHashSet();
-
       Optional<IHostAttributes> attributes =
           
storeProvider.getAttributeStore().getHostAttributes(Iterables.getOnlyElement(hosts));
-
       if (!attributes.isPresent()) {
         metrics.recordMissingAttributes();
         return Optional.empty();
@@ -219,16 +217,14 @@ public interface PreemptionVictimFilter {
       ResourceRequest requiredResources =
           ResourceRequest.fromTask(pendingTask, executorSettings, jobState, 
tierManager);
 
+      Optional<Instant> unavailability = 
offer.flatMap(HostOffer::getUnavailabilityStart);
+
       ResourceBag totalResource = slackResources;
+      Set<PreemptionVictim> toPreemptTasks = Sets.newHashSet();
       for (PreemptionVictim victim : sortedVictims) {
         toPreemptTasks.add(victim);
         totalResource = totalResource.add(victimToResources.apply(victim));
 
-        Optional<Instant> unavailability = Optional.empty();
-        if (offer.isPresent()) {
-          unavailability = offer.get().getUnavailabilityStart();
-        }
-
         Set<Veto> vetoes = schedulingFilter.filter(
             new UnusedResource(totalResource, attributes.get(), 
unavailability),
             requiredResources);
@@ -237,6 +233,7 @@ public interface PreemptionVictimFilter {
           return Optional.of(ImmutableSet.copyOf(toPreemptTasks));
         }
       }
+
       return Optional.empty();
     }
 
@@ -245,7 +242,7 @@ public interface PreemptionVictimFilter {
      *
      * @param pendingTask A task that is not scheduled to possibly preempt 
other tasks for.
      * @return A filter that will compare the priorities and resources 
required by other tasks
-     *     with {@code preemptableTask}.
+     *     with {@code preemptibleTask}.
      */
     private Predicate<PreemptionVictim> preemptionFilter(final ITaskConfig 
pendingTask) {
       return possibleVictim -> {

http://git-wip-us.apache.org/repos/asf/aurora/blob/c05632b2/src/main/java/org/apache/aurora/scheduler/preemptor/Preemptor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/preemptor/Preemptor.java 
b/src/main/java/org/apache/aurora/scheduler/preemptor/Preemptor.java
index 293d106..84d7543 100644
--- a/src/main/java/org/apache/aurora/scheduler/preemptor/Preemptor.java
+++ b/src/main/java/org/apache/aurora/scheduler/preemptor/Preemptor.java
@@ -13,7 +13,6 @@
  */
 package org.apache.aurora.scheduler.preemptor;
 
-import java.util.Iterator;
 import java.util.Optional;
 
 import javax.inject.Inject;
@@ -27,6 +26,8 @@ import org.apache.aurora.scheduler.state.StateManager;
 import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
 import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
 import org.apache.mesos.v1.Protos.AgentID;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import static java.util.Objects.requireNonNull;
 
@@ -51,6 +52,8 @@ public interface Preemptor {
       MutableStoreProvider storeProvider);
 
   class PreemptorImpl implements Preemptor {
+    private static final Logger LOG = 
LoggerFactory.getLogger(PreemptorImpl.class);
+
     private final StateManager stateManager;
     private final OfferManager offerManager;
     private final PreemptionVictimFilter preemptionVictimFilter;
@@ -79,12 +82,9 @@ public interface Preemptor {
         MutableStoreProvider store) {
 
       TaskGroupKey groupKey = TaskGroupKey.from(pendingTask.getTask());
-      Iterator<PreemptionProposal> proposalIterator = 
slotCache.getByValue(groupKey).iterator();
 
       // A preemption slot is available -> attempt to preempt tasks.
-      while (proposalIterator.hasNext()) {
-        // Get the next available preemption slot.
-        PreemptionProposal slot = proposalIterator.next();
+      for (PreemptionProposal slot : slotCache.getByValue(groupKey)) {
         slotCache.remove(slot, groupKey);
 
         // Validate PreemptionProposal is still valid for the given task.
@@ -97,10 +97,13 @@ public interface Preemptor {
                 offerManager.get(slaveId),
                 store);
 
-        metrics.recordSlotValidationResult(validatedVictims);
+        metrics.recordSlotValidationResult(validatedVictims, pendingTask);
         if (validatedVictims.isPresent()) {
           for (PreemptionVictim toPreempt : validatedVictims.get()) {
-            metrics.recordTaskPreemption(toPreempt);
+            LOG.info("Preempting {} in favor of {}",
+                toPreempt.getTaskId(),
+                pendingTask.getTaskId());
+            metrics.recordTaskPreemption(toPreempt, pendingTask);
             stateManager.changeState(
                 store,
                 toPreempt.getTaskId(),

http://git-wip-us.apache.org/repos/asf/aurora/blob/c05632b2/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptorMetrics.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptorMetrics.java 
b/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptorMetrics.java
index 8730577..2bb3bc8 100644
--- a/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptorMetrics.java
+++ b/src/main/java/org/apache/aurora/scheduler/preemptor/PreemptorMetrics.java
@@ -21,7 +21,10 @@ import javax.inject.Inject;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableSet;
 
+import org.apache.aurora.scheduler.base.JobKeys;
 import org.apache.aurora.scheduler.stats.CachedCounters;
+import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
+import org.apache.aurora.scheduler.storage.entities.IJobKey;
 import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
 
 import static java.util.Objects.requireNonNull;
@@ -66,8 +69,8 @@ public class PreemptorMetrics {
     Set<String> allStats = ImmutableSet.of(
         attemptsStatName(false),
         attemptsStatName(true),
-        successStatName(false),
-        successStatName(true),
+        preemptedStatName(false),
+        preemptedStatName(true),
         slotSearchStatName(true, false),
         slotSearchStatName(false, false),
         slotSearchStatName(true, true),
@@ -95,38 +98,71 @@ public class PreemptorMetrics {
   }
 
   @VisibleForTesting
-  static String successStatName(boolean production) {
+  static String attemptsByJobStatName(IJobKey jobKey) {
+    return "preemption_slot_search_attempts_for_" + 
JobKeys.canonicalString(jobKey);
+  }
+
+  @VisibleForTesting
+  static String preemptedStatName(boolean production) {
     return "preemptor_tasks_preempted_" + prod(production);
   }
 
   @VisibleForTesting
+  static String preemptedByJobStatName(IJobKey jobKey) {
+    return "preemptor_tasks_preempted_" + JobKeys.canonicalString(jobKey);
+  }
+
+  @VisibleForTesting
+  static String preemptorByJobStatName(IJobKey jobKey) {
+    return "preemptor_task_preemptor_" + JobKeys.canonicalString(jobKey);
+  }
+
+  @VisibleForTesting
   static String slotSearchStatName(boolean success, boolean production) {
     return "preemptor_slot_search_" + result(success) + "_for_" + 
prod(production);
   }
 
   @VisibleForTesting
+  static String slotSearchByJobStatName(boolean success, IJobKey jobKey) {
+    return "preemptor_slot_search_" + result(success) + "_for_" + 
JobKeys.canonicalString(jobKey);
+  }
+
+  @VisibleForTesting
   static String slotValidationStatName(boolean success) {
     return "preemptor_slot_validation_" + result(success);
   }
 
+  @VisibleForTesting
+  static String slotValidationByJobStatName(boolean success, IJobKey jobKey) {
+    return "preemptor_slot_validation_"
+        + result(success)
+        + "_for_"
+        + JobKeys.canonicalString(jobKey);
+  }
+
   void recordPreemptionAttemptFor(ITaskConfig task) {
     increment(attemptsStatName(task.isProduction()));
+    increment(attemptsByJobStatName(task.getJob()));
   }
 
-  void recordTaskPreemption(PreemptionVictim victim) {
-    increment(successStatName(victim.isProduction()));
+  void recordTaskPreemption(PreemptionVictim victim, IAssignedTask preemptor) {
+    increment(preemptedStatName(victim.isProduction()));
+    increment(preemptedByJobStatName(victim.getConfig().getJob()));
+    increment(preemptorByJobStatName(preemptor.getTask().getJob()));
   }
 
   void recordSlotSearchResult(Optional<?> result, ITaskConfig task) {
     increment(slotSearchStatName(result.isPresent(), task.isProduction()));
+    increment(slotSearchByJobStatName(result.isPresent(), task.getJob()));
   }
 
   void recordUnmatchedTask() {
     increment(UNMATCHED_TASKS);
   }
 
-  void recordSlotValidationResult(Optional<?> result) {
+  void recordSlotValidationResult(Optional<?> result, IAssignedTask task) {
     increment(slotValidationStatName(result.isPresent()));
+    increment(slotValidationByJobStatName(result.isPresent(), 
task.getTask().getJob()));
   }
 
   void recordMissingAttributes() {

http://git-wip-us.apache.org/repos/asf/aurora/blob/c05632b2/src/main/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImpl.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImpl.java 
b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImpl.java
index edab03d..46e6127 100644
--- 
a/src/main/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImpl.java
+++ 
b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImpl.java
@@ -128,8 +128,8 @@ public class TaskSchedulerImpl implements TaskScheduler {
         ));
 
     if (ids.size() != tasks.size()) {
-      LOG.warn("Failed to look up tasks "
-          + Joiner.on(", ").join(Sets.difference(ids, tasks.keySet())));
+      LOG.warn("Failed to look up tasks {}",
+          Joiner.on(", ").join(Sets.difference(ids, tasks.keySet())));
     }
     return tasks;
   }
@@ -179,13 +179,14 @@ public class TaskSchedulerImpl implements TaskScheduler {
       AttributeAggregate jobState,
       MutableStoreProvider storeProvider) {
 
-    if (!reservations.getByValue(TaskGroupKey.from(task.getTask())).isEmpty()) 
{
+    TaskGroupKey groupKey = TaskGroupKey.from(task.getTask());
+
+    if (!reservations.getByValue(groupKey).isEmpty()) {
       return;
     }
-    Optional<String> slaveId = preemptor.attemptPreemptionFor(task, jobState, 
storeProvider);
-    if (slaveId.isPresent()) {
-      reservations.put(slaveId.get(), TaskGroupKey.from(task.getTask()));
-    }
+
+    preemptor.attemptPreemptionFor(task, jobState, storeProvider)
+        .ifPresent(slaveId -> reservations.put(slaveId, groupKey));
   }
 
   @Subscribe

http://git-wip-us.apache.org/repos/asf/aurora/blob/c05632b2/src/test/java/org/apache/aurora/scheduler/preemptor/PendingTaskProcessorTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/aurora/scheduler/preemptor/PendingTaskProcessorTest.java
 
b/src/test/java/org/apache/aurora/scheduler/preemptor/PendingTaskProcessorTest.java
index ba775f4..0bd8d21 100644
--- 
a/src/test/java/org/apache/aurora/scheduler/preemptor/PendingTaskProcessorTest.java
+++ 
b/src/test/java/org/apache/aurora/scheduler/preemptor/PendingTaskProcessorTest.java
@@ -55,7 +55,9 @@ import org.junit.Test;
 import static org.apache.aurora.gen.ScheduleStatus.PENDING;
 import static 
org.apache.aurora.scheduler.preemptor.PreemptorMetrics.TASK_PROCESSOR_RUN_NAME;
 import static 
org.apache.aurora.scheduler.preemptor.PreemptorMetrics.UNMATCHED_TASKS;
+import static 
org.apache.aurora.scheduler.preemptor.PreemptorMetrics.attemptsByJobStatName;
 import static 
org.apache.aurora.scheduler.preemptor.PreemptorMetrics.attemptsStatName;
+import static 
org.apache.aurora.scheduler.preemptor.PreemptorMetrics.slotSearchByJobStatName;
 import static 
org.apache.aurora.scheduler.preemptor.PreemptorMetrics.slotSearchStatName;
 import static org.easymock.EasyMock.anyObject;
 import static org.easymock.EasyMock.eq;
@@ -132,7 +134,11 @@ public class PendingTaskProcessorTest extends EasyMockTest 
{
     slotFinder.run();
     assertEquals(1L, statsProvider.getLongValue(TASK_PROCESSOR_RUN_NAME));
     assertEquals(2L, statsProvider.getLongValue(attemptsStatName(true)));
+    assertEquals(1L, statsProvider.getLongValue(attemptsByJobStatName(JOB_A)));
+    assertEquals(1L, statsProvider.getLongValue(attemptsByJobStatName(JOB_B)));
     assertEquals(2L, statsProvider.getLongValue(slotSearchStatName(true, 
true)));
+    assertEquals(1L, statsProvider.getLongValue(slotSearchByJobStatName(true, 
JOB_A)));
+    assertEquals(1L, statsProvider.getLongValue(slotSearchByJobStatName(true, 
JOB_B)));
     assertEquals(0L, statsProvider.getLongValue(slotSearchStatName(false, 
true)));
     assertEquals(0L, statsProvider.getLongValue(UNMATCHED_TASKS));
     assertEquals(2L, statsProvider.getLongValue(CACHE_SIZE_STAT_NAME));
@@ -153,8 +159,10 @@ public class PendingTaskProcessorTest extends EasyMockTest 
{
     slotFinder.run();
     assertEquals(1L, statsProvider.getLongValue(TASK_PROCESSOR_RUN_NAME));
     assertEquals(1L, statsProvider.getLongValue(attemptsStatName(true)));
+    assertEquals(1L, statsProvider.getLongValue(attemptsByJobStatName(JOB_A)));
     assertEquals(0L, statsProvider.getLongValue(slotSearchStatName(true, 
true)));
     assertEquals(1L, statsProvider.getLongValue(slotSearchStatName(false, 
true)));
+    assertEquals(1L, statsProvider.getLongValue(slotSearchByJobStatName(false, 
JOB_A)));
     assertEquals(1L, statsProvider.getLongValue(UNMATCHED_TASKS));
   }
 
@@ -206,13 +214,18 @@ public class PendingTaskProcessorTest extends 
EasyMockTest {
     assertEquals(slotCache.get(proposal2), Optional.of(group(task5)));
     assertEquals(1L, statsProvider.getLongValue(TASK_PROCESSOR_RUN_NAME));
     assertEquals(3L, statsProvider.getLongValue(attemptsStatName(true)));
+    assertEquals(1L, statsProvider.getLongValue(attemptsByJobStatName(JOB_A)));
+    assertEquals(2L, statsProvider.getLongValue(attemptsByJobStatName(JOB_B)));
     assertEquals(2L, statsProvider.getLongValue(slotSearchStatName(true, 
true)));
+    assertEquals(2L, statsProvider.getLongValue(slotSearchByJobStatName(true, 
JOB_B)));
 
     // TODO(wfarner): This test depends on the iteration order of a hash set 
(the set containing
     // task groups), and as a result this stat could be 0 or 2 depending on 
which group is
     // evaluated first.
     assertTrue(ImmutableSet.of(0L, 2L).contains(
         statsProvider.getLongValue(slotSearchStatName(false, true))));
+    assertTrue(ImmutableSet.of(0L, 2L).contains(
+        statsProvider.getLongValue(slotSearchByJobStatName(false, JOB_A))));
     assertEquals(1L, statsProvider.getLongValue(UNMATCHED_TASKS));
     assertEquals(2L, statsProvider.getLongValue(CACHE_SIZE_STAT_NAME));
   }
@@ -227,6 +240,7 @@ public class PendingTaskProcessorTest extends EasyMockTest {
     slotFinder.run();
     assertEquals(1L, statsProvider.getLongValue(TASK_PROCESSOR_RUN_NAME));
     assertEquals(0L, statsProvider.getLongValue(attemptsStatName(true)));
+    assertEquals(0L, statsProvider.getLongValue(attemptsStatName(false)));
     assertEquals(0L, statsProvider.getLongValue(slotSearchStatName(true, 
true)));
     assertEquals(0L, statsProvider.getLongValue(slotSearchStatName(false, 
true)));
     assertEquals(0L, statsProvider.getLongValue(UNMATCHED_TASKS));

http://git-wip-us.apache.org/repos/asf/aurora/blob/c05632b2/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilterTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilterTest.java
 
b/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilterTest.java
index b3ffb0d..89a9e53 100644
--- 
a/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilterTest.java
+++ 
b/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptionVictimFilterTest.java
@@ -174,7 +174,7 @@ public class PreemptionVictimFilterTest extends 
EasyMockTest {
   }
 
   @Test
-  public void testOnePreemptableTask() {
+  public void testOnePreemptibleTask() {
     setUpHost();
 
     schedulingFilter = createMock(SchedulingFilter.class);

http://git-wip-us.apache.org/repos/asf/aurora/blob/c05632b2/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptorImplTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptorImplTest.java 
b/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptorImplTest.java
index 0ef29d5..f2a93e8 100644
--- a/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptorImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptorImplTest.java
@@ -46,21 +46,29 @@ import org.junit.Test;
 
 import static org.apache.aurora.gen.ScheduleStatus.PENDING;
 import static org.apache.aurora.scheduler.filter.AttributeAggregate.empty;
+import static 
org.apache.aurora.scheduler.preemptor.PreemptorMetrics.preemptedByJobStatName;
+import static 
org.apache.aurora.scheduler.preemptor.PreemptorMetrics.preemptedStatName;
+import static 
org.apache.aurora.scheduler.preemptor.PreemptorMetrics.preemptorByJobStatName;
+import static 
org.apache.aurora.scheduler.preemptor.PreemptorMetrics.slotValidationByJobStatName;
 import static 
org.apache.aurora.scheduler.preemptor.PreemptorMetrics.slotValidationStatName;
-import static 
org.apache.aurora.scheduler.preemptor.PreemptorMetrics.successStatName;
 import static org.easymock.EasyMock.anyObject;
 import static org.easymock.EasyMock.eq;
 import static org.easymock.EasyMock.expect;
 import static org.junit.Assert.assertEquals;
 
 public class PreemptorImplTest extends EasyMockTest {
+  private static final String TASK_NAME = "preemptor-name";
+  private static final IScheduledTask TASK = 
IScheduledTask.build(makeTask(TASK_NAME));
   private static final String SLAVE_ID_1 = "slave_id_1";
   private static final String SLAVE_ID_2 = "slave_id_2";
-  private static final IScheduledTask TASK = IScheduledTask.build(makeTask());
-  private static final PreemptionProposal PROPOSAL_1 = 
createPreemptionProposal(TASK, SLAVE_ID_1);
-  private static final PreemptionProposal PROPOSAL_2 = 
createPreemptionProposal(TASK, SLAVE_ID_2);
+  private static final IScheduledTask SLAVE_1_TASK = 
IScheduledTask.build(makeTask(SLAVE_ID_1));
+  private static final IScheduledTask SLAVE_2_TASK = 
IScheduledTask.build(makeTask(SLAVE_ID_2));
+  private static final PreemptionProposal PROPOSAL_1 = 
createPreemptionProposal(SLAVE_1_TASK,
+      SLAVE_ID_1);
+  private static final PreemptionProposal PROPOSAL_2 = 
createPreemptionProposal(SLAVE_2_TASK,
+      SLAVE_ID_2);
   private static final TaskGroupKey GROUP_KEY =
-      
TaskGroupKey.from(ITaskConfig.build(makeTask().getAssignedTask().getTask()));
+      
TaskGroupKey.from(ITaskConfig.build(makeTask(TASK_NAME).getAssignedTask().getTask()));
 
   private static final Set<PreemptionProposal> NO_SLOTS = ImmutableSet.of();
   private static final Optional<String> EMPTY_RESULT = Optional.empty();
@@ -99,16 +107,22 @@ public class PreemptorImplTest extends EasyMockTest {
     
expect(slotCache.getByValue(GROUP_KEY)).andReturn(ImmutableSet.of(PROPOSAL_1, 
PROPOSAL_2));
     slotCache.remove(PROPOSAL_1, GROUP_KEY);
     expectSlotValidation(PROPOSAL_1, Optional.of(ImmutableSet.of(
-        PreemptionVictim.fromTask(TASK.getAssignedTask()))));
+        PreemptionVictim.fromTask(SLAVE_1_TASK.getAssignedTask()))));
 
     expectPreempted(TASK);
 
     control.replay();
 
     assertEquals(Optional.of(SLAVE_ID_1), callPreemptor());
-    assertEquals(0L, 
statsProvider.getLongValue(slotValidationStatName(false)));
     assertEquals(1L, statsProvider.getLongValue(slotValidationStatName(true)));
-    assertEquals(1L, statsProvider.getLongValue(successStatName(true)));
+    assertEquals(1L, 
statsProvider.getLongValue(slotValidationByJobStatName(true,
+        TASK.getAssignedTask().getTask().getJob())));
+    PROPOSAL_1.getVictims().forEach(victim ->
+        assertEquals(1L, statsProvider.getLongValue(preemptedByJobStatName(
+            victim.getConfig().getJob()))));
+    assertEquals(1L, statsProvider.getLongValue(preemptorByJobStatName(
+        TASK.getAssignedTask().getTask().getJob())));
+    assertEquals(1L, statsProvider.getLongValue(preemptedStatName(true)));
   }
 
   @Test
@@ -121,8 +135,9 @@ public class PreemptorImplTest extends EasyMockTest {
 
     assertEquals(EMPTY_RESULT, callPreemptor());
     assertEquals(1L, 
statsProvider.getLongValue(slotValidationStatName(false)));
-    assertEquals(0L, statsProvider.getLongValue(slotValidationStatName(true)));
-    assertEquals(0L, statsProvider.getLongValue(successStatName(true)));
+    assertEquals(1L, 
statsProvider.getLongValue(slotValidationByJobStatName(false,
+        TASK.getAssignedTask().getTask().getJob())));
+    assertEquals(0L, statsProvider.getLongValue(preemptedStatName(true)));
   }
 
   @Test
@@ -132,7 +147,7 @@ public class PreemptorImplTest extends EasyMockTest {
     expectSlotValidation(PROPOSAL_1, Optional.empty());
     slotCache.remove(PROPOSAL_2, GROUP_KEY);
     expectSlotValidation(PROPOSAL_2, Optional.of(ImmutableSet.of(
-        PreemptionVictim.fromTask(TASK.getAssignedTask()))));
+        PreemptionVictim.fromTask(SLAVE_2_TASK.getAssignedTask()))));
 
     expectPreempted(TASK);
 
@@ -140,8 +155,17 @@ public class PreemptorImplTest extends EasyMockTest {
 
     assertEquals(Optional.of(SLAVE_ID_2), callPreemptor());
     assertEquals(1L, 
statsProvider.getLongValue(slotValidationStatName(false)));
+    assertEquals(1L, 
statsProvider.getLongValue(slotValidationByJobStatName(false,
+        TASK.getAssignedTask().getTask().getJob())));
     assertEquals(1L, statsProvider.getLongValue(slotValidationStatName(true)));
-    assertEquals(1L, statsProvider.getLongValue(successStatName(true)));
+    assertEquals(1L, 
statsProvider.getLongValue(slotValidationByJobStatName(true,
+        TASK.getAssignedTask().getTask().getJob())));
+    PROPOSAL_2.getVictims().forEach(victim ->
+        assertEquals(1L, statsProvider.getLongValue(preemptedByJobStatName(
+            victim.getConfig().getJob()))));
+    assertEquals(1L, statsProvider.getLongValue(preemptorByJobStatName(
+        TASK.getAssignedTask().getTask().getJob())));
+    assertEquals(1L, statsProvider.getLongValue(preemptedStatName(true)));
   }
 
   @Test
@@ -156,8 +180,9 @@ public class PreemptorImplTest extends EasyMockTest {
 
     assertEquals(EMPTY_RESULT, callPreemptor());
     assertEquals(2L, 
statsProvider.getLongValue(slotValidationStatName(false)));
-    assertEquals(0L, statsProvider.getLongValue(slotValidationStatName(true)));
-    assertEquals(0L, statsProvider.getLongValue(successStatName(true)));
+    assertEquals(2L, 
statsProvider.getLongValue(slotValidationByJobStatName(false,
+        TASK.getAssignedTask().getTask().getJob())));
+    assertEquals(0L, statsProvider.getLongValue(preemptedStatName(true)));
   }
 
   @Test
@@ -169,7 +194,7 @@ public class PreemptorImplTest extends EasyMockTest {
     assertEquals(EMPTY_RESULT, callPreemptor());
     assertEquals(0L, 
statsProvider.getLongValue(slotValidationStatName(false)));
     assertEquals(0L, statsProvider.getLongValue(slotValidationStatName(true)));
-    assertEquals(0L, statsProvider.getLongValue(successStatName(true)));
+    assertEquals(0L, statsProvider.getLongValue(preemptedStatName(true)));
   }
 
   private Optional<String> callPreemptor() {
@@ -203,13 +228,13 @@ public class PreemptorImplTest extends EasyMockTest {
     return new 
PreemptionProposal(ImmutableSet.of(PreemptionVictim.fromTask(assigned)), 
slaveId);
   }
 
-  private static ScheduledTask makeTask() {
+  private static ScheduledTask makeTask(String name) {
     ScheduledTask task = new ScheduledTask()
         .setAssignedTask(new AssignedTask()
             .setTask(new TaskConfig()
                 .setPriority(1)
                 .setProduction(true)
-                .setJob(new JobKey("role", "env", "name"))));
+                .setJob(new JobKey("role", "env", name))));
     task.addToTaskEvents(new TaskEvent(0, PENDING));
     return task;
   }

Reply via email to