Repository: aurora
Updated Branches:
  refs/heads/master 21af250c9 -> 80139da46


http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/test/java/org/apache/aurora/scheduler/offers/OfferManagerImplTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/aurora/scheduler/offers/OfferManagerImplTest.java 
b/src/test/java/org/apache/aurora/scheduler/offers/OfferManagerImplTest.java
index 6b18296..ff80baa 100644
--- a/src/test/java/org/apache/aurora/scheduler/offers/OfferManagerImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/offers/OfferManagerImplTest.java
@@ -21,6 +21,7 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 
+import org.apache.aurora.common.collections.Pair;
 import org.apache.aurora.common.quantity.Amount;
 import org.apache.aurora.common.quantity.Time;
 import org.apache.aurora.common.testing.easymock.EasyMockTest;
@@ -32,9 +33,12 @@ import org.apache.aurora.scheduler.base.TaskGroupKey;
 import org.apache.aurora.scheduler.base.Tasks;
 import org.apache.aurora.scheduler.events.PubsubEvent.DriverDisconnected;
 import org.apache.aurora.scheduler.events.PubsubEvent.HostAttributesChanged;
+import org.apache.aurora.scheduler.filter.SchedulingFilter;
+import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest;
+import org.apache.aurora.scheduler.filter.SchedulingFilter.UnusedResource;
 import org.apache.aurora.scheduler.mesos.Driver;
 import org.apache.aurora.scheduler.offers.Deferment.Noop;
-import org.apache.aurora.scheduler.offers.OfferManager.OfferManagerImpl;
+import org.apache.aurora.scheduler.resources.ResourceBag;
 import org.apache.aurora.scheduler.resources.ResourceType;
 import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
 import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
@@ -54,11 +58,13 @@ import static 
org.apache.aurora.gen.MaintenanceMode.DRAINING;
 import static org.apache.aurora.gen.MaintenanceMode.NONE;
 import static org.apache.aurora.scheduler.base.TaskTestUtil.JOB;
 import static org.apache.aurora.scheduler.base.TaskTestUtil.makeTask;
-import static 
org.apache.aurora.scheduler.offers.OfferManager.OfferManagerImpl.GLOBALLY_BANNED_OFFERS;
-import static 
org.apache.aurora.scheduler.offers.OfferManager.OfferManagerImpl.OFFER_ACCEPT_RACES;
-import static 
org.apache.aurora.scheduler.offers.OfferManager.OfferManagerImpl.OFFER_CANCEL_FAILURES;
-import static 
org.apache.aurora.scheduler.offers.OfferManager.OfferManagerImpl.OUTSTANDING_OFFERS;
-import static 
org.apache.aurora.scheduler.offers.OfferManager.OfferManagerImpl.STATICALLY_BANNED_OFFERS;
+import static org.apache.aurora.scheduler.filter.AttributeAggregate.empty;
+import static 
org.apache.aurora.scheduler.offers.OfferManagerImpl.GLOBALLY_BANNED_OFFERS;
+import static 
org.apache.aurora.scheduler.offers.OfferManagerImpl.OFFER_ACCEPT_RACES;
+import static 
org.apache.aurora.scheduler.offers.OfferManagerImpl.OFFER_CANCEL_FAILURES;
+import static 
org.apache.aurora.scheduler.offers.OfferManagerImpl.OUTSTANDING_OFFERS;
+import static 
org.apache.aurora.scheduler.offers.OfferManagerImpl.STATICALLY_BANNED_OFFERS;
+import static 
org.apache.aurora.scheduler.offers.OfferManagerImpl.VETO_EVALUATED_OFFERS;
 import static 
org.apache.aurora.scheduler.resources.ResourceTestUtil.mesosRange;
 import static 
org.apache.aurora.scheduler.resources.ResourceTestUtil.mesosScalar;
 import static org.apache.aurora.scheduler.resources.ResourceTestUtil.offer;
@@ -66,6 +72,8 @@ import static 
org.apache.aurora.scheduler.resources.ResourceType.CPUS;
 import static org.apache.aurora.scheduler.resources.ResourceType.DISK_MB;
 import static org.apache.aurora.scheduler.resources.ResourceType.PORTS;
 import static org.apache.aurora.scheduler.resources.ResourceType.RAM_MB;
+import static org.easymock.EasyMock.anyObject;
+import static org.easymock.EasyMock.expect;
 import static org.easymock.EasyMock.expectLastCall;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
@@ -89,21 +97,25 @@ public class OfferManagerImplTest extends EasyMockTest {
   private static final String HOST_C = "HOST_C";
   private static final HostOffer OFFER_C = new HostOffer(
       Offers.makeOffer("OFFER_C", HOST_C),
-      IHostAttributes.build(new HostAttributes().setMode(NONE)));
+      IHostAttributes.build(new 
HostAttributes().setMode(NONE).setHost(HOST_C)));
   private static final int PORT = 1000;
   private static final Protos.Offer MESOS_OFFER = offer(mesosRange(PORTS, 
PORT));
   private static final IScheduledTask TASK = makeTask("id", JOB);
+  private static final ResourceRequest EMPTY_REQUEST = new ResourceRequest(
+      TASK.getAssignedTask().getTask(),
+      ResourceBag.EMPTY,
+      empty());
   private static final TaskGroupKey GROUP_KEY = 
TaskGroupKey.from(TASK.getAssignedTask().getTask());
   private static final TaskInfo TASK_INFO = TaskInfo.newBuilder()
       .setName("taskName")
       .setTaskId(Protos.TaskID.newBuilder().setValue(Tasks.id(TASK)))
       .setAgentId(MESOS_OFFER.getAgentId())
       .build();
-  private static Operation launch = Operation.newBuilder()
+  private static final Operation LAUNCH = Operation.newBuilder()
       .setType(Operation.Type.LAUNCH)
       .setLaunch(Operation.Launch.newBuilder().addTaskInfos(TASK_INFO))
       .build();
-  private static final List<Operation> OPERATIONS = ImmutableList.of(launch);
+  private static final List<Operation> OPERATIONS = ImmutableList.of(LAUNCH);
   private static final long OFFER_FILTER_SECONDS = 0;
   private static final Filters OFFER_FILTER = Filters.newBuilder()
       .setRefuseSeconds(OFFER_FILTER_SECONDS)
