[GitHub] sanha commented on a change in pull request #1: [NEMO-26] Implement SourceLocationAwareSchedulingPolicy

2018-03-07 Thread GitBox
sanha commented on a change in pull request #1: [NEMO-26] Implement 
SourceLocationAwareSchedulingPolicy
URL: https://github.com/apache/incubator-nemo/pull/1#discussion_r173065772
 
 

 ##
 File path: 
runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulingPolicy.java
 ##
 @@ -17,15 +17,20 @@
 
 import edu.snu.nemo.runtime.common.plan.physical.ScheduledTaskGroup;
 import edu.snu.nemo.runtime.master.JobStateManager;
+import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
 import org.apache.reef.annotations.audience.DriverSide;
 import org.apache.reef.tang.annotations.DefaultImplementation;
 
+import javax.annotation.concurrent.ThreadSafe;
 import java.util.Set;
 
 /**
+ * (WARNING) Implementations of this interface must be thread-safe.
+ *
  * Defines the policy by which {@link BatchSingleJobScheduler} assigns task 
groups to executors.
  */
 @DriverSide
+@ThreadSafe
 @DefaultImplementation(RoundRobinSchedulingPolicy.class)
 
 Review comment:
   Let's change the default policy to the source location aware policy.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sanha commented on a change in pull request #1: [NEMO-26] Implement SourceLocationAwareSchedulingPolicy

2018-03-07 Thread GitBox
sanha commented on a change in pull request #1: [NEMO-26] Implement 
SourceLocationAwareSchedulingPolicy
URL: https://github.com/apache/incubator-nemo/pull/1#discussion_r173043365
 
 

 ##
 File path: 
runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/RoundRobinSchedulingPolicy.java
 ##
 @@ -70,26 +69,19 @@
*/
   private final Map executorIdByContainerType;
 
-  /**
-   * A copy of {@link ContainerManager#executorRepresenterMap}.
-   * This cached copy is updated when an executor is added or removed.
-   */
-  private final Map executorRepresenterMap;
-
   /**
* The index of the next executor to be assigned for each container type.
* This map allows the executor index computation of the RR scheduling.
*/
   private final Map nextExecutorIndexByContainerType;
 
   @Inject
