Repository: aurora Updated Branches: refs/heads/master 843289478 -> d3c5ca7cc
switching from launchTask to acceptOffers Bugs closed: AURORA-1776 Reviewed at https://reviews.apache.org/r/52074/ Project: http://git-wip-us.apache.org/repos/asf/aurora/repo Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/d3c5ca7c Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/d3c5ca7c Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/d3c5ca7c Branch: refs/heads/master Commit: d3c5ca7cc5633f264a5c55d16db15b6b785a882d Parents: 8432894 Author: Dmitriy Shirchenko <cald...@gmail.com> Authored: Wed Sep 21 12:42:00 2016 -0700 Committer: Maxim Khutornenko <ma...@apache.org> Committed: Wed Sep 21 12:42:00 2016 -0700 ---------------------------------------------------------------------- .../aurora/benchmark/fakes/FakeDriver.java | 3 +- .../apache/aurora/scheduler/mesos/Driver.java | 8 ++--- .../scheduler/mesos/SchedulerDriverService.java | 9 +++-- .../aurora/scheduler/offers/OfferManager.java | 8 ++++- .../scheduler/offers/OfferManagerImplTest.java | 37 +++++++++++++++----- 5 files changed, 48 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/aurora/blob/d3c5ca7c/src/jmh/java/org/apache/aurora/benchmark/fakes/FakeDriver.java ---------------------------------------------------------------------- diff --git a/src/jmh/java/org/apache/aurora/benchmark/fakes/FakeDriver.java b/src/jmh/java/org/apache/aurora/benchmark/fakes/FakeDriver.java index 1bfd8db..9674c76 100644 --- a/src/jmh/java/org/apache/aurora/benchmark/fakes/FakeDriver.java +++ b/src/jmh/java/org/apache/aurora/benchmark/fakes/FakeDriver.java @@ -27,7 +27,8 @@ public class FakeDriver extends AbstractIdleService implements Driver { } @Override - public void launchTask(Protos.OfferID offerId, Protos.TaskInfo task, Protos.Filters filter) { + public void acceptOffers(Protos.OfferID offerId, Collection<Protos.Offer.Operation> operations, + Protos.Filters filter) { // no-op } http://git-wip-us.apache.org/repos/asf/aurora/blob/d3c5ca7c/src/main/java/org/apache/aurora/scheduler/mesos/Driver.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/mesos/Driver.java b/src/main/java/org/apache/aurora/scheduler/mesos/Driver.java index 1448146..bb208ea 100644 --- a/src/main/java/org/apache/aurora/scheduler/mesos/Driver.java +++ b/src/main/java/org/apache/aurora/scheduler/mesos/Driver.java @@ -18,8 +18,8 @@ import java.util.Collection; import com.google.common.util.concurrent.Service; import org.apache.mesos.Protos.Filters; +import org.apache.mesos.Protos.Offer.Operation; import org.apache.mesos.Protos.OfferID; -import org.apache.mesos.Protos.TaskInfo; import org.apache.mesos.Protos.TaskStatus; /** @@ -33,13 +33,13 @@ import org.apache.mesos.Protos.TaskStatus; public interface Driver extends Service { /** - * Launches a task. + * Performs operations eg launching a task or reserving an offer. * * @param offerId ID of the resource offer to accept with the task. - * @param task Task to launch. + * @param operations Operations to perform on the offer eg reserve offer and launch a task. * @param filter offer filter applied to unused resources in this offer. */ - void launchTask(OfferID offerId, TaskInfo task, Filters filter); + void acceptOffers(OfferID offerId, Collection<Operation> operations, Filters filter); /** * Declines a resource offer. http://git-wip-us.apache.org/repos/asf/aurora/blob/d3c5ca7c/src/main/java/org/apache/aurora/scheduler/mesos/SchedulerDriverService.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/mesos/SchedulerDriverService.java b/src/main/java/org/apache/aurora/scheduler/mesos/SchedulerDriverService.java index 41b9aab..c89be79 100644 --- a/src/main/java/org/apache/aurora/scheduler/mesos/SchedulerDriverService.java +++ b/src/main/java/org/apache/aurora/scheduler/mesos/SchedulerDriverService.java @@ -30,6 +30,7 @@ import org.apache.aurora.scheduler.storage.Storage; import org.apache.mesos.Protos; import org.apache.mesos.Protos.FrameworkID; import org.apache.mesos.Protos.FrameworkInfo; +import org.apache.mesos.Protos.Offer.Operation; import org.apache.mesos.Scheduler; import org.apache.mesos.SchedulerDriver; import org.slf4j.Logger; @@ -116,10 +117,14 @@ class SchedulerDriverService extends AbstractIdleService implements Driver { } @Override - public void launchTask(Protos.OfferID offerId, Protos.TaskInfo task, Protos.Filters filter) { + public void acceptOffers( + Protos.OfferID offerId, + Collection<Operation> operations, + Protos.Filters filter) { ensureRunning(); + Futures.getUnchecked(driverFuture) - .launchTasks(ImmutableList.of(offerId), ImmutableList.of(task), filter); + .acceptOffers(ImmutableList.of(offerId), operations, filter); } @Override http://git-wip-us.apache.org/repos/asf/aurora/blob/d3c5ca7c/src/main/java/org/apache/aurora/scheduler/offers/OfferManager.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/offers/OfferManager.java b/src/main/java/org/apache/aurora/scheduler/offers/OfferManager.java index 925c025..3b56921 100644 --- a/src/main/java/org/apache/aurora/scheduler/offers/OfferManager.java +++ b/src/main/java/org/apache/aurora/scheduler/offers/OfferManager.java @@ -26,6 +26,7 @@ import com.google.common.base.Function; import com.google.common.base.Optional; import com.google.common.collect.FluentIterable; import com.google.common.collect.HashMultimap; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import com.google.common.collect.Maps; @@ -47,6 +48,7 @@ import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber; import org.apache.aurora.scheduler.mesos.Driver; import org.apache.aurora.scheduler.storage.entities.IHostAttributes; import org.apache.mesos.Protos; +import org.apache.mesos.Protos.Offer.Operation; import org.apache.mesos.Protos.OfferID; import org.apache.mesos.Protos.SlaveID; import org.slf4j.Logger; @@ -362,7 +364,11 @@ public interface OfferManager extends EventSubscriber { // which is a feature of ConcurrentSkipListSet. if (hostOffers.remove(offerId)) { try { - driver.launchTask(offerId, task, getOfferFilter()); + Operation launch = Operation.newBuilder() + .setType(Operation.Type.LAUNCH) + .setLaunch(Operation.Launch.newBuilder().addTaskInfos(task)) + .build(); + driver.acceptOffers(offerId, ImmutableList.of(launch), getOfferFilter()); } catch (IllegalStateException e) { // TODO(William Farner): Catch only the checked exception produced by Driver // once it changes from throwing IllegalStateException when the driver is not yet http://git-wip-us.apache.org/repos/asf/aurora/blob/d3c5ca7c/src/test/java/org/apache/aurora/scheduler/offers/OfferManagerImplTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/offers/OfferManagerImplTest.java b/src/test/java/org/apache/aurora/scheduler/offers/OfferManagerImplTest.java index a739bce..5e570b6 100644 --- a/src/test/java/org/apache/aurora/scheduler/offers/OfferManagerImplTest.java +++ b/src/test/java/org/apache/aurora/scheduler/offers/OfferManagerImplTest.java @@ -13,7 +13,10 @@ */ package org.apache.aurora.scheduler.offers; +import java.util.List; + import com.google.common.base.Optional; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; @@ -22,28 +25,33 @@ import org.apache.aurora.common.quantity.Time; import org.apache.aurora.common.stats.StatsProvider; import org.apache.aurora.common.testing.easymock.EasyMockTest; import org.apache.aurora.gen.HostAttributes; -import org.apache.aurora.gen.JobKey; import org.apache.aurora.gen.MaintenanceMode; -import org.apache.aurora.gen.TaskConfig; import org.apache.aurora.scheduler.HostOffer; import org.apache.aurora.scheduler.async.DelayExecutor; import org.apache.aurora.scheduler.base.TaskGroupKey; +import org.apache.aurora.scheduler.base.Tasks; import org.apache.aurora.scheduler.events.PubsubEvent.DriverDisconnected; import org.apache.aurora.scheduler.events.PubsubEvent.HostAttributesChanged; import org.apache.aurora.scheduler.mesos.Driver; import org.apache.aurora.scheduler.offers.OfferManager.OfferManagerImpl; import org.apache.aurora.scheduler.storage.entities.IHostAttributes; -import org.apache.aurora.scheduler.storage.entities.ITaskConfig; +import org.apache.aurora.scheduler.storage.entities.IScheduledTask; import org.apache.aurora.scheduler.testing.FakeScheduledExecutor; import org.apache.aurora.scheduler.testing.FakeStatsProvider; import org.apache.mesos.Protos; import org.apache.mesos.Protos.Filters; +import org.apache.mesos.Protos.Offer.Operation; import org.apache.mesos.Protos.TaskInfo; import org.junit.Before; import org.junit.Test; import static org.apache.aurora.gen.MaintenanceMode.DRAINING; import static org.apache.aurora.gen.MaintenanceMode.NONE; +import static org.apache.aurora.scheduler.base.TaskTestUtil.JOB; +import static org.apache.aurora.scheduler.base.TaskTestUtil.makeTask; +import static org.apache.aurora.scheduler.resources.ResourceTestUtil.mesosRange; +import static org.apache.aurora.scheduler.resources.ResourceTestUtil.offer; +import static org.apache.aurora.scheduler.resources.ResourceType.PORTS; import static org.easymock.EasyMock.expectLastCall; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -66,9 +74,20 @@ public class OfferManagerImplTest extends EasyMockTest { private static final HostOffer OFFER_C = new HostOffer( Offers.makeOffer("OFFER_C", HOST_C), IHostAttributes.build(new HostAttributes().setMode(NONE))); - private static final TaskGroupKey GROUP_KEY = TaskGroupKey.from( - ITaskConfig.build(new TaskConfig().setJob(new JobKey("role", "env", "name")))); - private static final TaskInfo TASK_INFO = TaskInfo.getDefaultInstance(); + private static final int PORT = 1000; + private static final Protos.Offer MESOS_OFFER = offer(mesosRange(PORTS, PORT)); + private static final IScheduledTask TASK = makeTask("id", JOB); + private static final TaskGroupKey GROUP_KEY = TaskGroupKey.from(TASK.getAssignedTask().getTask()); + private static final TaskInfo TASK_INFO = TaskInfo.newBuilder() + .setName("taskName") + .setTaskId(Protos.TaskID.newBuilder().setValue(Tasks.id(TASK))) + .setSlaveId(MESOS_OFFER.getSlaveId()) + .build(); + private static Operation launch = Operation.newBuilder() + .setType(Operation.Type.LAUNCH) + .setLaunch(Operation.Launch.newBuilder().addTaskInfos(TASK_INFO)) + .build(); + private static final List<Operation> OPERATIONS = ImmutableList.of(launch); private static final long OFFER_FILTER_SECONDS = 0L; private static final Filters OFFER_FILTER = Filters.newBuilder() .setRefuseSeconds(OFFER_FILTER_SECONDS) @@ -98,7 +117,7 @@ public class OfferManagerImplTest extends EasyMockTest { HostOffer offerA = setMode(OFFER_A, DRAINING); HostOffer offerC = setMode(OFFER_C, DRAINING); - driver.launchTask(OFFER_B.getOffer().getId(), TASK_INFO, OFFER_FILTER); + driver.acceptOffers(OFFER_B.getOffer().getId(), OPERATIONS, OFFER_FILTER); driver.declineOffer(OFFER_A_ID, OFFER_FILTER); driver.declineOffer(offerC.getOffer().getId(), OFFER_FILTER); @@ -239,8 +258,8 @@ public class OfferManagerImplTest extends EasyMockTest { } @Test(expected = OfferManager.LaunchException.class) - public void testLaunchTaskDriverThrows() throws OfferManager.LaunchException { - driver.launchTask(OFFER_A_ID, TASK_INFO, OFFER_FILTER); + public void testAcceptOffersDriverThrows() throws OfferManager.LaunchException { + driver.acceptOffers(OFFER_A_ID, OPERATIONS, OFFER_FILTER); expectLastCall().andThrow(new IllegalStateException()); control.replay();