@@ -113,6 +125,7 @@ public class OfferManagerImplTest extends EasyMockTest {
   private Driver driver;
   private OfferManagerImpl offerManager;
   private FakeStatsProvider statsProvider;
+  private SchedulingFilter schedulingFilter;
 
   @Before
   public void setUp() {
@@ -125,7 +138,13 @@ public class OfferManagerImplTest extends EasyMockTest {
         FAKE_TICKER
     );
     statsProvider = new FakeStatsProvider();
-    offerManager = new OfferManagerImpl(driver, offerSettings, statsProvider, 
new Noop());
+    schedulingFilter = createMock(SchedulingFilter.class);
+
+    offerManager = new OfferManagerImpl(driver,
+        offerSettings,
+        statsProvider,
+        new Noop(),
+        schedulingFilter);
   }
 
   @Test
@@ -136,11 +155,11 @@ public class OfferManagerImplTest extends EasyMockTest {
 
     control.replay();
 
-    offerManager.addOffer(hostOfferB);
-    offerManager.addOffer(OFFER_A);
-    offerManager.addOffer(hostOfferC);
+    offerManager.add(hostOfferB);
+    offerManager.add(OFFER_A);
+    offerManager.add(hostOfferC);
 
-    List<HostOffer> actual = ImmutableList.copyOf(offerManager.getOffers());
+    List<HostOffer> actual = ImmutableList.copyOf(offerManager.getAll());
 
     assertEquals(
         // hostOfferC has a further away start time, so it should be preferred.
@@ -160,15 +179,15 @@ public class OfferManagerImplTest extends EasyMockTest {
 
     control.replay();
 
-    offerManager.addOffer(offerA);
+    offerManager.add(offerA);
     assertEquals(1, statsProvider.getLongValue(OUTSTANDING_OFFERS));
-    offerManager.addOffer(OFFER_B);
+    offerManager.add(OFFER_B);
     assertEquals(2, statsProvider.getLongValue(OUTSTANDING_OFFERS));
-    offerManager.addOffer(offerC);
+    offerManager.add(offerC);
     assertEquals(3, statsProvider.getLongValue(OUTSTANDING_OFFERS));
     assertEquals(
         ImmutableSet.of(OFFER_B, offerA, offerC),
-        ImmutableSet.copyOf(offerManager.getOffers()));
+        ImmutableSet.copyOf(offerManager.getAll()));
     offerManager.launchTask(OFFER_B.getOffer().getId(), TASK_INFO);
     assertEquals(2, statsProvider.getLongValue(OUTSTANDING_OFFERS));
   }
@@ -179,19 +198,19 @@ public class OfferManagerImplTest extends EasyMockTest {
 
     offerManager.hostAttributesChanged(new 
HostAttributesChanged(HOST_ATTRIBUTES_A));
 
-    offerManager.addOffer(OFFER_A);
-    offerManager.addOffer(OFFER_B);
-    assertEquals(ImmutableSet.of(OFFER_A, OFFER_B), 
ImmutableSet.copyOf(offerManager.getOffers()));
+    offerManager.add(OFFER_A);
+    offerManager.add(OFFER_B);
+    assertEquals(ImmutableSet.of(OFFER_A, OFFER_B), 
ImmutableSet.copyOf(offerManager.getAll()));
 
     HostOffer offerA = setMode(OFFER_A, DRAINING);
     offerManager.hostAttributesChanged(new 
HostAttributesChanged(offerA.getAttributes()));
-    assertEquals(ImmutableSet.of(OFFER_B, offerA), 
ImmutableSet.copyOf(offerManager.getOffers()));
+    assertEquals(ImmutableSet.of(OFFER_B, offerA), 
ImmutableSet.copyOf(offerManager.getAll()));
 
     offerA = setMode(OFFER_A, NONE);
     HostOffer offerB = setMode(OFFER_B, DRAINING);
     offerManager.hostAttributesChanged(new 
HostAttributesChanged(offerA.getAttributes()));
     offerManager.hostAttributesChanged(new 
HostAttributesChanged(offerB.getAttributes()));
-    assertEquals(ImmutableSet.of(OFFER_A, OFFER_B), 
ImmutableSet.copyOf(offerManager.getOffers()));
+    assertEquals(ImmutableSet.of(OFFER_A, OFFER_B), 
ImmutableSet.copyOf(offerManager.getAll()));
   }
 
   @Test
@@ -201,9 +220,9 @@ public class OfferManagerImplTest extends EasyMockTest {
 
     control.replay();
 
-    offerManager.addOffer(OFFER_A);
+    offerManager.add(OFFER_A);
     assertEquals(1, statsProvider.getLongValue(OUTSTANDING_OFFERS));
-    offerManager.addOffer(OFFER_A);
+    offerManager.add(OFFER_A);
     assertEquals(0, statsProvider.getLongValue(OUTSTANDING_OFFERS));
   }
 
@@ -211,74 +230,83 @@ public class OfferManagerImplTest extends EasyMockTest {
   public void testGetOffersReturnsAllOffers() {
     control.replay();
 
-    offerManager.addOffer(OFFER_A);
-    assertEquals(OFFER_A, Iterables.getOnlyElement(offerManager.getOffers()));
+    offerManager.add(OFFER_A);
+    assertEquals(OFFER_A, Iterables.getOnlyElement(offerManager.getAll()));
     assertEquals(1, statsProvider.getLongValue(OUTSTANDING_OFFERS));
 
-    offerManager.cancelOffer(OFFER_A_ID);
+    offerManager.cancel(OFFER_A_ID);
     assertEquals(0, statsProvider.getLongValue(OFFER_CANCEL_FAILURES));
-    assertTrue(Iterables.isEmpty(offerManager.getOffers()));
+    assertTrue(Iterables.isEmpty(offerManager.getAll()));
     assertEquals(0, statsProvider.getLongValue(OUTSTANDING_OFFERS));
   }
 
   @Test
   public void testOfferFilteringDueToStaticBan() {
+    expectFilterNone();
+
     control.replay();
 
     // Static ban ignored when now offers.
-    offerManager.banOfferForTaskGroup(OFFER_A_ID, GROUP_KEY);
+    offerManager.banForTaskGroup(OFFER_A_ID, GROUP_KEY);
     assertEquals(0, statsProvider.getLongValue(STATICALLY_BANNED_OFFERS));
-    offerManager.addOffer(OFFER_A);
-    assertEquals(OFFER_A, 
Iterables.getOnlyElement(offerManager.getOffers(GROUP_KEY)));
-    assertEquals(OFFER_A, Iterables.getOnlyElement(offerManager.getOffers()));
+    offerManager.add(OFFER_A);
+    assertEquals(OFFER_A,
+        Iterables.getOnlyElement(offerManager.getAllMatching(GROUP_KEY, 
EMPTY_REQUEST, false)));
+    assertEquals(OFFER_A, Iterables.getOnlyElement(offerManager.getAll()));
 
     // Add static ban.
-    offerManager.banOfferForTaskGroup(OFFER_A_ID, GROUP_KEY);
+    offerManager.banForTaskGroup(OFFER_A_ID, GROUP_KEY);
     assertEquals(1, statsProvider.getLongValue(STATICALLY_BANNED_OFFERS));
-    assertEquals(OFFER_A, Iterables.getOnlyElement(offerManager.getOffers()));
-    assertTrue(Iterables.isEmpty(offerManager.getOffers(GROUP_KEY)));
+    assertEquals(OFFER_A, Iterables.getOnlyElement(offerManager.getAll()));
+    assertTrue(Iterables.isEmpty(offerManager.getAllMatching(GROUP_KEY, 
EMPTY_REQUEST, false)));
   }
 
   @Test
   public void testStaticBanExpiresAfterMaxHoldTime() throws 
InterruptedException {
+    expectFilterNone();
+
     control.replay();
 
-    offerManager.addOffer(OFFER_A);
-    offerManager.banOfferForTaskGroup(OFFER_A_ID, GROUP_KEY);
-    assertEquals(OFFER_A, Iterables.getOnlyElement(offerManager.getOffers()));
-    assertTrue(Iterables.isEmpty(offerManager.getOffers(GROUP_KEY)));
+    offerManager.add(OFFER_A);
+    offerManager.banForTaskGroup(OFFER_A_ID, GROUP_KEY);
+    assertEquals(OFFER_A, Iterables.getOnlyElement(offerManager.getAll()));
+    assertTrue(Iterables.isEmpty(offerManager.getAllMatching(GROUP_KEY, 
EMPTY_REQUEST, false)));
     assertEquals(1, statsProvider.getLongValue(STATICALLY_BANNED_OFFERS));
 
     // Make sure the static ban expires after maximum amount of time an offer 
is held.
     FAKE_TICKER.advance(RETURN_DELAY);
     offerManager.cleanupStaticBans();
-    assertEquals(OFFER_A, 
Iterables.getOnlyElement(offerManager.getOffers(GROUP_KEY)));
+    assertEquals(OFFER_A,
+        Iterables.getOnlyElement(offerManager.getAllMatching(GROUP_KEY, 
EMPTY_REQUEST, false)));
     assertEquals(0, statsProvider.getLongValue(STATICALLY_BANNED_OFFERS));
   }
 
   @Test
   public void testStaticBanIsClearedOnDriverDisconnect() {
+    expectFilterNone();
+
     control.replay();
 
-    offerManager.addOffer(OFFER_A);
-    offerManager.banOfferForTaskGroup(OFFER_A_ID, GROUP_KEY);
-    assertEquals(OFFER_A, Iterables.getOnlyElement(offerManager.getOffers()));
-    assertTrue(Iterables.isEmpty(offerManager.getOffers(GROUP_KEY)));
+    offerManager.add(OFFER_A);
+    offerManager.banForTaskGroup(OFFER_A_ID, GROUP_KEY);
+    assertEquals(OFFER_A, Iterables.getOnlyElement(offerManager.getAll()));
+    assertTrue(Iterables.isEmpty(offerManager.getAllMatching(GROUP_KEY, 
EMPTY_REQUEST, false)));
     assertEquals(1, statsProvider.getLongValue(STATICALLY_BANNED_OFFERS));
 
     // Make sure the static ban is cleared when driver is disconnected.
     offerManager.driverDisconnected(new DriverDisconnected());
     assertEquals(0, statsProvider.getLongValue(STATICALLY_BANNED_OFFERS));
-    offerManager.addOffer(OFFER_A);
-    assertEquals(OFFER_A, 
Iterables.getOnlyElement(offerManager.getOffers(GROUP_KEY)));
+    offerManager.add(OFFER_A);
+    assertEquals(OFFER_A,
+        Iterables.getOnlyElement(offerManager.getAllMatching(GROUP_KEY, 
EMPTY_REQUEST, false)));
   }
 
   @Test
   public void testGetOffer() {
     control.replay();
 
-    offerManager.addOffer(OFFER_A);
-    assertEquals(Optional.of(OFFER_A), 
offerManager.getOffer(OFFER_A.getOffer().getAgentId()));
+    offerManager.add(OFFER_A);
+    assertEquals(Optional.of(OFFER_A), 
offerManager.get(OFFER_A.getOffer().getAgentId()));
     assertEquals(1, statsProvider.getLongValue(OUTSTANDING_OFFERS));
   }
 
@@ -289,7 +317,7 @@ public class OfferManagerImplTest extends EasyMockTest {
 
     control.replay();
 
-    offerManager.addOffer(OFFER_A);
+    offerManager.add(OFFER_A);
     offerManager.launchTask(OFFER_A_ID, TASK_INFO);
   }
 
@@ -308,8 +336,8 @@ public class OfferManagerImplTest extends EasyMockTest {
   public void testFlushOffers() {
     control.replay();
 
-    offerManager.addOffer(OFFER_A);
-    offerManager.addOffer(OFFER_B);
+    offerManager.add(OFFER_A);
+    offerManager.add(OFFER_B);
     assertEquals(2, statsProvider.getLongValue(OUTSTANDING_OFFERS));
     offerManager.driverDisconnected(new DriverDisconnected());
     assertEquals(0, statsProvider.getLongValue(OUTSTANDING_OFFERS));
@@ -319,26 +347,29 @@ public class OfferManagerImplTest extends EasyMockTest {
   public void testCancelFailure() {
     control.replay();
 
-    offerManager.cancelOffer(OFFER_A.getOffer().getId());
+    offerManager.cancel(OFFER_A.getOffer().getId());
     assertEquals(1, statsProvider.getLongValue(OFFER_CANCEL_FAILURES));
   }
 
   @Test
   public void testBanAndUnbanOffer() {
+    expectFilterNone();
+
     control.replay();
 
     // After adding a banned offer, user can see it is in OUTSTANDING_OFFERS 
but cannot retrieve it.
-    offerManager.banOffer(OFFER_A_ID);
-    offerManager.addOffer(OFFER_A);
+    offerManager.ban(OFFER_A_ID);
+    offerManager.add(OFFER_A);
     assertEquals(1, statsProvider.getLongValue(OUTSTANDING_OFFERS));
     assertEquals(1, statsProvider.getLongValue(GLOBALLY_BANNED_OFFERS));
-    assertTrue(Iterables.isEmpty(offerManager.getOffers(GROUP_KEY)));
+    assertTrue(Iterables.isEmpty(offerManager.getAllMatching(GROUP_KEY, 
EMPTY_REQUEST, false)));
 
-    offerManager.cancelOffer(OFFER_A_ID);
-    offerManager.addOffer(OFFER_A);
+    offerManager.cancel(OFFER_A_ID);
+    offerManager.add(OFFER_A);
     assertEquals(1, statsProvider.getLongValue(OUTSTANDING_OFFERS));
     assertEquals(0, statsProvider.getLongValue(GLOBALLY_BANNED_OFFERS));
-    assertEquals(OFFER_A, 
Iterables.getOnlyElement(offerManager.getOffers(GROUP_KEY)));
+    assertEquals(OFFER_A,
+        Iterables.getOnlyElement(offerManager.getAllMatching(GROUP_KEY, 
EMPTY_REQUEST, false)));
   }
 
   private static HostOffer setUnavailability(HostOffer offer, long startMs) {
@@ -363,7 +394,7 @@ public class OfferManagerImplTest extends EasyMockTest {
             RETURN_DELAY,
             Long.MAX_VALUE,
             FAKE_TICKER);
-    return new OfferManagerImpl(driver, settings, statsProvider, new Noop());
+    return new OfferManagerImpl(driver, settings, statsProvider, new Noop(), 
schedulingFilter);
   }
 
   @Test
@@ -377,25 +408,25 @@ public class OfferManagerImplTest extends EasyMockTest {
             mesosScalar(CPUS, 24.0, true),
             mesosScalar(RAM_MB, 1024)),
         HOST_ATTRIBUTES_A), DRAINING);
-
     HostOffer medium = setMode(new HostOffer(
         offer("host2", mesosScalar(CPUS, 5.0), mesosScalar(RAM_MB, 1024)),
         HOST_ATTRIBUTES_A), DRAINING);
-
     HostOffer large = setMode(new HostOffer(
         offer("host3", mesosScalar(CPUS, 10.0), mesosScalar(RAM_MB, 1024)),
         HOST_ATTRIBUTES_A), DRAINING);
 
+    expectFilterNone();
+
     control.replay();
 
-    cpuManager.addOffer(medium);
-    cpuManager.addOffer(large);
-    cpuManager.addOffer(small);
+    cpuManager.add(medium);
+    cpuManager.add(large);
+    cpuManager.add(small);
 
     assertEquals(ImmutableList.of(small, medium, large),
-        ImmutableList.copyOf(cpuManager.getOffers(GROUP_KEY)));
+        ImmutableList.copyOf(cpuManager.getAllMatching(GROUP_KEY, 
EMPTY_REQUEST, false)));
     assertEquals(ImmutableList.of(small, medium, large),
-        ImmutableList.copyOf(cpuManager.getOffers()));
+        ImmutableList.copyOf(cpuManager.getAll()));
   }
 
   @Test
@@ -410,7 +441,6 @@ public class OfferManagerImplTest extends EasyMockTest {
             mesosScalar(CPUS, 23.0, true),
             mesosScalar(RAM_MB, 1024)),
         HOST_ATTRIBUTES_A), DRAINING);
