Repository: aurora
Updated Branches:
  refs/heads/master 1e2a9e160 -> 86d8f2f04


Evaluate multiple preemption proposals per round

`TaskScheduler` makes an attempt to preempt already identified candidates
through `Preemptor` when it fails to schedule one or more tasks. However,
`Preemptor` currently evaluates only one proposal per invocation. A proposal
may get vetoed at this point by scheduling filters. If a proposal fails
validation the task group might get penalized by `TaskGroups` to give
`PendingTaskProcessor` some time to find new preemption candidates; despite
the fact that another proposal may already exist in `slotCache`. This penalty
might result in expiration of existing proposals in `slotCache`, hence slowing
down the overall preemption process.

This patch modifies `Preemptor` so that it evaluates all existing preemption
proposals before giving up.

Bugs closed: AURORA-1868

Reviewed at https://reviews.apache.org/r/55243/


Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/86d8f2f0
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/86d8f2f0
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/86d8f2f0

Branch: refs/heads/master
Commit: 86d8f2f043e65dfc15afae4cfd4574b3b1f294c2
Parents: 1e2a9e1
Author: Mehrdad Nurolahzade <mehr...@nurolahzade.com>
Authored: Tue Jan 24 17:07:09 2017 +0100
Committer: Stephan Erb <s...@apache.org>
Committed: Tue Jan 24 17:07:09 2017 +0100

----------------------------------------------------------------------
 .../aurora/scheduler/preemptor/Preemptor.java   | 33 +++++------
 .../scheduler/preemptor/PreemptorImplTest.java  | 62 ++++++++++++++++----
 2 files changed, 66 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/aurora/blob/86d8f2f0/src/main/java/org/apache/aurora/scheduler/preemptor/Preemptor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/preemptor/Preemptor.java 
b/src/main/java/org/apache/aurora/scheduler/preemptor/Preemptor.java
index 7d2903a..6b807e0 100644
--- a/src/main/java/org/apache/aurora/scheduler/preemptor/Preemptor.java
+++ b/src/main/java/org/apache/aurora/scheduler/preemptor/Preemptor.java
@@ -13,7 +13,7 @@
  */
 package org.apache.aurora.scheduler.preemptor;
 
-import java.util.Set;
+import java.util.Iterator;
 
 import javax.inject.Inject;
 
