Repository: aurora Updated Branches: refs/heads/master 1e2a9e160 -> 86d8f2f04
Evaluate multiple preemption proposals per round `TaskScheduler` makes an attempt to preempt already identified candidates through `Preemptor` when it fails to schedule one or more tasks. However, `Preemptor` currently evaluates only one proposal per invocation. A proposal may get vetoed at this point by scheduling filters. If a proposal fails validation the task group might get penalized by `TaskGroups` to give `PendingTaskProcessor` some time to find new preemption candidates; despite the fact that another proposal may already exist in `slotCache`. This penalty might result in expiration of existing proposals in `slotCache`, hence slowing down the overall preemption process. This patch modifies `Preemptor` so that it evaluates all existing preemption proposals before giving up. Bugs closed: AURORA-1868 Reviewed at https://reviews.apache.org/r/55243/ Project: http://git-wip-us.apache.org/repos/asf/aurora/repo Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/86d8f2f0 Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/86d8f2f0 Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/86d8f2f0 Branch: refs/heads/master Commit: 86d8f2f043e65dfc15afae4cfd4574b3b1f294c2 Parents: 1e2a9e1 Author: Mehrdad Nurolahzade <mehr...@nurolahzade.com> Authored: Tue Jan 24 17:07:09 2017 +0100 Committer: Stephan Erb <s...@apache.org> Committed: Tue Jan 24 17:07:09 2017 +0100 ---------------------------------------------------------------------- .../aurora/scheduler/preemptor/Preemptor.java | 33 +++++------ .../scheduler/preemptor/PreemptorImplTest.java | 62 ++++++++++++++++---- 2 files changed, 66 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/aurora/blob/86d8f2f0/src/main/java/org/apache/aurora/scheduler/preemptor/Preemptor.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/preemptor/Preemptor.java b/src/main/java/org/apache/aurora/scheduler/preemptor/Preemptor.java index 7d2903a..6b807e0 100644 --- a/src/main/java/org/apache/aurora/scheduler/preemptor/Preemptor.java +++ b/src/main/java/org/apache/aurora/scheduler/preemptor/Preemptor.java @@ -13,7 +13,7 @@ */ package org.apache.aurora.scheduler.preemptor; -import java.util.Set; +import java.util.Iterator; import javax.inject.Inject; @@ -79,12 +79,12 @@ public interface Preemptor { MutableStoreProvider store) { TaskGroupKey groupKey = TaskGroupKey.from(pendingTask.getTask()); - Set<PreemptionProposal> preemptionProposals = slotCache.getByValue(groupKey); + Iterator<PreemptionProposal> proposalIterator = slotCache.getByValue(groupKey).iterator(); // A preemption slot is available -> attempt to preempt tasks. - if (!preemptionProposals.isEmpty()) { + while (proposalIterator.hasNext()) { // Get the next available preemption slot. - PreemptionProposal slot = preemptionProposals.iterator().next(); + PreemptionProposal slot = proposalIterator.next(); slotCache.remove(slot, groupKey); // Validate PreemptionProposal is still valid for the given task. @@ -98,21 +98,18 @@ public interface Preemptor { store); metrics.recordSlotValidationResult(validatedVictims); - if (!validatedVictims.isPresent()) { - // Previously found victims are no longer valid -> let the next run find a new slot. - return Optional.absent(); + if (validatedVictims.isPresent()) { + for (PreemptionVictim toPreempt : validatedVictims.get()) { + metrics.recordTaskPreemption(toPreempt); + stateManager.changeState( + store, + toPreempt.getTaskId(), + Optional.absent(), + PREEMPTING, + Optional.of("Preempting in favor of " + pendingTask.getTaskId())); + } + return Optional.of(slot.getSlaveId()); } - - for (PreemptionVictim toPreempt : validatedVictims.get()) { - metrics.recordTaskPreemption(toPreempt); - stateManager.changeState( - store, - toPreempt.getTaskId(), - Optional.absent(), - PREEMPTING, - Optional.of("Preempting in favor of " + pendingTask.getTaskId())); - } - return Optional.of(slot.getSlaveId()); } return Optional.absent(); http://git-wip-us.apache.org/repos/asf/aurora/blob/86d8f2f0/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptorImplTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptorImplTest.java b/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptorImplTest.java index 40c42b1..3b932c9 100644 --- a/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptorImplTest.java +++ b/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptorImplTest.java @@ -54,9 +54,11 @@ import static org.easymock.EasyMock.expect; import static org.junit.Assert.assertEquals; public class PreemptorImplTest extends EasyMockTest { - private static final String SLAVE_ID = "slave_id"; + private static final String SLAVE_ID_1 = "slave_id_1"; + private static final String SLAVE_ID_2 = "slave_id_2"; private static final IScheduledTask TASK = IScheduledTask.build(makeTask()); - private static final PreemptionProposal PROPOSAL = createPreemptionProposal(TASK); + private static final PreemptionProposal PROPOSAL_1 = createPreemptionProposal(TASK, SLAVE_ID_1); + private static final PreemptionProposal PROPOSAL_2 = createPreemptionProposal(TASK, SLAVE_ID_2); private static final TaskGroupKey GROUP_KEY = TaskGroupKey.from(ITaskConfig.build(makeTask().getAssignedTask().getTask())); @@ -94,30 +96,67 @@ public class PreemptorImplTest extends EasyMockTest { @Test public void testPreemptTasksSuccessful() throws Exception { - expect(slotCache.getByValue(GROUP_KEY)).andReturn(ImmutableSet.of(PROPOSAL)); - slotCache.remove(PROPOSAL, GROUP_KEY); - expectSlotValidation(PROPOSAL, Optional.of(ImmutableSet.of( + expect(slotCache.getByValue(GROUP_KEY)).andReturn(ImmutableSet.of(PROPOSAL_1, PROPOSAL_2)); + slotCache.remove(PROPOSAL_1, GROUP_KEY); + expectSlotValidation(PROPOSAL_1, Optional.of(ImmutableSet.of( PreemptionVictim.fromTask(TASK.getAssignedTask())))); expectPreempted(TASK); control.replay(); - assertEquals(Optional.of(SLAVE_ID), callPreemptor()); + assertEquals(Optional.of(SLAVE_ID_1), callPreemptor()); + assertEquals(0L, statsProvider.getLongValue(slotValidationStatName(false))); assertEquals(1L, statsProvider.getLongValue(slotValidationStatName(true))); assertEquals(1L, statsProvider.getLongValue(successStatName(true))); } @Test public void testPreemptTasksValidationFailed() throws Exception { - expect(slotCache.getByValue(GROUP_KEY)).andReturn(ImmutableSet.of(PROPOSAL)); - slotCache.remove(PROPOSAL, GROUP_KEY); - expectSlotValidation(PROPOSAL, Optional.absent()); + expect(slotCache.getByValue(GROUP_KEY)).andReturn(ImmutableSet.of(PROPOSAL_1)); + slotCache.remove(PROPOSAL_1, GROUP_KEY); + expectSlotValidation(PROPOSAL_1, Optional.absent()); control.replay(); assertEquals(EMPTY_RESULT, callPreemptor()); assertEquals(1L, statsProvider.getLongValue(slotValidationStatName(false))); + assertEquals(0L, statsProvider.getLongValue(slotValidationStatName(true))); + assertEquals(0L, statsProvider.getLongValue(successStatName(true))); + } + + @Test + public void testMultiplePreemptionProposalsSuccessful() throws Exception { + expect(slotCache.getByValue(GROUP_KEY)).andReturn(ImmutableSet.of(PROPOSAL_1, PROPOSAL_2)); + slotCache.remove(PROPOSAL_1, GROUP_KEY); + expectSlotValidation(PROPOSAL_1, Optional.absent()); + slotCache.remove(PROPOSAL_2, GROUP_KEY); + expectSlotValidation(PROPOSAL_2, Optional.of(ImmutableSet.of( + PreemptionVictim.fromTask(TASK.getAssignedTask())))); + + expectPreempted(TASK); + + control.replay(); + + assertEquals(Optional.of(SLAVE_ID_2), callPreemptor()); + assertEquals(1L, statsProvider.getLongValue(slotValidationStatName(false))); + assertEquals(1L, statsProvider.getLongValue(slotValidationStatName(true))); + assertEquals(1L, statsProvider.getLongValue(successStatName(true))); + } + + @Test + public void testMultiplePreemptionProposalsFailed() throws Exception { + expect(slotCache.getByValue(GROUP_KEY)).andReturn(ImmutableSet.of(PROPOSAL_1, PROPOSAL_2)); + slotCache.remove(PROPOSAL_1, GROUP_KEY); + expectSlotValidation(PROPOSAL_1, Optional.absent()); + slotCache.remove(PROPOSAL_2, GROUP_KEY); + expectSlotValidation(PROPOSAL_2, Optional.absent()); + + control.replay(); + + assertEquals(EMPTY_RESULT, callPreemptor()); + assertEquals(2L, statsProvider.getLongValue(slotValidationStatName(false))); + assertEquals(0L, statsProvider.getLongValue(slotValidationStatName(true))); assertEquals(0L, statsProvider.getLongValue(successStatName(true))); } @@ -129,6 +168,7 @@ public class PreemptorImplTest extends EasyMockTest { assertEquals(EMPTY_RESULT, callPreemptor()); assertEquals(0L, statsProvider.getLongValue(slotValidationStatName(false))); + assertEquals(0L, statsProvider.getLongValue(slotValidationStatName(true))); assertEquals(0L, statsProvider.getLongValue(successStatName(true))); } @@ -158,9 +198,9 @@ public class PreemptorImplTest extends EasyMockTest { .andReturn(StateChangeResult.SUCCESS); } - private static PreemptionProposal createPreemptionProposal(IScheduledTask task) { + private static PreemptionProposal createPreemptionProposal(IScheduledTask task, String slaveId) { IAssignedTask assigned = task.getAssignedTask(); - return new PreemptionProposal(ImmutableSet.of(PreemptionVictim.fromTask(assigned)), SLAVE_ID); + return new PreemptionProposal(ImmutableSet.of(PreemptionVictim.fromTask(assigned)), slaveId); } private static ScheduledTask makeTask() {