-
     HostOffer medium = setMode(new HostOffer(
         offer(
             "host1",
@@ -418,21 +448,22 @@ public class OfferManagerImplTest extends EasyMockTest {
             mesosScalar(CPUS, 24.0, true),
             mesosScalar(RAM_MB, 1024)),
         HOST_ATTRIBUTES_A), DRAINING);
-
     HostOffer large = setMode(new HostOffer(
         offer("host3", mesosScalar(CPUS, 1.0), mesosScalar(RAM_MB, 1024)),
         HOST_ATTRIBUTES_A), DRAINING);
 
+    expectFilterNone();
+
     control.replay();
 
-    cpuManager.addOffer(medium);
-    cpuManager.addOffer(large);
-    cpuManager.addOffer(small);
+    cpuManager.add(medium);
+    cpuManager.add(large);
+    cpuManager.add(small);
 
     assertEquals(ImmutableList.of(small, medium, large),
-        ImmutableList.copyOf(cpuManager.getOffers(GROUP_KEY)));
+        ImmutableList.copyOf(cpuManager.getAllMatching(GROUP_KEY, 
EMPTY_REQUEST, true)));
     assertEquals(ImmutableList.of(small, medium, large),
-        ImmutableList.copyOf(cpuManager.getOffers()));
+        ImmutableList.copyOf(cpuManager.getAll()));
   }
 
   @Test
@@ -442,25 +473,25 @@ public class OfferManagerImplTest extends EasyMockTest {
     HostOffer small = setMode(new HostOffer(
         offer("host1", mesosScalar(CPUS, 1), mesosScalar(RAM_MB, 1), 
mesosScalar(DISK_MB, 1.0)),
         HOST_ATTRIBUTES_A), DRAINING);
-
     HostOffer medium = setMode(new HostOffer(
         offer("host2", mesosScalar(CPUS, 1), mesosScalar(RAM_MB, 1), 
mesosScalar(DISK_MB, 5.0)),
         HOST_ATTRIBUTES_A), DRAINING);
-
     HostOffer large = setMode(new HostOffer(
         offer("host3", mesosScalar(CPUS, 1), mesosScalar(RAM_MB, 1), 
mesosScalar(DISK_MB, 10.0)),
         HOST_ATTRIBUTES_A), DRAINING);
 
+    expectFilterNone();
+
     control.replay();
 
-    cpuManager.addOffer(medium);
-    cpuManager.addOffer(large);
-    cpuManager.addOffer(small);
+    cpuManager.add(medium);
+    cpuManager.add(large);
+    cpuManager.add(small);
 
     assertEquals(ImmutableList.of(small, medium, large),
-        ImmutableList.copyOf(cpuManager.getOffers(GROUP_KEY)));
+        ImmutableList.copyOf(cpuManager.getAllMatching(GROUP_KEY, 
EMPTY_REQUEST, false)));
     assertEquals(ImmutableList.of(small, medium, large),
-        ImmutableList.copyOf(cpuManager.getOffers()));
+        ImmutableList.copyOf(cpuManager.getAll()));
   }
 
   @Test
@@ -470,25 +501,25 @@ public class OfferManagerImplTest extends EasyMockTest {
     HostOffer small = setMode(new HostOffer(
         offer("host1", mesosScalar(CPUS, 10), mesosScalar(RAM_MB, 1.0)),
         HOST_ATTRIBUTES_A), DRAINING);
-
     HostOffer medium = setMode(new HostOffer(
         offer("host2", mesosScalar(CPUS, 10), mesosScalar(RAM_MB, 5.0)),
         HOST_ATTRIBUTES_A), DRAINING);
-
     HostOffer large = setMode(new HostOffer(
         offer("host3", mesosScalar(CPUS, 10), mesosScalar(RAM_MB, 10.0)),
         HOST_ATTRIBUTES_A), DRAINING);
 
+    expectFilterNone();
+
     control.replay();
 
-    cpuManager.addOffer(medium);
-    cpuManager.addOffer(large);
-    cpuManager.addOffer(small);
+    cpuManager.add(medium);
+    cpuManager.add(large);
+    cpuManager.add(small);
 
     assertEquals(ImmutableList.of(small, medium, large),
-        ImmutableList.copyOf(cpuManager.getOffers(GROUP_KEY)));
+        ImmutableList.copyOf(cpuManager.getAllMatching(GROUP_KEY, 
EMPTY_REQUEST, false)));
     assertEquals(ImmutableList.of(small, medium, large),
-        ImmutableList.copyOf(cpuManager.getOffers()));
+        ImmutableList.copyOf(cpuManager.getAll()));
   }
 
   @Test
@@ -502,14 +533,12 @@ public class OfferManagerImplTest extends EasyMockTest {
             mesosScalar(RAM_MB, 2.0),
             mesosScalar(DISK_MB, 3.0)),
         HOST_ATTRIBUTES_A), DRAINING);
-
     HostOffer medium = setMode(new HostOffer(
         offer("host2",
             mesosScalar(CPUS, 1.0),
             mesosScalar(RAM_MB, 3.0),
             mesosScalar(DISK_MB, 2.0)),
         HOST_ATTRIBUTES_A), DRAINING);
-
     HostOffer large = setMode(new HostOffer(
         offer("host3",
             mesosScalar(CPUS, 10.0),
@@ -518,16 +547,18 @@ public class OfferManagerImplTest extends EasyMockTest {
             mesosScalar(DISK_MB, 1.0)),
         HOST_ATTRIBUTES_A), DRAINING);
 
+    expectFilterNone();
+
     control.replay();
 
-    cpuManager.addOffer(large);
-    cpuManager.addOffer(medium);
-    cpuManager.addOffer(small);
+    cpuManager.add(large);
+    cpuManager.add(medium);
+    cpuManager.add(small);
 
     assertEquals(ImmutableList.of(small, medium, large),
-        ImmutableList.copyOf(cpuManager.getOffers(GROUP_KEY)));
+        ImmutableList.copyOf(cpuManager.getAllMatching(GROUP_KEY, 
EMPTY_REQUEST, false)));
     assertEquals(ImmutableList.of(small, medium, large),
-        ImmutableList.copyOf(cpuManager.getOffers()));
+        ImmutableList.copyOf(cpuManager.getAll()));
   }
 
   @Test
@@ -545,13 +576,14 @@ public class OfferManagerImplTest extends EasyMockTest {
         driver,
         settings,
         statsProvider,
-        new Deferment.DelayedDeferment(() -> RETURN_DELAY, executorMock));
+        new Deferment.DelayedDeferment(() -> RETURN_DELAY, executorMock),
+        schedulingFilter);
 
     driver.declineOffer(OFFER_A_ID, OFFER_FILTER);
 
     control.replay();
 
-    offerManager.addOffer(OFFER_A);
+    offerManager.add(OFFER_A);
     assertEquals(1, statsProvider.getLongValue(OUTSTANDING_OFFERS));
 
     clock.advance(RETURN_DELAY);