@@ -79,12 +79,12 @@ public interface Preemptor {
         MutableStoreProvider store) {
 
       TaskGroupKey groupKey = TaskGroupKey.from(pendingTask.getTask());
-      Set<PreemptionProposal> preemptionProposals = 
slotCache.getByValue(groupKey);
+      Iterator<PreemptionProposal> proposalIterator = 
slotCache.getByValue(groupKey).iterator();
 
       // A preemption slot is available -> attempt to preempt tasks.
-      if (!preemptionProposals.isEmpty()) {
+      while (proposalIterator.hasNext()) {
         // Get the next available preemption slot.
-        PreemptionProposal slot = preemptionProposals.iterator().next();
+        PreemptionProposal slot = proposalIterator.next();
         slotCache.remove(slot, groupKey);
 
         // Validate PreemptionProposal is still valid for the given task.
@@ -98,21 +98,18 @@ public interface Preemptor {
                 store);
 
         metrics.recordSlotValidationResult(validatedVictims);
-        if (!validatedVictims.isPresent()) {
-          // Previously found victims are no longer valid -> let the next run 
find a new slot.
-          return Optional.absent();
+        if (validatedVictims.isPresent()) {
+          for (PreemptionVictim toPreempt : validatedVictims.get()) {
+            metrics.recordTaskPreemption(toPreempt);
+            stateManager.changeState(
+                store,
+                toPreempt.getTaskId(),
+                Optional.absent(),
+                PREEMPTING,
+                Optional.of("Preempting in favor of " + 
pendingTask.getTaskId()));
+          }
+          return Optional.of(slot.getSlaveId());
         }
-
-        for (PreemptionVictim toPreempt : validatedVictims.get()) {
-          metrics.recordTaskPreemption(toPreempt);
-          stateManager.changeState(
-              store,
-              toPreempt.getTaskId(),
-              Optional.absent(),
-              PREEMPTING,
-              Optional.of("Preempting in favor of " + 
pendingTask.getTaskId()));
-        }
-        return Optional.of(slot.getSlaveId());
       }
 
       return Optional.absent();

http://git-wip-us.apache.org/repos/asf/aurora/blob/86d8f2f0/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptorImplTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptorImplTest.java 
b/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptorImplTest.java
index 40c42b1..3b932c9 100644
--- a/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptorImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptorImplTest.java
@@ -54,9 +54,11 @@ import static org.easymock.EasyMock.expect;
 import static org.junit.Assert.assertEquals;
 
 public class PreemptorImplTest extends EasyMockTest {
-  private static final String SLAVE_ID = "slave_id";
+  private static final String SLAVE_ID_1 = "slave_id_1";
+  private static final String SLAVE_ID_2 = "slave_id_2";
   private static final IScheduledTask TASK = IScheduledTask.build(makeTask());
-  private static final PreemptionProposal PROPOSAL = 
createPreemptionProposal(TASK);
+  private static final PreemptionProposal PROPOSAL_1 = 
createPreemptionProposal(TASK, SLAVE_ID_1);
+  private static final PreemptionProposal PROPOSAL_2 = 
createPreemptionProposal(TASK, SLAVE_ID_2);
   private static final TaskGroupKey GROUP_KEY =
       
TaskGroupKey.from(ITaskConfig.build(makeTask().getAssignedTask().getTask()));
 
@@ -94,30 +96,67 @@ public class PreemptorImplTest extends EasyMockTest {
 
   @Test
   public void testPreemptTasksSuccessful() throws Exception {
-    
expect(slotCache.getByValue(GROUP_KEY)).andReturn(ImmutableSet.of(PROPOSAL));
-    slotCache.remove(PROPOSAL, GROUP_KEY);
-    expectSlotValidation(PROPOSAL, Optional.of(ImmutableSet.of(
+    
expect(slotCache.getByValue(GROUP_KEY)).andReturn(ImmutableSet.of(PROPOSAL_1, 
PROPOSAL_2));
+    slotCache.remove(PROPOSAL_1, GROUP_KEY);
+    expectSlotValidation(PROPOSAL_1, Optional.of(ImmutableSet.of(
         PreemptionVictim.fromTask(TASK.getAssignedTask()))));
 
     expectPreempted(TASK);
 
     control.replay();
 
-    assertEquals(Optional.of(SLAVE_ID), callPreemptor());
+    assertEquals(Optional.of(SLAVE_ID_1), callPreemptor());
+    assertEquals(0L, 
statsProvider.getLongValue(slotValidationStatName(false)));
     assertEquals(1L, statsProvider.getLongValue(slotValidationStatName(true)));
     assertEquals(1L, statsProvider.getLongValue(successStatName(true)));
   }
 
   @Test
   public void testPreemptTasksValidationFailed() throws Exception {
-    
expect(slotCache.getByValue(GROUP_KEY)).andReturn(ImmutableSet.of(PROPOSAL));
-    slotCache.remove(PROPOSAL, GROUP_KEY);
-    expectSlotValidation(PROPOSAL, Optional.absent());
+    
expect(slotCache.getByValue(GROUP_KEY)).andReturn(ImmutableSet.of(PROPOSAL_1));
+    slotCache.remove(PROPOSAL_1, GROUP_KEY);
+    expectSlotValidation(PROPOSAL_1, Optional.absent());
 
     control.replay();
 
     assertEquals(EMPTY_RESULT, callPreemptor());
     assertEquals(1L, 
statsProvider.getLongValue(slotValidationStatName(false)));
+    assertEquals(0L, statsProvider.getLongValue(slotValidationStatName(true)));
+    assertEquals(0L, statsProvider.getLongValue(successStatName(true)));
+  }
+
+  @Test
+  public void testMultiplePreemptionProposalsSuccessful() throws Exception {
+    
expect(slotCache.getByValue(GROUP_KEY)).andReturn(ImmutableSet.of(PROPOSAL_1, 
PROPOSAL_2));
+    slotCache.remove(PROPOSAL_1, GROUP_KEY);
+    expectSlotValidation(PROPOSAL_1, Optional.absent());
+    slotCache.remove(PROPOSAL_2, GROUP_KEY);
+    expectSlotValidation(PROPOSAL_2, Optional.of(ImmutableSet.of(
+        PreemptionVictim.fromTask(TASK.getAssignedTask()))));
+
+    expectPreempted(TASK);
+
+    control.replay();
+
+    assertEquals(Optional.of(SLAVE_ID_2), callPreemptor());
+    assertEquals(1L, 
statsProvider.getLongValue(slotValidationStatName(false)));
+    assertEquals(1L, statsProvider.getLongValue(slotValidationStatName(true)));
+    assertEquals(1L, statsProvider.getLongValue(successStatName(true)));
+  }
+
+  @Test
+  public void testMultiplePreemptionProposalsFailed() throws Exception {
+    
expect(slotCache.getByValue(GROUP_KEY)).andReturn(ImmutableSet.of(PROPOSAL_1, 
PROPOSAL_2));
+    slotCache.remove(PROPOSAL_1, GROUP_KEY);
+    expectSlotValidation(PROPOSAL_1, Optional.absent());
+    slotCache.remove(PROPOSAL_2, GROUP_KEY);
+    expectSlotValidation(PROPOSAL_2, Optional.absent());
+
+    control.replay();
+
+    assertEquals(EMPTY_RESULT, callPreemptor());
+    assertEquals(2L, 
statsProvider.getLongValue(slotValidationStatName(false)));
+    assertEquals(0L, statsProvider.getLongValue(slotValidationStatName(true)));
     assertEquals(0L, statsProvider.getLongValue(successStatName(true)));
   }
 
@@ -129,6 +168,7 @@ public class PreemptorImplTest extends EasyMockTest {
 
     assertEquals(EMPTY_RESULT, callPreemptor());
     assertEquals(0L, 
statsProvider.getLongValue(slotValidationStatName(false)));
+    assertEquals(0L, statsProvider.getLongValue(slotValidationStatName(true)));
     assertEquals(0L, statsProvider.getLongValue(successStatName(true)));
   }
 
@@ -158,9 +198,9 @@ public class PreemptorImplTest extends EasyMockTest {
         .andReturn(StateChangeResult.SUCCESS);
   }
 
-  private static PreemptionProposal createPreemptionProposal(IScheduledTask 
task) {
+  private static PreemptionProposal createPreemptionProposal(IScheduledTask 
task, String slaveId) {
     IAssignedTask assigned = task.getAssignedTask();
-    return new 
PreemptionProposal(ImmutableSet.of(PreemptionVictim.fromTask(assigned)), 
SLAVE_ID);
+    return new 
PreemptionProposal(ImmutableSet.of(PreemptionVictim.fromTask(assigned)), 
slaveId);
   }
 
   private static ScheduledTask makeTask() {

Reply via email to