-  public RoundRobinSchedulingPolicy(final ContainerManager containerManager,
+  public RoundRobinSchedulingPolicy(final ExecutorRegistry executorRegistry,
 
 Review comment:
   Can't we change this as private or add `VisibleForTesting` tag?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sanha commented on a change in pull request #1: [NEMO-26] Implement SourceLocationAwareSchedulingPolicy

2018-03-07 Thread GitBox
sanha commented on a change in pull request #1: [NEMO-26] Implement 
SourceLocationAwareSchedulingPolicy
URL: https://github.com/apache/incubator-nemo/pull/1#discussion_r173043365
 
 

 ##
 File path: 
runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/RoundRobinSchedulingPolicy.java
 ##
 @@ -70,26 +69,19 @@
*/
   private final Map executorIdByContainerType;
 
-  /**
-   * A copy of {@link ContainerManager#executorRepresenterMap}.
-   * This cached copy is updated when an executor is added or removed.
-   */
-  private final Map executorRepresenterMap;
-
   /**
* The index of the next executor to be assigned for each container type.
* This map allows the executor index computation of the RR scheduling.
*/
   private final Map nextExecutorIndexByContainerType;
 
   @Inject
-  public RoundRobinSchedulingPolicy(final ContainerManager containerManager,
+  public RoundRobinSchedulingPolicy(final ExecutorRegistry executorRegistry,
 
 Review comment:
   Can't we change this as private or add @VisibleForTesting tag?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sanha commented on a change in pull request #1: [NEMO-26] Implement SourceLocationAwareSchedulingPolicy

2018-03-07 Thread GitBox
sanha commented on a change in pull request #1: [NEMO-26] Implement 
SourceLocationAwareSchedulingPolicy
URL: https://github.com/apache/incubator-nemo/pull/1#discussion_r173042451
 
 

 ##
 File path: tests/src/test/java/edu/snu/nemo/tests/runtime/RuntimeTestUtil.java
 ##
 @@ -15,123 +15,94 @@
  */
 package edu.snu.nemo.tests.runtime;
 
-import edu.snu.nemo.common.dag.DAG;
-import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
-import edu.snu.nemo.runtime.common.plan.RuntimeEdge;
 import edu.snu.nemo.runtime.common.plan.physical.*;
-import edu.snu.nemo.runtime.common.state.BlockState;
 import edu.snu.nemo.runtime.common.state.StageState;
 import edu.snu.nemo.runtime.common.state.TaskGroupState;
 import edu.snu.nemo.runtime.master.JobStateManager;
-import edu.snu.nemo.runtime.master.BlockManagerMaster;
-import edu.snu.nemo.runtime.master.resource.ContainerManager;
+import edu.snu.nemo.runtime.master.scheduler.ExecutorRegistry;
 import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
 import edu.snu.nemo.runtime.master.scheduler.PendingTaskGroupQueue;
 import edu.snu.nemo.runtime.master.scheduler.Scheduler;
 import edu.snu.nemo.runtime.master.scheduler.SchedulingPolicy;
 import org.apache.beam.sdk.values.KV;
 
 import java.util.*;
-import java.util.concurrent.BlockingDeque;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingDeque;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
 /**
  * Utility class for runtime unit tests.
  */
 public final class RuntimeTestUtil {
-  private static ExecutorService completionEventThreadPool;
-  private static BlockingDeque eventRunnableQueue;
-  private static boolean testComplete;
-
-  public static void initialize() {
-testComplete = false;
-completionEventThreadPool = Executors.newFixedThreadPool(5);
-
-eventRunnableQueue = new LinkedBlockingDeque<>();
-
-for (int i = 0; i < 5; i++) {
-  completionEventThreadPool.execute(() -> {
-while (!testComplete || !eventRunnableQueue.isEmpty()) {
-  try {
-final Runnable event = eventRunnableQueue.takeFirst();
-event.run();
-  } catch (InterruptedException e) {
-e.printStackTrace();
-  }
-}
-  });
-}
-completionEventThreadPool.shutdown();
-  }
-
-  public static void cleanup() {
-testComplete = true;
-  }
-
   /**
* Sends a stage's completion event to scheduler, with all its task groups 
marked as complete as well.
* This replaces executor's task group completion messages for testing 
purposes.
* @param jobStateManager for the submitted job.
* @param scheduler for the submitted job.
-   * @param containerManager used for testing purposes.
+   * @param executorRegistry provides executor representers
* @param physicalStage for which the states should be marked as complete.
*/
   public static void sendStageCompletionEventToScheduler(final JobStateManager 
jobStateManager,
  final Scheduler 
scheduler,
- final 
ContainerManager containerManager,
+ final 
ExecutorRegistry executorRegistry,
  final PhysicalStage 
physicalStage,
  final int attemptIdx) 
{
-eventRunnableQueue.add(new Runnable() {
-  @Override
-  public void run() {
-while 
(jobStateManager.getStageState(physicalStage.getId()).getStateMachine().getCurrentState()
-== StageState.State.EXECUTING) {
-  physicalStage.getTaskGroupIds().forEach(taskGroupId -> {
-if 
(jobStateManager.getTaskGroupState(taskGroupId).getStateMachine().getCurrentState()
-== TaskGroupState.State.EXECUTING) {
-  sendTaskGroupStateEventToScheduler(scheduler, containerManager, 
taskGroupId,
-  TaskGroupState.State.COMPLETE, attemptIdx, null);
-}
-  });
-}
+// Loop until the stage completes.
 
 Review comment:
   It seems that the function of this method is changed. Please update the 
method description & name.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sanha commented on a change in pull request #1: [NEMO-26] Implement SourceLocationAwareSchedulingPolicy

2018-03-07 Thread GitBox
sanha commented on a change in pull request #1: [NEMO-26] Implement 
SourceLocationAwareSchedulingPolicy
URL: https://github.com/apache/incubator-nemo/pull/1#discussion_r173044113
 
 

 ##
 File path: 
tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/SourceLocationAwareSchedulingPolicyTest.java
 ##
 @@ -0,0 +1,418 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.tests.runtime.master.scheduler;
+
+import 
edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorPlacementProperty;
+import edu.snu.nemo.runtime.common.plan.physical.ScheduledTaskGroup;
+import edu.snu.nemo.runtime.common.state.TaskGroupState;
+import edu.snu.nemo.common.ir.Readable;
+import edu.snu.nemo.runtime.master.JobStateManager;
+import edu.snu.nemo.runtime.master.scheduler.ExecutorRegistry;
+import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
+import edu.snu.nemo.runtime.master.scheduler.RoundRobinSchedulingPolicy;
+import edu.snu.nemo.runtime.master.scheduler.SchedulingPolicy;
+import 
edu.snu.nemo.runtime.master.scheduler.SourceLocationAwareSchedulingPolicy;
+import org.apache.reef.tang.Injector;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.exceptions.InjectionException;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.*;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.*;
+
+/**
+ * Test cases for
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({JobStateManager.class, ExecutorRepresenter.class, 
RoundRobinSchedulingPolicy.class,
+ScheduledTaskGroup.class, Readable.class})
+public final class SourceLocationAwareSchedulingPolicyTest {
+  private static final String SITE_0 = "SEOUL";
+  private static final String SITE_1 = "JINJU";
+  private static final String SITE_2 = "BUSAN";
+  private static final int SCHEDULER_TIMEOUT_MS = 500;
+
+  private SourceLocationAwareSchedulingPolicy sourceLocationAware;
+  private SpiedSchedulingPolicyWrapper roundRobin;
+  private MockJobStateManagerWrapper jobStateManager;
+
+  @Before
+  public void setup() throws InjectionException {
+final Injector injector = Tang.Factory.getTang().newInjector();
+jobStateManager = new MockJobStateManagerWrapper();
+
+final ExecutorRegistry executorRegistry = new ExecutorRegistry();
+final RoundRobinSchedulingPolicy roundRobinSchedulingPolicy =
+new RoundRobinSchedulingPolicy(executorRegistry, SCHEDULER_TIMEOUT_MS);
+roundRobin = new SpiedSchedulingPolicyWrapper(roundRobinSchedulingPolicy, 
jobStateManager.get());
+
+injector.bindVolatileInstance(RoundRobinSchedulingPolicy.class, 
roundRobin.get());
+injector.bindVolatileInstance(JobStateManager.class, 
jobStateManager.get());
+injector.bindVolatileInstance(ExecutorRegistry.class, executorRegistry);
+sourceLocationAware = 
injector.getInstance(SourceLocationAwareSchedulingPolicy.class);
+  }
+
+  @After
+  public void teardown() {
+// All expectations should be resolved at this time.
+roundRobin.ensureNoUnresolvedExpectation();
+  }
+
+  /**
+   * {@link SourceLocationAwareSchedulingPolicy} should delegate scheduling 
decision when the
+   * {@link ScheduledTaskGroup} does not have any source tasks.
+   */
+  @Test
+  public void testRoundRobinSchedulerFallback() {
+// Prepare test scenario
+final ScheduledTaskGroup tg0 = 
CreateScheduledTaskGroup.withoutReadables(ExecutorPlacementProperty.NONE);
+final ScheduledTaskGroup tg1 = 
CreateScheduledTaskGroup.withReadablesWithoutSourceLocations(2,
+ExecutorPlacementProperty.NONE);
+final ScheduledTaskGroup tg2 = 
CreateScheduledTaskGroup.withReadablesWhichThrowException(5,
+ExecutorPlacementProperty.NONE);
+addExecutor(new MockExecutorRepresenterWrapper(SITE_0, 
ExecutorPlacementProperty.NONE, 1));
+addExecutor(new MockExecutorRepresenterWrapper(SITE_1, 
ExecutorPlacementProperty.NONE, 1));
+
+// Trying 

[GitHub] sanha commented on a change in pull request #1: [NEMO-26] Implement SourceLocationAwareSchedulingPolicy

2018-03-07 Thread GitBox
sanha commented on a change in pull request #1: [NEMO-26] Implement 
SourceLocationAwareSchedulingPolicy
URL: https://github.com/apache/incubator-nemo/pull/1#discussion_r173046431
 
 

 ##
 File path: 
runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingPolicy.java
 ##
 @@ -0,0 +1,236 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.runtime.master.scheduler;
+
+import edu.snu.nemo.common.exception.SchedulingException;
+import edu.snu.nemo.common.ir.Readable;
+import 
edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorPlacementProperty;
+import edu.snu.nemo.runtime.common.plan.physical.ScheduledTaskGroup;
+import edu.snu.nemo.runtime.common.state.TaskGroupState;
+import edu.snu.nemo.runtime.master.JobStateManager;
+import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
+import org.apache.reef.annotations.audience.DriverSide;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.ThreadSafe;
+import javax.inject.Inject;
+import java.util.*;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * This policy is same as {@link RoundRobinSchedulingPolicy}, however for 
TaskGroups
+ * with {@link edu.snu.nemo.common.ir.vertex.SourceVertex}, it tries to pick 
one of the executors
+ * where the corresponding data resides.
+ */
+@ThreadSafe
+@DriverSide
+public final class SourceLocationAwareSchedulingPolicy implements 
SchedulingPolicy {
+  private static final Logger LOG = 
LoggerFactory.getLogger(SourceLocationAwareSchedulingPolicy.class);
+
+  private final ExecutorRegistry executorRegistry;
+  private final RoundRobinSchedulingPolicy roundRobinSchedulingPolicy;
+  private final long scheduleTimeoutMs;
+  private final Lock lock = new ReentrantLock();
+  private final Condition moreExecutorsAvailableCondition = 
lock.newCondition();
+
+  /**
+   * Injectable constructor for {@link SourceLocationAwareSchedulingPolicy}.
+   * @param executorRegistry provides catalog of available executors
+   * @param roundRobinSchedulingPolicy provides fallback for TaskGroups with 
no input source information
+   */
+  @Inject
+  private SourceLocationAwareSchedulingPolicy(final ExecutorRegistry 
executorRegistry,
+  final RoundRobinSchedulingPolicy 
roundRobinSchedulingPolicy) {
+this.executorRegistry = executorRegistry;
+this.roundRobinSchedulingPolicy = roundRobinSchedulingPolicy;
+this.scheduleTimeoutMs = roundRobinSchedulingPolicy.getScheduleTimeoutMs();
+  }
+
+  @Override
+  public long getScheduleTimeoutMs() {
+return scheduleTimeoutMs;
+  }
+
+  /**
+   * Try to schedule a TaskGroup.
+   * If the TaskGroup has one or more source tasks, this method schedules the 
task group to one of the physical nodes,
+   * chosen from union of set of locations where splits of each source task 
resides.
+   * If the TaskGroup has no source tasks, falls back to {@link 
RoundRobinSchedulingPolicy}.
+   * @param scheduledTaskGroup to schedule.
+   * @param jobStateManager jobStateManager which the TaskGroup belongs to.
+   * @return true if the task group is successfully scheduled, false otherwise.
+   */
+  @Override
+  public boolean scheduleTaskGroup(final ScheduledTaskGroup scheduledTaskGroup,
+   final JobStateManager jobStateManager) {
+lock.lock();
+try {
+  Set sourceLocations = Collections.emptySet();
+  try {
+sourceLocations = 
getSourceLocations(scheduledTaskGroup.getLogicalTaskIdToReadable().values());
+  } catch (final UnsupportedOperationException e) {
+// do nothing
+  } catch (final Exception e) {
+LOG.warn(String.format("Exception while trying to get source location 
for %s",
+scheduledTaskGroup.getTaskGroupId()), e);
+  }
+  if (sourceLocations.size() == 0) {
+// No source location information found, fall back to the 
RoundRobinSchedulingPolicy
+return 
roundRobinSchedulingPolicy.scheduleTaskGroup(scheduledTaskGroup, 
jobStateManager);
+  }
+
+  if 

[GitHub] sanha commented on a change in pull request #1: [NEMO-26] Implement SourceLocationAwareSchedulingPolicy

2018-03-06 Thread GitBox
sanha commented on a change in pull request #1: [NEMO-26] Implement 
SourceLocationAwareSchedulingPolicy
URL: https://github.com/apache/incubator-nemo/pull/1#discussion_r172728122
 
 

 ##
 File path: 
runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobScheduler.java
 ##
 @@ -526,8 +403,133 @@ private PhysicalStage getStageById(final String stageId) 
{
 throw new RuntimeException(new Throwable("This taskGroupId does not exist 
in the plan"));
   }
 
-  @Override
-  public void terminate() {
-// nothing to do yet.
+  /**
+   * Action after task group execution has been completed, not after it has 
been put on hold.
+   *
+   * @param executorId  the ID of the executor.
+   * @param taskGroupId the ID pf the task group completed.
+   */
+  private void onTaskGroupExecutionComplete(final String executorId,
+final String taskGroupId) {
+onTaskGroupExecutionComplete(executorId, taskGroupId, false);
+  }
+
+  /**
+   * Action after task group execution has been completed.
+   * @param executorId id of the executor.
+   * @param taskGroupId the ID of the task group completed.
+   * @param isOnHoldToComplete whether or not if it is switched to complete 
after it has been on hold.
+   */
+  private void onTaskGroupExecutionComplete(final String executorId,
+final String taskGroupId,
+final Boolean isOnHoldToComplete) {
+LOG.debug("{} completed in {}", new Object[]{taskGroupId, executorId});
+if (!isOnHoldToComplete) {
+  schedulingPolicy.onTaskGroupExecutionComplete(executorId, taskGroupId);
+}
+
+final String stageIdForTaskGroupUponCompletion = 
RuntimeIdGenerator.getStageIdFromTaskGroupId(taskGroupId);
+if 
(jobStateManager.checkStageCompletion(stageIdForTaskGroupUponCompletion)) {
+  // if the stage this task group belongs to is complete,
+  if (!jobStateManager.checkJobTermination()) { // and if the job is not 
yet complete or failed,
+scheduleNextStage(stageIdForTaskGroupUponCompletion);
+  }
+}
+  }
+
+  /**
+   * Action for after task group execution is put on hold.
+   * @param executorId the ID of the executor.
+   * @param taskGroupIdthe ID of the task group.
+   * @param taskPutOnHold  the ID of task that is put on hold.
+   */
+  private void onTaskGroupExecutionOnHold(final String executorId,
+  final String taskGroupId,
+  final String taskPutOnHold) {
+LOG.info("{} put on hold in {}", new Object[]{taskGroupId, executorId});
+schedulingPolicy.onTaskGroupExecutionComplete(executorId, taskGroupId);
+final String stageIdForTaskGroupUponCompletion = 
RuntimeIdGenerator.getStageIdFromTaskGroupId(taskGroupId);
+
+final boolean stageComplete =
+
jobStateManager.checkStageCompletion(stageIdForTaskGroupUponCompletion);
+
+if (stageComplete) {
+  // get optimization vertex from the task.
+  final MetricCollectionBarrierVertex metricCollectionBarrierVertex =
+  getTaskGroupDagById(taskGroupId).getVertices().stream() // get tasks 
list
+  .filter(task -> task.getId().equals(taskPutOnHold)) // find it
+  .map(physicalPlan::getIRVertexOf) // get the corresponding 
IRVertex, the MetricCollectionBarrierVertex
+  .filter(irVertex -> irVertex instanceof 
MetricCollectionBarrierVertex)
+  .distinct()
+  .map(irVertex -> (MetricCollectionBarrierVertex) irVertex) // 
convert types
+  .findFirst().orElseThrow(() -> new 
RuntimeException(ON_HOLD.name() // get it
+  + " called with failed task ids by some other task than "
+  + MetricCollectionBarrierTask.class.getSimpleName()));
+  // and we will use this vertex to perform metric collection and dynamic 
optimization.
+
+  pubSubEventHandlerWrapper.getPubSubEventHandler().onNext(
+  new DynamicOptimizationEvent(physicalPlan, 
metricCollectionBarrierVertex, Pair.of(executorId, taskGroupId)));
+} else {
+  onTaskGroupExecutionComplete(executorId, taskGroupId, true);
+}
+  }
+
+  private void onTaskGroupExecutionFailedRecoverable(final String executorId, 
final String taskGroupId,
 
 Review comment:
   Please add a comment about this method.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sanha commented on a change in pull request #1: [NEMO-26] Implement SourceLocationAwareSchedulingPolicy

2018-03-06 Thread GitBox
sanha commented on a change in pull request #1: [NEMO-26] Implement 
SourceLocationAwareSchedulingPolicy
URL: https://github.com/apache/incubator-nemo/pull/1#discussion_r172735237
 
 

 ##
 File path: 
runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulerRunner.java
 ##
 @@ -53,7 +55,7 @@ public SchedulerRunner(final SchedulingPolicy 
schedulingPolicy,
 this.isTerminated = false;
   }
 
-  public synchronized void scheduleJob(final JobStateManager jobStateManager) {
 
 Review comment:
   Please add comments about these methods


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sanha commented on a change in pull request #1: [NEMO-26] Implement SourceLocationAwareSchedulingPolicy

2018-03-06 Thread GitBox
sanha commented on a change in pull request #1: [NEMO-26] Implement 
SourceLocationAwareSchedulingPolicy
URL: https://github.com/apache/incubator-nemo/pull/1#discussion_r172723450
 
 

 ##
 File path: 
runtime/master/src/main/java/edu/snu/nemo/runtime/master/RuntimeMaster.java
 ##
 @@ -91,21 +87,17 @@
 
   @Inject
   public RuntimeMaster(final Scheduler scheduler,
-   final SchedulerRunner schedulerRunner,
-   final PendingTaskGroupQueue pendingTaskGroupQueue,
final ContainerManager containerManager,
final BlockManagerMaster blockManagerMaster,
final MetricMessageHandler metricMessageHandler,
final MessageEnvironment masterMessageEnvironment,
@Parameter(JobConf.DAGDirectory.class) final String 
dagDirectory) {
-// We would like to keep the master event thread pool single threaded
+// We would like to use a single thread for runtime master operations
 // since the processing logic in master takes a very short amount of time
 // compared to the job completion times of executed jobs
 // and keeping it single threaded removes the complexity of multi-thread 
synchronization.
-this.masterControlEventExecutor = Executors.newSingleThreadExecutor();
+this.runtimeMasterThread = Executors.newSingleThreadExecutor();
 
 Review comment:
   Why don't we name this thread? (Please check 
[here](https://stackoverflow.com/questions/6113746/naming-threads-and-thread-pools-of-executorservice))


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sanha commented on a change in pull request #1: [NEMO-26] Implement SourceLocationAwareSchedulingPolicy

2018-03-06 Thread GitBox
sanha commented on a change in pull request #1: [NEMO-26] Implement 
SourceLocationAwareSchedulingPolicy
URL: https://github.com/apache/incubator-nemo/pull/1#discussion_r172748732
 
 

 ##
 File path: 
runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingPolicy.java
 ##
 @@ -0,0 +1,182 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.runtime.master.scheduler;
+
+import edu.snu.nemo.common.exception.SchedulingException;
+import edu.snu.nemo.common.ir.Readable;
+import 
edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorPlacementProperty;
+import edu.snu.nemo.runtime.common.plan.physical.ScheduledTaskGroup;
+import edu.snu.nemo.runtime.common.state.TaskGroupState;
+import edu.snu.nemo.runtime.master.JobStateManager;
+import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
+import org.apache.reef.annotations.audience.DriverSide;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.ThreadSafe;
+import javax.inject.Inject;
+import java.util.*;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * This policy is same as {@link RoundRobinSchedulingPolicy}, however for 
TaskGroups
+ * with {@link edu.snu.nemo.common.ir.vertex.SourceVertex}, it tries to pick 
one of the executors
+ * where the corresponding data resides.
+ */
+@ThreadSafe
+@DriverSide
+public final class SourceLocationAwareSchedulingPolicy implements 
SchedulingPolicy {
+  private static final Logger LOG = 
LoggerFactory.getLogger(SourceLocationAwareSchedulingPolicy.class);
+
+  private final ExecutorRegistry executorRegistry;
+  private final RoundRobinSchedulingPolicy roundRobinSchedulingPolicy;
+  private final long scheduleTimeoutMs;
+
+  @Inject
+  private SourceLocationAwareSchedulingPolicy(final ExecutorRegistry 
executorRegistry,
+  final RoundRobinSchedulingPolicy 
roundRobinSchedulingPolicy) {
+this.executorRegistry = executorRegistry;
+this.roundRobinSchedulingPolicy = roundRobinSchedulingPolicy;
+this.scheduleTimeoutMs = roundRobinSchedulingPolicy.getScheduleTimeoutMs();
+  }
+
+  @Override
+  public long getScheduleTimeoutMs() {
+return scheduleTimeoutMs;
+  }
+
+  /**
+   * Try to schedule a TaskGroup.
+   * If the TaskGroup has one or more source tasks, this method schedules the 
task group to one of the physical nodes,
+   * chosen from union of set of locations where splits of each source task 
resides.
+   * If the TaskGroup has no source tasks, falls back to {@link 
RoundRobinSchedulingPolicy}.
+   * @param scheduledTaskGroup to schedule.
+   * @param jobStateManager jobStateManager which the TaskGroup belongs to.
+   * @return true if the task group is successfully scheduled, false otherwise.
+   */
+  @Override
+  public synchronized boolean scheduleTaskGroup(final ScheduledTaskGroup 
scheduledTaskGroup,
+final JobStateManager 
jobStateManager) {
+Set sourceLocations = Collections.emptySet();
+try {
+  sourceLocations = 
getSourceLocations(scheduledTaskGroup.getLogicalTaskIdToReadable().values());
+} catch (final Exception e) {
+  LOG.warn(String.format("Exception while trying to get source location 
for %s",
+  scheduledTaskGroup.getTaskGroupId()), e);
+}
+if (sourceLocations.size() == 0) {
+  // No source location information found, fall back to the 
RoundRobinSchedulingPolicy
+  return roundRobinSchedulingPolicy.scheduleTaskGroup(scheduledTaskGroup, 
jobStateManager);
+}
+
+if (scheduleToLocalNode(scheduledTaskGroup, jobStateManager, 
sourceLocations)) {
+  return true;
+} else {
+  try {
+Thread.sleep(scheduleTimeoutMs);
 
 Review comment:
   Can we wake up this thread like 
`RoundRobinSchedulingPolicy#signalPossiblyWaitingScheduler`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sanha commented on a change in pull request #1: [NEMO-26] Implement SourceLocationAwareSchedulingPolicy

2018-03-06 Thread GitBox
sanha commented on a change in pull request #1: [NEMO-26] Implement 
SourceLocationAwareSchedulingPolicy
URL: https://github.com/apache/incubator-nemo/pull/1#discussion_r172721727
 
 

 ##
 File path: 
runtime/master/src/main/java/edu/snu/nemo/runtime/master/BlockManagerMaster.java
 ##
 @@ -435,4 +435,44 @@ void registerRequest(final long requestId,
   return locationFuture;
 }
   }
+
+
+  public static BlockState.State convertBlockState(final 
ControlMessage.BlockStateFromExecutor state) {
 
 Review comment:
   Please add comment about this method.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sanha commented on a change in pull request #1: [NEMO-26] Implement SourceLocationAwareSchedulingPolicy

2018-03-06 Thread GitBox
sanha commented on a change in pull request #1: [NEMO-26] Implement 
SourceLocationAwareSchedulingPolicy
URL: https://github.com/apache/incubator-nemo/pull/1#discussion_r172725494
 
 

 ##
 File path: 
runtime/master/src/main/java/edu/snu/nemo/runtime/master/RuntimeMaster.java
 ##
 @@ -311,7 +304,7 @@ private void handleControlMessage(final 
ControlMessage.Message message) {
* @param srcVertexId   the ID of the source vertex.
* @param blockId   the ID of the block.
*/
-  public void accumulateBarrierMetric(final List blockSizeInfo,
+  private void accumulateBarrierMetric(final List blockSizeInfo,
   final String srcVertexId,
 
 Review comment:
   Please check the indent


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sanha commented on a change in pull request #1: [NEMO-26] Implement SourceLocationAwareSchedulingPolicy

2018-03-06 Thread GitBox
sanha commented on a change in pull request #1: [NEMO-26] Implement 
SourceLocationAwareSchedulingPolicy
URL: https://github.com/apache/incubator-nemo/pull/1#discussion_r172726613
 
 

 ##
 File path: 
runtime/master/src/main/java/edu/snu/nemo/runtime/master/resource/ExecutorRepresenter.java
 ##
 @@ -46,12 +51,14 @@
   private final MessageSender messageSender;
   private final ActiveContext activeContext;
   private final ExecutorService serializationExecutorService;
+  private final String nodeName;
 
   public ExecutorRepresenter(final String executorId,
 
 Review comment:
   Please add comment about this constructor and other methods.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sanha commented on a change in pull request #1: [NEMO-26] Implement SourceLocationAwareSchedulingPolicy

2018-03-06 Thread GitBox
sanha commented on a change in pull request #1: [NEMO-26] Implement 
SourceLocationAwareSchedulingPolicy
URL: https://github.com/apache/incubator-nemo/pull/1#discussion_r172736279
 
 

 ##
 File path: 
runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingPolicy.java
 ##
 @@ -0,0 +1,182 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.runtime.master.scheduler;
+
+import edu.snu.nemo.common.exception.SchedulingException;
+import edu.snu.nemo.common.ir.Readable;
+import 
edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorPlacementProperty;
+import edu.snu.nemo.runtime.common.plan.physical.ScheduledTaskGroup;
+import edu.snu.nemo.runtime.common.state.TaskGroupState;
+import edu.snu.nemo.runtime.master.JobStateManager;
+import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
+import org.apache.reef.annotations.audience.DriverSide;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.ThreadSafe;
+import javax.inject.Inject;
+import java.util.*;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * This policy is same as {@link RoundRobinSchedulingPolicy}, however for 
TaskGroups
+ * with {@link edu.snu.nemo.common.ir.vertex.SourceVertex}, it tries to pick 
one of the executors
+ * where the corresponding data resides.
+ */
+@ThreadSafe
+@DriverSide
+public final class SourceLocationAwareSchedulingPolicy implements 
SchedulingPolicy {
+  private static final Logger LOG = 
LoggerFactory.getLogger(SourceLocationAwareSchedulingPolicy.class);
+
+  private final ExecutorRegistry executorRegistry;
+  private final RoundRobinSchedulingPolicy roundRobinSchedulingPolicy;
+  private final long scheduleTimeoutMs;
+
+  @Inject
+  private SourceLocationAwareSchedulingPolicy(final ExecutorRegistry 
executorRegistry,
 
 Review comment:
   Please add comment about this constructor.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sanha commented on a change in pull request #1: [NEMO-26] Implement SourceLocationAwareSchedulingPolicy

2018-03-06 Thread GitBox
sanha commented on a change in pull request #1: [NEMO-26] Implement 
SourceLocationAwareSchedulingPolicy
URL: https://github.com/apache/incubator-nemo/pull/1#discussion_r172720282
 
 

 ##
 File path: 
compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/source/BeamBoundedSourceVertex.java
 ##
 @@ -94,5 +98,18 @@ public String propertiesToJSON() {
   }
   return elements;
 }
+
+@Override
+public List getLocations() throws Exception {
+  if (boundedSource instanceof 
HadoopInputFormatIO.HadoopInputFormatBoundedSource) {
+final Field inputSplitField = 
boundedSource.getClass().getDeclaredField("inputSplit");
+inputSplitField.setAccessible(true);
+final InputSplit inputSplit = ((HadoopInputFormatIO.SerializableSplit) 
inputSplitField
+.get(boundedSource)).getSplit();
+return Arrays.asList(inputSplit.getLocations());
+  } else {
+throw new UnsupportedOperationException();
 
 Review comment:
   `return Collections.emptyList();` ?
   (Because the default method return it.)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sanha commented on a change in pull request #1: [NEMO-26] Implement SourceLocationAwareSchedulingPolicy

2018-03-06 Thread GitBox
sanha commented on a change in pull request #1: [NEMO-26] Implement 
SourceLocationAwareSchedulingPolicy
URL: https://github.com/apache/incubator-nemo/pull/1#discussion_r172721711
 
 

 ##
 File path: 
runtime/master/src/main/java/edu/snu/nemo/runtime/master/BlockManagerMaster.java
 ##
 @@ -435,4 +435,44 @@ void registerRequest(final long requestId,
   return locationFuture;
 }
   }
+
+
 
 Review comment:
   Please remove this duplicated newline.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sanha commented on a change in pull request #1: [NEMO-26] Implement SourceLocationAwareSchedulingPolicy

2018-03-06 Thread GitBox
sanha commented on a change in pull request #1: [NEMO-26] Implement 
SourceLocationAwareSchedulingPolicy
URL: https://github.com/apache/incubator-nemo/pull/1#discussion_r172720045
 
 

 ##
 File path: common/src/main/java/edu/snu/nemo/common/ir/Readable.java
 ##
 @@ -28,5 +30,15 @@
* @throws Exception exception while reading data.
*/
   Iterable read() throws Exception;
+
+  /**
+   * Returns the list of locations where this readable resides.
+   * Each location has a complete copy of the readable.
+   * @return List of locations where this readable resides, or an empty list 
if this operation is not supported
+   * @throws Exception exceptions on the way
+   */
+  default List getLocations() throws Exception {
 
 Review comment:
   Why do we have to define this default method?
   Can't we just add abstract method and well describe it's expected function?
   If this method is default, someone can omit to override the method while 
implementing another `Readable` even though it can support this function.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sanha commented on a change in pull request #1: [NEMO-26] Implement SourceLocationAwareSchedulingPolicy

2018-03-06 Thread GitBox
sanha commented on a change in pull request #1: [NEMO-26] Implement 
SourceLocationAwareSchedulingPolicy
URL: https://github.com/apache/incubator-nemo/pull/1#discussion_r172736526
 
 

 ##
 File path: 
runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingPolicy.java
 ##
 @@ -0,0 +1,182 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.runtime.master.scheduler;
+
+import edu.snu.nemo.common.exception.SchedulingException;
+import edu.snu.nemo.common.ir.Readable;
+import 
edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorPlacementProperty;
+import edu.snu.nemo.runtime.common.plan.physical.ScheduledTaskGroup;
+import edu.snu.nemo.runtime.common.state.TaskGroupState;
+import edu.snu.nemo.runtime.master.JobStateManager;
+import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
+import org.apache.reef.annotations.audience.DriverSide;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.ThreadSafe;
+import javax.inject.Inject;
+import java.util.*;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * This policy is same as {@link RoundRobinSchedulingPolicy}, however for 
TaskGroups
+ * with {@link edu.snu.nemo.common.ir.vertex.SourceVertex}, it tries to pick 
one of the executors
+ * where the corresponding data resides.
+ */
+@ThreadSafe
+@DriverSide
+public final class SourceLocationAwareSchedulingPolicy implements 
SchedulingPolicy {
+  private static final Logger LOG = 
LoggerFactory.getLogger(SourceLocationAwareSchedulingPolicy.class);
+
+  private final ExecutorRegistry executorRegistry;
+  private final RoundRobinSchedulingPolicy roundRobinSchedulingPolicy;
+  private final long scheduleTimeoutMs;
+
+  @Inject
+  private SourceLocationAwareSchedulingPolicy(final ExecutorRegistry 
executorRegistry,
+  final RoundRobinSchedulingPolicy 
roundRobinSchedulingPolicy) {
+this.executorRegistry = executorRegistry;
+this.roundRobinSchedulingPolicy = roundRobinSchedulingPolicy;
+this.scheduleTimeoutMs = roundRobinSchedulingPolicy.getScheduleTimeoutMs();
+  }
+
+  @Override
+  public long getScheduleTimeoutMs() {
+return scheduleTimeoutMs;
+  }
+
+  /**
+   * Try to schedule a TaskGroup.
+   * If the TaskGroup has one or more source tasks, this method schedules the 
task group to one of the physical nodes,
+   * chosen from union of set of locations where splits of each source task 
resides.
+   * If the TaskGroup has no source tasks, falls back to {@link 
RoundRobinSchedulingPolicy}.
+   * @param scheduledTaskGroup to schedule.
+   * @param jobStateManager jobStateManager which the TaskGroup belongs to.
+   * @return true if the task group is successfully scheduled, false otherwise.
+   */
+  @Override
+  public synchronized boolean scheduleTaskGroup(final ScheduledTaskGroup 
scheduledTaskGroup,
+final JobStateManager 
jobStateManager) {
+Set sourceLocations = Collections.emptySet();
+try {
+  sourceLocations = 
getSourceLocations(scheduledTaskGroup.getLogicalTaskIdToReadable().values());
+} catch (final Exception e) {
+  LOG.warn(String.format("Exception while trying to get source location 
for %s",
 
 Review comment:
   As I mentioned above, let's just return empty set if the source location is 
not supported.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services