@@ -575,11 +607,154 @@ public class OfferManagerImplTest extends EasyMockTest {
 
     control.replay();
 
-    offerManager.banOffer(OFFER_A_ID);
-    offerManager.addOffer(OFFER_A);
-    offerManager.addOffer(sameAgent);
-    offerManager.cancelOffer(OFFER_A_ID);
-    offerManager.addOffer(sameAgent2);
-    assertEquals(ImmutableSet.of(sameAgent2), offerManager.getOffers());
+    offerManager.ban(OFFER_A_ID);
+    offerManager.add(OFFER_A);
+    offerManager.add(sameAgent);
+    offerManager.cancel(OFFER_A_ID);
+    offerManager.add(sameAgent2);
+    assertEquals(ImmutableSet.of(sameAgent2), offerManager.getAll());
+  }
+
+  private void expectFilterNone() {
+    // Most tests will use a permissive scheduling filter
+    expect(schedulingFilter.filter(anyObject(), anyObject()))
+        .andReturn(ImmutableSet.of())
+        .anyTimes();
+  }
+
+  @Test
+  public void testGetMatchingSingleAgent() {
+    expectFilterNone();
+
+    control.replay();
+    offerManager.add(OFFER_A);
+    assertEquals(Optional.of(OFFER_A),
+        offerManager.getMatching(OFFER_A.getOffer().getAgentId(), 
EMPTY_REQUEST, false));
+  }
+
+  @Test
+  public void testGetMatchingNoGloballyBanned() {
+    expectFilterNone();
+
+    control.replay();
+    offerManager.add(OFFER_A);
+    assertEquals(0, statsProvider.getLongValue(GLOBALLY_BANNED_OFFERS));
+    offerManager.ban(OFFER_A_ID);
+    assertEquals(Optional.absent(),
+        offerManager.getMatching(OFFER_A.getOffer().getAgentId(), 
EMPTY_REQUEST, false));
+    assertEquals(1, statsProvider.getLongValue(GLOBALLY_BANNED_OFFERS));
+  }
+
+  @Test
+  public void testGetMatchingNoVetoed() {
+    // Calling getMatching when a veto is present should return an empty 
option. Additionally,
+    // it should not statically ban the offer if it is vetoed.
+    expect(schedulingFilter.filter(new UnusedResource(OFFER_A, false), 
EMPTY_REQUEST))
+        
.andReturn(ImmutableSet.of(SchedulingFilter.Veto.dedicatedHostConstraintMismatch()));
+
+    control.replay();
+    offerManager.add(OFFER_A);
+    assertEquals(0, statsProvider.getLongValue(STATICALLY_BANNED_OFFERS));
+    assertEquals(Optional.absent(),
+        offerManager.getMatching(OFFER_A.getOffer().getAgentId(), 
EMPTY_REQUEST, false));
+    assertEquals(0, statsProvider.getLongValue(STATICALLY_BANNED_OFFERS));
+  }
+
+  @Test
+  public void testGetAllMatching() {
+    expectFilterNone();
+
+    control.replay();
+    offerManager.add(OFFER_A);
+    offerManager.add(OFFER_B);
+    offerManager.add(OFFER_C);
+    assertEquals(0, statsProvider.getLongValue(VETO_EVALUATED_OFFERS));
+    assertEquals(ImmutableSet.of(OFFER_A, OFFER_B, OFFER_C),
+        ImmutableSet.copyOf(offerManager.getAllMatching(GROUP_KEY, 
EMPTY_REQUEST, false)));
+    assertEquals(3, statsProvider.getLongValue(VETO_EVALUATED_OFFERS));
+  }
+
+  @Test
+  public void testGetAllMatchingNoGloballyBanned() {
+    expectFilterNone();
+
+    control.replay();
+    offerManager.add(OFFER_A);
+    offerManager.add(OFFER_B);
+    offerManager.add(OFFER_C);
+    assertEquals(0, statsProvider.getLongValue(VETO_EVALUATED_OFFERS));
+    assertEquals(0, statsProvider.getLongValue(GLOBALLY_BANNED_OFFERS));
+    offerManager.ban(OFFER_B.getOffer().getId());
+    assertEquals(ImmutableSet.of(OFFER_A, OFFER_C),
+        ImmutableSet.copyOf(offerManager.getAllMatching(GROUP_KEY, 
EMPTY_REQUEST, false)));
+    assertEquals(2, statsProvider.getLongValue(VETO_EVALUATED_OFFERS));
+    assertEquals(1, statsProvider.getLongValue(GLOBALLY_BANNED_OFFERS));
+  }
+
+  @Test
+  public void testGetAllMatchingNoStaticallyBanned() {
+    expectFilterNone();
+
+    control.replay();
+    offerManager.add(OFFER_A);
+    offerManager.add(OFFER_B);
+    offerManager.add(OFFER_C);
+    assertEquals(0, statsProvider.getLongValue(VETO_EVALUATED_OFFERS));
+    assertEquals(0, statsProvider.getLongValue(STATICALLY_BANNED_OFFERS));
+    offerManager.banForTaskGroup(OFFER_B.getOffer().getId(), GROUP_KEY);
+    assertEquals(ImmutableSet.of(OFFER_A, OFFER_C),
+        ImmutableSet.copyOf(offerManager.getAllMatching(GROUP_KEY, 
EMPTY_REQUEST, false)));
+    assertEquals(2, statsProvider.getLongValue(VETO_EVALUATED_OFFERS));
+    assertEquals(1, statsProvider.getLongValue(STATICALLY_BANNED_OFFERS));
+    assertEquals(ImmutableSet.of(Pair.of(OFFER_B.getOffer().getId(), 
GROUP_KEY)),
+        offerManager.getStaticBans());
+  }
+
+  @Test
+  public void testGetAllMatchingIgnoreNoCpusAndMem() {
+    expectFilterNone();
+
+    HostOffer empty = setMode(new HostOffer(
+        offer("host1",
+            mesosScalar(CPUS, 0),
+            mesosScalar(RAM_MB, 0),
+            mesosScalar(DISK_MB, 3.0)),
+        HOST_ATTRIBUTES_A), NONE);
+
+    control.replay();
+    offerManager.add(empty);
+    offerManager.add(OFFER_A);
+    assertEquals(0, statsProvider.getLongValue(GLOBALLY_BANNED_OFFERS));
+    assertEquals(0, statsProvider.getLongValue(STATICALLY_BANNED_OFFERS));
+    assertEquals(0, statsProvider.getLongValue(VETO_EVALUATED_OFFERS));
+    assertEquals(ImmutableSet.of(OFFER_A),
+        ImmutableSet.copyOf(offerManager.getAllMatching(GROUP_KEY, 
EMPTY_REQUEST, false)));
+    assertEquals(1, statsProvider.getLongValue(VETO_EVALUATED_OFFERS));
+    assertEquals(ImmutableSet.of(empty, OFFER_A),
+        ImmutableSet.copyOf(offerManager.getAll()));
+  }
+
+  @Test
+  public void testGetAllMatchingNoVetoed() {
+    // Calling getAllMatching should statically ban the offer as well if it is 
statically vetoed
+    expect(schedulingFilter.filter(new UnusedResource(OFFER_A, false), 
EMPTY_REQUEST))
+        
.andReturn(ImmutableSet.of(SchedulingFilter.Veto.dedicatedHostConstraintMismatch()));
+    expect(schedulingFilter.filter(new UnusedResource(OFFER_B, false), 
EMPTY_REQUEST))
+        .andReturn(ImmutableSet.of());
+    expect(schedulingFilter.filter(new UnusedResource(OFFER_C, false), 
EMPTY_REQUEST))
+        
.andReturn(ImmutableSet.of(SchedulingFilter.Veto.unsatisfiedLimit("test_limit")));
+
+    control.replay();
+    offerManager.add(OFFER_A);
+    offerManager.add(OFFER_B);
+    offerManager.add(OFFER_C);
+    assertEquals(0, statsProvider.getLongValue(VETO_EVALUATED_OFFERS));
+    assertEquals(0, statsProvider.getLongValue(STATICALLY_BANNED_OFFERS));
+    assertEquals(ImmutableSet.of(OFFER_B),
+        ImmutableSet.copyOf(offerManager.getAllMatching(GROUP_KEY, 
EMPTY_REQUEST, false)));
+    assertEquals(3, statsProvider.getLongValue(VETO_EVALUATED_OFFERS));
+    assertEquals(1, statsProvider.getLongValue(STATICALLY_BANNED_OFFERS));
+    assertEquals(ImmutableSet.of(Pair.of(OFFER_A.getOffer().getId(), 
GROUP_KEY)),
+        offerManager.getStaticBans());
   }
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/test/java/org/apache/aurora/scheduler/preemptor/PendingTaskProcessorTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/aurora/scheduler/preemptor/PendingTaskProcessorTest.java
 
