Repository: aurora Updated Branches: refs/heads/master 21af250c9 -> 80139da46
http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/test/java/org/apache/aurora/scheduler/offers/OfferManagerImplTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/offers/OfferManagerImplTest.java b/src/test/java/org/apache/aurora/scheduler/offers/OfferManagerImplTest.java index 6b18296..ff80baa 100644 --- a/src/test/java/org/apache/aurora/scheduler/offers/OfferManagerImplTest.java +++ b/src/test/java/org/apache/aurora/scheduler/offers/OfferManagerImplTest.java @@ -21,6 +21,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; +import org.apache.aurora.common.collections.Pair; import org.apache.aurora.common.quantity.Amount; import org.apache.aurora.common.quantity.Time; import org.apache.aurora.common.testing.easymock.EasyMockTest; @@ -32,9 +33,12 @@ import org.apache.aurora.scheduler.base.TaskGroupKey; import org.apache.aurora.scheduler.base.Tasks; import org.apache.aurora.scheduler.events.PubsubEvent.DriverDisconnected; import org.apache.aurora.scheduler.events.PubsubEvent.HostAttributesChanged; +import org.apache.aurora.scheduler.filter.SchedulingFilter; +import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest; +import org.apache.aurora.scheduler.filter.SchedulingFilter.UnusedResource; import org.apache.aurora.scheduler.mesos.Driver; import org.apache.aurora.scheduler.offers.Deferment.Noop; -import org.apache.aurora.scheduler.offers.OfferManager.OfferManagerImpl; +import org.apache.aurora.scheduler.resources.ResourceBag; import org.apache.aurora.scheduler.resources.ResourceType; import org.apache.aurora.scheduler.storage.entities.IHostAttributes; import org.apache.aurora.scheduler.storage.entities.IScheduledTask; @@ -54,11 +58,13 @@ import static org.apache.aurora.gen.MaintenanceMode.DRAINING; import static org.apache.aurora.gen.MaintenanceMode.NONE; import static org.apache.aurora.scheduler.base.TaskTestUtil.JOB; import static org.apache.aurora.scheduler.base.TaskTestUtil.makeTask; -import static org.apache.aurora.scheduler.offers.OfferManager.OfferManagerImpl.GLOBALLY_BANNED_OFFERS; -import static org.apache.aurora.scheduler.offers.OfferManager.OfferManagerImpl.OFFER_ACCEPT_RACES; -import static org.apache.aurora.scheduler.offers.OfferManager.OfferManagerImpl.OFFER_CANCEL_FAILURES; -import static org.apache.aurora.scheduler.offers.OfferManager.OfferManagerImpl.OUTSTANDING_OFFERS; -import static org.apache.aurora.scheduler.offers.OfferManager.OfferManagerImpl.STATICALLY_BANNED_OFFERS; +import static org.apache.aurora.scheduler.filter.AttributeAggregate.empty; +import static org.apache.aurora.scheduler.offers.OfferManagerImpl.GLOBALLY_BANNED_OFFERS; +import static org.apache.aurora.scheduler.offers.OfferManagerImpl.OFFER_ACCEPT_RACES; +import static org.apache.aurora.scheduler.offers.OfferManagerImpl.OFFER_CANCEL_FAILURES; +import static org.apache.aurora.scheduler.offers.OfferManagerImpl.OUTSTANDING_OFFERS; +import static org.apache.aurora.scheduler.offers.OfferManagerImpl.STATICALLY_BANNED_OFFERS; +import static org.apache.aurora.scheduler.offers.OfferManagerImpl.VETO_EVALUATED_OFFERS; import static org.apache.aurora.scheduler.resources.ResourceTestUtil.mesosRange; import static org.apache.aurora.scheduler.resources.ResourceTestUtil.mesosScalar; import static org.apache.aurora.scheduler.resources.ResourceTestUtil.offer; @@ -66,6 +72,8 @@ import static org.apache.aurora.scheduler.resources.ResourceType.CPUS; import static org.apache.aurora.scheduler.resources.ResourceType.DISK_MB; import static org.apache.aurora.scheduler.resources.ResourceType.PORTS; import static org.apache.aurora.scheduler.resources.ResourceType.RAM_MB; +import static org.easymock.EasyMock.anyObject; +import static org.easymock.EasyMock.expect; import static org.easymock.EasyMock.expectLastCall; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -89,21 +97,25 @@ public class OfferManagerImplTest extends EasyMockTest { private static final String HOST_C = "HOST_C"; private static final HostOffer OFFER_C = new HostOffer( Offers.makeOffer("OFFER_C", HOST_C), - IHostAttributes.build(new HostAttributes().setMode(NONE))); + IHostAttributes.build(new HostAttributes().setMode(NONE).setHost(HOST_C))); private static final int PORT = 1000; private static final Protos.Offer MESOS_OFFER = offer(mesosRange(PORTS, PORT)); private static final IScheduledTask TASK = makeTask("id", JOB); + private static final ResourceRequest EMPTY_REQUEST = new ResourceRequest( + TASK.getAssignedTask().getTask(), + ResourceBag.EMPTY, + empty()); private static final TaskGroupKey GROUP_KEY = TaskGroupKey.from(TASK.getAssignedTask().getTask()); private static final TaskInfo TASK_INFO = TaskInfo.newBuilder() .setName("taskName") .setTaskId(Protos.TaskID.newBuilder().setValue(Tasks.id(TASK))) .setAgentId(MESOS_OFFER.getAgentId()) .build(); - private static Operation launch = Operation.newBuilder() + private static final Operation LAUNCH = Operation.newBuilder() .setType(Operation.Type.LAUNCH) .setLaunch(Operation.Launch.newBuilder().addTaskInfos(TASK_INFO)) .build(); - private static final List<Operation> OPERATIONS = ImmutableList.of(launch); + private static final List<Operation> OPERATIONS = ImmutableList.of(LAUNCH); private static final long OFFER_FILTER_SECONDS = 0; private static final Filters OFFER_FILTER = Filters.newBuilder() .setRefuseSeconds(OFFER_FILTER_SECONDS) @@ -113,6 +125,7 @@ public class OfferManagerImplTest extends EasyMockTest { private Driver driver; private OfferManagerImpl offerManager; private FakeStatsProvider statsProvider; + private SchedulingFilter schedulingFilter; @Before public void setUp() { @@ -125,7 +138,13 @@ public class OfferManagerImplTest extends EasyMockTest { FAKE_TICKER ); statsProvider = new FakeStatsProvider(); - offerManager = new OfferManagerImpl(driver, offerSettings, statsProvider, new Noop()); + schedulingFilter = createMock(SchedulingFilter.class); + + offerManager = new OfferManagerImpl(driver, + offerSettings, + statsProvider, + new Noop(), + schedulingFilter); } @Test @@ -136,11 +155,11 @@ public class OfferManagerImplTest extends EasyMockTest { control.replay(); - offerManager.addOffer(hostOfferB); - offerManager.addOffer(OFFER_A); - offerManager.addOffer(hostOfferC); + offerManager.add(hostOfferB); + offerManager.add(OFFER_A); + offerManager.add(hostOfferC); - List<HostOffer> actual = ImmutableList.copyOf(offerManager.getOffers()); + List<HostOffer> actual = ImmutableList.copyOf(offerManager.getAll()); assertEquals( // hostOfferC has a further away start time, so it should be preferred. @@ -160,15 +179,15 @@ public class OfferManagerImplTest extends EasyMockTest { control.replay(); - offerManager.addOffer(offerA); + offerManager.add(offerA); assertEquals(1, statsProvider.getLongValue(OUTSTANDING_OFFERS)); - offerManager.addOffer(OFFER_B); + offerManager.add(OFFER_B); assertEquals(2, statsProvider.getLongValue(OUTSTANDING_OFFERS)); - offerManager.addOffer(offerC); + offerManager.add(offerC); assertEquals(3, statsProvider.getLongValue(OUTSTANDING_OFFERS)); assertEquals( ImmutableSet.of(OFFER_B, offerA, offerC), - ImmutableSet.copyOf(offerManager.getOffers())); + ImmutableSet.copyOf(offerManager.getAll())); offerManager.launchTask(OFFER_B.getOffer().getId(), TASK_INFO); assertEquals(2, statsProvider.getLongValue(OUTSTANDING_OFFERS)); } @@ -179,19 +198,19 @@ public class OfferManagerImplTest extends EasyMockTest { offerManager.hostAttributesChanged(new HostAttributesChanged(HOST_ATTRIBUTES_A)); - offerManager.addOffer(OFFER_A); - offerManager.addOffer(OFFER_B); - assertEquals(ImmutableSet.of(OFFER_A, OFFER_B), ImmutableSet.copyOf(offerManager.getOffers())); + offerManager.add(OFFER_A); + offerManager.add(OFFER_B); + assertEquals(ImmutableSet.of(OFFER_A, OFFER_B), ImmutableSet.copyOf(offerManager.getAll())); HostOffer offerA = setMode(OFFER_A, DRAINING); offerManager.hostAttributesChanged(new HostAttributesChanged(offerA.getAttributes())); - assertEquals(ImmutableSet.of(OFFER_B, offerA), ImmutableSet.copyOf(offerManager.getOffers())); + assertEquals(ImmutableSet.of(OFFER_B, offerA), ImmutableSet.copyOf(offerManager.getAll())); offerA = setMode(OFFER_A, NONE); HostOffer offerB = setMode(OFFER_B, DRAINING); offerManager.hostAttributesChanged(new HostAttributesChanged(offerA.getAttributes())); offerManager.hostAttributesChanged(new HostAttributesChanged(offerB.getAttributes())); - assertEquals(ImmutableSet.of(OFFER_A, OFFER_B), ImmutableSet.copyOf(offerManager.getOffers())); + assertEquals(ImmutableSet.of(OFFER_A, OFFER_B), ImmutableSet.copyOf(offerManager.getAll())); } @Test @@ -201,9 +220,9 @@ public class OfferManagerImplTest extends EasyMockTest { control.replay(); - offerManager.addOffer(OFFER_A); + offerManager.add(OFFER_A); assertEquals(1, statsProvider.getLongValue(OUTSTANDING_OFFERS)); - offerManager.addOffer(OFFER_A); + offerManager.add(OFFER_A); assertEquals(0, statsProvider.getLongValue(OUTSTANDING_OFFERS)); } @@ -211,74 +230,83 @@ public class OfferManagerImplTest extends EasyMockTest { public void testGetOffersReturnsAllOffers() { control.replay(); - offerManager.addOffer(OFFER_A); - assertEquals(OFFER_A, Iterables.getOnlyElement(offerManager.getOffers())); + offerManager.add(OFFER_A); + assertEquals(OFFER_A, Iterables.getOnlyElement(offerManager.getAll())); assertEquals(1, statsProvider.getLongValue(OUTSTANDING_OFFERS)); - offerManager.cancelOffer(OFFER_A_ID); + offerManager.cancel(OFFER_A_ID); assertEquals(0, statsProvider.getLongValue(OFFER_CANCEL_FAILURES)); - assertTrue(Iterables.isEmpty(offerManager.getOffers())); + assertTrue(Iterables.isEmpty(offerManager.getAll())); assertEquals(0, statsProvider.getLongValue(OUTSTANDING_OFFERS)); } @Test public void testOfferFilteringDueToStaticBan() { + expectFilterNone(); + control.replay(); // Static ban ignored when now offers. - offerManager.banOfferForTaskGroup(OFFER_A_ID, GROUP_KEY); + offerManager.banForTaskGroup(OFFER_A_ID, GROUP_KEY); assertEquals(0, statsProvider.getLongValue(STATICALLY_BANNED_OFFERS)); - offerManager.addOffer(OFFER_A); - assertEquals(OFFER_A, Iterables.getOnlyElement(offerManager.getOffers(GROUP_KEY))); - assertEquals(OFFER_A, Iterables.getOnlyElement(offerManager.getOffers())); + offerManager.add(OFFER_A); + assertEquals(OFFER_A, + Iterables.getOnlyElement(offerManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST, false))); + assertEquals(OFFER_A, Iterables.getOnlyElement(offerManager.getAll())); // Add static ban. - offerManager.banOfferForTaskGroup(OFFER_A_ID, GROUP_KEY); + offerManager.banForTaskGroup(OFFER_A_ID, GROUP_KEY); assertEquals(1, statsProvider.getLongValue(STATICALLY_BANNED_OFFERS)); - assertEquals(OFFER_A, Iterables.getOnlyElement(offerManager.getOffers())); - assertTrue(Iterables.isEmpty(offerManager.getOffers(GROUP_KEY))); + assertEquals(OFFER_A, Iterables.getOnlyElement(offerManager.getAll())); + assertTrue(Iterables.isEmpty(offerManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST, false))); } @Test public void testStaticBanExpiresAfterMaxHoldTime() throws InterruptedException { + expectFilterNone(); + control.replay(); - offerManager.addOffer(OFFER_A); - offerManager.banOfferForTaskGroup(OFFER_A_ID, GROUP_KEY); - assertEquals(OFFER_A, Iterables.getOnlyElement(offerManager.getOffers())); - assertTrue(Iterables.isEmpty(offerManager.getOffers(GROUP_KEY))); + offerManager.add(OFFER_A); + offerManager.banForTaskGroup(OFFER_A_ID, GROUP_KEY); + assertEquals(OFFER_A, Iterables.getOnlyElement(offerManager.getAll())); + assertTrue(Iterables.isEmpty(offerManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST, false))); assertEquals(1, statsProvider.getLongValue(STATICALLY_BANNED_OFFERS)); // Make sure the static ban expires after maximum amount of time an offer is held. FAKE_TICKER.advance(RETURN_DELAY); offerManager.cleanupStaticBans(); - assertEquals(OFFER_A, Iterables.getOnlyElement(offerManager.getOffers(GROUP_KEY))); + assertEquals(OFFER_A, + Iterables.getOnlyElement(offerManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST, false))); assertEquals(0, statsProvider.getLongValue(STATICALLY_BANNED_OFFERS)); } @Test public void testStaticBanIsClearedOnDriverDisconnect() { + expectFilterNone(); + control.replay(); - offerManager.addOffer(OFFER_A); - offerManager.banOfferForTaskGroup(OFFER_A_ID, GROUP_KEY); - assertEquals(OFFER_A, Iterables.getOnlyElement(offerManager.getOffers())); - assertTrue(Iterables.isEmpty(offerManager.getOffers(GROUP_KEY))); + offerManager.add(OFFER_A); + offerManager.banForTaskGroup(OFFER_A_ID, GROUP_KEY); + assertEquals(OFFER_A, Iterables.getOnlyElement(offerManager.getAll())); + assertTrue(Iterables.isEmpty(offerManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST, false))); assertEquals(1, statsProvider.getLongValue(STATICALLY_BANNED_OFFERS)); // Make sure the static ban is cleared when driver is disconnected. offerManager.driverDisconnected(new DriverDisconnected()); assertEquals(0, statsProvider.getLongValue(STATICALLY_BANNED_OFFERS)); - offerManager.addOffer(OFFER_A); - assertEquals(OFFER_A, Iterables.getOnlyElement(offerManager.getOffers(GROUP_KEY))); + offerManager.add(OFFER_A); + assertEquals(OFFER_A, + Iterables.getOnlyElement(offerManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST, false))); } @Test public void testGetOffer() { control.replay(); - offerManager.addOffer(OFFER_A); - assertEquals(Optional.of(OFFER_A), offerManager.getOffer(OFFER_A.getOffer().getAgentId())); + offerManager.add(OFFER_A); + assertEquals(Optional.of(OFFER_A), offerManager.get(OFFER_A.getOffer().getAgentId())); assertEquals(1, statsProvider.getLongValue(OUTSTANDING_OFFERS)); } @@ -289,7 +317,7 @@ public class OfferManagerImplTest extends EasyMockTest { control.replay(); - offerManager.addOffer(OFFER_A); + offerManager.add(OFFER_A); offerManager.launchTask(OFFER_A_ID, TASK_INFO); } @@ -308,8 +336,8 @@ public class OfferManagerImplTest extends EasyMockTest { public void testFlushOffers() { control.replay(); - offerManager.addOffer(OFFER_A); - offerManager.addOffer(OFFER_B); + offerManager.add(OFFER_A); + offerManager.add(OFFER_B); assertEquals(2, statsProvider.getLongValue(OUTSTANDING_OFFERS)); offerManager.driverDisconnected(new DriverDisconnected()); assertEquals(0, statsProvider.getLongValue(OUTSTANDING_OFFERS)); @@ -319,26 +347,29 @@ public class OfferManagerImplTest extends EasyMockTest { public void testCancelFailure() { control.replay(); - offerManager.cancelOffer(OFFER_A.getOffer().getId()); + offerManager.cancel(OFFER_A.getOffer().getId()); assertEquals(1, statsProvider.getLongValue(OFFER_CANCEL_FAILURES)); } @Test public void testBanAndUnbanOffer() { + expectFilterNone(); + control.replay(); // After adding a banned offer, user can see it is in OUTSTANDING_OFFERS but cannot retrieve it. - offerManager.banOffer(OFFER_A_ID); - offerManager.addOffer(OFFER_A); + offerManager.ban(OFFER_A_ID); + offerManager.add(OFFER_A); assertEquals(1, statsProvider.getLongValue(OUTSTANDING_OFFERS)); assertEquals(1, statsProvider.getLongValue(GLOBALLY_BANNED_OFFERS)); - assertTrue(Iterables.isEmpty(offerManager.getOffers(GROUP_KEY))); + assertTrue(Iterables.isEmpty(offerManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST, false))); - offerManager.cancelOffer(OFFER_A_ID); - offerManager.addOffer(OFFER_A); + offerManager.cancel(OFFER_A_ID); + offerManager.add(OFFER_A); assertEquals(1, statsProvider.getLongValue(OUTSTANDING_OFFERS)); assertEquals(0, statsProvider.getLongValue(GLOBALLY_BANNED_OFFERS)); - assertEquals(OFFER_A, Iterables.getOnlyElement(offerManager.getOffers(GROUP_KEY))); + assertEquals(OFFER_A, + Iterables.getOnlyElement(offerManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST, false))); } private static HostOffer setUnavailability(HostOffer offer, long startMs) { @@ -363,7 +394,7 @@ public class OfferManagerImplTest extends EasyMockTest { RETURN_DELAY, Long.MAX_VALUE, FAKE_TICKER); - return new OfferManagerImpl(driver, settings, statsProvider, new Noop()); + return new OfferManagerImpl(driver, settings, statsProvider, new Noop(), schedulingFilter); } @Test @@ -377,25 +408,25 @@ public class OfferManagerImplTest extends EasyMockTest { mesosScalar(CPUS, 24.0, true), mesosScalar(RAM_MB, 1024)), HOST_ATTRIBUTES_A), DRAINING); - HostOffer medium = setMode(new HostOffer( offer("host2", mesosScalar(CPUS, 5.0), mesosScalar(RAM_MB, 1024)), HOST_ATTRIBUTES_A), DRAINING); - HostOffer large = setMode(new HostOffer( offer("host3", mesosScalar(CPUS, 10.0), mesosScalar(RAM_MB, 1024)), HOST_ATTRIBUTES_A), DRAINING); + expectFilterNone(); + control.replay(); - cpuManager.addOffer(medium); - cpuManager.addOffer(large); - cpuManager.addOffer(small); + cpuManager.add(medium); + cpuManager.add(large); + cpuManager.add(small); assertEquals(ImmutableList.of(small, medium, large), - ImmutableList.copyOf(cpuManager.getOffers(GROUP_KEY))); + ImmutableList.copyOf(cpuManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST, false))); assertEquals(ImmutableList.of(small, medium, large), - ImmutableList.copyOf(cpuManager.getOffers())); + ImmutableList.copyOf(cpuManager.getAll())); } @Test @@ -410,7 +441,6 @@ public class OfferManagerImplTest extends EasyMockTest { mesosScalar(CPUS, 23.0, true), mesosScalar(RAM_MB, 1024)), HOST_ATTRIBUTES_A), DRAINING); - HostOffer medium = setMode(new HostOffer( offer( "host1", @@ -418,21 +448,22 @@ public class OfferManagerImplTest extends EasyMockTest { mesosScalar(CPUS, 24.0, true), mesosScalar(RAM_MB, 1024)), HOST_ATTRIBUTES_A), DRAINING); - HostOffer large = setMode(new HostOffer( offer("host3", mesosScalar(CPUS, 1.0), mesosScalar(RAM_MB, 1024)), HOST_ATTRIBUTES_A), DRAINING); + expectFilterNone(); + control.replay(); - cpuManager.addOffer(medium); - cpuManager.addOffer(large); - cpuManager.addOffer(small); + cpuManager.add(medium); + cpuManager.add(large); + cpuManager.add(small); assertEquals(ImmutableList.of(small, medium, large), - ImmutableList.copyOf(cpuManager.getOffers(GROUP_KEY))); + ImmutableList.copyOf(cpuManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST, true))); assertEquals(ImmutableList.of(small, medium, large), - ImmutableList.copyOf(cpuManager.getOffers())); + ImmutableList.copyOf(cpuManager.getAll())); } @Test @@ -442,25 +473,25 @@ public class OfferManagerImplTest extends EasyMockTest { HostOffer small = setMode(new HostOffer( offer("host1", mesosScalar(CPUS, 1), mesosScalar(RAM_MB, 1), mesosScalar(DISK_MB, 1.0)), HOST_ATTRIBUTES_A), DRAINING); - HostOffer medium = setMode(new HostOffer( offer("host2", mesosScalar(CPUS, 1), mesosScalar(RAM_MB, 1), mesosScalar(DISK_MB, 5.0)), HOST_ATTRIBUTES_A), DRAINING); - HostOffer large = setMode(new HostOffer( offer("host3", mesosScalar(CPUS, 1), mesosScalar(RAM_MB, 1), mesosScalar(DISK_MB, 10.0)), HOST_ATTRIBUTES_A), DRAINING); + expectFilterNone(); + control.replay(); - cpuManager.addOffer(medium); - cpuManager.addOffer(large); - cpuManager.addOffer(small); + cpuManager.add(medium); + cpuManager.add(large); + cpuManager.add(small); assertEquals(ImmutableList.of(small, medium, large), - ImmutableList.copyOf(cpuManager.getOffers(GROUP_KEY))); + ImmutableList.copyOf(cpuManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST, false))); assertEquals(ImmutableList.of(small, medium, large), - ImmutableList.copyOf(cpuManager.getOffers())); + ImmutableList.copyOf(cpuManager.getAll())); } @Test @@ -470,25 +501,25 @@ public class OfferManagerImplTest extends EasyMockTest { HostOffer small = setMode(new HostOffer( offer("host1", mesosScalar(CPUS, 10), mesosScalar(RAM_MB, 1.0)), HOST_ATTRIBUTES_A), DRAINING); - HostOffer medium = setMode(new HostOffer( offer("host2", mesosScalar(CPUS, 10), mesosScalar(RAM_MB, 5.0)), HOST_ATTRIBUTES_A), DRAINING); - HostOffer large = setMode(new HostOffer( offer("host3", mesosScalar(CPUS, 10), mesosScalar(RAM_MB, 10.0)), HOST_ATTRIBUTES_A), DRAINING); + expectFilterNone(); + control.replay(); - cpuManager.addOffer(medium); - cpuManager.addOffer(large); - cpuManager.addOffer(small); + cpuManager.add(medium); + cpuManager.add(large); + cpuManager.add(small); assertEquals(ImmutableList.of(small, medium, large), - ImmutableList.copyOf(cpuManager.getOffers(GROUP_KEY))); + ImmutableList.copyOf(cpuManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST, false))); assertEquals(ImmutableList.of(small, medium, large), - ImmutableList.copyOf(cpuManager.getOffers())); + ImmutableList.copyOf(cpuManager.getAll())); } @Test @@ -502,14 +533,12 @@ public class OfferManagerImplTest extends EasyMockTest { mesosScalar(RAM_MB, 2.0), mesosScalar(DISK_MB, 3.0)), HOST_ATTRIBUTES_A), DRAINING); - HostOffer medium = setMode(new HostOffer( offer("host2", mesosScalar(CPUS, 1.0), mesosScalar(RAM_MB, 3.0), mesosScalar(DISK_MB, 2.0)), HOST_ATTRIBUTES_A), DRAINING); - HostOffer large = setMode(new HostOffer( offer("host3", mesosScalar(CPUS, 10.0), @@ -518,16 +547,18 @@ public class OfferManagerImplTest extends EasyMockTest { mesosScalar(DISK_MB, 1.0)), HOST_ATTRIBUTES_A), DRAINING); + expectFilterNone(); + control.replay(); - cpuManager.addOffer(large); - cpuManager.addOffer(medium); - cpuManager.addOffer(small); + cpuManager.add(large); + cpuManager.add(medium); + cpuManager.add(small); assertEquals(ImmutableList.of(small, medium, large), - ImmutableList.copyOf(cpuManager.getOffers(GROUP_KEY))); + ImmutableList.copyOf(cpuManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST, false))); assertEquals(ImmutableList.of(small, medium, large), - ImmutableList.copyOf(cpuManager.getOffers())); + ImmutableList.copyOf(cpuManager.getAll())); } @Test @@ -545,13 +576,14 @@ public class OfferManagerImplTest extends EasyMockTest { driver, settings, statsProvider, - new Deferment.DelayedDeferment(() -> RETURN_DELAY, executorMock)); + new Deferment.DelayedDeferment(() -> RETURN_DELAY, executorMock), + schedulingFilter); driver.declineOffer(OFFER_A_ID, OFFER_FILTER); control.replay(); - offerManager.addOffer(OFFER_A); + offerManager.add(OFFER_A); assertEquals(1, statsProvider.getLongValue(OUTSTANDING_OFFERS)); clock.advance(RETURN_DELAY); @@ -575,11 +607,154 @@ public class OfferManagerImplTest extends EasyMockTest { control.replay(); - offerManager.banOffer(OFFER_A_ID); - offerManager.addOffer(OFFER_A); - offerManager.addOffer(sameAgent); - offerManager.cancelOffer(OFFER_A_ID); - offerManager.addOffer(sameAgent2); - assertEquals(ImmutableSet.of(sameAgent2), offerManager.getOffers()); + offerManager.ban(OFFER_A_ID); + offerManager.add(OFFER_A); + offerManager.add(sameAgent); + offerManager.cancel(OFFER_A_ID); + offerManager.add(sameAgent2); + assertEquals(ImmutableSet.of(sameAgent2), offerManager.getAll()); + } + + private void expectFilterNone() { + // Most tests will use a permissive scheduling filter + expect(schedulingFilter.filter(anyObject(), anyObject())) + .andReturn(ImmutableSet.of()) + .anyTimes(); + } + + @Test + public void testGetMatchingSingleAgent() { + expectFilterNone(); + + control.replay(); + offerManager.add(OFFER_A); + assertEquals(Optional.of(OFFER_A), + offerManager.getMatching(OFFER_A.getOffer().getAgentId(), EMPTY_REQUEST, false)); + } + + @Test + public void testGetMatchingNoGloballyBanned() { + expectFilterNone(); + + control.replay(); + offerManager.add(OFFER_A); + assertEquals(0, statsProvider.getLongValue(GLOBALLY_BANNED_OFFERS)); + offerManager.ban(OFFER_A_ID); + assertEquals(Optional.absent(), + offerManager.getMatching(OFFER_A.getOffer().getAgentId(), EMPTY_REQUEST, false)); + assertEquals(1, statsProvider.getLongValue(GLOBALLY_BANNED_OFFERS)); + } + + @Test + public void testGetMatchingNoVetoed() { + // Calling getMatching when a veto is present should return an empty option. Additionally, + // it should not statically ban the offer if it is vetoed. + expect(schedulingFilter.filter(new UnusedResource(OFFER_A, false), EMPTY_REQUEST)) + .andReturn(ImmutableSet.of(SchedulingFilter.Veto.dedicatedHostConstraintMismatch())); + + control.replay(); + offerManager.add(OFFER_A); + assertEquals(0, statsProvider.getLongValue(STATICALLY_BANNED_OFFERS)); + assertEquals(Optional.absent(), + offerManager.getMatching(OFFER_A.getOffer().getAgentId(), EMPTY_REQUEST, false)); + assertEquals(0, statsProvider.getLongValue(STATICALLY_BANNED_OFFERS)); + } + + @Test + public void testGetAllMatching() { + expectFilterNone(); + + control.replay(); + offerManager.add(OFFER_A); + offerManager.add(OFFER_B); + offerManager.add(OFFER_C); + assertEquals(0, statsProvider.getLongValue(VETO_EVALUATED_OFFERS)); + assertEquals(ImmutableSet.of(OFFER_A, OFFER_B, OFFER_C), + ImmutableSet.copyOf(offerManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST, false))); + assertEquals(3, statsProvider.getLongValue(VETO_EVALUATED_OFFERS)); + } + + @Test + public void testGetAllMatchingNoGloballyBanned() { + expectFilterNone(); + + control.replay(); + offerManager.add(OFFER_A); + offerManager.add(OFFER_B); + offerManager.add(OFFER_C); + assertEquals(0, statsProvider.getLongValue(VETO_EVALUATED_OFFERS)); + assertEquals(0, statsProvider.getLongValue(GLOBALLY_BANNED_OFFERS)); + offerManager.ban(OFFER_B.getOffer().getId()); + assertEquals(ImmutableSet.of(OFFER_A, OFFER_C), + ImmutableSet.copyOf(offerManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST, false))); + assertEquals(2, statsProvider.getLongValue(VETO_EVALUATED_OFFERS)); + assertEquals(1, statsProvider.getLongValue(GLOBALLY_BANNED_OFFERS)); + } + + @Test + public void testGetAllMatchingNoStaticallyBanned() { + expectFilterNone(); + + control.replay(); + offerManager.add(OFFER_A); + offerManager.add(OFFER_B); + offerManager.add(OFFER_C); + assertEquals(0, statsProvider.getLongValue(VETO_EVALUATED_OFFERS)); + assertEquals(0, statsProvider.getLongValue(STATICALLY_BANNED_OFFERS)); + offerManager.banForTaskGroup(OFFER_B.getOffer().getId(), GROUP_KEY); + assertEquals(ImmutableSet.of(OFFER_A, OFFER_C), + ImmutableSet.copyOf(offerManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST, false))); + assertEquals(2, statsProvider.getLongValue(VETO_EVALUATED_OFFERS)); + assertEquals(1, statsProvider.getLongValue(STATICALLY_BANNED_OFFERS)); + assertEquals(ImmutableSet.of(Pair.of(OFFER_B.getOffer().getId(), GROUP_KEY)), + offerManager.getStaticBans()); + } + + @Test + public void testGetAllMatchingIgnoreNoCpusAndMem() { + expectFilterNone(); + + HostOffer empty = setMode(new HostOffer( + offer("host1", + mesosScalar(CPUS, 0), + mesosScalar(RAM_MB, 0), + mesosScalar(DISK_MB, 3.0)), + HOST_ATTRIBUTES_A), NONE); + + control.replay(); + offerManager.add(empty); + offerManager.add(OFFER_A); + assertEquals(0, statsProvider.getLongValue(GLOBALLY_BANNED_OFFERS)); + assertEquals(0, statsProvider.getLongValue(STATICALLY_BANNED_OFFERS)); + assertEquals(0, statsProvider.getLongValue(VETO_EVALUATED_OFFERS)); + assertEquals(ImmutableSet.of(OFFER_A), + ImmutableSet.copyOf(offerManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST, false))); + assertEquals(1, statsProvider.getLongValue(VETO_EVALUATED_OFFERS)); + assertEquals(ImmutableSet.of(empty, OFFER_A), + ImmutableSet.copyOf(offerManager.getAll())); + } + + @Test + public void testGetAllMatchingNoVetoed() { + // Calling getAllMatching should statically ban the offer as well if it is statically vetoed + expect(schedulingFilter.filter(new UnusedResource(OFFER_A, false), EMPTY_REQUEST)) + .andReturn(ImmutableSet.of(SchedulingFilter.Veto.dedicatedHostConstraintMismatch())); + expect(schedulingFilter.filter(new UnusedResource(OFFER_B, false), EMPTY_REQUEST)) + .andReturn(ImmutableSet.of()); + expect(schedulingFilter.filter(new UnusedResource(OFFER_C, false), EMPTY_REQUEST)) + .andReturn(ImmutableSet.of(SchedulingFilter.Veto.unsatisfiedLimit("test_limit"))); + + control.replay(); + offerManager.add(OFFER_A); + offerManager.add(OFFER_B); + offerManager.add(OFFER_C); + assertEquals(0, statsProvider.getLongValue(VETO_EVALUATED_OFFERS)); + assertEquals(0, statsProvider.getLongValue(STATICALLY_BANNED_OFFERS)); + assertEquals(ImmutableSet.of(OFFER_B), + ImmutableSet.copyOf(offerManager.getAllMatching(GROUP_KEY, EMPTY_REQUEST, false))); + assertEquals(3, statsProvider.getLongValue(VETO_EVALUATED_OFFERS)); + assertEquals(1, statsProvider.getLongValue(STATICALLY_BANNED_OFFERS)); + assertEquals(ImmutableSet.of(Pair.of(OFFER_A.getOffer().getId(), GROUP_KEY)), + offerManager.getStaticBans()); } } http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/test/java/org/apache/aurora/scheduler/preemptor/PendingTaskProcessorTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/preemptor/PendingTaskProcessorTest.java b/src/test/java/org/apache/aurora/scheduler/preemptor/PendingTaskProcessorTest.java index c76b3e3..a346e44 100644 --- a/src/test/java/org/apache/aurora/scheduler/preemptor/PendingTaskProcessorTest.java +++ b/src/test/java/org/apache/aurora/scheduler/preemptor/PendingTaskProcessorTest.java @@ -300,7 +300,7 @@ public class PendingTaskProcessorTest extends EasyMockTest { } private void expectOffers(HostOffer... offers) { - expect(offerManager.getOffers()).andReturn(ImmutableSet.copyOf(offers)); + expect(offerManager.getAll()).andReturn(ImmutableSet.copyOf(offers)); } private void expectGetClusterState(IScheduledTask... returnedTasks) { http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptorImplTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptorImplTest.java b/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptorImplTest.java index f14cba1..1061583 100644 --- a/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptorImplTest.java +++ b/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptorImplTest.java @@ -82,7 +82,7 @@ public class PreemptorImplTest extends EasyMockTest { slotCache = createMock(new Clazz<BiCache<PreemptionProposal, TaskGroupKey>>() { }); statsProvider = new FakeStatsProvider(); OfferManager offerManager = createMock(OfferManager.class); - expect(offerManager.getOffer(anyObject(Protos.AgentID.class))) + expect(offerManager.get(anyObject(Protos.AgentID.class))) .andReturn(Optional.of(OFFER)) .anyTimes(); http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptorModuleTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptorModuleTest.java b/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptorModuleTest.java index 97c238c..815a3ef 100644 --- a/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptorModuleTest.java +++ b/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptorModuleTest.java @@ -25,8 +25,8 @@ import org.apache.aurora.gen.AssignedTask; import org.apache.aurora.scheduler.config.CliOptions; import org.apache.aurora.scheduler.filter.AttributeAggregate; import org.apache.aurora.scheduler.filter.SchedulingFilter; +import org.apache.aurora.scheduler.scheduling.TaskAssigner; import org.apache.aurora.scheduler.state.StateManager; -import org.apache.aurora.scheduler.state.TaskAssigner; import org.apache.aurora.scheduler.storage.Storage; import org.apache.aurora.scheduler.storage.entities.IAssignedTask; import org.apache.aurora.scheduler.storage.testing.StorageTestUtil; http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/test/java/org/apache/aurora/scheduler/scheduling/FirstFitOfferSelectorTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/scheduling/FirstFitOfferSelectorTest.java b/src/test/java/org/apache/aurora/scheduler/scheduling/FirstFitOfferSelectorTest.java new file mode 100644 index 0000000..627055c --- /dev/null +++ b/src/test/java/org/apache/aurora/scheduler/scheduling/FirstFitOfferSelectorTest.java @@ -0,0 +1,66 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.scheduling; + +import com.google.common.collect.ImmutableList; + +import org.apache.aurora.common.testing.easymock.EasyMockTest; +import org.apache.aurora.scheduler.HostOffer; +import org.apache.aurora.scheduler.resources.ResourceBag; +import org.apache.aurora.scheduler.storage.entities.IAssignedTask; +import org.junit.Before; +import org.junit.Test; + +import static org.apache.aurora.scheduler.base.TaskTestUtil.JOB; +import static org.apache.aurora.scheduler.base.TaskTestUtil.makeTask; +import static org.apache.aurora.scheduler.filter.AttributeAggregate.empty; +import static org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; + +public class FirstFitOfferSelectorTest extends EasyMockTest { + + private static final IAssignedTask TASK = makeTask("id", JOB).getAssignedTask(); + private static final ResourceRequest EMPTY_REQUEST = new ResourceRequest( + TASK.getTask(), + ResourceBag.EMPTY, + empty()); + + private OfferSelector firstFitOfferSelector; + + @Before + public void setUp() { + firstFitOfferSelector = new FirstFitOfferSelector(); + } + + @Test + public void testNoOffers() { + Iterable<HostOffer> offers = ImmutableList.of(); + + control.replay(); + + assertFalse(firstFitOfferSelector.select(offers, EMPTY_REQUEST).isPresent()); + } + + @Test + public void testReturnFirstOffer() { + HostOffer offerA = createMock(HostOffer.class); + HostOffer offerB = createMock(HostOffer.class); + Iterable<HostOffer> offers = ImmutableList.of(offerA, offerB); + + control.replay(); + + assertEquals(offerA, firstFitOfferSelector.select(offers, EMPTY_REQUEST).get()); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/test/java/org/apache/aurora/scheduler/scheduling/TaskAssignerImplTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/scheduling/TaskAssignerImplTest.java b/src/test/java/org/apache/aurora/scheduler/scheduling/TaskAssignerImplTest.java new file mode 100644 index 0000000..e094950 --- /dev/null +++ b/src/test/java/org/apache/aurora/scheduler/scheduling/TaskAssignerImplTest.java @@ -0,0 +1,374 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.scheduling; + +import java.util.Map; +import java.util.Set; + +import com.google.common.base.Optional; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; + +import org.apache.aurora.common.testing.easymock.EasyMockTest; +import org.apache.aurora.gen.AssignedTask; +import org.apache.aurora.gen.Attribute; +import org.apache.aurora.gen.HostAttributes; +import org.apache.aurora.gen.JobKey; +import org.apache.aurora.gen.TaskConfig; +import org.apache.aurora.scheduler.HostOffer; +import org.apache.aurora.scheduler.TierManager; +import org.apache.aurora.scheduler.base.InstanceKeys; +import org.apache.aurora.scheduler.base.TaskGroupKey; +import org.apache.aurora.scheduler.base.Tasks; +import org.apache.aurora.scheduler.filter.AttributeAggregate; +import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest; +import org.apache.aurora.scheduler.mesos.MesosTaskFactory; +import org.apache.aurora.scheduler.offers.OfferManager; +import org.apache.aurora.scheduler.resources.ResourceBag; +import org.apache.aurora.scheduler.state.StateChangeResult; +import org.apache.aurora.scheduler.state.StateManager; +import org.apache.aurora.scheduler.storage.entities.IAssignedTask; +import org.apache.aurora.scheduler.storage.entities.IHostAttributes; +import org.apache.aurora.scheduler.storage.entities.IInstanceKey; +import org.apache.aurora.scheduler.storage.entities.IScheduledTask; +import org.apache.aurora.scheduler.storage.entities.ITaskConfig; +import org.apache.aurora.scheduler.testing.FakeStatsProvider; +import org.apache.aurora.scheduler.updater.UpdateAgentReserver; +import org.apache.mesos.v1.Protos.AgentID; +import org.apache.mesos.v1.Protos.FrameworkID; +import org.apache.mesos.v1.Protos.OfferID; +import org.apache.mesos.v1.Protos.TaskID; +import org.apache.mesos.v1.Protos.TaskInfo; +import org.junit.Before; +import org.junit.Test; + +import static org.apache.aurora.gen.ScheduleStatus.LOST; +import static org.apache.aurora.gen.ScheduleStatus.PENDING; +import static org.apache.aurora.scheduler.base.TaskTestUtil.DEV_TIER; +import static org.apache.aurora.scheduler.base.TaskTestUtil.JOB; +import static org.apache.aurora.scheduler.base.TaskTestUtil.makeTask; +import static org.apache.aurora.scheduler.filter.AttributeAggregate.empty; +import static org.apache.aurora.scheduler.resources.ResourceTestUtil.mesosRange; +import static org.apache.aurora.scheduler.resources.ResourceTestUtil.mesosScalar; +import static org.apache.aurora.scheduler.resources.ResourceTestUtil.offer; +import static org.apache.aurora.scheduler.resources.ResourceType.CPUS; +import static org.apache.aurora.scheduler.resources.ResourceType.PORTS; +import static org.apache.aurora.scheduler.resources.ResourceType.RAM_MB; +import static org.apache.aurora.scheduler.scheduling.TaskAssignerImpl.ASSIGNER_LAUNCH_FAILURES; +import static org.apache.aurora.scheduler.scheduling.TaskAssignerImpl.LAUNCH_FAILED_MSG; +import static org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider; +import static org.apache.mesos.v1.Protos.Offer; +import static org.easymock.EasyMock.anyObject; +import static org.easymock.EasyMock.anyString; +import static org.easymock.EasyMock.eq; +import static org.easymock.EasyMock.expect; +import static org.easymock.EasyMock.expectLastCall; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; + +public class TaskAssignerImplTest extends EasyMockTest { + + private static final int PORT = 1000; + private static final Offer MESOS_OFFER = + offer(mesosScalar(CPUS, 1), mesosScalar(RAM_MB, 1024), mesosRange(PORTS, PORT)); + private static final String SLAVE_ID = MESOS_OFFER.getAgentId().getValue(); + private static final HostOffer OFFER = + new HostOffer(MESOS_OFFER, IHostAttributes.build(new HostAttributes() + .setHost(MESOS_OFFER.getHostname()) + .setAttributes(ImmutableSet.of( + new Attribute("host", ImmutableSet.of(MESOS_OFFER.getHostname())))))); + private static final IScheduledTask TASK = makeTask("id", JOB); + private static final TaskGroupKey GROUP_KEY = TaskGroupKey.from(TASK.getAssignedTask().getTask()); + private static final TaskInfo TASK_INFO = TaskInfo.newBuilder() + .setName("taskName") + .setTaskId(TaskID.newBuilder().setValue(Tasks.id(TASK))) + .setAgentId(MESOS_OFFER.getAgentId()) + .build(); + private static final IInstanceKey INSTANCE_KEY = + InstanceKeys.from(JOB, TASK.getAssignedTask().getInstanceId()); + private static final Map<String, TaskGroupKey> NO_RESERVATION = ImmutableMap.of(); + private static final HostOffer OFFER_2 = new HostOffer( + Offer.newBuilder() + .setId(OfferID.newBuilder().setValue("offerId0")) + .setFrameworkId(FrameworkID.newBuilder().setValue("frameworkId")) + .setAgentId(AgentID.newBuilder().setValue("slaveId0")) + .setHostname("hostName0") + .addResources(mesosRange(PORTS, PORT)) + .addResources(mesosScalar(CPUS, 1)) + .addResources(mesosScalar(RAM_MB, 1024)) + .build(), + IHostAttributes.build(new HostAttributes())); + + private static final Set<String> NO_ASSIGNMENT = ImmutableSet.of(); + + private AttributeAggregate aggregate; + private ResourceRequest resourceRequest; + + private MutableStoreProvider storeProvider; + private StateManager stateManager; + private MesosTaskFactory taskFactory; + private OfferManager offerManager; + private TaskAssignerImpl assigner; + private TierManager tierManager; + private FakeStatsProvider statsProvider; + private UpdateAgentReserver updateAgentReserver; + + @Before + public void setUp() throws Exception { + storeProvider = createMock(MutableStoreProvider.class); + taskFactory = createMock(MesosTaskFactory.class); + stateManager = createMock(StateManager.class); + offerManager = createMock(OfferManager.class); + tierManager = createMock(TierManager.class); + updateAgentReserver = createMock(UpdateAgentReserver.class); + statsProvider = new FakeStatsProvider(); + // TODO(jly): FirstFitOfferSelector returns the first offer which is what we want for testing, + // but if its implementation becomes more complex we may need to replace it with a fake. + OfferSelector offerSelector = new FirstFitOfferSelector(); + assigner = new TaskAssignerImpl( + stateManager, + taskFactory, + offerManager, + tierManager, + updateAgentReserver, + statsProvider, + offerSelector); + aggregate = empty(); + resourceRequest = new ResourceRequest( + TASK.getAssignedTask().getTask(), + ResourceBag.EMPTY, + aggregate); + } + + @Test + public void testAssignNoTasks() throws Exception { + control.replay(); + + assertEquals( + NO_ASSIGNMENT, + assigner.maybeAssign(storeProvider, null, null, ImmutableSet.of(), null)); + } + + @Test + public void testAssignmentClearedOnError() throws Exception { + expectNoUpdateReservations(1); + expect(offerManager.getAllMatching(GROUP_KEY, resourceRequest, false)) + .andReturn(ImmutableSet.of(OFFER, OFFER_2)); + offerManager.launchTask(MESOS_OFFER.getId(), TASK_INFO); + expectLastCall().andThrow(new OfferManager.LaunchException("expected")); + expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEV_TIER); + expectAssignTask(MESOS_OFFER); + expect(stateManager.changeState( + storeProvider, + Tasks.id(TASK), + Optional.of(PENDING), + LOST, + LAUNCH_FAILED_MSG)) + .andReturn(StateChangeResult.SUCCESS); + expect(taskFactory.createFrom(TASK.getAssignedTask(), MESOS_OFFER, false)) + .andReturn(TASK_INFO); + + control.replay(); + + assertEquals(0L, statsProvider.getLongValue(ASSIGNER_LAUNCH_FAILURES)); + // Ensures scheduling loop terminates on the first launch failure. + assertEquals( + NO_ASSIGNMENT, + assigner.maybeAssign( + storeProvider, + resourceRequest, + TaskGroupKey.from(TASK.getAssignedTask().getTask()), + ImmutableSet.of( + TASK.getAssignedTask(), + makeTask("id2", JOB).getAssignedTask(), + makeTask("id3", JOB).getAssignedTask()), + NO_RESERVATION)); + assertEquals(1L, statsProvider.getLongValue(ASSIGNER_LAUNCH_FAILURES)); + } + + @Test + public void testAssignmentSkippedForReservedSlave() throws Exception { + expectNoUpdateReservations(0); + expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEV_TIER); + expect(offerManager.getAllMatching(GROUP_KEY, resourceRequest, false)) + .andReturn(ImmutableSet.of(OFFER)); + + control.replay(); + + assertEquals( + NO_ASSIGNMENT, + assigner.maybeAssign( + storeProvider, + resourceRequest, + TaskGroupKey.from(TASK.getAssignedTask().getTask()), + ImmutableSet.of(TASK.getAssignedTask()), + ImmutableMap.of(SLAVE_ID, TaskGroupKey.from( + ITaskConfig.build(new TaskConfig().setJob(new JobKey("other", "e", "n"))))))); + } + + @Test + public void testTaskWithReservedSlaveLandsElsewhere() throws Exception { + // Ensures slave/task reservation relationship is only enforced in slave->task direction + // and permissive in task->slave direction. In other words, a task with a slave reservation + // should still be tried against other unreserved slaves. + expectNoUpdateReservations(1); + expect(offerManager.getAllMatching(GROUP_KEY, resourceRequest, false)) + .andReturn(ImmutableSet.of(OFFER_2, OFFER)); + expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEV_TIER); + expectAssignTask(OFFER_2.getOffer()); + expect(taskFactory.createFrom(TASK.getAssignedTask(), OFFER_2.getOffer(), false)) + .andReturn(TASK_INFO); + offerManager.launchTask(OFFER_2.getOffer().getId(), TASK_INFO); + + control.replay(); + + assertEquals( + ImmutableSet.of(Tasks.id(TASK)), + assigner.maybeAssign( + storeProvider, + resourceRequest, + TaskGroupKey.from(TASK.getAssignedTask().getTask()), + ImmutableSet.of(TASK.getAssignedTask()), + ImmutableMap.of(SLAVE_ID, GROUP_KEY))); + } + + @Test + public void testResourceMapperCallback() { + AssignedTask builder = TASK.newBuilder().getAssignedTask(); + builder.unsetAssignedPorts(); + + control.replay(); + + assertEquals( + TASK.getAssignedTask(), + assigner.mapAndAssignResources(MESOS_OFFER, IAssignedTask.build(builder))); + } + + @Test + public void testAssignToReservedAgent() throws Exception { + expect(updateAgentReserver.hasReservations(GROUP_KEY)).andReturn(true); + expect(updateAgentReserver.getAgent(INSTANCE_KEY)).andReturn(Optional.of(SLAVE_ID)); + updateAgentReserver.release(SLAVE_ID, INSTANCE_KEY); + expect(offerManager.getMatching(MESOS_OFFER.getAgentId(), resourceRequest, false)) + .andReturn(Optional.of(OFFER)); + expectAssignTask(MESOS_OFFER); + offerManager.launchTask(MESOS_OFFER.getId(), TASK_INFO); + expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEV_TIER); + + expect(taskFactory.createFrom(TASK.getAssignedTask(), MESOS_OFFER, false)) + .andReturn(TASK_INFO); + + control.replay(); + + assertEquals( + ImmutableSet.of(Tasks.id(TASK)), + assigner.maybeAssign( + storeProvider, + resourceRequest, + TaskGroupKey.from(TASK.getAssignedTask().getTask()), + ImmutableSet.of( + TASK.getAssignedTask()), + ImmutableMap.of(SLAVE_ID, GROUP_KEY))); + assertNotEquals(empty(), aggregate); + } + + @Test + public void testAssignReservedAgentWhenOfferNotReady() throws Exception { + expect(updateAgentReserver.hasReservations(GROUP_KEY)).andReturn(true); + expect(updateAgentReserver.getAgent(INSTANCE_KEY)).andReturn(Optional.of(SLAVE_ID)); + expect(offerManager.getMatching(MESOS_OFFER.getAgentId(), resourceRequest, false)) + .andReturn(Optional.absent()); + expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEV_TIER); + expectLastCall(); + + control.replay(); + + assertEquals( + ImmutableSet.of(), + assigner.maybeAssign( + storeProvider, + resourceRequest, + TaskGroupKey.from(TASK.getAssignedTask().getTask()), + ImmutableSet.of(TASK.getAssignedTask()), + ImmutableMap.of(SLAVE_ID, GROUP_KEY))); + assertEquals(empty(), aggregate); + } + + @Test + public void testAssignWithMixOfReservedAndNotReserved() throws Exception { + expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEV_TIER); + + expect(updateAgentReserver.hasReservations(GROUP_KEY)).andReturn(true); + expect(updateAgentReserver.getAgent(INSTANCE_KEY)).andReturn(Optional.of(SLAVE_ID)); + updateAgentReserver.release(SLAVE_ID, INSTANCE_KEY); + expect(offerManager.getMatching(MESOS_OFFER.getAgentId(), resourceRequest, false)) + .andReturn(Optional.of(OFFER)); + expectAssignTask(MESOS_OFFER); + offerManager.launchTask(MESOS_OFFER.getId(), TASK_INFO); + expect(taskFactory.createFrom(TASK.getAssignedTask(), MESOS_OFFER, false)) + .andReturn(TASK_INFO); + + // Normal scheduling loop for the remaining task... + IScheduledTask secondTask = makeTask("another-task", JOB, 9999); + TaskInfo secondTaskInfo = TaskInfo.newBuilder() + .setName("another-task") + .setTaskId(TaskID.newBuilder().setValue(Tasks.id(secondTask))) + .setAgentId(MESOS_OFFER.getAgentId()) + .build(); + expect(updateAgentReserver.getAgent(InstanceKeys.from(JOB, 9999))).andReturn(Optional.absent()); + ImmutableSet<HostOffer> matchingOffers = ImmutableSet.of(OFFER); + expect(offerManager.getAllMatching(GROUP_KEY, resourceRequest, false)) + .andReturn(matchingOffers); + expect(updateAgentReserver.getReservations(OFFER.getOffer().getAgentId().getValue())) + .andReturn(ImmutableSet.of()); + expectAssignTask(MESOS_OFFER, secondTask); + offerManager.launchTask(MESOS_OFFER.getId(), secondTaskInfo); + expect(taskFactory.createFrom(secondTask.getAssignedTask(), MESOS_OFFER, false)) + .andReturn(secondTaskInfo); + + control.replay(); + + assertEquals( + Tasks.ids(TASK, secondTask), + assigner.maybeAssign( + storeProvider, + resourceRequest, + GROUP_KEY, + ImmutableSet.of( + TASK.getAssignedTask(), + secondTask.getAssignedTask()), + ImmutableMap.of(SLAVE_ID, GROUP_KEY))); + assertNotEquals(empty(), aggregate); + } + + private void expectAssignTask(Offer offer) { + expectAssignTask(offer, TASK); + } + + private void expectAssignTask(Offer offer, IScheduledTask task) { + expect(stateManager.assignTask( + eq(storeProvider), + eq(Tasks.id(task)), + eq(offer.getHostname()), + eq(offer.getAgentId()), + anyObject())).andReturn(task.getAssignedTask()); + } + + private void expectNoUpdateReservations(int offers) { + expect(updateAgentReserver.hasReservations(anyObject())).andReturn(false); + for (int i = 0; i < offers; i++) { + expect(updateAgentReserver.getReservations(anyString())).andReturn(ImmutableSet.of()); + } + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/test/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImplTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImplTest.java b/src/test/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImplTest.java index 7a4525a..260ff9d 100644 --- a/src/test/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImplTest.java +++ b/src/test/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImplTest.java @@ -46,9 +46,7 @@ import org.apache.aurora.scheduler.preemptor.BiCache; import org.apache.aurora.scheduler.preemptor.Preemptor; import org.apache.aurora.scheduler.resources.ResourceBag; import org.apache.aurora.scheduler.resources.ResourceManager; -import org.apache.aurora.scheduler.scheduling.TaskScheduler.TaskSchedulerImpl; import org.apache.aurora.scheduler.state.PubsubTestUtil; -import org.apache.aurora.scheduler.state.TaskAssigner; import org.apache.aurora.scheduler.storage.Storage; import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult; import org.apache.aurora.scheduler.storage.entities.IJobKey; http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/test/java/org/apache/aurora/scheduler/state/FirstFitTaskAssignerTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/state/FirstFitTaskAssignerTest.java b/src/test/java/org/apache/aurora/scheduler/state/FirstFitTaskAssignerTest.java deleted file mode 100644 index 58f9de2..0000000 --- a/src/test/java/org/apache/aurora/scheduler/state/FirstFitTaskAssignerTest.java +++ /dev/null @@ -1,539 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.scheduler.state; - -import java.util.Map; -import java.util.Set; - -import com.google.common.base.Optional; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.ImmutableSet; - -import org.apache.aurora.common.testing.easymock.EasyMockTest; -import org.apache.aurora.gen.AssignedTask; -import org.apache.aurora.gen.Attribute; -import org.apache.aurora.gen.HostAttributes; -import org.apache.aurora.gen.JobKey; -import org.apache.aurora.gen.TaskConfig; -import org.apache.aurora.scheduler.HostOffer; -import org.apache.aurora.scheduler.TierManager; -import org.apache.aurora.scheduler.base.InstanceKeys; -import org.apache.aurora.scheduler.base.TaskGroupKey; -import org.apache.aurora.scheduler.base.Tasks; -import org.apache.aurora.scheduler.filter.AttributeAggregate; -import org.apache.aurora.scheduler.filter.SchedulingFilter; -import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest; -import org.apache.aurora.scheduler.filter.SchedulingFilter.UnusedResource; -import org.apache.aurora.scheduler.filter.SchedulingFilter.Veto; -import org.apache.aurora.scheduler.mesos.MesosTaskFactory; -import org.apache.aurora.scheduler.offers.OfferManager; -import org.apache.aurora.scheduler.resources.ResourceBag; -import org.apache.aurora.scheduler.state.TaskAssigner.FirstFitTaskAssigner; -import org.apache.aurora.scheduler.storage.entities.IAssignedTask; -import org.apache.aurora.scheduler.storage.entities.IHostAttributes; -import org.apache.aurora.scheduler.storage.entities.IInstanceKey; -import org.apache.aurora.scheduler.storage.entities.IScheduledTask; -import org.apache.aurora.scheduler.storage.entities.ITaskConfig; -import org.apache.aurora.scheduler.testing.FakeStatsProvider; -import org.apache.aurora.scheduler.updater.UpdateAgentReserver; -import org.apache.mesos.v1.Protos.AgentID; -import org.apache.mesos.v1.Protos.FrameworkID; -import org.apache.mesos.v1.Protos.OfferID; -import org.apache.mesos.v1.Protos.TaskID; -import org.apache.mesos.v1.Protos.TaskInfo; -import org.junit.Before; -import org.junit.Test; - -import static org.apache.aurora.gen.ScheduleStatus.LOST; -import static org.apache.aurora.gen.ScheduleStatus.PENDING; -import static org.apache.aurora.scheduler.base.TaskTestUtil.DEV_TIER; -import static org.apache.aurora.scheduler.base.TaskTestUtil.JOB; -import static org.apache.aurora.scheduler.base.TaskTestUtil.makeTask; -import static org.apache.aurora.scheduler.filter.AttributeAggregate.empty; -import static org.apache.aurora.scheduler.resources.ResourceManager.bagFromMesosResources; -import static org.apache.aurora.scheduler.resources.ResourceTestUtil.mesosRange; -import static org.apache.aurora.scheduler.resources.ResourceTestUtil.mesosScalar; -import static org.apache.aurora.scheduler.resources.ResourceTestUtil.offer; -import static org.apache.aurora.scheduler.resources.ResourceType.CPUS; -import static org.apache.aurora.scheduler.resources.ResourceType.PORTS; -import static org.apache.aurora.scheduler.resources.ResourceType.RAM_MB; -import static org.apache.aurora.scheduler.state.TaskAssigner.FirstFitTaskAssigner.ASSIGNER_EVALUATED_OFFERS; -import static org.apache.aurora.scheduler.state.TaskAssigner.FirstFitTaskAssigner.ASSIGNER_LAUNCH_FAILURES; -import static org.apache.aurora.scheduler.state.TaskAssigner.FirstFitTaskAssigner.LAUNCH_FAILED_MSG; -import static org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider; -import static org.apache.mesos.v1.Protos.Offer; -import static org.easymock.EasyMock.anyObject; -import static org.easymock.EasyMock.anyString; -import static org.easymock.EasyMock.eq; -import static org.easymock.EasyMock.expect; -import static org.easymock.EasyMock.expectLastCall; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotEquals; - -public class FirstFitTaskAssignerTest extends EasyMockTest { - - private static final int PORT = 1000; - private static final Offer MESOS_OFFER = - offer(mesosScalar(CPUS, 1), mesosScalar(RAM_MB, 1024), mesosRange(PORTS, PORT)); - private static final String SLAVE_ID = MESOS_OFFER.getAgentId().getValue(); - private static final HostOffer OFFER = - new HostOffer(MESOS_OFFER, IHostAttributes.build(new HostAttributes() - .setHost(MESOS_OFFER.getHostname()) - .setAttributes(ImmutableSet.of( - new Attribute("host", ImmutableSet.of(MESOS_OFFER.getHostname())))))); - private static final IScheduledTask TASK = makeTask("id", JOB); - private static final TaskGroupKey GROUP_KEY = TaskGroupKey.from(TASK.getAssignedTask().getTask()); - private static final TaskInfo TASK_INFO = TaskInfo.newBuilder() - .setName("taskName") - .setTaskId(TaskID.newBuilder().setValue(Tasks.id(TASK))) - .setAgentId(MESOS_OFFER.getAgentId()) - .build(); - private static final IInstanceKey INSTANCE_KEY = - InstanceKeys.from(JOB, TASK.getAssignedTask().getInstanceId()); - private static final Map<String, TaskGroupKey> NO_RESERVATION = ImmutableMap.of(); - private static final UnusedResource UNUSED = new UnusedResource( - bagFromMesosResources(MESOS_OFFER.getResourcesList()), - OFFER.getAttributes()); - private static final HostOffer OFFER_2 = new HostOffer( - Offer.newBuilder() - .setId(OfferID.newBuilder().setValue("offerId0")) - .setFrameworkId(FrameworkID.newBuilder().setValue("frameworkId")) - .setAgentId(AgentID.newBuilder().setValue("slaveId0")) - .setHostname("hostName0") - .addResources(mesosRange(PORTS, PORT)) - .addResources(mesosScalar(CPUS, 1)) - .addResources(mesosScalar(RAM_MB, 1024)) - .build(), - IHostAttributes.build(new HostAttributes())); - - private static final Set<String> NO_ASSIGNMENT = ImmutableSet.of(); - - private ResourceRequest resourceRequest; - - private MutableStoreProvider storeProvider; - private StateManager stateManager; - private SchedulingFilter filter; - private MesosTaskFactory taskFactory; - private OfferManager offerManager; - private FirstFitTaskAssigner assigner; - private TierManager tierManager; - private FakeStatsProvider statsProvider; - private UpdateAgentReserver updateAgentReserver; - - @Before - public void setUp() throws Exception { - storeProvider = createMock(MutableStoreProvider.class); - filter = createMock(SchedulingFilter.class); - taskFactory = createMock(MesosTaskFactory.class); - stateManager = createMock(StateManager.class); - offerManager = createMock(OfferManager.class); - tierManager = createMock(TierManager.class); - updateAgentReserver = createMock(UpdateAgentReserver.class); - statsProvider = new FakeStatsProvider(); - assigner = new FirstFitTaskAssigner( - stateManager, - filter, - taskFactory, - offerManager, - tierManager, - updateAgentReserver, - statsProvider); - resourceRequest = new ResourceRequest( - TASK.getAssignedTask().getTask(), - ResourceBag.EMPTY, - empty()); - } - - @Test - public void testAssignNoTasks() throws Exception { - control.replay(); - - assertEquals( - NO_ASSIGNMENT, - assigner.maybeAssign(storeProvider, null, null, ImmutableSet.of(), null)); - } - - @Test - public void testAssignPartialNoVetoes() throws Exception { - expectNoUpdateReservations(1); - expect(offerManager.getOffers(GROUP_KEY)).andReturn(ImmutableSet.of(OFFER)); - offerManager.launchTask(MESOS_OFFER.getId(), TASK_INFO); - expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEV_TIER); - expect(filter.filter(UNUSED, resourceRequest)).andReturn(ImmutableSet.of()); - expectAssignTask(MESOS_OFFER); - expect(taskFactory.createFrom(TASK.getAssignedTask(), MESOS_OFFER, false)) - .andReturn(TASK_INFO); - - control.replay(); - - AttributeAggregate aggregate = empty(); - assertEquals(0L, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS)); - assertEquals( - ImmutableSet.of(Tasks.id(TASK)), - assigner.maybeAssign( - storeProvider, - new ResourceRequest(TASK.getAssignedTask().getTask(), ResourceBag.EMPTY, aggregate), - TaskGroupKey.from(TASK.getAssignedTask().getTask()), - ImmutableSet.of( - TASK.getAssignedTask(), - makeTask("id2", JOB).getAssignedTask(), - makeTask("id3", JOB).getAssignedTask()), - ImmutableMap.of(SLAVE_ID, GROUP_KEY))); - assertNotEquals(empty(), aggregate); - assertEquals(1L, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS)); - } - - @Test - public void testAssignVetoesWithStaticBan() throws Exception { - expectNoUpdateReservations(1); - expect(offerManager.getOffers(GROUP_KEY)).andReturn(ImmutableSet.of(OFFER)); - offerManager.banOfferForTaskGroup(MESOS_OFFER.getId(), GROUP_KEY); - expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEV_TIER); - expect(filter.filter(UNUSED, resourceRequest)) - .andReturn(ImmutableSet.of(Veto.constraintMismatch("denied"))); - - control.replay(); - - assertEquals(0L, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS)); - assertEquals( - NO_ASSIGNMENT, - assigner.maybeAssign( - storeProvider, - resourceRequest, - TaskGroupKey.from(TASK.getAssignedTask().getTask()), - ImmutableSet.of(TASK.getAssignedTask()), - NO_RESERVATION)); - assertEquals(1L, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS)); - } - - @Test - public void testAssignVetoesWithNoStaticBan() throws Exception { - expectNoUpdateReservations(1); - expect(offerManager.getOffers(GROUP_KEY)).andReturn(ImmutableSet.of(OFFER)); - expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEV_TIER); - expect(filter.filter(UNUSED, resourceRequest)) - .andReturn(ImmutableSet.of(Veto.unsatisfiedLimit("limit"))); - - control.replay(); - - assertEquals(0L, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS)); - assertEquals( - NO_ASSIGNMENT, - assigner.maybeAssign( - storeProvider, - resourceRequest, - TaskGroupKey.from(TASK.getAssignedTask().getTask()), - ImmutableSet.of(TASK.getAssignedTask()), - NO_RESERVATION)); - assertEquals(1L, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS)); - } - - @Test - public void testAssignmentClearedOnError() throws Exception { - expectNoUpdateReservations(1); - expect(offerManager.getOffers(GROUP_KEY)).andReturn(ImmutableSet.of(OFFER, OFFER_2)); - offerManager.launchTask(MESOS_OFFER.getId(), TASK_INFO); - expectLastCall().andThrow(new OfferManager.LaunchException("expected")); - expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEV_TIER); - expect(filter.filter(UNUSED, resourceRequest)).andReturn(ImmutableSet.of()); - expectAssignTask(MESOS_OFFER); - expect(stateManager.changeState( - storeProvider, - Tasks.id(TASK), - Optional.of(PENDING), - LOST, - LAUNCH_FAILED_MSG)) - .andReturn(StateChangeResult.SUCCESS); - expect(taskFactory.createFrom(TASK.getAssignedTask(), MESOS_OFFER, false)) - .andReturn(TASK_INFO); - - control.replay(); - - assertEquals(0L, statsProvider.getLongValue(ASSIGNER_LAUNCH_FAILURES)); - assertEquals(0L, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS)); - // Ensures scheduling loop terminates on the first launch failure. - assertEquals( - NO_ASSIGNMENT, - assigner.maybeAssign( - storeProvider, - resourceRequest, - TaskGroupKey.from(TASK.getAssignedTask().getTask()), - ImmutableSet.of( - TASK.getAssignedTask(), - makeTask("id2", JOB).getAssignedTask(), - makeTask("id3", JOB).getAssignedTask()), - NO_RESERVATION)); - assertEquals(1L, statsProvider.getLongValue(ASSIGNER_LAUNCH_FAILURES)); - assertEquals(1L, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS)); - } - - @Test - public void testAssignmentSkippedForReservedSlave() throws Exception { - expectNoUpdateReservations(0); - expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEV_TIER); - expect(offerManager.getOffers(GROUP_KEY)).andReturn(ImmutableSet.of(OFFER)); - - control.replay(); - - assertEquals(0L, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS)); - assertEquals( - NO_ASSIGNMENT, - assigner.maybeAssign( - storeProvider, - resourceRequest, - TaskGroupKey.from(TASK.getAssignedTask().getTask()), - ImmutableSet.of(TASK.getAssignedTask()), - ImmutableMap.of(SLAVE_ID, TaskGroupKey.from( - ITaskConfig.build(new TaskConfig().setJob(new JobKey("other", "e", "n"))))))); - assertEquals(0, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS)); - } - - @Test - public void testTaskWithReservedSlaveLandsElsewhere() throws Exception { - // Ensures slave/task reservation relationship is only enforced in slave->task direction - // and permissive in task->slave direction. In other words, a task with a slave reservation - // should still be tried against other unreserved slaves. - expectNoUpdateReservations(1); - expect(offerManager.getOffers(GROUP_KEY)).andReturn(ImmutableSet.of(OFFER_2, OFFER)); - expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEV_TIER); - expect(filter.filter( - new UnusedResource( - bagFromMesosResources(OFFER_2.getOffer().getResourcesList()), - OFFER_2.getAttributes()), - resourceRequest)).andReturn(ImmutableSet.of()); - expectAssignTask(OFFER_2.getOffer()); - expect(taskFactory.createFrom(TASK.getAssignedTask(), OFFER_2.getOffer(), false)) - .andReturn(TASK_INFO); - offerManager.launchTask(OFFER_2.getOffer().getId(), TASK_INFO); - - control.replay(); - - assertEquals(0L, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS)); - assertEquals( - ImmutableSet.of(Tasks.id(TASK)), - assigner.maybeAssign( - storeProvider, - resourceRequest, - TaskGroupKey.from(TASK.getAssignedTask().getTask()), - ImmutableSet.of(TASK.getAssignedTask()), - ImmutableMap.of(SLAVE_ID, GROUP_KEY))); - assertEquals(1L, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS)); - } - - @Test - public void testAssignerDoesNotReturnOnFirstMismatch() throws Exception { - // Ensures scheduling loop does not terminate prematurely when the first mismatch is identified. - HostOffer mismatched = new HostOffer( - Offer.newBuilder() - .setId(OfferID.newBuilder().setValue("offerId0")) - .setFrameworkId(FrameworkID.newBuilder().setValue("frameworkId")) - .setAgentId(AgentID.newBuilder().setValue("slaveId0")) - .setHostname("hostName0") - .addResources(mesosRange(PORTS, PORT)) - .addResources(mesosScalar(CPUS, 1)) - .addResources(mesosScalar(RAM_MB, 1024)) - .build(), - IHostAttributes.build(new HostAttributes())); - - expectNoUpdateReservations(2); - expect(offerManager.getOffers(GROUP_KEY)).andReturn(ImmutableSet.of(mismatched, OFFER)); - expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEV_TIER); - expect(filter.filter( - new UnusedResource( - bagFromMesosResources(mismatched.getOffer().getResourcesList()), - mismatched.getAttributes()), - resourceRequest)) - .andReturn(ImmutableSet.of(Veto.constraintMismatch("constraint mismatch"))); - offerManager.banOfferForTaskGroup(mismatched.getOffer().getId(), GROUP_KEY); - expect(filter.filter( - new UnusedResource( - bagFromMesosResources(MESOS_OFFER.getResourcesList()), OFFER.getAttributes()), - resourceRequest)) - .andReturn(ImmutableSet.of()); - - expectAssignTask(MESOS_OFFER); - expect(taskFactory.createFrom(TASK.getAssignedTask(), OFFER.getOffer(), false)) - .andReturn(TASK_INFO); - offerManager.launchTask(OFFER.getOffer().getId(), TASK_INFO); - - control.replay(); - - assertEquals(0L, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS)); - assertEquals( - ImmutableSet.of(Tasks.id(TASK)), - assigner.maybeAssign( - storeProvider, - resourceRequest, - TaskGroupKey.from(TASK.getAssignedTask().getTask()), - ImmutableSet.of(TASK.getAssignedTask()), - ImmutableMap.of(SLAVE_ID, GROUP_KEY))); - assertEquals(2L, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS)); - } - - @Test - public void testResourceMapperCallback() { - AssignedTask builder = TASK.newBuilder().getAssignedTask(); - builder.unsetAssignedPorts(); - - control.replay(); - - assertEquals( - TASK.getAssignedTask(), - assigner.mapAndAssignResources(MESOS_OFFER, IAssignedTask.build(builder))); - } - - @Test - public void testAssignToReservedAgent() throws Exception { - expect(updateAgentReserver.hasReservations(GROUP_KEY)).andReturn(true); - expect(updateAgentReserver.getAgent(INSTANCE_KEY)).andReturn(Optional.of(SLAVE_ID)); - updateAgentReserver.release(SLAVE_ID, INSTANCE_KEY); - expect(offerManager.getOffer(MESOS_OFFER.getAgentId())).andReturn(Optional.of(OFFER)); - expect(filter.filter(UNUSED, resourceRequest)).andReturn(ImmutableSet.of()); - expectAssignTask(MESOS_OFFER); - offerManager.launchTask(MESOS_OFFER.getId(), TASK_INFO); - expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEV_TIER); - - expect(taskFactory.createFrom(TASK.getAssignedTask(), MESOS_OFFER, false)) - .andReturn(TASK_INFO); - - control.replay(); - - AttributeAggregate aggregate = empty(); - assertEquals(0L, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS)); - assertEquals( - ImmutableSet.of(Tasks.id(TASK)), - assigner.maybeAssign( - storeProvider, - new ResourceRequest(TASK.getAssignedTask().getTask(), ResourceBag.EMPTY, aggregate), - TaskGroupKey.from(TASK.getAssignedTask().getTask()), - ImmutableSet.of( - TASK.getAssignedTask()), - ImmutableMap.of(SLAVE_ID, GROUP_KEY))); - assertNotEquals(empty(), aggregate); - assertEquals(0L, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS)); - } - - @Test - public void testAssignReservedAgentWhenOfferNotReady() throws Exception { - expect(updateAgentReserver.hasReservations(GROUP_KEY)).andReturn(true); - expect(updateAgentReserver.getAgent(INSTANCE_KEY)).andReturn(Optional.of(SLAVE_ID)); - expect(offerManager.getOffer(MESOS_OFFER.getAgentId())).andReturn(Optional.of(OFFER)); - expect(filter.filter(UNUSED, resourceRequest)) - .andReturn(ImmutableSet.of(Veto.insufficientResources("cpu", 1))); - expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEV_TIER); - offerManager.banOfferForTaskGroup(MESOS_OFFER.getId(), GROUP_KEY); - expectLastCall(); - - control.replay(); - - AttributeAggregate aggregate = empty(); - assertEquals(0L, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS)); - assertEquals( - ImmutableSet.of(), - assigner.maybeAssign( - storeProvider, - new ResourceRequest(TASK.getAssignedTask().getTask(), ResourceBag.EMPTY, aggregate), - TaskGroupKey.from(TASK.getAssignedTask().getTask()), - ImmutableSet.of(TASK.getAssignedTask()), - ImmutableMap.of(SLAVE_ID, GROUP_KEY))); - assertEquals(empty(), aggregate); - assertEquals(0L, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS)); - } - - @Test - public void testAssignWithMixOfReservedAndNotReserved() throws Exception { - AttributeAggregate aggregate = empty(); - ResourceRequest resources = new ResourceRequest( - TASK.getAssignedTask().getTask(), ResourceBag.EMPTY, aggregate); - expect(updateAgentReserver.hasReservations(GROUP_KEY)).andReturn(true); - expect(updateAgentReserver.getAgent(INSTANCE_KEY)).andReturn(Optional.of(SLAVE_ID)); - updateAgentReserver.release(SLAVE_ID, INSTANCE_KEY); - expect(offerManager.getOffer(MESOS_OFFER.getAgentId())).andReturn(Optional.of(OFFER)); - expect(filter.filter(UNUSED, resourceRequest)).andReturn(ImmutableSet.of()); - expectAssignTask(MESOS_OFFER); - offerManager.launchTask(MESOS_OFFER.getId(), TASK_INFO); - expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEV_TIER); - - expect(taskFactory.createFrom(TASK.getAssignedTask(), MESOS_OFFER, false)) - .andReturn(TASK_INFO); - - // Normal scheduling loop for the remaining task... - expect(updateAgentReserver.getAgent(InstanceKeys.from(JOB, 9999))).andReturn(Optional.absent()); - expect(offerManager.getOffers(GROUP_KEY)).andReturn(ImmutableSet.of(OFFER)); - expect(updateAgentReserver.getReservations(OFFER.getOffer().getAgentId().getValue())) - .andReturn(ImmutableSet.of()); - expect(filter.filter(UNUSED, resources)) - .andReturn(ImmutableSet.of(Veto.constraintMismatch("lol"))); - offerManager.banOfferForTaskGroup(MESOS_OFFER.getId(), GROUP_KEY); - - control.replay(); - - assertEquals(0L, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS)); - assertEquals( - ImmutableSet.of(Tasks.id(TASK)), - assigner.maybeAssign( - storeProvider, - resources, - TaskGroupKey.from(TASK.getAssignedTask().getTask()), - ImmutableSet.of( - TASK.getAssignedTask(), - makeTask("another-task", JOB, 9999).getAssignedTask()), - ImmutableMap.of(SLAVE_ID, GROUP_KEY))); - assertNotEquals(empty(), aggregate); - assertEquals(1L, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS)); - } - - @Test - public void testSkipsOffersWithNoMemAndNoCpu() { - expectNoUpdateReservations(0); - expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEV_TIER); - - // Offer lacks CPU. - Offer mesosOffer = offer(mesosScalar(RAM_MB, 1024), mesosRange(PORTS, PORT)); - HostOffer offer = new HostOffer(mesosOffer, IHostAttributes.build(new HostAttributes() - .setHost(mesosOffer.getHostname()) - .setAttributes(ImmutableSet.of( - new Attribute("host", ImmutableSet.of(mesosOffer.getHostname())))))); - - expect(offerManager.getOffers(GROUP_KEY)).andReturn(ImmutableSet.of(offer)); - - control.replay(); - - assertEquals( - NO_ASSIGNMENT, - assigner.maybeAssign( - storeProvider, - resourceRequest, - TaskGroupKey.from(TASK.getAssignedTask().getTask()), - ImmutableSet.of(TASK.getAssignedTask()), - NO_RESERVATION)); - assertEquals(0, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS)); - } - - private void expectAssignTask(Offer offer) { - expect(stateManager.assignTask( - eq(storeProvider), - eq(Tasks.id(TASK)), - eq(offer.getHostname()), - eq(offer.getAgentId()), - anyObject())).andReturn(TASK.getAssignedTask()); - } - - private void expectNoUpdateReservations(int offers) { - expect(updateAgentReserver.hasReservations(anyObject())).andReturn(false); - for (int i = 0; i < offers; i++) { - expect(updateAgentReserver.getReservations(anyString())).andReturn(ImmutableSet.of()); - } - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/test/java/org/apache/aurora/scheduler/stats/AsyncStatsModuleTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/stats/AsyncStatsModuleTest.java b/src/test/java/org/apache/aurora/scheduler/stats/AsyncStatsModuleTest.java index f8e8023..dfcbb4a 100644 --- a/src/test/java/org/apache/aurora/scheduler/stats/AsyncStatsModuleTest.java +++ b/src/test/java/org/apache/aurora/scheduler/stats/AsyncStatsModuleTest.java @@ -45,7 +45,7 @@ public class AsyncStatsModuleTest extends EasyMockTest { @Test public void testOfferAdapter() { OfferManager offerManager = createMock(OfferManager.class); - expect(offerManager.getOffers()).andReturn(ImmutableList.of( + expect(offerManager.getAll()).andReturn(ImmutableList.of( new HostOffer(Protos.Offer.newBuilder() .setId(Protos.OfferID.newBuilder().setValue("offerId")) .setFrameworkId(Protos.FrameworkID.newBuilder().setValue("frameworkId"))