[GitHub] sanha commented on a change in pull request #1: [NEMO-26] Implement SourceLocationAwareSchedulingPolicy
sanha commented on a change in pull request #1: [NEMO-26] Implement SourceLocationAwareSchedulingPolicy URL: https://github.com/apache/incubator-nemo/pull/1#discussion_r173065772 ## File path: runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulingPolicy.java ## @@ -17,15 +17,20 @@ import edu.snu.nemo.runtime.common.plan.physical.ScheduledTaskGroup; import edu.snu.nemo.runtime.master.JobStateManager; +import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter; import org.apache.reef.annotations.audience.DriverSide; import org.apache.reef.tang.annotations.DefaultImplementation; +import javax.annotation.concurrent.ThreadSafe; import java.util.Set; /** + * (WARNING) Implementations of this interface must be thread-safe. + * * Defines the policy by which {@link BatchSingleJobScheduler} assigns task groups to executors. */ @DriverSide +@ThreadSafe @DefaultImplementation(RoundRobinSchedulingPolicy.class) Review comment: Let's change the default policy to the source location aware policy. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sanha commented on a change in pull request #1: [NEMO-26] Implement SourceLocationAwareSchedulingPolicy
sanha commented on a change in pull request #1: [NEMO-26] Implement SourceLocationAwareSchedulingPolicy URL: https://github.com/apache/incubator-nemo/pull/1#discussion_r173043365 ## File path: runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/RoundRobinSchedulingPolicy.java ## @@ -70,26 +69,19 @@ */ private final Map> executorIdByContainerType; - /** - * A copy of {@link ContainerManager#executorRepresenterMap}. - * This cached copy is updated when an executor is added or removed. - */ - private final Map executorRepresenterMap; - /** * The index of the next executor to be assigned for each container type. * This map allows the executor index computation of the RR scheduling. */ private final Map nextExecutorIndexByContainerType; @Inject - public RoundRobinSchedulingPolicy(final ContainerManager containerManager, + public RoundRobinSchedulingPolicy(final ExecutorRegistry executorRegistry, Review comment: Can't we change this as private or add `VisibleForTesting` tag? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sanha commented on a change in pull request #1: [NEMO-26] Implement SourceLocationAwareSchedulingPolicy
sanha commented on a change in pull request #1: [NEMO-26] Implement SourceLocationAwareSchedulingPolicy URL: https://github.com/apache/incubator-nemo/pull/1#discussion_r173043365 ## File path: runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/RoundRobinSchedulingPolicy.java ## @@ -70,26 +69,19 @@ */ private final Map> executorIdByContainerType; - /** - * A copy of {@link ContainerManager#executorRepresenterMap}. - * This cached copy is updated when an executor is added or removed. - */ - private final Map executorRepresenterMap; - /** * The index of the next executor to be assigned for each container type. * This map allows the executor index computation of the RR scheduling. */ private final Map nextExecutorIndexByContainerType; @Inject - public RoundRobinSchedulingPolicy(final ContainerManager containerManager, + public RoundRobinSchedulingPolicy(final ExecutorRegistry executorRegistry, Review comment: Can't we change this as private or add @VisibleForTesting tag? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sanha commented on a change in pull request #1: [NEMO-26] Implement SourceLocationAwareSchedulingPolicy
sanha commented on a change in pull request #1: [NEMO-26] Implement SourceLocationAwareSchedulingPolicy URL: https://github.com/apache/incubator-nemo/pull/1#discussion_r173046029 ## File path: runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/RoundRobinSchedulingPolicy.java ## @@ -70,26 +69,19 @@ */ private final Map> executorIdByContainerType; - /** - * A copy of {@link ContainerManager#executorRepresenterMap}. - * This cached copy is updated when an executor is added or removed. - */ - private final Map executorRepresenterMap; - /** * The index of the next executor to be assigned for each container type. * This map allows the executor index computation of the RR scheduling. */ private final Map nextExecutorIndexByContainerType; @Inject - public RoundRobinSchedulingPolicy(final ContainerManager containerManager, + public RoundRobinSchedulingPolicy(final ExecutorRegistry executorRegistry, @Parameter(JobConf.SchedulerTimeoutMs.class) final int scheduleTimeoutMs) { -this.containerManager = containerManager; this.scheduleTimeoutMs = scheduleTimeoutMs; +this.executorRegistry = executorRegistry; this.lock = new ReentrantLock(); this.executorIdByContainerType = new HashMap<>(); -this.executorRepresenterMap = new HashMap<>(); this.conditionByContainerType = new HashMap<>(); Review comment: Can't we have a single condition for every container types like `SourceLocationAwareSchedulingPolicy`? It seems not good to wait specific container type even though another type of container becomes available. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sanha commented on a change in pull request #1: [NEMO-26] Implement SourceLocationAwareSchedulingPolicy
sanha commented on a change in pull request #1: [NEMO-26] Implement SourceLocationAwareSchedulingPolicy URL: https://github.com/apache/incubator-nemo/pull/1#discussion_r173042451 ## File path: tests/src/test/java/edu/snu/nemo/tests/runtime/RuntimeTestUtil.java ## @@ -15,123 +15,94 @@ */ package edu.snu.nemo.tests.runtime; -import edu.snu.nemo.common.dag.DAG; -import edu.snu.nemo.runtime.common.RuntimeIdGenerator; -import edu.snu.nemo.runtime.common.plan.RuntimeEdge; import edu.snu.nemo.runtime.common.plan.physical.*; -import edu.snu.nemo.runtime.common.state.BlockState; import edu.snu.nemo.runtime.common.state.StageState; import edu.snu.nemo.runtime.common.state.TaskGroupState; import edu.snu.nemo.runtime.master.JobStateManager; -import edu.snu.nemo.runtime.master.BlockManagerMaster; -import edu.snu.nemo.runtime.master.resource.ContainerManager; +import edu.snu.nemo.runtime.master.scheduler.ExecutorRegistry; import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter; import edu.snu.nemo.runtime.master.scheduler.PendingTaskGroupQueue; import edu.snu.nemo.runtime.master.scheduler.Scheduler; import edu.snu.nemo.runtime.master.scheduler.SchedulingPolicy; import org.apache.beam.sdk.values.KV; import java.util.*; -import java.util.concurrent.BlockingDeque; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.LinkedBlockingDeque; import java.util.stream.Collectors; import java.util.stream.IntStream; /** * Utility class for runtime unit tests. */ public final class RuntimeTestUtil { - private static ExecutorService completionEventThreadPool; - private static BlockingDeque eventRunnableQueue; - private static boolean testComplete; - - public static void initialize() { -testComplete = false; -completionEventThreadPool = Executors.newFixedThreadPool(5); - -eventRunnableQueue = new LinkedBlockingDeque<>(); - -for (int i = 0; i < 5; i++) { - completionEventThreadPool.execute(() -> { -while (!testComplete || !eventRunnableQueue.isEmpty()) { - try { -final Runnable event = eventRunnableQueue.takeFirst(); -event.run(); - } catch (InterruptedException e) { -e.printStackTrace(); - } -} - }); -} -completionEventThreadPool.shutdown(); - } - - public static void cleanup() { -testComplete = true; - } - /** * Sends a stage's completion event to scheduler, with all its task groups marked as complete as well. * This replaces executor's task group completion messages for testing purposes. * @param jobStateManager for the submitted job. * @param scheduler for the submitted job. - * @param containerManager used for testing purposes. + * @param executorRegistry provides executor representers * @param physicalStage for which the states should be marked as complete. */ public static void sendStageCompletionEventToScheduler(final JobStateManager jobStateManager, final Scheduler scheduler, - final ContainerManager containerManager, + final ExecutorRegistry executorRegistry, final PhysicalStage physicalStage, final int attemptIdx) { -eventRunnableQueue.add(new Runnable() { - @Override - public void run() { -while (jobStateManager.getStageState(physicalStage.getId()).getStateMachine().getCurrentState() -== StageState.State.EXECUTING) { - physicalStage.getTaskGroupIds().forEach(taskGroupId -> { -if (jobStateManager.getTaskGroupState(taskGroupId).getStateMachine().getCurrentState() -== TaskGroupState.State.EXECUTING) { - sendTaskGroupStateEventToScheduler(scheduler, containerManager, taskGroupId, - TaskGroupState.State.COMPLETE, attemptIdx, null); -} - }); -} +// Loop until the stage completes. Review comment: It seems that the function of this method is changed. Please update the method description & name. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sanha commented on a change in pull request #1: [NEMO-26] Implement SourceLocationAwareSchedulingPolicy
sanha commented on a change in pull request #1: [NEMO-26] Implement SourceLocationAwareSchedulingPolicy URL: https://github.com/apache/incubator-nemo/pull/1#discussion_r173042154 ## File path: runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingPolicy.java ## @@ -46,7 +50,14 @@ private final ExecutorRegistry executorRegistry; private final RoundRobinSchedulingPolicy roundRobinSchedulingPolicy; private final long scheduleTimeoutMs; + private final Lock lock = new ReentrantLock(); + private final Condition moreExecutorsAvailableCondition = lock.newCondition(); Review comment: Don't we have to share this lock and condition with `RoundRobinSchedulingPolicy`? Because this policy fall-back to the RRPolicy when it fails to find any source vertex or it's location, some `ScheduledTaskGroup`s can wait for the condition in the RRPolicy. In this case, signaling to this condition will not awake these `ScheduledTaskGroup`s. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sanha commented on a change in pull request #1: [NEMO-26] Implement SourceLocationAwareSchedulingPolicy
sanha commented on a change in pull request #1: [NEMO-26] Implement SourceLocationAwareSchedulingPolicy URL: https://github.com/apache/incubator-nemo/pull/1#discussion_r173045410 ## File path: tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/SourceLocationAwareSchedulingPolicyTest.java ## @@ -0,0 +1,418 @@ +/* + * Copyright (C) 2018 Seoul National University + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package edu.snu.nemo.tests.runtime.master.scheduler; + +import edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorPlacementProperty; +import edu.snu.nemo.runtime.common.plan.physical.ScheduledTaskGroup; +import edu.snu.nemo.runtime.common.state.TaskGroupState; +import edu.snu.nemo.common.ir.Readable; +import edu.snu.nemo.runtime.master.JobStateManager; +import edu.snu.nemo.runtime.master.scheduler.ExecutorRegistry; +import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter; +import edu.snu.nemo.runtime.master.scheduler.RoundRobinSchedulingPolicy; +import edu.snu.nemo.runtime.master.scheduler.SchedulingPolicy; +import edu.snu.nemo.runtime.master.scheduler.SourceLocationAwareSchedulingPolicy; +import org.apache.reef.tang.Injector; +import org.apache.reef.tang.Tang; +import org.apache.reef.tang.exceptions.InjectionException; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import java.util.*; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.*; + +/** + * Test cases for + */ +@RunWith(PowerMockRunner.class) +@PrepareForTest({JobStateManager.class, ExecutorRepresenter.class, RoundRobinSchedulingPolicy.class, +ScheduledTaskGroup.class, Readable.class}) +public final class SourceLocationAwareSchedulingPolicyTest { + private static final String SITE_0 = "SEOUL"; + private static final String SITE_1 = "JINJU"; + private static final String SITE_2 = "BUSAN"; + private static final int SCHEDULER_TIMEOUT_MS = 500; + + private SourceLocationAwareSchedulingPolicy sourceLocationAware; + private SpiedSchedulingPolicyWrapper roundRobin; + private MockJobStateManagerWrapper jobStateManager; + + @Before + public void setup() throws InjectionException { +final Injector injector = Tang.Factory.getTang().newInjector(); +jobStateManager = new MockJobStateManagerWrapper(); + +final ExecutorRegistry executorRegistry = new ExecutorRegistry(); +final RoundRobinSchedulingPolicy roundRobinSchedulingPolicy = +new RoundRobinSchedulingPolicy(executorRegistry, SCHEDULER_TIMEOUT_MS); +roundRobin = new SpiedSchedulingPolicyWrapper(roundRobinSchedulingPolicy, jobStateManager.get()); + +injector.bindVolatileInstance(RoundRobinSchedulingPolicy.class, roundRobin.get()); +injector.bindVolatileInstance(JobStateManager.class, jobStateManager.get()); +injector.bindVolatileInstance(ExecutorRegistry.class, executorRegistry); +sourceLocationAware = injector.getInstance(SourceLocationAwareSchedulingPolicy.class); + } + + @After + public void teardown() { +// All expectations should be resolved at this time. +roundRobin.ensureNoUnresolvedExpectation(); + } + + /** + * {@link SourceLocationAwareSchedulingPolicy} should delegate scheduling decision when the + * {@link ScheduledTaskGroup} does not have any source tasks. + */ + @Test + public void testRoundRobinSchedulerFallback() { +// Prepare test scenario +final ScheduledTaskGroup tg0 = CreateScheduledTaskGroup.withoutReadables(ExecutorPlacementProperty.NONE); +final ScheduledTaskGroup tg1 = CreateScheduledTaskGroup.withReadablesWithoutSourceLocations(2, +ExecutorPlacementProperty.NONE); +final ScheduledTaskGroup tg2 = CreateScheduledTaskGroup.withReadablesWhichThrowException(5, +ExecutorPlacementProperty.NONE); +addExecutor(new MockExecutorRepresenterWrapper(SITE_0, ExecutorPlacementProperty.NONE, 1)); +addExecutor(new MockExecutorRepresenterWrapper(SITE_1, ExecutorPlacementProperty.NONE, 1)); + +// Trying
[GitHub] sanha commented on a change in pull request #1: [NEMO-26] Implement SourceLocationAwareSchedulingPolicy
sanha commented on a change in pull request #1: [NEMO-26] Implement SourceLocationAwareSchedulingPolicy URL: https://github.com/apache/incubator-nemo/pull/1#discussion_r173044113 ## File path: tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/SourceLocationAwareSchedulingPolicyTest.java ## @@ -0,0 +1,418 @@ +/* + * Copyright (C) 2018 Seoul National University + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package edu.snu.nemo.tests.runtime.master.scheduler; + +import edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorPlacementProperty; +import edu.snu.nemo.runtime.common.plan.physical.ScheduledTaskGroup; +import edu.snu.nemo.runtime.common.state.TaskGroupState; +import edu.snu.nemo.common.ir.Readable; +import edu.snu.nemo.runtime.master.JobStateManager; +import edu.snu.nemo.runtime.master.scheduler.ExecutorRegistry; +import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter; +import edu.snu.nemo.runtime.master.scheduler.RoundRobinSchedulingPolicy; +import edu.snu.nemo.runtime.master.scheduler.SchedulingPolicy; +import edu.snu.nemo.runtime.master.scheduler.SourceLocationAwareSchedulingPolicy; +import org.apache.reef.tang.Injector; +import org.apache.reef.tang.Tang; +import org.apache.reef.tang.exceptions.InjectionException; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import java.util.*; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.*; + +/** + * Test cases for + */ +@RunWith(PowerMockRunner.class) +@PrepareForTest({JobStateManager.class, ExecutorRepresenter.class, RoundRobinSchedulingPolicy.class, +ScheduledTaskGroup.class, Readable.class}) +public final class SourceLocationAwareSchedulingPolicyTest { + private static final String SITE_0 = "SEOUL"; + private static final String SITE_1 = "JINJU"; + private static final String SITE_2 = "BUSAN"; + private static final int SCHEDULER_TIMEOUT_MS = 500; + + private SourceLocationAwareSchedulingPolicy sourceLocationAware; + private SpiedSchedulingPolicyWrapper roundRobin; + private MockJobStateManagerWrapper jobStateManager; + + @Before + public void setup() throws InjectionException { +final Injector injector = Tang.Factory.getTang().newInjector(); +jobStateManager = new MockJobStateManagerWrapper(); + +final ExecutorRegistry executorRegistry = new ExecutorRegistry(); +final RoundRobinSchedulingPolicy roundRobinSchedulingPolicy = +new RoundRobinSchedulingPolicy(executorRegistry, SCHEDULER_TIMEOUT_MS); +roundRobin = new SpiedSchedulingPolicyWrapper(roundRobinSchedulingPolicy, jobStateManager.get()); + +injector.bindVolatileInstance(RoundRobinSchedulingPolicy.class, roundRobin.get()); +injector.bindVolatileInstance(JobStateManager.class, jobStateManager.get()); +injector.bindVolatileInstance(ExecutorRegistry.class, executorRegistry); +sourceLocationAware = injector.getInstance(SourceLocationAwareSchedulingPolicy.class); + } + + @After + public void teardown() { +// All expectations should be resolved at this time. +roundRobin.ensureNoUnresolvedExpectation(); + } + + /** + * {@link SourceLocationAwareSchedulingPolicy} should delegate scheduling decision when the + * {@link ScheduledTaskGroup} does not have any source tasks. + */ + @Test + public void testRoundRobinSchedulerFallback() { +// Prepare test scenario +final ScheduledTaskGroup tg0 = CreateScheduledTaskGroup.withoutReadables(ExecutorPlacementProperty.NONE); +final ScheduledTaskGroup tg1 = CreateScheduledTaskGroup.withReadablesWithoutSourceLocations(2, +ExecutorPlacementProperty.NONE); +final ScheduledTaskGroup tg2 = CreateScheduledTaskGroup.withReadablesWhichThrowException(5, +ExecutorPlacementProperty.NONE); +addExecutor(new MockExecutorRepresenterWrapper(SITE_0, ExecutorPlacementProperty.NONE, 1)); +addExecutor(new MockExecutorRepresenterWrapper(SITE_1, ExecutorPlacementProperty.NONE, 1)); + +// Trying
[GitHub] sanha commented on a change in pull request #1: [NEMO-26] Implement SourceLocationAwareSchedulingPolicy
sanha commented on a change in pull request #1: [NEMO-26] Implement SourceLocationAwareSchedulingPolicy URL: https://github.com/apache/incubator-nemo/pull/1#discussion_r173046431 ## File path: runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingPolicy.java ## @@ -0,0 +1,236 @@ +/* + * Copyright (C) 2018 Seoul National University + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package edu.snu.nemo.runtime.master.scheduler; + +import edu.snu.nemo.common.exception.SchedulingException; +import edu.snu.nemo.common.ir.Readable; +import edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorPlacementProperty; +import edu.snu.nemo.runtime.common.plan.physical.ScheduledTaskGroup; +import edu.snu.nemo.runtime.common.state.TaskGroupState; +import edu.snu.nemo.runtime.master.JobStateManager; +import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter; +import org.apache.reef.annotations.audience.DriverSide; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.concurrent.ThreadSafe; +import javax.inject.Inject; +import java.util.*; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * This policy is same as {@link RoundRobinSchedulingPolicy}, however for TaskGroups + * with {@link edu.snu.nemo.common.ir.vertex.SourceVertex}, it tries to pick one of the executors + * where the corresponding data resides. + */ +@ThreadSafe +@DriverSide +public final class SourceLocationAwareSchedulingPolicy implements SchedulingPolicy { + private static final Logger LOG = LoggerFactory.getLogger(SourceLocationAwareSchedulingPolicy.class); + + private final ExecutorRegistry executorRegistry; + private final RoundRobinSchedulingPolicy roundRobinSchedulingPolicy; + private final long scheduleTimeoutMs; + private final Lock lock = new ReentrantLock(); + private final Condition moreExecutorsAvailableCondition = lock.newCondition(); + + /** + * Injectable constructor for {@link SourceLocationAwareSchedulingPolicy}. + * @param executorRegistry provides catalog of available executors + * @param roundRobinSchedulingPolicy provides fallback for TaskGroups with no input source information + */ + @Inject + private SourceLocationAwareSchedulingPolicy(final ExecutorRegistry executorRegistry, + final RoundRobinSchedulingPolicy roundRobinSchedulingPolicy) { +this.executorRegistry = executorRegistry; +this.roundRobinSchedulingPolicy = roundRobinSchedulingPolicy; +this.scheduleTimeoutMs = roundRobinSchedulingPolicy.getScheduleTimeoutMs(); + } + + @Override + public long getScheduleTimeoutMs() { +return scheduleTimeoutMs; + } + + /** + * Try to schedule a TaskGroup. + * If the TaskGroup has one or more source tasks, this method schedules the task group to one of the physical nodes, + * chosen from union of set of locations where splits of each source task resides. + * If the TaskGroup has no source tasks, falls back to {@link RoundRobinSchedulingPolicy}. + * @param scheduledTaskGroup to schedule. + * @param jobStateManager jobStateManager which the TaskGroup belongs to. + * @return true if the task group is successfully scheduled, false otherwise. + */ + @Override + public boolean scheduleTaskGroup(final ScheduledTaskGroup scheduledTaskGroup, + final JobStateManager jobStateManager) { +lock.lock(); +try { + Set sourceLocations = Collections.emptySet(); + try { +sourceLocations = getSourceLocations(scheduledTaskGroup.getLogicalTaskIdToReadable().values()); + } catch (final UnsupportedOperationException e) { +// do nothing + } catch (final Exception e) { +LOG.warn(String.format("Exception while trying to get source location for %s", +scheduledTaskGroup.getTaskGroupId()), e); + } + if (sourceLocations.size() == 0) { +// No source location information found, fall back to the RoundRobinSchedulingPolicy +return roundRobinSchedulingPolicy.scheduleTaskGroup(scheduledTaskGroup, jobStateManager); + } + + if (scheduleToLocalNode(scheduledTaskGr
[GitHub] sanha commented on a change in pull request #1: [NEMO-26] Implement SourceLocationAwareSchedulingPolicy
sanha commented on a change in pull request #1: [NEMO-26] Implement SourceLocationAwareSchedulingPolicy URL: https://github.com/apache/incubator-nemo/pull/1#discussion_r172726862 ## File path: runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobScheduler.java ## @@ -321,7 +192,13 @@ public synchronized void onExecutorRemoved(final String executorId) { } } - private synchronized void scheduleRootStages() { + @Override + public void terminate() { +this.schedulerRunner.terminate(); +this.pendingTaskGroupQueue.close(); + } + + private void scheduleRootStages() { Review comment: Please add comment. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sanha commented on a change in pull request #1: [NEMO-26] Implement SourceLocationAwareSchedulingPolicy
sanha commented on a change in pull request #1: [NEMO-26] Implement SourceLocationAwareSchedulingPolicy URL: https://github.com/apache/incubator-nemo/pull/1#discussion_r172725454 ## File path: runtime/master/src/main/java/edu/snu/nemo/runtime/master/RuntimeMaster.java ## @@ -274,16 +271,12 @@ private void handleControlMessage(final ControlMessage.Message message) { convertFailureCause(taskGroupStateChangedMsg.getFailureCause())); break; case ExecutorFailed: + // Executor failed due to user code. final ControlMessage.ExecutorFailedMsg executorFailedMsg = message.getExecutorFailedMsg(); final String failedExecutorId = executorFailedMsg.getExecutorId(); final Exception exception = SerializationUtils.deserialize(executorFailedMsg.getException().toByteArray()); LOG.error(failedExecutorId + " failed, Stack Trace: ", exception); - containerManager.onExecutorRemoved(failedExecutorId); throw new RuntimeException(exception); Review comment: Is it enough to kill the `runtimeMasterThread` only? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sanha commented on a change in pull request #1: [NEMO-26] Implement SourceLocationAwareSchedulingPolicy
sanha commented on a change in pull request #1: [NEMO-26] Implement SourceLocationAwareSchedulingPolicy URL: https://github.com/apache/incubator-nemo/pull/1#discussion_r172735237 ## File path: runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulerRunner.java ## @@ -53,7 +55,7 @@ public SchedulerRunner(final SchedulingPolicy schedulingPolicy, this.isTerminated = false; } - public synchronized void scheduleJob(final JobStateManager jobStateManager) { Review comment: Please add comments about these methods This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sanha commented on a change in pull request #1: [NEMO-26] Implement SourceLocationAwareSchedulingPolicy
sanha commented on a change in pull request #1: [NEMO-26] Implement SourceLocationAwareSchedulingPolicy URL: https://github.com/apache/incubator-nemo/pull/1#discussion_r172723450 ## File path: runtime/master/src/main/java/edu/snu/nemo/runtime/master/RuntimeMaster.java ## @@ -91,21 +87,17 @@ @Inject public RuntimeMaster(final Scheduler scheduler, - final SchedulerRunner schedulerRunner, - final PendingTaskGroupQueue pendingTaskGroupQueue, final ContainerManager containerManager, final BlockManagerMaster blockManagerMaster, final MetricMessageHandler metricMessageHandler, final MessageEnvironment masterMessageEnvironment, @Parameter(JobConf.DAGDirectory.class) final String dagDirectory) { -// We would like to keep the master event thread pool single threaded +// We would like to use a single thread for runtime master operations // since the processing logic in master takes a very short amount of time // compared to the job completion times of executed jobs // and keeping it single threaded removes the complexity of multi-thread synchronization. -this.masterControlEventExecutor = Executors.newSingleThreadExecutor(); +this.runtimeMasterThread = Executors.newSingleThreadExecutor(); Review comment: Why don't we name this thread? (Please check [here](https://stackoverflow.com/questions/6113746/naming-threads-and-thread-pools-of-executorservice)) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sanha commented on a change in pull request #1: [NEMO-26] Implement SourceLocationAwareSchedulingPolicy
sanha commented on a change in pull request #1: [NEMO-26] Implement SourceLocationAwareSchedulingPolicy URL: https://github.com/apache/incubator-nemo/pull/1#discussion_r172728122 ## File path: runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobScheduler.java ## @@ -526,8 +403,133 @@ private PhysicalStage getStageById(final String stageId) { throw new RuntimeException(new Throwable("This taskGroupId does not exist in the plan")); } - @Override - public void terminate() { -// nothing to do yet. + /** + * Action after task group execution has been completed, not after it has been put on hold. + * + * @param executorId the ID of the executor. + * @param taskGroupId the ID pf the task group completed. + */ + private void onTaskGroupExecutionComplete(final String executorId, +final String taskGroupId) { +onTaskGroupExecutionComplete(executorId, taskGroupId, false); + } + + /** + * Action after task group execution has been completed. + * @param executorId id of the executor. + * @param taskGroupId the ID of the task group completed. + * @param isOnHoldToComplete whether or not if it is switched to complete after it has been on hold. + */ + private void onTaskGroupExecutionComplete(final String executorId, +final String taskGroupId, +final Boolean isOnHoldToComplete) { +LOG.debug("{} completed in {}", new Object[]{taskGroupId, executorId}); +if (!isOnHoldToComplete) { + schedulingPolicy.onTaskGroupExecutionComplete(executorId, taskGroupId); +} + +final String stageIdForTaskGroupUponCompletion = RuntimeIdGenerator.getStageIdFromTaskGroupId(taskGroupId); +if (jobStateManager.checkStageCompletion(stageIdForTaskGroupUponCompletion)) { + // if the stage this task group belongs to is complete, + if (!jobStateManager.checkJobTermination()) { // and if the job is not yet complete or failed, +scheduleNextStage(stageIdForTaskGroupUponCompletion); + } +} + } + + /** + * Action for after task group execution is put on hold. + * @param executorId the ID of the executor. + * @param taskGroupIdthe ID of the task group. + * @param taskPutOnHold the ID of task that is put on hold. + */ + private void onTaskGroupExecutionOnHold(final String executorId, + final String taskGroupId, + final String taskPutOnHold) { +LOG.info("{} put on hold in {}", new Object[]{taskGroupId, executorId}); +schedulingPolicy.onTaskGroupExecutionComplete(executorId, taskGroupId); +final String stageIdForTaskGroupUponCompletion = RuntimeIdGenerator.getStageIdFromTaskGroupId(taskGroupId); + +final boolean stageComplete = + jobStateManager.checkStageCompletion(stageIdForTaskGroupUponCompletion); + +if (stageComplete) { + // get optimization vertex from the task. + final MetricCollectionBarrierVertex metricCollectionBarrierVertex = + getTaskGroupDagById(taskGroupId).getVertices().stream() // get tasks list + .filter(task -> task.getId().equals(taskPutOnHold)) // find it + .map(physicalPlan::getIRVertexOf) // get the corresponding IRVertex, the MetricCollectionBarrierVertex + .filter(irVertex -> irVertex instanceof MetricCollectionBarrierVertex) + .distinct() + .map(irVertex -> (MetricCollectionBarrierVertex) irVertex) // convert types + .findFirst().orElseThrow(() -> new RuntimeException(ON_HOLD.name() // get it + + " called with failed task ids by some other task than " + + MetricCollectionBarrierTask.class.getSimpleName())); + // and we will use this vertex to perform metric collection and dynamic optimization. + + pubSubEventHandlerWrapper.getPubSubEventHandler().onNext( + new DynamicOptimizationEvent(physicalPlan, metricCollectionBarrierVertex, Pair.of(executorId, taskGroupId))); +} else { + onTaskGroupExecutionComplete(executorId, taskGroupId, true); +} + } + + private void onTaskGroupExecutionFailedRecoverable(final String executorId, final String taskGroupId, Review comment: Please add a comment about this method. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sanha commented on a change in pull request #1: [NEMO-26] Implement SourceLocationAwareSchedulingPolicy
sanha commented on a change in pull request #1: [NEMO-26] Implement SourceLocationAwareSchedulingPolicy URL: https://github.com/apache/incubator-nemo/pull/1#discussion_r172721727 ## File path: runtime/master/src/main/java/edu/snu/nemo/runtime/master/BlockManagerMaster.java ## @@ -435,4 +435,44 @@ void registerRequest(final long requestId, return locationFuture; } } + + + public static BlockState.State convertBlockState(final ControlMessage.BlockStateFromExecutor state) { Review comment: Please add comment about this method. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sanha commented on a change in pull request #1: [NEMO-26] Implement SourceLocationAwareSchedulingPolicy
sanha commented on a change in pull request #1: [NEMO-26] Implement SourceLocationAwareSchedulingPolicy URL: https://github.com/apache/incubator-nemo/pull/1#discussion_r172725494 ## File path: runtime/master/src/main/java/edu/snu/nemo/runtime/master/RuntimeMaster.java ## @@ -311,7 +304,7 @@ private void handleControlMessage(final ControlMessage.Message message) { * @param srcVertexId the ID of the source vertex. * @param blockId the ID of the block. */ - public void accumulateBarrierMetric(final List blockSizeInfo, + private void accumulateBarrierMetric(final List blockSizeInfo, final String srcVertexId, Review comment: Please check the indent This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sanha commented on a change in pull request #1: [NEMO-26] Implement SourceLocationAwareSchedulingPolicy
sanha commented on a change in pull request #1: [NEMO-26] Implement SourceLocationAwareSchedulingPolicy URL: https://github.com/apache/incubator-nemo/pull/1#discussion_r172721743 ## File path: runtime/master/src/main/java/edu/snu/nemo/runtime/master/BlockManagerMaster.java ## @@ -435,4 +435,44 @@ void registerRequest(final long requestId, return locationFuture; } } + + + public static BlockState.State convertBlockState(final ControlMessage.BlockStateFromExecutor state) { +switch (state) { + case BLOCK_READY: +return BlockState.State.READY; + case SCHEDULED: +return BlockState.State.SCHEDULED; + case COMMITTED: +return BlockState.State.COMMITTED; + case LOST_BEFORE_COMMIT: +return BlockState.State.LOST_BEFORE_COMMIT; + case LOST: +return BlockState.State.LOST; + case REMOVED: +return BlockState.State.REMOVED; + default: +throw new UnknownExecutionStateException(new Exception("This BlockState is unknown: " + state)); +} + } + + public static ControlMessage.BlockStateFromExecutor convertBlockState(final BlockState.State state) { Review comment: Please add comment about this method. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sanha commented on a change in pull request #1: [NEMO-26] Implement SourceLocationAwareSchedulingPolicy
sanha commented on a change in pull request #1: [NEMO-26] Implement SourceLocationAwareSchedulingPolicy URL: https://github.com/apache/incubator-nemo/pull/1#discussion_r172748732 ## File path: runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingPolicy.java ## @@ -0,0 +1,182 @@ +/* + * Copyright (C) 2018 Seoul National University + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package edu.snu.nemo.runtime.master.scheduler; + +import edu.snu.nemo.common.exception.SchedulingException; +import edu.snu.nemo.common.ir.Readable; +import edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorPlacementProperty; +import edu.snu.nemo.runtime.common.plan.physical.ScheduledTaskGroup; +import edu.snu.nemo.runtime.common.state.TaskGroupState; +import edu.snu.nemo.runtime.master.JobStateManager; +import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter; +import org.apache.reef.annotations.audience.DriverSide; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.concurrent.ThreadSafe; +import javax.inject.Inject; +import java.util.*; +import java.util.concurrent.ThreadLocalRandom; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * This policy is same as {@link RoundRobinSchedulingPolicy}, however for TaskGroups + * with {@link edu.snu.nemo.common.ir.vertex.SourceVertex}, it tries to pick one of the executors + * where the corresponding data resides. + */ +@ThreadSafe +@DriverSide +public final class SourceLocationAwareSchedulingPolicy implements SchedulingPolicy { + private static final Logger LOG = LoggerFactory.getLogger(SourceLocationAwareSchedulingPolicy.class); + + private final ExecutorRegistry executorRegistry; + private final RoundRobinSchedulingPolicy roundRobinSchedulingPolicy; + private final long scheduleTimeoutMs; + + @Inject + private SourceLocationAwareSchedulingPolicy(final ExecutorRegistry executorRegistry, + final RoundRobinSchedulingPolicy roundRobinSchedulingPolicy) { +this.executorRegistry = executorRegistry; +this.roundRobinSchedulingPolicy = roundRobinSchedulingPolicy; +this.scheduleTimeoutMs = roundRobinSchedulingPolicy.getScheduleTimeoutMs(); + } + + @Override + public long getScheduleTimeoutMs() { +return scheduleTimeoutMs; + } + + /** + * Try to schedule a TaskGroup. + * If the TaskGroup has one or more source tasks, this method schedules the task group to one of the physical nodes, + * chosen from union of set of locations where splits of each source task resides. + * If the TaskGroup has no source tasks, falls back to {@link RoundRobinSchedulingPolicy}. + * @param scheduledTaskGroup to schedule. + * @param jobStateManager jobStateManager which the TaskGroup belongs to. + * @return true if the task group is successfully scheduled, false otherwise. + */ + @Override + public synchronized boolean scheduleTaskGroup(final ScheduledTaskGroup scheduledTaskGroup, +final JobStateManager jobStateManager) { +Set sourceLocations = Collections.emptySet(); +try { + sourceLocations = getSourceLocations(scheduledTaskGroup.getLogicalTaskIdToReadable().values()); +} catch (final Exception e) { + LOG.warn(String.format("Exception while trying to get source location for %s", + scheduledTaskGroup.getTaskGroupId()), e); +} +if (sourceLocations.size() == 0) { + // No source location information found, fall back to the RoundRobinSchedulingPolicy + return roundRobinSchedulingPolicy.scheduleTaskGroup(scheduledTaskGroup, jobStateManager); +} + +if (scheduleToLocalNode(scheduledTaskGroup, jobStateManager, sourceLocations)) { + return true; +} else { + try { +Thread.sleep(scheduleTimeoutMs); Review comment: Can we wake up this thread like `RoundRobinSchedulingPolicy#signalPossiblyWaitingScheduler`? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sanha commented on a change in pull request #1: [NEMO-26] Implement SourceLocationAwareSchedulingPolicy
sanha commented on a change in pull request #1: [NEMO-26] Implement SourceLocationAwareSchedulingPolicy URL: https://github.com/apache/incubator-nemo/pull/1#discussion_r172726613 ## File path: runtime/master/src/main/java/edu/snu/nemo/runtime/master/resource/ExecutorRepresenter.java ## @@ -46,12 +51,14 @@ private final MessageSender messageSender; private final ActiveContext activeContext; private final ExecutorService serializationExecutorService; + private final String nodeName; public ExecutorRepresenter(final String executorId, Review comment: Please add comment about this constructor and other methods. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sanha commented on a change in pull request #1: [NEMO-26] Implement SourceLocationAwareSchedulingPolicy
sanha commented on a change in pull request #1: [NEMO-26] Implement SourceLocationAwareSchedulingPolicy URL: https://github.com/apache/incubator-nemo/pull/1#discussion_r172736279 ## File path: runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingPolicy.java ## @@ -0,0 +1,182 @@ +/* + * Copyright (C) 2018 Seoul National University + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package edu.snu.nemo.runtime.master.scheduler; + +import edu.snu.nemo.common.exception.SchedulingException; +import edu.snu.nemo.common.ir.Readable; +import edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorPlacementProperty; +import edu.snu.nemo.runtime.common.plan.physical.ScheduledTaskGroup; +import edu.snu.nemo.runtime.common.state.TaskGroupState; +import edu.snu.nemo.runtime.master.JobStateManager; +import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter; +import org.apache.reef.annotations.audience.DriverSide; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.concurrent.ThreadSafe; +import javax.inject.Inject; +import java.util.*; +import java.util.concurrent.ThreadLocalRandom; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * This policy is same as {@link RoundRobinSchedulingPolicy}, however for TaskGroups + * with {@link edu.snu.nemo.common.ir.vertex.SourceVertex}, it tries to pick one of the executors + * where the corresponding data resides. + */ +@ThreadSafe +@DriverSide +public final class SourceLocationAwareSchedulingPolicy implements SchedulingPolicy { + private static final Logger LOG = LoggerFactory.getLogger(SourceLocationAwareSchedulingPolicy.class); + + private final ExecutorRegistry executorRegistry; + private final RoundRobinSchedulingPolicy roundRobinSchedulingPolicy; + private final long scheduleTimeoutMs; + + @Inject + private SourceLocationAwareSchedulingPolicy(final ExecutorRegistry executorRegistry, Review comment: Please add comment about this constructor. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sanha commented on a change in pull request #1: [NEMO-26] Implement SourceLocationAwareSchedulingPolicy
sanha commented on a change in pull request #1: [NEMO-26] Implement SourceLocationAwareSchedulingPolicy URL: https://github.com/apache/incubator-nemo/pull/1#discussion_r172720282 ## File path: compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/source/BeamBoundedSourceVertex.java ## @@ -94,5 +98,18 @@ public String propertiesToJSON() { } return elements; } + +@Override +public List getLocations() throws Exception { + if (boundedSource instanceof HadoopInputFormatIO.HadoopInputFormatBoundedSource) { +final Field inputSplitField = boundedSource.getClass().getDeclaredField("inputSplit"); +inputSplitField.setAccessible(true); +final InputSplit inputSplit = ((HadoopInputFormatIO.SerializableSplit) inputSplitField +.get(boundedSource)).getSplit(); +return Arrays.asList(inputSplit.getLocations()); + } else { +throw new UnsupportedOperationException(); Review comment: `return Collections.emptyList();` ? (Because the default method return it.) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sanha commented on a change in pull request #1: [NEMO-26] Implement SourceLocationAwareSchedulingPolicy
sanha commented on a change in pull request #1: [NEMO-26] Implement SourceLocationAwareSchedulingPolicy URL: https://github.com/apache/incubator-nemo/pull/1#discussion_r172721711 ## File path: runtime/master/src/main/java/edu/snu/nemo/runtime/master/BlockManagerMaster.java ## @@ -435,4 +435,44 @@ void registerRequest(final long requestId, return locationFuture; } } + + Review comment: Please remove this duplicated newline. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sanha commented on a change in pull request #1: [NEMO-26] Implement SourceLocationAwareSchedulingPolicy
sanha commented on a change in pull request #1: [NEMO-26] Implement SourceLocationAwareSchedulingPolicy URL: https://github.com/apache/incubator-nemo/pull/1#discussion_r172720045 ## File path: common/src/main/java/edu/snu/nemo/common/ir/Readable.java ## @@ -28,5 +30,15 @@ * @throws Exception exception while reading data. */ Iterable read() throws Exception; + + /** + * Returns the list of locations where this readable resides. + * Each location has a complete copy of the readable. + * @return List of locations where this readable resides, or an empty list if this operation is not supported + * @throws Exception exceptions on the way + */ + default List getLocations() throws Exception { Review comment: Why do we have to define this default method? Can't we just add abstract method and well describe it's expected function? If this method is default, someone can omit to override the method while implementing another `Readable` even though it can support this function. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sanha commented on a change in pull request #1: [NEMO-26] Implement SourceLocationAwareSchedulingPolicy
sanha commented on a change in pull request #1: [NEMO-26] Implement SourceLocationAwareSchedulingPolicy URL: https://github.com/apache/incubator-nemo/pull/1#discussion_r172736526 ## File path: runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingPolicy.java ## @@ -0,0 +1,182 @@ +/* + * Copyright (C) 2018 Seoul National University + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package edu.snu.nemo.runtime.master.scheduler; + +import edu.snu.nemo.common.exception.SchedulingException; +import edu.snu.nemo.common.ir.Readable; +import edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorPlacementProperty; +import edu.snu.nemo.runtime.common.plan.physical.ScheduledTaskGroup; +import edu.snu.nemo.runtime.common.state.TaskGroupState; +import edu.snu.nemo.runtime.master.JobStateManager; +import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter; +import org.apache.reef.annotations.audience.DriverSide; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.concurrent.ThreadSafe; +import javax.inject.Inject; +import java.util.*; +import java.util.concurrent.ThreadLocalRandom; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * This policy is same as {@link RoundRobinSchedulingPolicy}, however for TaskGroups + * with {@link edu.snu.nemo.common.ir.vertex.SourceVertex}, it tries to pick one of the executors + * where the corresponding data resides. + */ +@ThreadSafe +@DriverSide +public final class SourceLocationAwareSchedulingPolicy implements SchedulingPolicy { + private static final Logger LOG = LoggerFactory.getLogger(SourceLocationAwareSchedulingPolicy.class); + + private final ExecutorRegistry executorRegistry; + private final RoundRobinSchedulingPolicy roundRobinSchedulingPolicy; + private final long scheduleTimeoutMs; + + @Inject + private SourceLocationAwareSchedulingPolicy(final ExecutorRegistry executorRegistry, + final RoundRobinSchedulingPolicy roundRobinSchedulingPolicy) { +this.executorRegistry = executorRegistry; +this.roundRobinSchedulingPolicy = roundRobinSchedulingPolicy; +this.scheduleTimeoutMs = roundRobinSchedulingPolicy.getScheduleTimeoutMs(); + } + + @Override + public long getScheduleTimeoutMs() { +return scheduleTimeoutMs; + } + + /** + * Try to schedule a TaskGroup. + * If the TaskGroup has one or more source tasks, this method schedules the task group to one of the physical nodes, + * chosen from union of set of locations where splits of each source task resides. + * If the TaskGroup has no source tasks, falls back to {@link RoundRobinSchedulingPolicy}. + * @param scheduledTaskGroup to schedule. + * @param jobStateManager jobStateManager which the TaskGroup belongs to. + * @return true if the task group is successfully scheduled, false otherwise. + */ + @Override + public synchronized boolean scheduleTaskGroup(final ScheduledTaskGroup scheduledTaskGroup, +final JobStateManager jobStateManager) { +Set sourceLocations = Collections.emptySet(); +try { + sourceLocations = getSourceLocations(scheduledTaskGroup.getLogicalTaskIdToReadable().values()); +} catch (final Exception e) { + LOG.warn(String.format("Exception while trying to get source location for %s", Review comment: As I mentioned above, let's just return empty set if the source location is not supported. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services