b/src/test/java/org/apache/aurora/scheduler/preemptor/PendingTaskProcessorTest.java
index c76b3e3..a346e44 100644
--- 
a/src/test/java/org/apache/aurora/scheduler/preemptor/PendingTaskProcessorTest.java
+++ 
b/src/test/java/org/apache/aurora/scheduler/preemptor/PendingTaskProcessorTest.java
@@ -300,7 +300,7 @@ public class PendingTaskProcessorTest extends EasyMockTest {
   }
 
   private void expectOffers(HostOffer... offers) {
-    expect(offerManager.getOffers()).andReturn(ImmutableSet.copyOf(offers));
+    expect(offerManager.getAll()).andReturn(ImmutableSet.copyOf(offers));
   }
 
   private void expectGetClusterState(IScheduledTask... returnedTasks) {

http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptorImplTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptorImplTest.java 
b/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptorImplTest.java
index f14cba1..1061583 100644
--- a/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptorImplTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptorImplTest.java
@@ -82,7 +82,7 @@ public class PreemptorImplTest extends EasyMockTest {
     slotCache = createMock(new Clazz<BiCache<PreemptionProposal, 
TaskGroupKey>>() { });
     statsProvider = new FakeStatsProvider();
     OfferManager offerManager = createMock(OfferManager.class);
-    expect(offerManager.getOffer(anyObject(Protos.AgentID.class)))
+    expect(offerManager.get(anyObject(Protos.AgentID.class)))
         .andReturn(Optional.of(OFFER))
         .anyTimes();
 

http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptorModuleTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptorModuleTest.java 
b/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptorModuleTest.java
index 97c238c..815a3ef 100644
--- 
a/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptorModuleTest.java
+++ 
b/src/test/java/org/apache/aurora/scheduler/preemptor/PreemptorModuleTest.java
@@ -25,8 +25,8 @@ import org.apache.aurora.gen.AssignedTask;
 import org.apache.aurora.scheduler.config.CliOptions;
 import org.apache.aurora.scheduler.filter.AttributeAggregate;
 import org.apache.aurora.scheduler.filter.SchedulingFilter;
+import org.apache.aurora.scheduler.scheduling.TaskAssigner;
 import org.apache.aurora.scheduler.state.StateManager;
-import org.apache.aurora.scheduler.state.TaskAssigner;
 import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
 import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;

http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/test/java/org/apache/aurora/scheduler/scheduling/FirstFitOfferSelectorTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/aurora/scheduler/scheduling/FirstFitOfferSelectorTest.java
 
b/src/test/java/org/apache/aurora/scheduler/scheduling/FirstFitOfferSelectorTest.java
new file mode 100644
index 0000000..627055c
--- /dev/null
+++ 
b/src/test/java/org/apache/aurora/scheduler/scheduling/FirstFitOfferSelectorTest.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.scheduling;
+
+import com.google.common.collect.ImmutableList;
+
+import org.apache.aurora.common.testing.easymock.EasyMockTest;
+import org.apache.aurora.scheduler.HostOffer;
+import org.apache.aurora.scheduler.resources.ResourceBag;
+import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.apache.aurora.scheduler.base.TaskTestUtil.JOB;
+import static org.apache.aurora.scheduler.base.TaskTestUtil.makeTask;
+import static org.apache.aurora.scheduler.filter.AttributeAggregate.empty;
+import static 
org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+public class FirstFitOfferSelectorTest extends EasyMockTest {
+
+  private static final IAssignedTask TASK = makeTask("id", 
JOB).getAssignedTask();
+  private static final ResourceRequest EMPTY_REQUEST = new ResourceRequest(
+      TASK.getTask(),
+      ResourceBag.EMPTY,
+      empty());
+
+  private OfferSelector firstFitOfferSelector;
+
+  @Before
+  public void setUp() {
+    firstFitOfferSelector = new FirstFitOfferSelector();
+  }
+
+  @Test
+  public void testNoOffers() {
+    Iterable<HostOffer> offers = ImmutableList.of();
+
+    control.replay();
+
+    assertFalse(firstFitOfferSelector.select(offers, 
EMPTY_REQUEST).isPresent());
+  }
+
+  @Test
+  public void testReturnFirstOffer() {
+    HostOffer offerA = createMock(HostOffer.class);
+    HostOffer offerB = createMock(HostOffer.class);
+    Iterable<HostOffer> offers = ImmutableList.of(offerA, offerB);
+
+    control.replay();
+
+    assertEquals(offerA, firstFitOfferSelector.select(offers, 
EMPTY_REQUEST).get());
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/test/java/org/apache/aurora/scheduler/scheduling/TaskAssignerImplTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/aurora/scheduler/scheduling/TaskAssignerImplTest.java
 
b/src/test/java/org/apache/aurora/scheduler/scheduling/TaskAssignerImplTest.java
new file mode 100644
index 0000000..e094950
--- /dev/null
+++ 
b/src/test/java/org/apache/aurora/scheduler/scheduling/TaskAssignerImplTest.java
@@ -0,0 +1,374 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.scheduling;
+
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+
+import org.apache.aurora.common.testing.easymock.EasyMockTest;
+import org.apache.aurora.gen.AssignedTask;
+import org.apache.aurora.gen.Attribute;
+import org.apache.aurora.gen.HostAttributes;
+import org.apache.aurora.gen.JobKey;
+import org.apache.aurora.gen.TaskConfig;
+import org.apache.aurora.scheduler.HostOffer;
+import org.apache.aurora.scheduler.TierManager;
+import org.apache.aurora.scheduler.base.InstanceKeys;
+import org.apache.aurora.scheduler.base.TaskGroupKey;
+import org.apache.aurora.scheduler.base.Tasks;
+import org.apache.aurora.scheduler.filter.AttributeAggregate;
+import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest;
+import org.apache.aurora.scheduler.mesos.MesosTaskFactory;
+import org.apache.aurora.scheduler.offers.OfferManager;
+import org.apache.aurora.scheduler.resources.ResourceBag;
+import org.apache.aurora.scheduler.state.StateChangeResult;
+import org.apache.aurora.scheduler.state.StateManager;
+import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
+import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
+import org.apache.aurora.scheduler.storage.entities.IInstanceKey;
+import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
+import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
+import org.apache.aurora.scheduler.testing.FakeStatsProvider;
+import org.apache.aurora.scheduler.updater.UpdateAgentReserver;
+import org.apache.mesos.v1.Protos.AgentID;
+import org.apache.mesos.v1.Protos.FrameworkID;
+import org.apache.mesos.v1.Protos.OfferID;
+import org.apache.mesos.v1.Protos.TaskID;
+import org.apache.mesos.v1.Protos.TaskInfo;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.apache.aurora.gen.ScheduleStatus.LOST;
+import static org.apache.aurora.gen.ScheduleStatus.PENDING;
+import static org.apache.aurora.scheduler.base.TaskTestUtil.DEV_TIER;
+import static org.apache.aurora.scheduler.base.TaskTestUtil.JOB;
+import static org.apache.aurora.scheduler.base.TaskTestUtil.makeTask;
+import static org.apache.aurora.scheduler.filter.AttributeAggregate.empty;
+import static 
org.apache.aurora.scheduler.resources.ResourceTestUtil.mesosRange;
+import static 
org.apache.aurora.scheduler.resources.ResourceTestUtil.mesosScalar;
+import static org.apache.aurora.scheduler.resources.ResourceTestUtil.offer;
+import static org.apache.aurora.scheduler.resources.ResourceType.CPUS;
+import static org.apache.aurora.scheduler.resources.ResourceType.PORTS;
+import static org.apache.aurora.scheduler.resources.ResourceType.RAM_MB;
+import static 
org.apache.aurora.scheduler.scheduling.TaskAssignerImpl.ASSIGNER_LAUNCH_FAILURES;
+import static 
org.apache.aurora.scheduler.scheduling.TaskAssignerImpl.LAUNCH_FAILED_MSG;
+import static org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
+import static org.apache.mesos.v1.Protos.Offer;
+import static org.easymock.EasyMock.anyObject;
+import static org.easymock.EasyMock.anyString;
+import static org.easymock.EasyMock.eq;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.expectLastCall;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+
+public class TaskAssignerImplTest extends EasyMockTest {
+
+  private static final int PORT = 1000;
+  private static final Offer MESOS_OFFER =
+      offer(mesosScalar(CPUS, 1), mesosScalar(RAM_MB, 1024), mesosRange(PORTS, 
PORT));
+  private static final String SLAVE_ID = MESOS_OFFER.getAgentId().getValue();
+  private static final HostOffer OFFER =
+      new HostOffer(MESOS_OFFER, IHostAttributes.build(new HostAttributes()
+          .setHost(MESOS_OFFER.getHostname())
+          .setAttributes(ImmutableSet.of(
+              new Attribute("host", 
ImmutableSet.of(MESOS_OFFER.getHostname()))))));
+  private static final IScheduledTask TASK = makeTask("id", JOB);
+  private static final TaskGroupKey GROUP_KEY = 
TaskGroupKey.from(TASK.getAssignedTask().getTask());
+  private static final TaskInfo TASK_INFO = TaskInfo.newBuilder()
+      .setName("taskName")
+      .setTaskId(TaskID.newBuilder().setValue(Tasks.id(TASK)))
+      .setAgentId(MESOS_OFFER.getAgentId())
+      .build();
+  private static final IInstanceKey INSTANCE_KEY =
+      InstanceKeys.from(JOB, TASK.getAssignedTask().getInstanceId());
+  private static final Map<String, TaskGroupKey> NO_RESERVATION = 
ImmutableMap.of();
+  private static final HostOffer OFFER_2 = new HostOffer(
+      Offer.newBuilder()
+          .setId(OfferID.newBuilder().setValue("offerId0"))
+          .setFrameworkId(FrameworkID.newBuilder().setValue("frameworkId"))
+          .setAgentId(AgentID.newBuilder().setValue("slaveId0"))
+          .setHostname("hostName0")
+          .addResources(mesosRange(PORTS, PORT))
+          .addResources(mesosScalar(CPUS, 1))
+          .addResources(mesosScalar(RAM_MB, 1024))
+          .build(),
+      IHostAttributes.build(new HostAttributes()));
+
+  private static final Set<String> NO_ASSIGNMENT = ImmutableSet.of();
+
+  private AttributeAggregate aggregate;
+  private ResourceRequest resourceRequest;
+
+  private MutableStoreProvider storeProvider;
+  private StateManager stateManager;
+  private MesosTaskFactory taskFactory;
+  private OfferManager offerManager;
+  private TaskAssignerImpl assigner;
+  private TierManager tierManager;
+  private FakeStatsProvider statsProvider;
+  private UpdateAgentReserver updateAgentReserver;
+
+  @Before
+  public void setUp() throws Exception {
+    storeProvider = createMock(MutableStoreProvider.class);
+    taskFactory = createMock(MesosTaskFactory.class);
+    stateManager = createMock(StateManager.class);
+    offerManager = createMock(OfferManager.class);
+    tierManager = createMock(TierManager.class);
+    updateAgentReserver = createMock(UpdateAgentReserver.class);
+    statsProvider = new FakeStatsProvider();
+    // TODO(jly): FirstFitOfferSelector returns the first offer which is what 
we want for testing,
+    // but if its implementation becomes more complex we may need to replace 
it with a fake.
+    OfferSelector offerSelector = new FirstFitOfferSelector();
+    assigner = new TaskAssignerImpl(
+        stateManager,
+        taskFactory,
+        offerManager,
+        tierManager,
+        updateAgentReserver,
+        statsProvider,
+        offerSelector);
+    aggregate = empty();
+    resourceRequest = new ResourceRequest(
+        TASK.getAssignedTask().getTask(),
+        ResourceBag.EMPTY,
+        aggregate);
+  }
+
+  @Test
+  public void testAssignNoTasks() throws Exception {
+    control.replay();
+
+    assertEquals(
+        NO_ASSIGNMENT,
+        assigner.maybeAssign(storeProvider, null, null, ImmutableSet.of(), 
null));
+  }
+
+  @Test
+  public void testAssignmentClearedOnError() throws Exception {
+    expectNoUpdateReservations(1);
+    expect(offerManager.getAllMatching(GROUP_KEY, resourceRequest, false))
+        .andReturn(ImmutableSet.of(OFFER, OFFER_2));
+    offerManager.launchTask(MESOS_OFFER.getId(), TASK_INFO);
+    expectLastCall().andThrow(new OfferManager.LaunchException("expected"));
+    
expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEV_TIER);
+    expectAssignTask(MESOS_OFFER);
+    expect(stateManager.changeState(
+        storeProvider,
+        Tasks.id(TASK),
+        Optional.of(PENDING),
+        LOST,
+        LAUNCH_FAILED_MSG))
+        .andReturn(StateChangeResult.SUCCESS);
+    expect(taskFactory.createFrom(TASK.getAssignedTask(), MESOS_OFFER, false))
+        .andReturn(TASK_INFO);
+
+    control.replay();
+
+    assertEquals(0L, statsProvider.getLongValue(ASSIGNER_LAUNCH_FAILURES));
+    // Ensures scheduling loop terminates on the first launch failure.
+    assertEquals(
+        NO_ASSIGNMENT,
+        assigner.maybeAssign(
+            storeProvider,
+            resourceRequest,
+            TaskGroupKey.from(TASK.getAssignedTask().getTask()),
+            ImmutableSet.of(
+                TASK.getAssignedTask(),
+                makeTask("id2", JOB).getAssignedTask(),
+                makeTask("id3", JOB).getAssignedTask()),
+            NO_RESERVATION));
+    assertEquals(1L, statsProvider.getLongValue(ASSIGNER_LAUNCH_FAILURES));
+  }
+
+  @Test
+  public void testAssignmentSkippedForReservedSlave() throws Exception {
+    expectNoUpdateReservations(0);
+    
expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEV_TIER);
+    expect(offerManager.getAllMatching(GROUP_KEY, resourceRequest, false))
+        .andReturn(ImmutableSet.of(OFFER));
+
+    control.replay();
+
+    assertEquals(
+        NO_ASSIGNMENT,
+        assigner.maybeAssign(
+            storeProvider,
+            resourceRequest,
+            TaskGroupKey.from(TASK.getAssignedTask().getTask()),
+            ImmutableSet.of(TASK.getAssignedTask()),
+            ImmutableMap.of(SLAVE_ID, TaskGroupKey.from(
+                ITaskConfig.build(new TaskConfig().setJob(new JobKey("other", 
"e", "n")))))));
+  }
+
+  @Test
+  public void testTaskWithReservedSlaveLandsElsewhere() throws Exception {
+    // Ensures slave/task reservation relationship is only enforced in 
slave->task direction
+    // and permissive in task->slave direction. In other words, a task with a 
slave reservation
+    // should still be tried against other unreserved slaves.
+    expectNoUpdateReservations(1);
+    expect(offerManager.getAllMatching(GROUP_KEY, resourceRequest, false))
+        .andReturn(ImmutableSet.of(OFFER_2, OFFER));
+    
expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEV_TIER);
+    expectAssignTask(OFFER_2.getOffer());
+    expect(taskFactory.createFrom(TASK.getAssignedTask(), OFFER_2.getOffer(), 
false))
+        .andReturn(TASK_INFO);
+    offerManager.launchTask(OFFER_2.getOffer().getId(), TASK_INFO);
+
+    control.replay();
+
+    assertEquals(
+        ImmutableSet.of(Tasks.id(TASK)),
+        assigner.maybeAssign(
+            storeProvider,
+            resourceRequest,
+            TaskGroupKey.from(TASK.getAssignedTask().getTask()),
+            ImmutableSet.of(TASK.getAssignedTask()),
+            ImmutableMap.of(SLAVE_ID, GROUP_KEY)));
+  }
+
+  @Test
+  public void testResourceMapperCallback() {
+    AssignedTask builder = TASK.newBuilder().getAssignedTask();
+    builder.unsetAssignedPorts();
+
+    control.replay();
+
+    assertEquals(
+        TASK.getAssignedTask(),
+        assigner.mapAndAssignResources(MESOS_OFFER, 
IAssignedTask.build(builder)));
+  }
+
+  @Test
+  public void testAssignToReservedAgent() throws Exception {
+    expect(updateAgentReserver.hasReservations(GROUP_KEY)).andReturn(true);
+    
expect(updateAgentReserver.getAgent(INSTANCE_KEY)).andReturn(Optional.of(SLAVE_ID));
+    updateAgentReserver.release(SLAVE_ID, INSTANCE_KEY);
+    expect(offerManager.getMatching(MESOS_OFFER.getAgentId(), resourceRequest, 
false))
+        .andReturn(Optional.of(OFFER));
+    expectAssignTask(MESOS_OFFER);
+    offerManager.launchTask(MESOS_OFFER.getId(), TASK_INFO);
+    
expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEV_TIER);
+
+    expect(taskFactory.createFrom(TASK.getAssignedTask(), MESOS_OFFER, false))
+        .andReturn(TASK_INFO);
+
+    control.replay();
+
+    assertEquals(
+        ImmutableSet.of(Tasks.id(TASK)),
+        assigner.maybeAssign(
+            storeProvider,
+            resourceRequest,
+            TaskGroupKey.from(TASK.getAssignedTask().getTask()),
+            ImmutableSet.of(
+                TASK.getAssignedTask()),
+            ImmutableMap.of(SLAVE_ID, GROUP_KEY)));
+    assertNotEquals(empty(), aggregate);
+  }
+
+  @Test
+  public void testAssignReservedAgentWhenOfferNotReady() throws Exception {
+    expect(updateAgentReserver.hasReservations(GROUP_KEY)).andReturn(true);
+    
expect(updateAgentReserver.getAgent(INSTANCE_KEY)).andReturn(Optional.of(SLAVE_ID));
+    expect(offerManager.getMatching(MESOS_OFFER.getAgentId(), resourceRequest, 
false))
+        .andReturn(Optional.absent());
+    
expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEV_TIER);
+    expectLastCall();
+
+    control.replay();
+
+    assertEquals(
+        ImmutableSet.of(),
+        assigner.maybeAssign(
+            storeProvider,
+            resourceRequest,
+            TaskGroupKey.from(TASK.getAssignedTask().getTask()),
+            ImmutableSet.of(TASK.getAssignedTask()),
+            ImmutableMap.of(SLAVE_ID, GROUP_KEY)));
+    assertEquals(empty(), aggregate);
+  }
+
+  @Test
+  public void testAssignWithMixOfReservedAndNotReserved() throws Exception {
+    
expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEV_TIER);
+
+    expect(updateAgentReserver.hasReservations(GROUP_KEY)).andReturn(true);
+    
expect(updateAgentReserver.getAgent(INSTANCE_KEY)).andReturn(Optional.of(SLAVE_ID));
+    updateAgentReserver.release(SLAVE_ID, INSTANCE_KEY);
+    expect(offerManager.getMatching(MESOS_OFFER.getAgentId(), resourceRequest, 
false))
+        .andReturn(Optional.of(OFFER));
+    expectAssignTask(MESOS_OFFER);
+    offerManager.launchTask(MESOS_OFFER.getId(), TASK_INFO);
+    expect(taskFactory.createFrom(TASK.getAssignedTask(), MESOS_OFFER, false))
+        .andReturn(TASK_INFO);
+
+    // Normal scheduling loop for the remaining task...
+    IScheduledTask secondTask = makeTask("another-task", JOB, 9999);
+    TaskInfo secondTaskInfo = TaskInfo.newBuilder()
+        .setName("another-task")
+        .setTaskId(TaskID.newBuilder().setValue(Tasks.id(secondTask)))
+        .setAgentId(MESOS_OFFER.getAgentId())
+        .build();
+    expect(updateAgentReserver.getAgent(InstanceKeys.from(JOB, 
9999))).andReturn(Optional.absent());
+    ImmutableSet<HostOffer> matchingOffers = ImmutableSet.of(OFFER);
+    expect(offerManager.getAllMatching(GROUP_KEY, resourceRequest, false))
+        .andReturn(matchingOffers);
+    
expect(updateAgentReserver.getReservations(OFFER.getOffer().getAgentId().getValue()))
+        .andReturn(ImmutableSet.of());
+    expectAssignTask(MESOS_OFFER, secondTask);
+    offerManager.launchTask(MESOS_OFFER.getId(), secondTaskInfo);
+    expect(taskFactory.createFrom(secondTask.getAssignedTask(), MESOS_OFFER, 
false))
+        .andReturn(secondTaskInfo);
+
+    control.replay();
+
+    assertEquals(
+        Tasks.ids(TASK, secondTask),
+        assigner.maybeAssign(
+            storeProvider,
+            resourceRequest,
+            GROUP_KEY,
+            ImmutableSet.of(
+                TASK.getAssignedTask(),
+                secondTask.getAssignedTask()),
+            ImmutableMap.of(SLAVE_ID, GROUP_KEY)));
+    assertNotEquals(empty(), aggregate);
+  }
+
+  private void expectAssignTask(Offer offer) {
+    expectAssignTask(offer, TASK);
+  }
+
+  private void expectAssignTask(Offer offer, IScheduledTask task) {
+    expect(stateManager.assignTask(
+        eq(storeProvider),
+        eq(Tasks.id(task)),
+        eq(offer.getHostname()),
+        eq(offer.getAgentId()),
+        anyObject())).andReturn(task.getAssignedTask());
+  }
+
+  private void expectNoUpdateReservations(int offers) {
+    expect(updateAgentReserver.hasReservations(anyObject())).andReturn(false);
+    for (int i = 0; i < offers; i++) {
+      
expect(updateAgentReserver.getReservations(anyString())).andReturn(ImmutableSet.of());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/test/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImplTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImplTest.java
 
b/src/test/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImplTest.java
index 7a4525a..260ff9d 100644
--- 
a/src/test/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImplTest.java
+++ 
b/src/test/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImplTest.java
@@ -46,9 +46,7 @@ import org.apache.aurora.scheduler.preemptor.BiCache;
 import org.apache.aurora.scheduler.preemptor.Preemptor;
 import org.apache.aurora.scheduler.resources.ResourceBag;
 import org.apache.aurora.scheduler.resources.ResourceManager;
-import org.apache.aurora.scheduler.scheduling.TaskScheduler.TaskSchedulerImpl;
 import org.apache.aurora.scheduler.state.PubsubTestUtil;
-import org.apache.aurora.scheduler.state.TaskAssigner;
 import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult;
 import org.apache.aurora.scheduler.storage.entities.IJobKey;

http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/test/java/org/apache/aurora/scheduler/state/FirstFitTaskAssignerTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/aurora/scheduler/state/FirstFitTaskAssignerTest.java 
b/src/test/java/org/apache/aurora/scheduler/state/FirstFitTaskAssignerTest.java
deleted file mode 100644
index 58f9de2..0000000
--- 
a/src/test/java/org/apache/aurora/scheduler/state/FirstFitTaskAssignerTest.java
+++ /dev/null
@@ -1,539 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.state;
-
-import java.util.Map;
-import java.util.Set;
-
-import com.google.common.base.Optional;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
-
-import org.apache.aurora.common.testing.easymock.EasyMockTest;
-import org.apache.aurora.gen.AssignedTask;
-import org.apache.aurora.gen.Attribute;
-import org.apache.aurora.gen.HostAttributes;
-import org.apache.aurora.gen.JobKey;
-import org.apache.aurora.gen.TaskConfig;
-import org.apache.aurora.scheduler.HostOffer;
-import org.apache.aurora.scheduler.TierManager;
-import org.apache.aurora.scheduler.base.InstanceKeys;
-import org.apache.aurora.scheduler.base.TaskGroupKey;
-import org.apache.aurora.scheduler.base.Tasks;
-import org.apache.aurora.scheduler.filter.AttributeAggregate;
-import org.apache.aurora.scheduler.filter.SchedulingFilter;
-import org.apache.aurora.scheduler.filter.SchedulingFilter.ResourceRequest;
-import org.apache.aurora.scheduler.filter.SchedulingFilter.UnusedResource;
-import org.apache.aurora.scheduler.filter.SchedulingFilter.Veto;
-import org.apache.aurora.scheduler.mesos.MesosTaskFactory;
-import org.apache.aurora.scheduler.offers.OfferManager;
-import org.apache.aurora.scheduler.resources.ResourceBag;
-import org.apache.aurora.scheduler.state.TaskAssigner.FirstFitTaskAssigner;
-import org.apache.aurora.scheduler.storage.entities.IAssignedTask;
-import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
-import org.apache.aurora.scheduler.storage.entities.IInstanceKey;
-import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
-import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
-import org.apache.aurora.scheduler.testing.FakeStatsProvider;
-import org.apache.aurora.scheduler.updater.UpdateAgentReserver;
-import org.apache.mesos.v1.Protos.AgentID;
-import org.apache.mesos.v1.Protos.FrameworkID;
-import org.apache.mesos.v1.Protos.OfferID;
-import org.apache.mesos.v1.Protos.TaskID;
-import org.apache.mesos.v1.Protos.TaskInfo;
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.apache.aurora.gen.ScheduleStatus.LOST;
-import static org.apache.aurora.gen.ScheduleStatus.PENDING;
-import static org.apache.aurora.scheduler.base.TaskTestUtil.DEV_TIER;
-import static org.apache.aurora.scheduler.base.TaskTestUtil.JOB;
-import static org.apache.aurora.scheduler.base.TaskTestUtil.makeTask;
-import static org.apache.aurora.scheduler.filter.AttributeAggregate.empty;
-import static 
org.apache.aurora.scheduler.resources.ResourceManager.bagFromMesosResources;
-import static 
org.apache.aurora.scheduler.resources.ResourceTestUtil.mesosRange;
-import static 
org.apache.aurora.scheduler.resources.ResourceTestUtil.mesosScalar;
-import static org.apache.aurora.scheduler.resources.ResourceTestUtil.offer;
-import static org.apache.aurora.scheduler.resources.ResourceType.CPUS;
-import static org.apache.aurora.scheduler.resources.ResourceType.PORTS;
-import static org.apache.aurora.scheduler.resources.ResourceType.RAM_MB;
-import static 
org.apache.aurora.scheduler.state.TaskAssigner.FirstFitTaskAssigner.ASSIGNER_EVALUATED_OFFERS;
-import static 
org.apache.aurora.scheduler.state.TaskAssigner.FirstFitTaskAssigner.ASSIGNER_LAUNCH_FAILURES;
-import static 
org.apache.aurora.scheduler.state.TaskAssigner.FirstFitTaskAssigner.LAUNCH_FAILED_MSG;
-import static org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
-import static org.apache.mesos.v1.Protos.Offer;
-import static org.easymock.EasyMock.anyObject;
-import static org.easymock.EasyMock.anyString;
-import static org.easymock.EasyMock.eq;
-import static org.easymock.EasyMock.expect;
-import static org.easymock.EasyMock.expectLastCall;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotEquals;
-
-public class FirstFitTaskAssignerTest extends EasyMockTest {
-
-  private static final int PORT = 1000;
-  private static final Offer MESOS_OFFER =
-      offer(mesosScalar(CPUS, 1), mesosScalar(RAM_MB, 1024), mesosRange(PORTS, 
PORT));
-  private static final String SLAVE_ID = MESOS_OFFER.getAgentId().getValue();
-  private static final HostOffer OFFER =
-      new HostOffer(MESOS_OFFER, IHostAttributes.build(new HostAttributes()
-          .setHost(MESOS_OFFER.getHostname())
-          .setAttributes(ImmutableSet.of(
-              new Attribute("host", 
ImmutableSet.of(MESOS_OFFER.getHostname()))))));
-  private static final IScheduledTask TASK = makeTask("id", JOB);
-  private static final TaskGroupKey GROUP_KEY = 
TaskGroupKey.from(TASK.getAssignedTask().getTask());
-  private static final TaskInfo TASK_INFO = TaskInfo.newBuilder()
-      .setName("taskName")
-      .setTaskId(TaskID.newBuilder().setValue(Tasks.id(TASK)))
-      .setAgentId(MESOS_OFFER.getAgentId())
-      .build();
-  private static final IInstanceKey INSTANCE_KEY =
-      InstanceKeys.from(JOB, TASK.getAssignedTask().getInstanceId());
-  private static final Map<String, TaskGroupKey> NO_RESERVATION = 
ImmutableMap.of();
-  private static final UnusedResource UNUSED = new UnusedResource(
-      bagFromMesosResources(MESOS_OFFER.getResourcesList()),
-      OFFER.getAttributes());
-  private static final HostOffer OFFER_2 = new HostOffer(
-      Offer.newBuilder()
-          .setId(OfferID.newBuilder().setValue("offerId0"))
-          .setFrameworkId(FrameworkID.newBuilder().setValue("frameworkId"))
-          .setAgentId(AgentID.newBuilder().setValue("slaveId0"))
-          .setHostname("hostName0")
-          .addResources(mesosRange(PORTS, PORT))
-          .addResources(mesosScalar(CPUS, 1))
-          .addResources(mesosScalar(RAM_MB, 1024))
-          .build(),
-      IHostAttributes.build(new HostAttributes()));
-
-  private static final Set<String> NO_ASSIGNMENT = ImmutableSet.of();
-
-  private ResourceRequest resourceRequest;
-
-  private MutableStoreProvider storeProvider;
-  private StateManager stateManager;
-  private SchedulingFilter filter;
-  private MesosTaskFactory taskFactory;
-  private OfferManager offerManager;
-  private FirstFitTaskAssigner assigner;
-  private TierManager tierManager;
-  private FakeStatsProvider statsProvider;
-  private UpdateAgentReserver updateAgentReserver;
-
-  @Before
-  public void setUp() throws Exception {
-    storeProvider = createMock(MutableStoreProvider.class);
-    filter = createMock(SchedulingFilter.class);
-    taskFactory = createMock(MesosTaskFactory.class);
-    stateManager = createMock(StateManager.class);
-    offerManager = createMock(OfferManager.class);
-    tierManager = createMock(TierManager.class);
-    updateAgentReserver = createMock(UpdateAgentReserver.class);
-    statsProvider = new FakeStatsProvider();
-    assigner = new FirstFitTaskAssigner(
-        stateManager,
-        filter,
-        taskFactory,
-        offerManager,
-        tierManager,
-        updateAgentReserver,
-        statsProvider);
-    resourceRequest = new ResourceRequest(
-        TASK.getAssignedTask().getTask(),
-        ResourceBag.EMPTY,
-        empty());
-  }
-
-  @Test
-  public void testAssignNoTasks() throws Exception {
-    control.replay();
-
-    assertEquals(
-        NO_ASSIGNMENT,
-        assigner.maybeAssign(storeProvider, null, null, ImmutableSet.of(), 
null));
-  }
-
-  @Test
-  public void testAssignPartialNoVetoes() throws Exception {
-    expectNoUpdateReservations(1);
-    
expect(offerManager.getOffers(GROUP_KEY)).andReturn(ImmutableSet.of(OFFER));
-    offerManager.launchTask(MESOS_OFFER.getId(), TASK_INFO);
-    
expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEV_TIER);
-    expect(filter.filter(UNUSED, 
resourceRequest)).andReturn(ImmutableSet.of());
-    expectAssignTask(MESOS_OFFER);
-    expect(taskFactory.createFrom(TASK.getAssignedTask(), MESOS_OFFER, false))
-        .andReturn(TASK_INFO);
-
-    control.replay();
-
-    AttributeAggregate aggregate = empty();
-    assertEquals(0L, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS));
-    assertEquals(
-        ImmutableSet.of(Tasks.id(TASK)),
-        assigner.maybeAssign(
-            storeProvider,
-            new ResourceRequest(TASK.getAssignedTask().getTask(), 
ResourceBag.EMPTY, aggregate),
-            TaskGroupKey.from(TASK.getAssignedTask().getTask()),
-            ImmutableSet.of(
-                TASK.getAssignedTask(),
-                makeTask("id2", JOB).getAssignedTask(),
-                makeTask("id3", JOB).getAssignedTask()),
-            ImmutableMap.of(SLAVE_ID, GROUP_KEY)));
-    assertNotEquals(empty(), aggregate);
-    assertEquals(1L, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS));
-  }
-
-  @Test
-  public void testAssignVetoesWithStaticBan() throws Exception {
-    expectNoUpdateReservations(1);
-    
expect(offerManager.getOffers(GROUP_KEY)).andReturn(ImmutableSet.of(OFFER));
-    offerManager.banOfferForTaskGroup(MESOS_OFFER.getId(), GROUP_KEY);
-    
expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEV_TIER);
-    expect(filter.filter(UNUSED, resourceRequest))
-        .andReturn(ImmutableSet.of(Veto.constraintMismatch("denied")));
-
-    control.replay();
-
-    assertEquals(0L, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS));
-    assertEquals(
-        NO_ASSIGNMENT,
-        assigner.maybeAssign(
-            storeProvider,
-            resourceRequest,
-            TaskGroupKey.from(TASK.getAssignedTask().getTask()),
-            ImmutableSet.of(TASK.getAssignedTask()),
-            NO_RESERVATION));
-    assertEquals(1L, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS));
-  }
-
-  @Test
-  public void testAssignVetoesWithNoStaticBan() throws Exception {
-    expectNoUpdateReservations(1);
-    
expect(offerManager.getOffers(GROUP_KEY)).andReturn(ImmutableSet.of(OFFER));
-    
expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEV_TIER);
-    expect(filter.filter(UNUSED, resourceRequest))
-        .andReturn(ImmutableSet.of(Veto.unsatisfiedLimit("limit")));
-
-    control.replay();
-
-    assertEquals(0L, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS));
-    assertEquals(
-        NO_ASSIGNMENT,
-        assigner.maybeAssign(
-            storeProvider,
-            resourceRequest,
-            TaskGroupKey.from(TASK.getAssignedTask().getTask()),
-            ImmutableSet.of(TASK.getAssignedTask()),
-            NO_RESERVATION));
-    assertEquals(1L, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS));
-  }
-
-  @Test
-  public void testAssignmentClearedOnError() throws Exception {
-    expectNoUpdateReservations(1);
-    expect(offerManager.getOffers(GROUP_KEY)).andReturn(ImmutableSet.of(OFFER, 
OFFER_2));
-    offerManager.launchTask(MESOS_OFFER.getId(), TASK_INFO);
-    expectLastCall().andThrow(new OfferManager.LaunchException("expected"));
-    
expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEV_TIER);
-    expect(filter.filter(UNUSED, 
resourceRequest)).andReturn(ImmutableSet.of());
-    expectAssignTask(MESOS_OFFER);
-    expect(stateManager.changeState(
-        storeProvider,
-        Tasks.id(TASK),
-        Optional.of(PENDING),
-        LOST,
-        LAUNCH_FAILED_MSG))
-        .andReturn(StateChangeResult.SUCCESS);
-    expect(taskFactory.createFrom(TASK.getAssignedTask(), MESOS_OFFER, false))
-        .andReturn(TASK_INFO);
-
-    control.replay();
-
-    assertEquals(0L, statsProvider.getLongValue(ASSIGNER_LAUNCH_FAILURES));
-    assertEquals(0L, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS));
-    // Ensures scheduling loop terminates on the first launch failure.
-    assertEquals(
-        NO_ASSIGNMENT,
-        assigner.maybeAssign(
-            storeProvider,
-            resourceRequest,
-            TaskGroupKey.from(TASK.getAssignedTask().getTask()),
-            ImmutableSet.of(
-                TASK.getAssignedTask(),
-                makeTask("id2", JOB).getAssignedTask(),
-                makeTask("id3", JOB).getAssignedTask()),
-            NO_RESERVATION));
-    assertEquals(1L, statsProvider.getLongValue(ASSIGNER_LAUNCH_FAILURES));
-    assertEquals(1L, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS));
-  }
-
-  @Test
-  public void testAssignmentSkippedForReservedSlave() throws Exception {
-    expectNoUpdateReservations(0);
-    
expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEV_TIER);
-    
expect(offerManager.getOffers(GROUP_KEY)).andReturn(ImmutableSet.of(OFFER));
-
-    control.replay();
-
-    assertEquals(0L, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS));
-    assertEquals(
-        NO_ASSIGNMENT,
-        assigner.maybeAssign(
-            storeProvider,
-            resourceRequest,
-            TaskGroupKey.from(TASK.getAssignedTask().getTask()),
-            ImmutableSet.of(TASK.getAssignedTask()),
-            ImmutableMap.of(SLAVE_ID, TaskGroupKey.from(
-                ITaskConfig.build(new TaskConfig().setJob(new JobKey("other", 
"e", "n")))))));
-    assertEquals(0, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS));
-  }
-
-  @Test
-  public void testTaskWithReservedSlaveLandsElsewhere() throws Exception {
-    // Ensures slave/task reservation relationship is only enforced in 
slave->task direction
-    // and permissive in task->slave direction. In other words, a task with a 
slave reservation
-    // should still be tried against other unreserved slaves.
-    expectNoUpdateReservations(1);
-    
expect(offerManager.getOffers(GROUP_KEY)).andReturn(ImmutableSet.of(OFFER_2, 
OFFER));
-    
expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEV_TIER);
-    expect(filter.filter(
-        new UnusedResource(
-            bagFromMesosResources(OFFER_2.getOffer().getResourcesList()),
-            OFFER_2.getAttributes()),
-        resourceRequest)).andReturn(ImmutableSet.of());
-    expectAssignTask(OFFER_2.getOffer());
-    expect(taskFactory.createFrom(TASK.getAssignedTask(), OFFER_2.getOffer(), 
false))
-        .andReturn(TASK_INFO);
-    offerManager.launchTask(OFFER_2.getOffer().getId(), TASK_INFO);
-
-    control.replay();
-
-    assertEquals(0L, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS));
-    assertEquals(
-        ImmutableSet.of(Tasks.id(TASK)),
-        assigner.maybeAssign(
-            storeProvider,
-            resourceRequest,
-            TaskGroupKey.from(TASK.getAssignedTask().getTask()),
-            ImmutableSet.of(TASK.getAssignedTask()),
-            ImmutableMap.of(SLAVE_ID, GROUP_KEY)));
-    assertEquals(1L, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS));
-  }
-
-  @Test
-  public void testAssignerDoesNotReturnOnFirstMismatch() throws Exception {
-    // Ensures scheduling loop does not terminate prematurely when the first 
mismatch is identified.
-    HostOffer mismatched = new HostOffer(
-        Offer.newBuilder()
-            .setId(OfferID.newBuilder().setValue("offerId0"))
-            .setFrameworkId(FrameworkID.newBuilder().setValue("frameworkId"))
-            .setAgentId(AgentID.newBuilder().setValue("slaveId0"))
-            .setHostname("hostName0")
-            .addResources(mesosRange(PORTS, PORT))
-            .addResources(mesosScalar(CPUS, 1))
-            .addResources(mesosScalar(RAM_MB, 1024))
-            .build(),
-        IHostAttributes.build(new HostAttributes()));
-
-    expectNoUpdateReservations(2);
-    
expect(offerManager.getOffers(GROUP_KEY)).andReturn(ImmutableSet.of(mismatched, 
OFFER));
-    
expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEV_TIER);
-    expect(filter.filter(
-        new UnusedResource(
-            bagFromMesosResources(mismatched.getOffer().getResourcesList()),
-            mismatched.getAttributes()),
-        resourceRequest))
-        .andReturn(ImmutableSet.of(Veto.constraintMismatch("constraint 
mismatch")));
-    offerManager.banOfferForTaskGroup(mismatched.getOffer().getId(), 
GROUP_KEY);
-    expect(filter.filter(
-        new UnusedResource(
-            bagFromMesosResources(MESOS_OFFER.getResourcesList()), 
OFFER.getAttributes()),
-        resourceRequest))
-        .andReturn(ImmutableSet.of());
-
-    expectAssignTask(MESOS_OFFER);
-    expect(taskFactory.createFrom(TASK.getAssignedTask(), OFFER.getOffer(), 
false))
-        .andReturn(TASK_INFO);
-    offerManager.launchTask(OFFER.getOffer().getId(), TASK_INFO);
-
-    control.replay();
-
-    assertEquals(0L, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS));
-    assertEquals(
-        ImmutableSet.of(Tasks.id(TASK)),
-        assigner.maybeAssign(
-            storeProvider,
-            resourceRequest,
-            TaskGroupKey.from(TASK.getAssignedTask().getTask()),
-            ImmutableSet.of(TASK.getAssignedTask()),
-            ImmutableMap.of(SLAVE_ID, GROUP_KEY)));
-    assertEquals(2L, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS));
-  }
-
-  @Test
-  public void testResourceMapperCallback() {
-    AssignedTask builder = TASK.newBuilder().getAssignedTask();
-    builder.unsetAssignedPorts();
-
-    control.replay();
-
-    assertEquals(
-        TASK.getAssignedTask(),
-        assigner.mapAndAssignResources(MESOS_OFFER, 
IAssignedTask.build(builder)));
-  }
-
-  @Test
-  public void testAssignToReservedAgent() throws Exception {
-    expect(updateAgentReserver.hasReservations(GROUP_KEY)).andReturn(true);
-    
expect(updateAgentReserver.getAgent(INSTANCE_KEY)).andReturn(Optional.of(SLAVE_ID));
-    updateAgentReserver.release(SLAVE_ID, INSTANCE_KEY);
-    
expect(offerManager.getOffer(MESOS_OFFER.getAgentId())).andReturn(Optional.of(OFFER));
-    expect(filter.filter(UNUSED, 
resourceRequest)).andReturn(ImmutableSet.of());
-    expectAssignTask(MESOS_OFFER);
-    offerManager.launchTask(MESOS_OFFER.getId(), TASK_INFO);
-    
expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEV_TIER);
-
-    expect(taskFactory.createFrom(TASK.getAssignedTask(), MESOS_OFFER, false))
-        .andReturn(TASK_INFO);
-
-    control.replay();
-
-    AttributeAggregate aggregate = empty();
-    assertEquals(0L, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS));
-    assertEquals(
-        ImmutableSet.of(Tasks.id(TASK)),
-        assigner.maybeAssign(
-            storeProvider,
-            new ResourceRequest(TASK.getAssignedTask().getTask(), 
ResourceBag.EMPTY, aggregate),
-            TaskGroupKey.from(TASK.getAssignedTask().getTask()),
-            ImmutableSet.of(
-                TASK.getAssignedTask()),
-            ImmutableMap.of(SLAVE_ID, GROUP_KEY)));
-    assertNotEquals(empty(), aggregate);
-    assertEquals(0L, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS));
-  }
-
-  @Test
-  public void testAssignReservedAgentWhenOfferNotReady() throws Exception {
-    expect(updateAgentReserver.hasReservations(GROUP_KEY)).andReturn(true);
-    
expect(updateAgentReserver.getAgent(INSTANCE_KEY)).andReturn(Optional.of(SLAVE_ID));
-    
expect(offerManager.getOffer(MESOS_OFFER.getAgentId())).andReturn(Optional.of(OFFER));
-    expect(filter.filter(UNUSED, resourceRequest))
-        .andReturn(ImmutableSet.of(Veto.insufficientResources("cpu", 1)));
-    
expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEV_TIER);
-    offerManager.banOfferForTaskGroup(MESOS_OFFER.getId(), GROUP_KEY);
-    expectLastCall();
-
-    control.replay();
-
-    AttributeAggregate aggregate = empty();
-    assertEquals(0L, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS));
-    assertEquals(
-        ImmutableSet.of(),
-        assigner.maybeAssign(
-            storeProvider,
-            new ResourceRequest(TASK.getAssignedTask().getTask(), 
ResourceBag.EMPTY, aggregate),
-            TaskGroupKey.from(TASK.getAssignedTask().getTask()),
-            ImmutableSet.of(TASK.getAssignedTask()),
-            ImmutableMap.of(SLAVE_ID, GROUP_KEY)));
-    assertEquals(empty(), aggregate);
-    assertEquals(0L, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS));
-  }
-
-  @Test
-  public void testAssignWithMixOfReservedAndNotReserved() throws Exception {
-    AttributeAggregate aggregate = empty();
-    ResourceRequest resources = new ResourceRequest(
-        TASK.getAssignedTask().getTask(), ResourceBag.EMPTY, aggregate);
-    expect(updateAgentReserver.hasReservations(GROUP_KEY)).andReturn(true);
-    
expect(updateAgentReserver.getAgent(INSTANCE_KEY)).andReturn(Optional.of(SLAVE_ID));
-    updateAgentReserver.release(SLAVE_ID, INSTANCE_KEY);
-    
expect(offerManager.getOffer(MESOS_OFFER.getAgentId())).andReturn(Optional.of(OFFER));
-    expect(filter.filter(UNUSED, 
resourceRequest)).andReturn(ImmutableSet.of());
-    expectAssignTask(MESOS_OFFER);
-    offerManager.launchTask(MESOS_OFFER.getId(), TASK_INFO);
-    
expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEV_TIER);
-
-    expect(taskFactory.createFrom(TASK.getAssignedTask(), MESOS_OFFER, false))
-        .andReturn(TASK_INFO);
-
-    // Normal scheduling loop for the remaining task...
-    expect(updateAgentReserver.getAgent(InstanceKeys.from(JOB, 
9999))).andReturn(Optional.absent());
-    
expect(offerManager.getOffers(GROUP_KEY)).andReturn(ImmutableSet.of(OFFER));
-    
expect(updateAgentReserver.getReservations(OFFER.getOffer().getAgentId().getValue()))
-        .andReturn(ImmutableSet.of());
-    expect(filter.filter(UNUSED, resources))
-        .andReturn(ImmutableSet.of(Veto.constraintMismatch("lol")));
-    offerManager.banOfferForTaskGroup(MESOS_OFFER.getId(), GROUP_KEY);
-
-    control.replay();
-
-    assertEquals(0L, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS));
-    assertEquals(
-        ImmutableSet.of(Tasks.id(TASK)),
-        assigner.maybeAssign(
-            storeProvider,
-            resources,
-            TaskGroupKey.from(TASK.getAssignedTask().getTask()),
-            ImmutableSet.of(
-                TASK.getAssignedTask(),
-                makeTask("another-task", JOB, 9999).getAssignedTask()),
-            ImmutableMap.of(SLAVE_ID, GROUP_KEY)));
-    assertNotEquals(empty(), aggregate);
-    assertEquals(1L, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS));
-  }
-
-  @Test
-  public void testSkipsOffersWithNoMemAndNoCpu() {
-    expectNoUpdateReservations(0);
-    
expect(tierManager.getTier(TASK.getAssignedTask().getTask())).andReturn(DEV_TIER);
-
-    // Offer lacks CPU.
-    Offer mesosOffer = offer(mesosScalar(RAM_MB, 1024), mesosRange(PORTS, 
PORT));
-    HostOffer offer = new HostOffer(mesosOffer, IHostAttributes.build(new 
HostAttributes()
-        .setHost(mesosOffer.getHostname())
-        .setAttributes(ImmutableSet.of(
-            new Attribute("host", 
ImmutableSet.of(mesosOffer.getHostname()))))));
-
-    
expect(offerManager.getOffers(GROUP_KEY)).andReturn(ImmutableSet.of(offer));
-
-    control.replay();
-
-    assertEquals(
-        NO_ASSIGNMENT,
-        assigner.maybeAssign(
-            storeProvider,
-            resourceRequest,
-            TaskGroupKey.from(TASK.getAssignedTask().getTask()),
-            ImmutableSet.of(TASK.getAssignedTask()),
-            NO_RESERVATION));
-    assertEquals(0, statsProvider.getLongValue(ASSIGNER_EVALUATED_OFFERS));
-  }
-
-  private void expectAssignTask(Offer offer) {
-    expect(stateManager.assignTask(
-        eq(storeProvider),
-        eq(Tasks.id(TASK)),
-        eq(offer.getHostname()),
-        eq(offer.getAgentId()),
-        anyObject())).andReturn(TASK.getAssignedTask());
-  }
-
-  private void expectNoUpdateReservations(int offers) {
-    expect(updateAgentReserver.hasReservations(anyObject())).andReturn(false);
-    for (int i = 0; i < offers; i++) {
-      
expect(updateAgentReserver.getReservations(anyString())).andReturn(ImmutableSet.of());
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/80139da4/src/test/java/org/apache/aurora/scheduler/stats/AsyncStatsModuleTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/aurora/scheduler/stats/AsyncStatsModuleTest.java 
b/src/test/java/org/apache/aurora/scheduler/stats/AsyncStatsModuleTest.java
index f8e8023..dfcbb4a 100644
--- a/src/test/java/org/apache/aurora/scheduler/stats/AsyncStatsModuleTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/stats/AsyncStatsModuleTest.java
@@ -45,7 +45,7 @@ public class AsyncStatsModuleTest extends EasyMockTest {
   @Test
   public void testOfferAdapter() {
     OfferManager offerManager = createMock(OfferManager.class);
-    expect(offerManager.getOffers()).andReturn(ImmutableList.of(
+    expect(offerManager.getAll()).andReturn(ImmutableList.of(
         new HostOffer(Protos.Offer.newBuilder()
             .setId(Protos.OfferID.newBuilder().setValue("offerId"))
             
.setFrameworkId(Protos.FrameworkID.newBuilder().setValue("frameworkId"))

Reply via email to