[GitHub] seojangho commented on a change in pull request #2: [NEMO-7] Intra-TaskGroup pipelining

2018-03-06 Thread GitBox
seojangho commented on a change in pull request #2: [NEMO-7] Intra-TaskGroup 
pipelining
URL: https://github.com/apache/incubator-nemo/pull/2#discussion_r172762612
 
 

 ##
 File path: 
compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/transform/ReduceTransform.java
 ##
 @@ -15,41 +15,62 @@
  */
 package edu.snu.nemo.compiler.frontend.spark.transform;
 
-import edu.snu.nemo.common.ir.OutputCollector;
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Output;
+import edu.snu.nemo.common.ir.Pipe;
 import edu.snu.nemo.common.ir.vertex.transform.Transform;
+import edu.snu.nemo.compiler.frontend.spark.core.java.JavaRDD;
 import org.apache.spark.api.java.function.Function2;
 
 import javax.annotation.Nullable;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
 import java.util.Iterator;
 
+
 /**
  * Reduce Transform for Spark.
+ *
  * @param  element type.
  */
 public final class ReduceTransform implements Transform {
   private final Function2 func;
-  private OutputCollector oc;
+  private Pipe pipe;
+  private T result;
+  private String filename;
 
   /**
* Constructor.
* @param func function to run for the reduce transform.
*/
   public ReduceTransform(final Function2 func) {
 this.func = func;
+this.result = null;
+this.filename = filename + JavaRDD.getResultId();
   }
 
   @Override
-  public void prepare(final Context context, final OutputCollector 
outputCollector) {
-this.oc = outputCollector;
+  public void prepare(final Context context, final Pipe p) {
+this.pipe = p;
   }
 
   @Override
-  public void onData(final Iterator elements, final String srcVertexId) {
-final T res = reduceIterator(elements, func);
-if (res == null) { // nothing to be done.
+  public void onData(final Object element) {
 
 Review comment:
   I suggest `public void onData(final T element)`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] seojangho commented on a change in pull request #2: [NEMO-7] Intra-TaskGroup pipelining

2018-03-06 Thread GitBox
seojangho commented on a change in pull request #2: [NEMO-7] Intra-TaskGroup 
pipelining
URL: https://github.com/apache/incubator-nemo/pull/2#discussion_r172762043
 
 

 ##
 File path: 
common/src/main/java/edu/snu/nemo/common/ir/vertex/transform/Transform.java
 ##
 @@ -32,16 +30,15 @@
   /**
* Prepare the transform.
* @param context of the transform.
-   * @param outputCollector that collects outputs.
+   * @param pipe that collects outputs.
*/
-  void prepare(Context context, OutputCollector outputCollector);
+  void prepare(Context context, Pipe pipe);
 
   /**
* On data received.
-   * @param elements data received.
-   * @param srcVertexId sender of the data.
+   * @param element data received.
*/
-  void onData(Iterator elements, String srcVertexId);
+  void onData(Object element);
 
 Review comment:
   We declared input type as ``.
   ```java
   void onData(I element);
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] seojangho commented on a change in pull request #2: [NEMO-7] Intra-TaskGroup pipelining

2018-03-06 Thread GitBox
seojangho commented on a change in pull request #2: [NEMO-7] Intra-TaskGroup 
pipelining
URL: https://github.com/apache/incubator-nemo/pull/2#discussion_r172762337
 
 

 ##
 File path: 
compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/transform/FlattenTransform.java
 ##
 @@ -37,18 +36,18 @@ public FlattenTransform() {
   }
 
   @Override
-  public void prepare(final Context context, final OutputCollector oc) {
-this.outputCollector = oc;
+  public void prepare(final Context context, final Pipe p) {
+this.pipe = p;
   }
 
   @Override
-  public void onData(final Iterator elements, final String srcVertexId) {
-elements.forEachRemaining(collectedElements::add);
+  public void onData(final Object element) {
 
 Review comment:
   Same here. `public void onData(final T element)`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] seojangho commented on a change in pull request #2: [NEMO-7] Intra-TaskGroup pipelining

2018-03-06 Thread GitBox
seojangho commented on a change in pull request #2: [NEMO-7] Intra-TaskGroup 
pipelining
URL: https://github.com/apache/incubator-nemo/pull/2#discussion_r172762740
 
 

 ##
 File path: conf/src/main/java/edu/snu/nemo/conf/JobConf.java
 ##
 @@ -103,7 +103,7 @@
* Path to the JSON file that specifies resource layout.
*/
   @NamedParameter(doc = "Path to the JSON file that specifies resources for 
executors", short_name = "executor_json",
-  default_value = "examples/resources/sample_executor_resources.json")
+  default_value = "../resources/sample_executor_resources.json")
 
 Review comment:
   This change may conflict with `master`. Please follow `master`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] seojangho commented on a change in pull request #2: [NEMO-7] Intra-TaskGroup pipelining

2018-03-06 Thread GitBox
seojangho commented on a change in pull request #2: [NEMO-7] Intra-TaskGroup 
pipelining
URL: https://github.com/apache/incubator-nemo/pull/2#discussion_r172762472
 
 

 ##
 File path: 
compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/transform/GroupByKeyTransform.java
 ##
 @@ -38,22 +38,23 @@ public GroupByKeyTransform() {
   }
 
   @Override
-  public void prepare(final Transform.Context context, final 
OutputCollector> outputCollector) {
-this.oc = outputCollector;
+  public void prepare(final Transform.Context context, final Pipe> p) {
+this.pipe = p;
   }
 
   @Override
-  public void onData(final Iterator> elements, final String 
srcVertexId) {
-elements.forEachRemaining(element -> {
-  keyToValues.putIfAbsent(element._1, new ArrayList<>());
-  keyToValues.get(element._1).add(element._2);
-});
+  public void onData(final Object element) {
+K key = ((Tuple2) element)._1;
 
 Review comment:
   Add `final` modifier?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] seojangho commented on a change in pull request #2: [NEMO-7] Intra-TaskGroup pipelining

2018-03-06 Thread GitBox
seojangho commented on a change in pull request #2: [NEMO-7] Intra-TaskGroup 
pipelining
URL: https://github.com/apache/incubator-nemo/pull/2#discussion_r172762523
 
 

 ##
 File path: 
compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/transform/ReduceByKeyTransform.java
 ##
 @@ -42,24 +46,29 @@ public ReduceByKeyTransform(final Function2 func) 
{
   }
 
   @Override
-  public void prepare(final Context context, final OutputCollector> outputCollector) {
-this.oc = outputCollector;
+  public void prepare(final Context context, final Pipe> p) {
+this.pipe = p;
   }
 
   @Override
-  public void onData(final Iterator> elements, final String 
srcVertexId) {
-elements.forEachRemaining(element -> {
-  keyToValues.putIfAbsent(element._1, new ArrayList<>());
-  keyToValues.get(element._1).add(element._2);
-});
+  public void onData(final Object element) {
+K key = ((Tuple2) element)._1;
 
 Review comment:
   Add `final` modifier?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] seojangho commented on issue #1: [NEMO-26] Implement SourceLocationAwareSchedulingPolicy

2018-03-06 Thread GitBox
seojangho commented on issue #1: [NEMO-26] Implement 
SourceLocationAwareSchedulingPolicy
URL: https://github.com/apache/incubator-nemo/pull/1#issuecomment-371051613
 
 
   @sanha Thanks for the review. Please check the updates.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] seojangho commented on a change in pull request #1: [NEMO-26] Implement SourceLocationAwareSchedulingPolicy

2018-03-06 Thread GitBox
seojangho commented on a change in pull request #1: [NEMO-26] Implement 
SourceLocationAwareSchedulingPolicy
URL: https://github.com/apache/incubator-nemo/pull/1#discussion_r172759613
 
 

 ##
 File path: tests/src/test/java/edu/snu/nemo/tests/runtime/RuntimeTestUtil.java
 ##
 @@ -15,123 +15,100 @@
  */
 package edu.snu.nemo.tests.runtime;
 
-import edu.snu.nemo.common.dag.DAG;
-import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
-import edu.snu.nemo.runtime.common.plan.RuntimeEdge;
 import edu.snu.nemo.runtime.common.plan.physical.*;
-import edu.snu.nemo.runtime.common.state.BlockState;
 import edu.snu.nemo.runtime.common.state.StageState;
 import edu.snu.nemo.runtime.common.state.TaskGroupState;
 import edu.snu.nemo.runtime.master.JobStateManager;
-import edu.snu.nemo.runtime.master.BlockManagerMaster;
-import edu.snu.nemo.runtime.master.resource.ContainerManager;
+import edu.snu.nemo.runtime.master.scheduler.ExecutorRegistry;
 import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
 import edu.snu.nemo.runtime.master.scheduler.PendingTaskGroupQueue;
 import edu.snu.nemo.runtime.master.scheduler.Scheduler;
 import edu.snu.nemo.runtime.master.scheduler.SchedulingPolicy;
 import org.apache.beam.sdk.values.KV;
 
 import java.util.*;
-import java.util.concurrent.BlockingDeque;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingDeque;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
 /**
  * Utility class for runtime unit tests.
  */
 public final class RuntimeTestUtil {
-  private static ExecutorService completionEventThreadPool;
-  private static BlockingDeque eventRunnableQueue;
-  private static boolean testComplete;
-
-  public static void initialize() {
-testComplete = false;
-completionEventThreadPool = Executors.newFixedThreadPool(5);
-
-eventRunnableQueue = new LinkedBlockingDeque<>();
-
-for (int i = 0; i < 5; i++) {
-  completionEventThreadPool.execute(() -> {
-while (!testComplete || !eventRunnableQueue.isEmpty()) {
-  try {
-final Runnable event = eventRunnableQueue.takeFirst();
-event.run();
-  } catch (InterruptedException e) {
-e.printStackTrace();
-  }
-}
-  });
-}
-completionEventThreadPool.shutdown();
-  }
-
-  public static void cleanup() {
-testComplete = true;
-  }
-
   /**
* Sends a stage's completion event to scheduler, with all its task groups 
marked as complete as well.
* This replaces executor's task group completion messages for testing 
purposes.
* @param jobStateManager for the submitted job.
* @param scheduler for the submitted job.
-   * @param containerManager used for testing purposes.
+   * @param executorRegistry provides executor representers
* @param physicalStage for which the states should be marked as complete.
*/
   public static void sendStageCompletionEventToScheduler(final JobStateManager 
jobStateManager,
  final Scheduler 
scheduler,
- final 
ContainerManager containerManager,
+ final 
ExecutorRegistry executorRegistry,
  final PhysicalStage 
physicalStage,
  final int attemptIdx) 
{
-eventRunnableQueue.add(new Runnable() {
-  @Override
-  public void run() {
-while 
(jobStateManager.getStageState(physicalStage.getId()).getStateMachine().getCurrentState()
-== StageState.State.EXECUTING) {
-  physicalStage.getTaskGroupIds().forEach(taskGroupId -> {
-if 
(jobStateManager.getTaskGroupState(taskGroupId).getStateMachine().getCurrentState()
-== TaskGroupState.State.EXECUTING) {
-  sendTaskGroupStateEventToScheduler(scheduler, containerManager, 
taskGroupId,
-  TaskGroupState.State.COMPLETE, attemptIdx, null);
-}
-  });
-}
+// Loop until the stage completes.
+while (true) {
+  try {
+Thread.sleep(100);
 
 Review comment:
   No worries.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] seojangho commented on a change in pull request #1: [NEMO-26] Implement SourceLocationAwareSchedulingPolicy

2018-03-06 Thread GitBox
seojangho commented on a change in pull request #1: [NEMO-26] Implement 
SourceLocationAwareSchedulingPolicy
URL: https://github.com/apache/incubator-nemo/pull/1#discussion_r172759560
 
 

 ##
 File path: 
runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingPolicy.java
 ##
 @@ -0,0 +1,182 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.runtime.master.scheduler;
+
+import edu.snu.nemo.common.exception.SchedulingException;
+import edu.snu.nemo.common.ir.Readable;
+import 
edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorPlacementProperty;
+import edu.snu.nemo.runtime.common.plan.physical.ScheduledTaskGroup;
+import edu.snu.nemo.runtime.common.state.TaskGroupState;
+import edu.snu.nemo.runtime.master.JobStateManager;
+import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
+import org.apache.reef.annotations.audience.DriverSide;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.ThreadSafe;
+import javax.inject.Inject;
+import java.util.*;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * This policy is same as {@link RoundRobinSchedulingPolicy}, however for 
TaskGroups
+ * with {@link edu.snu.nemo.common.ir.vertex.SourceVertex}, it tries to pick 
one of the executors
+ * where the corresponding data resides.
+ */
+@ThreadSafe
+@DriverSide
+public final class SourceLocationAwareSchedulingPolicy implements 
SchedulingPolicy {
+  private static final Logger LOG = 
LoggerFactory.getLogger(SourceLocationAwareSchedulingPolicy.class);
+
+  private final ExecutorRegistry executorRegistry;
+  private final RoundRobinSchedulingPolicy roundRobinSchedulingPolicy;
+  private final long scheduleTimeoutMs;
+
+  @Inject
+  private SourceLocationAwareSchedulingPolicy(final ExecutorRegistry 
executorRegistry,
+  final RoundRobinSchedulingPolicy 
roundRobinSchedulingPolicy) {
+this.executorRegistry = executorRegistry;
+this.roundRobinSchedulingPolicy = roundRobinSchedulingPolicy;
+this.scheduleTimeoutMs = roundRobinSchedulingPolicy.getScheduleTimeoutMs();
+  }
+
+  @Override
+  public long getScheduleTimeoutMs() {
+return scheduleTimeoutMs;
+  }
+
+  /**
+   * Try to schedule a TaskGroup.
+   * If the TaskGroup has one or more source tasks, this method schedules the 
task group to one of the physical nodes,
+   * chosen from union of set of locations where splits of each source task 
resides.
+   * If the TaskGroup has no source tasks, falls back to {@link 
RoundRobinSchedulingPolicy}.
+   * @param scheduledTaskGroup to schedule.
+   * @param jobStateManager jobStateManager which the TaskGroup belongs to.
+   * @return true if the task group is successfully scheduled, false otherwise.
+   */
+  @Override
+  public synchronized boolean scheduleTaskGroup(final ScheduledTaskGroup 
scheduledTaskGroup,
+final JobStateManager 
jobStateManager) {
+Set sourceLocations = Collections.emptySet();
+try {
+  sourceLocations = 
getSourceLocations(scheduledTaskGroup.getLogicalTaskIdToReadable().values());
+} catch (final Exception e) {
+  LOG.warn(String.format("Exception while trying to get source location 
for %s",
+  scheduledTaskGroup.getTaskGroupId()), e);
+}
+if (sourceLocations.size() == 0) {
+  // No source location information found, fall back to the 
RoundRobinSchedulingPolicy
+  return roundRobinSchedulingPolicy.scheduleTaskGroup(scheduledTaskGroup, 
jobStateManager);
+}
+
+if (scheduleToLocalNode(scheduledTaskGroup, jobStateManager, 
sourceLocations)) {
+  return true;
+} else {
+  try {
+Thread.sleep(scheduleTimeoutMs);
 
 Review comment:
   Yup. Thanks.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] seojangho commented on a change in pull request #1: [NEMO-26] Implement SourceLocationAwareSchedulingPolicy

2018-03-06 Thread GitBox
seojangho commented on a change in pull request #1: [NEMO-26] Implement 
SourceLocationAwareSchedulingPolicy
URL: https://github.com/apache/incubator-nemo/pull/1#discussion_r172758202
 
 

 ##
 File path: 
runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingPolicy.java
 ##
 @@ -0,0 +1,182 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.runtime.master.scheduler;
+
+import edu.snu.nemo.common.exception.SchedulingException;
+import edu.snu.nemo.common.ir.Readable;
+import 
edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorPlacementProperty;
+import edu.snu.nemo.runtime.common.plan.physical.ScheduledTaskGroup;
+import edu.snu.nemo.runtime.common.state.TaskGroupState;
+import edu.snu.nemo.runtime.master.JobStateManager;
+import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
+import org.apache.reef.annotations.audience.DriverSide;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.ThreadSafe;
+import javax.inject.Inject;
+import java.util.*;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * This policy is same as {@link RoundRobinSchedulingPolicy}, however for 
TaskGroups
+ * with {@link edu.snu.nemo.common.ir.vertex.SourceVertex}, it tries to pick 
one of the executors
+ * where the corresponding data resides.
+ */
+@ThreadSafe
+@DriverSide
+public final class SourceLocationAwareSchedulingPolicy implements 
SchedulingPolicy {
+  private static final Logger LOG = 
LoggerFactory.getLogger(SourceLocationAwareSchedulingPolicy.class);
+
+  private final ExecutorRegistry executorRegistry;
+  private final RoundRobinSchedulingPolicy roundRobinSchedulingPolicy;
+  private final long scheduleTimeoutMs;
+
+  @Inject
+  private SourceLocationAwareSchedulingPolicy(final ExecutorRegistry 
executorRegistry,
+  final RoundRobinSchedulingPolicy 
roundRobinSchedulingPolicy) {
+this.executorRegistry = executorRegistry;
+this.roundRobinSchedulingPolicy = roundRobinSchedulingPolicy;
+this.scheduleTimeoutMs = roundRobinSchedulingPolicy.getScheduleTimeoutMs();
+  }
+
+  @Override
+  public long getScheduleTimeoutMs() {
+return scheduleTimeoutMs;
+  }
+
+  /**
+   * Try to schedule a TaskGroup.
+   * If the TaskGroup has one or more source tasks, this method schedules the 
task group to one of the physical nodes,
+   * chosen from union of set of locations where splits of each source task 
resides.
+   * If the TaskGroup has no source tasks, falls back to {@link 
RoundRobinSchedulingPolicy}.
+   * @param scheduledTaskGroup to schedule.
+   * @param jobStateManager jobStateManager which the TaskGroup belongs to.
+   * @return true if the task group is successfully scheduled, false otherwise.
+   */
+  @Override
+  public synchronized boolean scheduleTaskGroup(final ScheduledTaskGroup 
scheduledTaskGroup,
+final JobStateManager 
jobStateManager) {
+Set sourceLocations = Collections.emptySet();
+try {
+  sourceLocations = 
getSourceLocations(scheduledTaskGroup.getLogicalTaskIdToReadable().values());
+} catch (final Exception e) {
+  LOG.warn(String.format("Exception while trying to get source location 
for %s",
 
 Review comment:
   Perhaps ensuring hard fail is desirable?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] seojangho commented on a change in pull request #1: [NEMO-26] Implement SourceLocationAwareSchedulingPolicy

2018-03-06 Thread GitBox
seojangho commented on a change in pull request #1: [NEMO-26] Implement 
SourceLocationAwareSchedulingPolicy
URL: https://github.com/apache/incubator-nemo/pull/1#discussion_r172755632
 
 

 ##
 File path: 
runtime/master/src/main/java/edu/snu/nemo/runtime/master/RuntimeMaster.java
 ##
 @@ -274,16 +271,12 @@ private void handleControlMessage(final 
ControlMessage.Message message) {
   convertFailureCause(taskGroupStateChangedMsg.getFailureCause()));
   break;
 case ExecutorFailed:
+  // Executor failed due to user code.
   final ControlMessage.ExecutorFailedMsg executorFailedMsg = 
message.getExecutorFailedMsg();
   final String failedExecutorId = executorFailedMsg.getExecutorId();
   final Exception exception = 
SerializationUtils.deserialize(executorFailedMsg.getException().toByteArray());
   LOG.error(failedExecutorId + " failed, Stack Trace: ", exception);
-  containerManager.onExecutorRemoved(failedExecutorId);
   throw new RuntimeException(exception);
 
 Review comment:
   Friend cc to @johnyangk :D


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] seojangho commented on a change in pull request #1: [NEMO-26] Implement SourceLocationAwareSchedulingPolicy

2018-03-06 Thread GitBox
seojangho commented on a change in pull request #1: [NEMO-26] Implement 
SourceLocationAwareSchedulingPolicy
URL: https://github.com/apache/incubator-nemo/pull/1#discussion_r172754621
 
 

 ##
 File path: 
compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/source/BeamBoundedSourceVertex.java
 ##
 @@ -94,5 +98,18 @@ public String propertiesToJSON() {
   }
   return elements;
 }
+
+@Override
+public List getLocations() throws Exception {
+  if (boundedSource instanceof 
HadoopInputFormatIO.HadoopInputFormatBoundedSource) {
+final Field inputSplitField = 
boundedSource.getClass().getDeclaredField("inputSplit");
+inputSplitField.setAccessible(true);
+final InputSplit inputSplit = ((HadoopInputFormatIO.SerializableSplit) 
inputSplitField
+.get(boundedSource)).getSplit();
+return Arrays.asList(inputSplit.getLocations());
+  } else {
+throw new UnsupportedOperationException();
 
 Review comment:
   Modified method description in interface-level to throw 
`UnsupportedOperationException` in this case.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] seojangho commented on a change in pull request #1: [NEMO-26] Implement SourceLocationAwareSchedulingPolicy

2018-03-06 Thread GitBox
seojangho commented on a change in pull request #1: [NEMO-26] Implement 
SourceLocationAwareSchedulingPolicy
URL: https://github.com/apache/incubator-nemo/pull/1#discussion_r172753133
 
 

 ##
 File path: common/src/main/java/edu/snu/nemo/common/ir/Readable.java
 ##
 @@ -28,5 +30,15 @@
* @throws Exception exception while reading data.
*/
   Iterable read() throws Exception;
+
+  /**
+   * Returns the list of locations where this readable resides.
+   * Each location has a complete copy of the readable.
+   * @return List of locations where this readable resides, or an empty list 
if this operation is not supported
+   * @throws Exception exceptions on the way
+   */
+  default List getLocations() throws Exception {
 
 Review comment:
   It will introduce some redundant lines in our current codebase, but with 
long-term view, you have the point.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] johnyangk commented on a change in pull request #1: [NEMO-26] Implement SourceLocationAwareSchedulingPolicy

2018-03-06 Thread GitBox
johnyangk commented on a change in pull request #1: [NEMO-26] Implement 
SourceLocationAwareSchedulingPolicy
URL: https://github.com/apache/incubator-nemo/pull/1#discussion_r172750797
 
 

 ##
 File path: tests/src/test/java/edu/snu/nemo/tests/runtime/RuntimeTestUtil.java
 ##
 @@ -15,123 +15,100 @@
  */
 package edu.snu.nemo.tests.runtime;
 
-import edu.snu.nemo.common.dag.DAG;
-import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
-import edu.snu.nemo.runtime.common.plan.RuntimeEdge;
 import edu.snu.nemo.runtime.common.plan.physical.*;
-import edu.snu.nemo.runtime.common.state.BlockState;
 import edu.snu.nemo.runtime.common.state.StageState;
 import edu.snu.nemo.runtime.common.state.TaskGroupState;
 import edu.snu.nemo.runtime.master.JobStateManager;
-import edu.snu.nemo.runtime.master.BlockManagerMaster;
-import edu.snu.nemo.runtime.master.resource.ContainerManager;
+import edu.snu.nemo.runtime.master.scheduler.ExecutorRegistry;
 import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
 import edu.snu.nemo.runtime.master.scheduler.PendingTaskGroupQueue;
 import edu.snu.nemo.runtime.master.scheduler.Scheduler;
 import edu.snu.nemo.runtime.master.scheduler.SchedulingPolicy;
 import org.apache.beam.sdk.values.KV;
 
 import java.util.*;
-import java.util.concurrent.BlockingDeque;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingDeque;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
 /**
  * Utility class for runtime unit tests.
  */
 public final class RuntimeTestUtil {
-  private static ExecutorService completionEventThreadPool;
-  private static BlockingDeque eventRunnableQueue;
-  private static boolean testComplete;
-
-  public static void initialize() {
-testComplete = false;
-completionEventThreadPool = Executors.newFixedThreadPool(5);
-
-eventRunnableQueue = new LinkedBlockingDeque<>();
-
-for (int i = 0; i < 5; i++) {
-  completionEventThreadPool.execute(() -> {
-while (!testComplete || !eventRunnableQueue.isEmpty()) {
-  try {
-final Runnable event = eventRunnableQueue.takeFirst();
-event.run();
-  } catch (InterruptedException e) {
-e.printStackTrace();
-  }
-}
-  });
-}
-completionEventThreadPool.shutdown();
-  }
-
-  public static void cleanup() {
-testComplete = true;
-  }
-
   /**
* Sends a stage's completion event to scheduler, with all its task groups 
marked as complete as well.
* This replaces executor's task group completion messages for testing 
purposes.
* @param jobStateManager for the submitted job.
* @param scheduler for the submitted job.
-   * @param containerManager used for testing purposes.
+   * @param executorRegistry provides executor representers
* @param physicalStage for which the states should be marked as complete.
*/
   public static void sendStageCompletionEventToScheduler(final JobStateManager 
jobStateManager,
  final Scheduler 
scheduler,
- final 
ContainerManager containerManager,
+ final 
ExecutorRegistry executorRegistry,
  final PhysicalStage 
physicalStage,
  final int attemptIdx) 
{
-eventRunnableQueue.add(new Runnable() {
-  @Override
-  public void run() {
-while 
(jobStateManager.getStageState(physicalStage.getId()).getStateMachine().getCurrentState()
-== StageState.State.EXECUTING) {
-  physicalStage.getTaskGroupIds().forEach(taskGroupId -> {
-if 
(jobStateManager.getTaskGroupState(taskGroupId).getStateMachine().getCurrentState()
-== TaskGroupState.State.EXECUTING) {
-  sendTaskGroupStateEventToScheduler(scheduler, containerManager, 
taskGroupId,
-  TaskGroupState.State.COMPLETE, attemptIdx, null);
-}
-  });
-}
+// Loop until the stage completes.
+while (true) {
+  try {
+Thread.sleep(100);
 
 Review comment:
   @seojangho Please remove L52-57. I wrote them to test something and forgot 
to remove them. Sorry about this.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sanha commented on a change in pull request #1: [NEMO-26] Implement SourceLocationAwareSchedulingPolicy

2018-03-06 Thread GitBox
sanha commented on a change in pull request #1: [NEMO-26] Implement 
SourceLocationAwareSchedulingPolicy
URL: https://github.com/apache/incubator-nemo/pull/1#discussion_r172728122
 
 

 ##
 File path: 
runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobScheduler.java
 ##
 @@ -526,8 +403,133 @@ private PhysicalStage getStageById(final String stageId) 
{
 throw new RuntimeException(new Throwable("This taskGroupId does not exist 
in the plan"));
   }
 
-  @Override
-  public void terminate() {
-// nothing to do yet.
+  /**
+   * Action after task group execution has been completed, not after it has 
been put on hold.
+   *
+   * @param executorId  the ID of the executor.
+   * @param taskGroupId the ID pf the task group completed.
+   */
+  private void onTaskGroupExecutionComplete(final String executorId,
+final String taskGroupId) {
+onTaskGroupExecutionComplete(executorId, taskGroupId, false);
+  }
+
+  /**
+   * Action after task group execution has been completed.
+   * @param executorId id of the executor.
+   * @param taskGroupId the ID of the task group completed.
+   * @param isOnHoldToComplete whether or not if it is switched to complete 
after it has been on hold.
+   */
+  private void onTaskGroupExecutionComplete(final String executorId,
+final String taskGroupId,
+final Boolean isOnHoldToComplete) {
+LOG.debug("{} completed in {}", new Object[]{taskGroupId, executorId});
+if (!isOnHoldToComplete) {
+  schedulingPolicy.onTaskGroupExecutionComplete(executorId, taskGroupId);
+}
+
+final String stageIdForTaskGroupUponCompletion = 
RuntimeIdGenerator.getStageIdFromTaskGroupId(taskGroupId);
+if 
(jobStateManager.checkStageCompletion(stageIdForTaskGroupUponCompletion)) {
+  // if the stage this task group belongs to is complete,
+  if (!jobStateManager.checkJobTermination()) { // and if the job is not 
yet complete or failed,
+scheduleNextStage(stageIdForTaskGroupUponCompletion);
+  }
+}
+  }
+
+  /**
+   * Action for after task group execution is put on hold.
+   * @param executorId the ID of the executor.
+   * @param taskGroupIdthe ID of the task group.
+   * @param taskPutOnHold  the ID of task that is put on hold.
+   */
+  private void onTaskGroupExecutionOnHold(final String executorId,
+  final String taskGroupId,
+  final String taskPutOnHold) {
+LOG.info("{} put on hold in {}", new Object[]{taskGroupId, executorId});
+schedulingPolicy.onTaskGroupExecutionComplete(executorId, taskGroupId);
+final String stageIdForTaskGroupUponCompletion = 
RuntimeIdGenerator.getStageIdFromTaskGroupId(taskGroupId);
+
+final boolean stageComplete =
+
jobStateManager.checkStageCompletion(stageIdForTaskGroupUponCompletion);
+
+if (stageComplete) {
+  // get optimization vertex from the task.
+  final MetricCollectionBarrierVertex metricCollectionBarrierVertex =
+  getTaskGroupDagById(taskGroupId).getVertices().stream() // get tasks 
list
+  .filter(task -> task.getId().equals(taskPutOnHold)) // find it
+  .map(physicalPlan::getIRVertexOf) // get the corresponding 
IRVertex, the MetricCollectionBarrierVertex
+  .filter(irVertex -> irVertex instanceof 
MetricCollectionBarrierVertex)
+  .distinct()
+  .map(irVertex -> (MetricCollectionBarrierVertex) irVertex) // 
convert types
+  .findFirst().orElseThrow(() -> new 
RuntimeException(ON_HOLD.name() // get it
+  + " called with failed task ids by some other task than "
+  + MetricCollectionBarrierTask.class.getSimpleName()));
+  // and we will use this vertex to perform metric collection and dynamic 
optimization.
+
+  pubSubEventHandlerWrapper.getPubSubEventHandler().onNext(
+  new DynamicOptimizationEvent(physicalPlan, 
metricCollectionBarrierVertex, Pair.of(executorId, taskGroupId)));
+} else {
+  onTaskGroupExecutionComplete(executorId, taskGroupId, true);
+}
+  }
+
+  private void onTaskGroupExecutionFailedRecoverable(final String executorId, 
final String taskGroupId,
 
 Review comment:
   Please add a comment about this method.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sanha commented on a change in pull request #1: [NEMO-26] Implement SourceLocationAwareSchedulingPolicy

2018-03-06 Thread GitBox
sanha commented on a change in pull request #1: [NEMO-26] Implement 
SourceLocationAwareSchedulingPolicy
URL: https://github.com/apache/incubator-nemo/pull/1#discussion_r172735237
 
 

 ##
 File path: 
runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulerRunner.java
 ##
 @@ -53,7 +55,7 @@ public SchedulerRunner(final SchedulingPolicy 
schedulingPolicy,
 this.isTerminated = false;
   }
 
-  public synchronized void scheduleJob(final JobStateManager jobStateManager) {
 
 Review comment:
   Please add comments about these methods


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sanha commented on a change in pull request #1: [NEMO-26] Implement SourceLocationAwareSchedulingPolicy

2018-03-06 Thread GitBox
sanha commented on a change in pull request #1: [NEMO-26] Implement 
SourceLocationAwareSchedulingPolicy
URL: https://github.com/apache/incubator-nemo/pull/1#discussion_r172723450
 
 

 ##
 File path: 
runtime/master/src/main/java/edu/snu/nemo/runtime/master/RuntimeMaster.java
 ##
 @@ -91,21 +87,17 @@
 
   @Inject
   public RuntimeMaster(final Scheduler scheduler,
-   final SchedulerRunner schedulerRunner,
-   final PendingTaskGroupQueue pendingTaskGroupQueue,
final ContainerManager containerManager,
final BlockManagerMaster blockManagerMaster,
final MetricMessageHandler metricMessageHandler,
final MessageEnvironment masterMessageEnvironment,
@Parameter(JobConf.DAGDirectory.class) final String 
dagDirectory) {
-// We would like to keep the master event thread pool single threaded
+// We would like to use a single thread for runtime master operations
 // since the processing logic in master takes a very short amount of time
 // compared to the job completion times of executed jobs
 // and keeping it single threaded removes the complexity of multi-thread 
synchronization.
-this.masterControlEventExecutor = Executors.newSingleThreadExecutor();
+this.runtimeMasterThread = Executors.newSingleThreadExecutor();
 
 Review comment:
   Why don't we name this thread? (Please check 
[here](https://stackoverflow.com/questions/6113746/naming-threads-and-thread-pools-of-executorservice))


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sanha commented on a change in pull request #1: [NEMO-26] Implement SourceLocationAwareSchedulingPolicy

2018-03-06 Thread GitBox
sanha commented on a change in pull request #1: [NEMO-26] Implement 
SourceLocationAwareSchedulingPolicy
URL: https://github.com/apache/incubator-nemo/pull/1#discussion_r172748732
 
 

 ##
 File path: 
runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingPolicy.java
 ##
 @@ -0,0 +1,182 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.runtime.master.scheduler;
+
+import edu.snu.nemo.common.exception.SchedulingException;
+import edu.snu.nemo.common.ir.Readable;
+import 
edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorPlacementProperty;
+import edu.snu.nemo.runtime.common.plan.physical.ScheduledTaskGroup;
+import edu.snu.nemo.runtime.common.state.TaskGroupState;
+import edu.snu.nemo.runtime.master.JobStateManager;
+import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
+import org.apache.reef.annotations.audience.DriverSide;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.ThreadSafe;
+import javax.inject.Inject;
+import java.util.*;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * This policy is same as {@link RoundRobinSchedulingPolicy}, however for 
TaskGroups
+ * with {@link edu.snu.nemo.common.ir.vertex.SourceVertex}, it tries to pick 
one of the executors
+ * where the corresponding data resides.
+ */
+@ThreadSafe
+@DriverSide
+public final class SourceLocationAwareSchedulingPolicy implements 
SchedulingPolicy {
+  private static final Logger LOG = 
LoggerFactory.getLogger(SourceLocationAwareSchedulingPolicy.class);
+
+  private final ExecutorRegistry executorRegistry;
+  private final RoundRobinSchedulingPolicy roundRobinSchedulingPolicy;
+  private final long scheduleTimeoutMs;
+
+  @Inject
+  private SourceLocationAwareSchedulingPolicy(final ExecutorRegistry 
executorRegistry,
+  final RoundRobinSchedulingPolicy 
roundRobinSchedulingPolicy) {
+this.executorRegistry = executorRegistry;
+this.roundRobinSchedulingPolicy = roundRobinSchedulingPolicy;
+this.scheduleTimeoutMs = roundRobinSchedulingPolicy.getScheduleTimeoutMs();
+  }
+
+  @Override
+  public long getScheduleTimeoutMs() {
+return scheduleTimeoutMs;
+  }
+
+  /**
+   * Try to schedule a TaskGroup.
+   * If the TaskGroup has one or more source tasks, this method schedules the 
task group to one of the physical nodes,
+   * chosen from union of set of locations where splits of each source task 
resides.
+   * If the TaskGroup has no source tasks, falls back to {@link 
RoundRobinSchedulingPolicy}.
+   * @param scheduledTaskGroup to schedule.
+   * @param jobStateManager jobStateManager which the TaskGroup belongs to.
+   * @return true if the task group is successfully scheduled, false otherwise.
+   */
+  @Override
+  public synchronized boolean scheduleTaskGroup(final ScheduledTaskGroup 
scheduledTaskGroup,
+final JobStateManager 
jobStateManager) {
+Set sourceLocations = Collections.emptySet();
+try {
+  sourceLocations = 
getSourceLocations(scheduledTaskGroup.getLogicalTaskIdToReadable().values());
+} catch (final Exception e) {
+  LOG.warn(String.format("Exception while trying to get source location 
for %s",
+  scheduledTaskGroup.getTaskGroupId()), e);
+}
+if (sourceLocations.size() == 0) {
+  // No source location information found, fall back to the 
RoundRobinSchedulingPolicy
+  return roundRobinSchedulingPolicy.scheduleTaskGroup(scheduledTaskGroup, 
jobStateManager);
+}
+
+if (scheduleToLocalNode(scheduledTaskGroup, jobStateManager, 
sourceLocations)) {
+  return true;
+} else {
+  try {
+Thread.sleep(scheduleTimeoutMs);
 
 Review comment:
   Can we wake up this thread like 
`RoundRobinSchedulingPolicy#signalPossiblyWaitingScheduler`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sanha commented on a change in pull request #1: [NEMO-26] Implement SourceLocationAwareSchedulingPolicy

2018-03-06 Thread GitBox
sanha commented on a change in pull request #1: [NEMO-26] Implement 
SourceLocationAwareSchedulingPolicy
URL: https://github.com/apache/incubator-nemo/pull/1#discussion_r172721727
 
 

 ##
 File path: 
runtime/master/src/main/java/edu/snu/nemo/runtime/master/BlockManagerMaster.java
 ##
 @@ -435,4 +435,44 @@ void registerRequest(final long requestId,
   return locationFuture;
 }
   }
+
+
+  public static BlockState.State convertBlockState(final 
ControlMessage.BlockStateFromExecutor state) {
 
 Review comment:
   Please add comment about this method.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sanha commented on a change in pull request #1: [NEMO-26] Implement SourceLocationAwareSchedulingPolicy

2018-03-06 Thread GitBox
sanha commented on a change in pull request #1: [NEMO-26] Implement 
SourceLocationAwareSchedulingPolicy
URL: https://github.com/apache/incubator-nemo/pull/1#discussion_r172725494
 
 

 ##
 File path: 
runtime/master/src/main/java/edu/snu/nemo/runtime/master/RuntimeMaster.java
 ##
 @@ -311,7 +304,7 @@ private void handleControlMessage(final 
ControlMessage.Message message) {
* @param srcVertexId   the ID of the source vertex.
* @param blockId   the ID of the block.
*/
-  public void accumulateBarrierMetric(final List blockSizeInfo,
+  private void accumulateBarrierMetric(final List blockSizeInfo,
   final String srcVertexId,
 
 Review comment:
   Please check the indent


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sanha commented on a change in pull request #1: [NEMO-26] Implement SourceLocationAwareSchedulingPolicy

2018-03-06 Thread GitBox
sanha commented on a change in pull request #1: [NEMO-26] Implement 
SourceLocationAwareSchedulingPolicy
URL: https://github.com/apache/incubator-nemo/pull/1#discussion_r172726613
 
 

 ##
 File path: 
runtime/master/src/main/java/edu/snu/nemo/runtime/master/resource/ExecutorRepresenter.java
 ##
 @@ -46,12 +51,14 @@
   private final MessageSender messageSender;
   private final ActiveContext activeContext;
   private final ExecutorService serializationExecutorService;
+  private final String nodeName;
 
   public ExecutorRepresenter(final String executorId,
 
 Review comment:
   Please add comment about this constructor and other methods.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sanha commented on a change in pull request #1: [NEMO-26] Implement SourceLocationAwareSchedulingPolicy

2018-03-06 Thread GitBox
sanha commented on a change in pull request #1: [NEMO-26] Implement 
SourceLocationAwareSchedulingPolicy
URL: https://github.com/apache/incubator-nemo/pull/1#discussion_r172736279
 
 

 ##
 File path: 
runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingPolicy.java
 ##
 @@ -0,0 +1,182 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.runtime.master.scheduler;
+
+import edu.snu.nemo.common.exception.SchedulingException;
+import edu.snu.nemo.common.ir.Readable;
+import 
edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorPlacementProperty;
+import edu.snu.nemo.runtime.common.plan.physical.ScheduledTaskGroup;
+import edu.snu.nemo.runtime.common.state.TaskGroupState;
+import edu.snu.nemo.runtime.master.JobStateManager;
+import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
+import org.apache.reef.annotations.audience.DriverSide;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.ThreadSafe;
+import javax.inject.Inject;
+import java.util.*;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * This policy is same as {@link RoundRobinSchedulingPolicy}, however for 
TaskGroups
+ * with {@link edu.snu.nemo.common.ir.vertex.SourceVertex}, it tries to pick 
one of the executors
+ * where the corresponding data resides.
+ */
+@ThreadSafe
+@DriverSide
+public final class SourceLocationAwareSchedulingPolicy implements 
SchedulingPolicy {
+  private static final Logger LOG = 
LoggerFactory.getLogger(SourceLocationAwareSchedulingPolicy.class);
+
+  private final ExecutorRegistry executorRegistry;
+  private final RoundRobinSchedulingPolicy roundRobinSchedulingPolicy;
+  private final long scheduleTimeoutMs;
+
+  @Inject
+  private SourceLocationAwareSchedulingPolicy(final ExecutorRegistry 
executorRegistry,
 
 Review comment:
   Please add comment about this constructor.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sanha commented on a change in pull request #1: [NEMO-26] Implement SourceLocationAwareSchedulingPolicy

2018-03-06 Thread GitBox
sanha commented on a change in pull request #1: [NEMO-26] Implement 
SourceLocationAwareSchedulingPolicy
URL: https://github.com/apache/incubator-nemo/pull/1#discussion_r172720282
 
 

 ##
 File path: 
compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/source/BeamBoundedSourceVertex.java
 ##
 @@ -94,5 +98,18 @@ public String propertiesToJSON() {
   }
   return elements;
 }
+
+@Override
+public List getLocations() throws Exception {
+  if (boundedSource instanceof 
HadoopInputFormatIO.HadoopInputFormatBoundedSource) {
+final Field inputSplitField = 
boundedSource.getClass().getDeclaredField("inputSplit");
+inputSplitField.setAccessible(true);
+final InputSplit inputSplit = ((HadoopInputFormatIO.SerializableSplit) 
inputSplitField
+.get(boundedSource)).getSplit();
+return Arrays.asList(inputSplit.getLocations());
+  } else {
+throw new UnsupportedOperationException();
 
 Review comment:
   `return Collections.emptyList();` ?
   (Because the default method return it.)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sanha commented on a change in pull request #1: [NEMO-26] Implement SourceLocationAwareSchedulingPolicy

2018-03-06 Thread GitBox
sanha commented on a change in pull request #1: [NEMO-26] Implement 
SourceLocationAwareSchedulingPolicy
URL: https://github.com/apache/incubator-nemo/pull/1#discussion_r172721711
 
 

 ##
 File path: 
runtime/master/src/main/java/edu/snu/nemo/runtime/master/BlockManagerMaster.java
 ##
 @@ -435,4 +435,44 @@ void registerRequest(final long requestId,
   return locationFuture;
 }
   }
+
+
 
 Review comment:
   Please remove this duplicated newline.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sanha commented on a change in pull request #1: [NEMO-26] Implement SourceLocationAwareSchedulingPolicy

2018-03-06 Thread GitBox
sanha commented on a change in pull request #1: [NEMO-26] Implement 
SourceLocationAwareSchedulingPolicy
URL: https://github.com/apache/incubator-nemo/pull/1#discussion_r172736526
 
 

 ##
 File path: 
runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingPolicy.java
 ##
 @@ -0,0 +1,182 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.runtime.master.scheduler;
+
+import edu.snu.nemo.common.exception.SchedulingException;
+import edu.snu.nemo.common.ir.Readable;
+import 
edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorPlacementProperty;
+import edu.snu.nemo.runtime.common.plan.physical.ScheduledTaskGroup;
+import edu.snu.nemo.runtime.common.state.TaskGroupState;
+import edu.snu.nemo.runtime.master.JobStateManager;
+import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
+import org.apache.reef.annotations.audience.DriverSide;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.ThreadSafe;
+import javax.inject.Inject;
+import java.util.*;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * This policy is same as {@link RoundRobinSchedulingPolicy}, however for 
TaskGroups
+ * with {@link edu.snu.nemo.common.ir.vertex.SourceVertex}, it tries to pick 
one of the executors
+ * where the corresponding data resides.
+ */
+@ThreadSafe
+@DriverSide
+public final class SourceLocationAwareSchedulingPolicy implements 
SchedulingPolicy {
+  private static final Logger LOG = 
LoggerFactory.getLogger(SourceLocationAwareSchedulingPolicy.class);
+
+  private final ExecutorRegistry executorRegistry;
+  private final RoundRobinSchedulingPolicy roundRobinSchedulingPolicy;
+  private final long scheduleTimeoutMs;
+
+  @Inject
+  private SourceLocationAwareSchedulingPolicy(final ExecutorRegistry 
executorRegistry,
+  final RoundRobinSchedulingPolicy 
roundRobinSchedulingPolicy) {
+this.executorRegistry = executorRegistry;
+this.roundRobinSchedulingPolicy = roundRobinSchedulingPolicy;
+this.scheduleTimeoutMs = roundRobinSchedulingPolicy.getScheduleTimeoutMs();
+  }
+
+  @Override
+  public long getScheduleTimeoutMs() {
+return scheduleTimeoutMs;
+  }
+
+  /**
+   * Try to schedule a TaskGroup.
+   * If the TaskGroup has one or more source tasks, this method schedules the 
task group to one of the physical nodes,
+   * chosen from union of set of locations where splits of each source task 
resides.
+   * If the TaskGroup has no source tasks, falls back to {@link 
RoundRobinSchedulingPolicy}.
+   * @param scheduledTaskGroup to schedule.
+   * @param jobStateManager jobStateManager which the TaskGroup belongs to.
+   * @return true if the task group is successfully scheduled, false otherwise.
+   */
+  @Override
+  public synchronized boolean scheduleTaskGroup(final ScheduledTaskGroup 
scheduledTaskGroup,
+final JobStateManager 
jobStateManager) {
+Set sourceLocations = Collections.emptySet();
+try {
+  sourceLocations = 
getSourceLocations(scheduledTaskGroup.getLogicalTaskIdToReadable().values());
+} catch (final Exception e) {
+  LOG.warn(String.format("Exception while trying to get source location 
for %s",
 
 Review comment:
   As I mentioned above, let's just return empty set if the source location is 
not supported.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sanha commented on issue #1: [NEMO-26] Implement SourceLocationAwareSchedulingPolicy

2018-03-06 Thread GitBox
sanha commented on issue #1: [NEMO-26] Implement 
SourceLocationAwareSchedulingPolicy
URL: https://github.com/apache/incubator-nemo/pull/1#issuecomment-370998667
 
 
   Thanks for the work! I'm reviewing this change now.
   By the way, could you update the JIRA issue description for the "refactoring 
container management and scheduling" part?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


Podling Report Reminder - March 2018

2018-03-06 Thread johndament
Dear podling,

This email was sent by an automated system on behalf of the Apache
Incubator PMC. It is an initial reminder to give you plenty of time to
prepare your quarterly board report.

The board meeting is scheduled for Wed, 21 March 2018, 10:30 am PDT.
The report for your podling will form a part of the Incubator PMC
report. The Incubator PMC requires your report to be submitted 2 weeks
before the board meeting, to allow sufficient time for review and
submission (Wed, March 07).

Please submit your report with sufficient time to allow the Incubator
PMC, and subsequently board members to review and digest. Again, the
very latest you should submit your report is 2 weeks prior to the board
meeting.

Thanks,

The Apache Incubator PMC

Submitting your Report

--

Your report should contain the following:

*   Your project name
*   A brief description of your project, which assumes no knowledge of
the project or necessarily of its field
*   A list of the three most important issues to address in the move
towards graduation.
*   Any issues that the Incubator PMC or ASF Board might wish/need to be
aware of
*   How has the community developed since the last report
*   How has the project developed since the last report.
*   How does the podling rate their own maturity.

This should be appended to the Incubator Wiki page at:

https://wiki.apache.org/incubator/March2018

Note: This is manually populated. You may need to wait a little before
this page is created from a template.

Mentors
---

Mentors should review reports for their project(s) and sign them off on
the Incubator wiki page. Signing off reports shows that you are
following the project - projects that are not signed may raise alarms
for the Incubator PMC.

Incubator PMC


[GitHub] johnyangk commented on a change in pull request #2: [NEMO-7] Intra-TaskGroup pipelining

2018-03-06 Thread GitBox
johnyangk commented on a change in pull request #2: [NEMO-7] Intra-TaskGroup 
pipelining
URL: https://github.com/apache/incubator-nemo/pull/2#discussion_r172710456
 
 

 ##
 File path: 
runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskGroupExecutor.java
 ##
 @@ -60,13 +55,36 @@
   private final DataTransferFactory channelFactory;
   private final MetricCollector metricCollector;
 
-  /**
-   * Map of task IDs in this task group to their readers/writers.
-   */
-  private final Map physicalTaskIdToInputReaderMap;
-  private final Map 
physicalTaskIdToOutputWriterMap;
-
-  private boolean isExecutionRequested;
+  // Map of task ID to its intra-TaskGroup data pipe.
+  private final Map taskToInputPipesMap;
+  private final Map taskToOutputPipeMap;  // one and only one 
Pipe per task
+  // Readers/writers that deals with inter-TaskGroup data.
+  private final List inputReaders;
+  private final Map taskToInputReadersMap;
+  private final Map taskToSideInputReadersMap;
+  private final Map taskToOutputWritersMap;
+  private final Map inputReaderToTasksMap;
+  private final Map idToSrcIteratorMap;
+  private final Map srcIteratorIdToTasksMap;
+  private final Map iteratorIdToTasksMap;
+  private final LinkedBlockingQueue> iteratorQueue;
+  private volatile Map pipeIdToDstTasksMap;
 
 Review comment:
   Can you explain in the comment why this is marked as `volatile`? My 
understanding is that only a single thread instantiates and executes a 
`TaskGroupExecutor`. Let me know if I'm missing something.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] johnyangk commented on a change in pull request #2: [NEMO-7] Intra-TaskGroup pipelining

2018-03-06 Thread GitBox
johnyangk commented on a change in pull request #2: [NEMO-7] Intra-TaskGroup 
pipelining
URL: https://github.com/apache/incubator-nemo/pull/2#discussion_r172709998
 
 

 ##
 File path: 
runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskGroupExecutor.java
 ##
 @@ -60,13 +55,36 @@
   private final DataTransferFactory channelFactory;
   private final MetricCollector metricCollector;
 
-  /**
-   * Map of task IDs in this task group to their readers/writers.
-   */
-  private final Map physicalTaskIdToInputReaderMap;
-  private final Map 
physicalTaskIdToOutputWriterMap;
-
-  private boolean isExecutionRequested;
+  // Map of task ID to its intra-TaskGroup data pipe.
+  private final Map taskToInputPipesMap;
+  private final Map taskToOutputPipeMap;  // one and only one 
Pipe per task
+  // Readers/writers that deals with inter-TaskGroup data.
+  private final List inputReaders;
+  private final Map taskToInputReadersMap;
+  private final Map taskToSideInputReadersMap;
+  private final Map taskToOutputWritersMap;
+  private final Map inputReaderToTasksMap;
+  private final Map idToSrcIteratorMap;
+  private final Map srcIteratorIdToTasksMap;
+  private final Map iteratorIdToTasksMap;
+  private final LinkedBlockingQueue> iteratorQueue;
+  private volatile Map pipeIdToDstTasksMap;
+  private final Set preparedTransforms;
+  private final Set finishedTaskIds;
 
 Review comment:
   Can you do without this?
   (e.g., We assume a task is finished, when it consumes all of its input 
iterators)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] johnyangk commented on a change in pull request #2: [NEMO-7] Intra-TaskGroup pipelining

2018-03-06 Thread GitBox
johnyangk commented on a change in pull request #2: [NEMO-7] Intra-TaskGroup 
pipelining
URL: https://github.com/apache/incubator-nemo/pull/2#discussion_r172709098
 
 

 ##
 File path: 
runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskGroupExecutor.java
 ##
 @@ -60,13 +55,36 @@
   private final DataTransferFactory channelFactory;
   private final MetricCollector metricCollector;
 
-  /**
-   * Map of task IDs in this task group to their readers/writers.
-   */
-  private final Map physicalTaskIdToInputReaderMap;
-  private final Map 
physicalTaskIdToOutputWriterMap;
-
-  private boolean isExecutionRequested;
+  // Map of task ID to its intra-TaskGroup data pipe.
+  private final Map taskToInputPipesMap;
+  private final Map taskToOutputPipeMap;  // one and only one 
Pipe per task
+  // Readers/writers that deals with inter-TaskGroup data.
 
 Review comment:
   I'm not sure about the newly added data structures below for the following 
reasons
   - Some of them cost an extra per-element operation and memory overhead
   - Maybe we can do without some of them
   
   Alternatively, I'd think about the following options
   - Compose iterators like Spark 
(https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala#L38)
   - Reuse `taskGroupDag` to obtain dependency information


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] johnyangk commented on a change in pull request #2: [NEMO-7] Intra-TaskGroup pipelining

2018-03-06 Thread GitBox
johnyangk commented on a change in pull request #2: [NEMO-7] Intra-TaskGroup 
pipelining
URL: https://github.com/apache/incubator-nemo/pull/2#discussion_r172710709
 
 

 ##
 File path: 
runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskGroupExecutor.java
 ##
 @@ -60,13 +55,36 @@
   private final DataTransferFactory channelFactory;
   private final MetricCollector metricCollector;
 
-  /**
-   * Map of task IDs in this task group to their readers/writers.
-   */
-  private final Map physicalTaskIdToInputReaderMap;
-  private final Map 
physicalTaskIdToOutputWriterMap;
-
-  private boolean isExecutionRequested;
+  // Map of task ID to its intra-TaskGroup data pipe.
+  private final Map taskToInputPipesMap;
+  private final Map taskToOutputPipeMap;  // one and only one 
Pipe per task
+  // Readers/writers that deals with inter-TaskGroup data.
+  private final List inputReaders;
+  private final Map taskToInputReadersMap;
+  private final Map taskToSideInputReadersMap;
+  private final Map taskToOutputWritersMap;
+  private final Map inputReaderToTasksMap;
+  private final Map idToSrcIteratorMap;
+  private final Map srcIteratorIdToTasksMap;
+  private final Map iteratorIdToTasksMap;
+  private final LinkedBlockingQueue> iteratorQueue;
+  private volatile Map pipeIdToDstTasksMap;
+  private final Set preparedTransforms;
+  private final Set finishedTaskIds;
+  private final AtomicInteger completedFutures;
+  private int numBoundedSources;
+  private int numIterators;
+
+  // For metrics
 
 Review comment:
   Can you refactor these variables into a separate class?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] johnyangk commented on a change in pull request #2: [NEMO-7] Intra-TaskGroup pipelining

2018-03-06 Thread GitBox
johnyangk commented on a change in pull request #2: [NEMO-7] Intra-TaskGroup 
pipelining
URL: https://github.com/apache/incubator-nemo/pull/2#discussion_r172710239
 
 

 ##
 File path: 
runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskGroupExecutor.java
 ##
 @@ -60,13 +55,36 @@
   private final DataTransferFactory channelFactory;
   private final MetricCollector metricCollector;
 
-  /**
-   * Map of task IDs in this task group to their readers/writers.
-   */
-  private final Map physicalTaskIdToInputReaderMap;
-  private final Map 
physicalTaskIdToOutputWriterMap;
-
-  private boolean isExecutionRequested;
+  // Map of task ID to its intra-TaskGroup data pipe.
+  private final Map taskToInputPipesMap;
+  private final Map taskToOutputPipeMap;  // one and only one 
Pipe per task
+  // Readers/writers that deals with inter-TaskGroup data.
+  private final List inputReaders;
+  private final Map taskToInputReadersMap;
+  private final Map taskToSideInputReadersMap;
+  private final Map taskToOutputWritersMap;
+  private final Map inputReaderToTasksMap;
+  private final Map idToSrcIteratorMap;
+  private final Map srcIteratorIdToTasksMap;
+  private final Map iteratorIdToTasksMap;
+  private final LinkedBlockingQueue> iteratorQueue;
+  private volatile Map pipeIdToDstTasksMap;
+  private final Set preparedTransforms;
 
 Review comment:
   Remove, and instead prepare transforms before consuming any data?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] johnyangk commented on a change in pull request #2: [NEMO-7] Intra-TaskGroup pipelining

2018-03-06 Thread GitBox
johnyangk commented on a change in pull request #2: [NEMO-7] Intra-TaskGroup 
pipelining
URL: https://github.com/apache/incubator-nemo/pull/2#discussion_r172704416
 
 

 ##
 File path: 
runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskGroupExecutor.java
 ##
 @@ -145,303 +201,504 @@ private void initializeDataTransfer() {
 .collect(Collectors.toSet());
   }
 
-  // Helper functions to initializes stage-internal edges.
-  private void createLocalReader(final Task task, final RuntimeEdge 
internalEdge) {
-final InputReader inputReader = 
channelFactory.createLocalReader(taskGroupIdx, internalEdge);
-addInputReader(task, inputReader);
-  }
+  /**
+   * Add input pipes to each {@link Task}.
+   * Input pipe denotes all the pipes of intra-Stage parent tasks of this task.
+   *
+   * @param task the Task to add input pipes to.
+   */
+  private void addInputPipe(final Task task) {
+List inputPipes = new ArrayList<>();
+List parentTasks = taskGroupDag.getParents(task.getId());
+final String physicalTaskId = getPhysicalTaskId(task.getId());
 
-  private void createLocalWriter(final Task task, final RuntimeEdge 
internalEdge) {
-final OutputWriter outputWriter = channelFactory.createLocalWriter(task, 
taskGroupIdx, internalEdge);
-addOutputWriter(task, outputWriter);
+if (parentTasks != null) {
+  parentTasks.forEach(parent -> {
+final LocalPipe parentOutputPipe = taskToOutputPipeMap.get(parent);
+inputPipes.add(parentOutputPipe);
+LOG.info("log: Added Outputpipe of {} as InputPipe of {} {}",
+getPhysicalTaskId(parent.getId()), taskGroupId, physicalTaskId);
+  });
+  taskToInputPipesMap.put(task, inputPipes);
+}
   }
 
-  // Helper functions to add the initialized reader/writer to the maintained 
map.
-  private void addInputReader(final Task task, final InputReader inputReader) {
+  /**
+   * Add output pipes to each {@link Task}.
+   * Output pipe denotes the one and only one pipe of this task.
+   * Check the outgoing edges that will use this pipe,
+   * and set this pipe as side input if any one of the edges uses this pipe as 
side input.
+   *
+   * @param task the Task to add output pipes to.
+   */
+  private void addOutputPipe(final Task task) {
+final LocalPipe outputPipe = new LocalPipe();
 final String physicalTaskId = getPhysicalTaskId(task.getId());
-physicalTaskIdToInputReaderMap.computeIfAbsent(physicalTaskId, readerList 
-> new ArrayList<>());
-physicalTaskIdToInputReaderMap.get(physicalTaskId).add(inputReader);
-  }
+final List outEdges = 
taskGroupDag.getOutgoingEdgesOf(task);
+
+outEdges.forEach(outEdge -> {
+  if (outEdge.isSideInput()) {
+outputPipe.setSideInputRuntimeEdge(outEdge);
+outputPipe.setAsSideInput(physicalTaskId);
+LOG.info("log: {} {} Marked as accepting sideInput(edge {})",
+taskGroupId, physicalTaskId, outEdge.getId());
+  }
+});
 
-  private void addOutputWriter(final Task task, final OutputWriter 
outputWriter) {
-final String physicalTaskId = getPhysicalTaskId(task.getId());
-physicalTaskIdToOutputWriterMap.computeIfAbsent(physicalTaskId, readerList 
-> new ArrayList<>());
-physicalTaskIdToOutputWriterMap.get(physicalTaskId).add(outputWriter);
+taskToOutputPipeMap.put(task, outputPipe);
+LOG.info("log: {} {} Added OutputPipe", taskGroupId, physicalTaskId);
   }
 
-  /**
-   * Executes the task group.
-   */
-  public void execute() {
-LOG.info("{} Execution Started!", taskGroupId);
-if (isExecutionRequested) {
-  throw new RuntimeException("TaskGroup {" + taskGroupId + "} execution 
called again!");
-} else {
-  isExecutionRequested = true;
-}
+  private boolean hasInputPipe(final Task task) {
+return taskToInputPipesMap.containsKey(task);
+  }
 
-taskGroupStateManager.onTaskGroupStateChanged(
-TaskGroupState.State.EXECUTING, Optional.empty(), Optional.empty());
+  private boolean hasOutputWriter(final Task task) {
+return taskToOutputWritersMap.containsKey(task);
+  }
 
-taskGroupDag.topologicalDo(task -> {
-  final String physicalTaskId = getPhysicalTaskId(task.getId());
-  taskGroupStateManager.onTaskStateChanged(physicalTaskId, 
TaskState.State.EXECUTING, Optional.empty());
-  try {
-if (task instanceof BoundedSourceTask) {
-  launchBoundedSourceTask((BoundedSourceTask) task);
-  taskGroupStateManager.onTaskStateChanged(physicalTaskId, 
TaskState.State.COMPLETE, Optional.empty());
-  LOG.info("{} Execution Complete!", taskGroupId);
-} else if (task instanceof OperatorTask) {
-  launchOperatorTask((OperatorTask) task);
-  taskGroupStateManager.onTaskStateChanged(physicalTaskId, 
TaskState.State.COMPLETE, Optional.empty());
-  LOG.info("{} Execution Complete!", taskGroupId);
-} else if (task instanceof 

[GitHub] johnyangk commented on a change in pull request #2: [NEMO-7] Intra-TaskGroup pipelining

2018-03-06 Thread GitBox
johnyangk commented on a change in pull request #2: [NEMO-7] Intra-TaskGroup 
pipelining
URL: https://github.com/apache/incubator-nemo/pull/2#discussion_r172706819
 
 

 ##
 File path: 
runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskGroupExecutor.java
 ##
 @@ -145,303 +201,504 @@ private void initializeDataTransfer() {
 .collect(Collectors.toSet());
   }
 
-  // Helper functions to initializes stage-internal edges.
-  private void createLocalReader(final Task task, final RuntimeEdge 
internalEdge) {
-final InputReader inputReader = 
channelFactory.createLocalReader(taskGroupIdx, internalEdge);
-addInputReader(task, inputReader);
-  }
+  /**
+   * Add input pipes to each {@link Task}.
+   * Input pipe denotes all the pipes of intra-Stage parent tasks of this task.
+   *
+   * @param task the Task to add input pipes to.
+   */
+  private void addInputPipe(final Task task) {
+List inputPipes = new ArrayList<>();
+List parentTasks = taskGroupDag.getParents(task.getId());
+final String physicalTaskId = getPhysicalTaskId(task.getId());
 
-  private void createLocalWriter(final Task task, final RuntimeEdge 
internalEdge) {
-final OutputWriter outputWriter = channelFactory.createLocalWriter(task, 
taskGroupIdx, internalEdge);
-addOutputWriter(task, outputWriter);
+if (parentTasks != null) {
+  parentTasks.forEach(parent -> {
+final LocalPipe parentOutputPipe = taskToOutputPipeMap.get(parent);
+inputPipes.add(parentOutputPipe);
+LOG.info("log: Added Outputpipe of {} as InputPipe of {} {}",
+getPhysicalTaskId(parent.getId()), taskGroupId, physicalTaskId);
+  });
+  taskToInputPipesMap.put(task, inputPipes);
+}
   }
 
-  // Helper functions to add the initialized reader/writer to the maintained 
map.
-  private void addInputReader(final Task task, final InputReader inputReader) {
+  /**
+   * Add output pipes to each {@link Task}.
+   * Output pipe denotes the one and only one pipe of this task.
+   * Check the outgoing edges that will use this pipe,
+   * and set this pipe as side input if any one of the edges uses this pipe as 
side input.
+   *
+   * @param task the Task to add output pipes to.
+   */
+  private void addOutputPipe(final Task task) {
+final LocalPipe outputPipe = new LocalPipe();
 final String physicalTaskId = getPhysicalTaskId(task.getId());
-physicalTaskIdToInputReaderMap.computeIfAbsent(physicalTaskId, readerList 
-> new ArrayList<>());
-physicalTaskIdToInputReaderMap.get(physicalTaskId).add(inputReader);
-  }
+final List outEdges = 
taskGroupDag.getOutgoingEdgesOf(task);
+
+outEdges.forEach(outEdge -> {
+  if (outEdge.isSideInput()) {
+outputPipe.setSideInputRuntimeEdge(outEdge);
+outputPipe.setAsSideInput(physicalTaskId);
+LOG.info("log: {} {} Marked as accepting sideInput(edge {})",
+taskGroupId, physicalTaskId, outEdge.getId());
+  }
+});
 
-  private void addOutputWriter(final Task task, final OutputWriter 
outputWriter) {
-final String physicalTaskId = getPhysicalTaskId(task.getId());
-physicalTaskIdToOutputWriterMap.computeIfAbsent(physicalTaskId, readerList 
-> new ArrayList<>());
-physicalTaskIdToOutputWriterMap.get(physicalTaskId).add(outputWriter);
+taskToOutputPipeMap.put(task, outputPipe);
+LOG.info("log: {} {} Added OutputPipe", taskGroupId, physicalTaskId);
   }
 
-  /**
-   * Executes the task group.
-   */
-  public void execute() {
-LOG.info("{} Execution Started!", taskGroupId);
-if (isExecutionRequested) {
-  throw new RuntimeException("TaskGroup {" + taskGroupId + "} execution 
called again!");
-} else {
-  isExecutionRequested = true;
-}
+  private boolean hasInputPipe(final Task task) {
+return taskToInputPipesMap.containsKey(task);
+  }
 
-taskGroupStateManager.onTaskGroupStateChanged(
-TaskGroupState.State.EXECUTING, Optional.empty(), Optional.empty());
+  private boolean hasOutputWriter(final Task task) {
+return taskToOutputWritersMap.containsKey(task);
+  }
 
-taskGroupDag.topologicalDo(task -> {
-  final String physicalTaskId = getPhysicalTaskId(task.getId());
-  taskGroupStateManager.onTaskStateChanged(physicalTaskId, 
TaskState.State.EXECUTING, Optional.empty());
-  try {
-if (task instanceof BoundedSourceTask) {
-  launchBoundedSourceTask((BoundedSourceTask) task);
-  taskGroupStateManager.onTaskStateChanged(physicalTaskId, 
TaskState.State.COMPLETE, Optional.empty());
-  LOG.info("{} Execution Complete!", taskGroupId);
-} else if (task instanceof OperatorTask) {
-  launchOperatorTask((OperatorTask) task);
-  taskGroupStateManager.onTaskStateChanged(physicalTaskId, 
TaskState.State.COMPLETE, Optional.empty());
-  LOG.info("{} Execution Complete!", taskGroupId);
-} else if (task instanceof 

[GitHub] johnyangk commented on a change in pull request #2: [NEMO-7] Intra-TaskGroup pipelining

2018-03-06 Thread GitBox
johnyangk commented on a change in pull request #2: [NEMO-7] Intra-TaskGroup 
pipelining
URL: https://github.com/apache/incubator-nemo/pull/2#discussion_r172711081
 
 

 ##
 File path: 
runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskGroupExecutor.java
 ##
 @@ -60,13 +55,36 @@
   private final DataTransferFactory channelFactory;
   private final MetricCollector metricCollector;
 
-  /**
-   * Map of task IDs in this task group to their readers/writers.
-   */
-  private final Map physicalTaskIdToInputReaderMap;
-  private final Map 
physicalTaskIdToOutputWriterMap;
-
-  private boolean isExecutionRequested;
+  // Map of task ID to its intra-TaskGroup data pipe.
+  private final Map taskToInputPipesMap;
+  private final Map taskToOutputPipeMap;  // one and only one 
Pipe per task
+  // Readers/writers that deals with inter-TaskGroup data.
+  private final List inputReaders;
+  private final Map taskToInputReadersMap;
+  private final Map taskToSideInputReadersMap;
+  private final Map taskToOutputWritersMap;
+  private final Map inputReaderToTasksMap;
+  private final Map idToSrcIteratorMap;
+  private final Map srcIteratorIdToTasksMap;
+  private final Map iteratorIdToTasksMap;
+  private final LinkedBlockingQueue> iteratorQueue;
+  private volatile Map pipeIdToDstTasksMap;
+  private final Set preparedTransforms;
+  private final Set finishedTaskIds;
+  private final AtomicInteger completedFutures;
+  private int numBoundedSources;
+  private int numIterators;
+
+  // For metrics
+  private long serBlockSize;
+  private long encodedBlockSize;
+  private long accumulatedBlockedReadTime;
 
 Review comment:
   Remove unused variable.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] johnyangk commented on a change in pull request #2: [NEMO-7] Intra-TaskGroup pipelining

2018-03-06 Thread GitBox
johnyangk commented on a change in pull request #2: [NEMO-7] Intra-TaskGroup 
pipelining
URL: https://github.com/apache/incubator-nemo/pull/2#discussion_r172711250
 
 

 ##
 File path: 
runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskGroupExecutor.java
 ##
 @@ -145,303 +201,504 @@ private void initializeDataTransfer() {
 .collect(Collectors.toSet());
   }
 
-  // Helper functions to initializes stage-internal edges.
-  private void createLocalReader(final Task task, final RuntimeEdge 
internalEdge) {
-final InputReader inputReader = 
channelFactory.createLocalReader(taskGroupIdx, internalEdge);
-addInputReader(task, inputReader);
-  }
+  /**
+   * Add input pipes to each {@link Task}.
+   * Input pipe denotes all the pipes of intra-Stage parent tasks of this task.
+   *
+   * @param task the Task to add input pipes to.
+   */
+  private void addInputPipe(final Task task) {
+List inputPipes = new ArrayList<>();
+List parentTasks = taskGroupDag.getParents(task.getId());
+final String physicalTaskId = getPhysicalTaskId(task.getId());
 
-  private void createLocalWriter(final Task task, final RuntimeEdge 
internalEdge) {
-final OutputWriter outputWriter = channelFactory.createLocalWriter(task, 
taskGroupIdx, internalEdge);
-addOutputWriter(task, outputWriter);
+if (parentTasks != null) {
+  parentTasks.forEach(parent -> {
+final LocalPipe parentOutputPipe = taskToOutputPipeMap.get(parent);
+inputPipes.add(parentOutputPipe);
+LOG.info("log: Added Outputpipe of {} as InputPipe of {} {}",
+getPhysicalTaskId(parent.getId()), taskGroupId, physicalTaskId);
+  });
+  taskToInputPipesMap.put(task, inputPipes);
+}
   }
 
-  // Helper functions to add the initialized reader/writer to the maintained 
map.
-  private void addInputReader(final Task task, final InputReader inputReader) {
+  /**
+   * Add output pipes to each {@link Task}.
+   * Output pipe denotes the one and only one pipe of this task.
+   * Check the outgoing edges that will use this pipe,
+   * and set this pipe as side input if any one of the edges uses this pipe as 
side input.
+   *
+   * @param task the Task to add output pipes to.
+   */
+  private void addOutputPipe(final Task task) {
+final LocalPipe outputPipe = new LocalPipe();
 final String physicalTaskId = getPhysicalTaskId(task.getId());
-physicalTaskIdToInputReaderMap.computeIfAbsent(physicalTaskId, readerList 
-> new ArrayList<>());
-physicalTaskIdToInputReaderMap.get(physicalTaskId).add(inputReader);
-  }
+final List outEdges = 
taskGroupDag.getOutgoingEdgesOf(task);
+
+outEdges.forEach(outEdge -> {
+  if (outEdge.isSideInput()) {
 
 Review comment:
   Ditto on else{}.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] johnyangk commented on a change in pull request #2: [NEMO-7] Intra-TaskGroup pipelining

2018-03-06 Thread GitBox
johnyangk commented on a change in pull request #2: [NEMO-7] Intra-TaskGroup 
pipelining
URL: https://github.com/apache/incubator-nemo/pull/2#discussion_r172708316
 
 

 ##
 File path: 
runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskGroupExecutor.java
 ##
 @@ -60,13 +55,36 @@
   private final DataTransferFactory channelFactory;
   private final MetricCollector metricCollector;
 
-  /**
-   * Map of task IDs in this task group to their readers/writers.
-   */
-  private final Map physicalTaskIdToInputReaderMap;
-  private final Map 
physicalTaskIdToOutputWriterMap;
-
-  private boolean isExecutionRequested;
+  // Map of task ID to its intra-TaskGroup data pipe.
+  private final Map taskToInputPipesMap;
+  private final Map taskToOutputPipeMap;  // one and only one 
Pipe per task
+  // Readers/writers that deals with inter-TaskGroup data.
+  private final List inputReaders;
+  private final Map taskToInputReadersMap;
+  private final Map taskToSideInputReadersMap;
+  private final Map taskToOutputWritersMap;
+  private final Map inputReaderToTasksMap;
+  private final Map idToSrcIteratorMap;
+  private final Map srcIteratorIdToTasksMap;
+  private final Map iteratorIdToTasksMap;
+  private final LinkedBlockingQueue> iteratorQueue;
+  private volatile Map pipeIdToDstTasksMap;
+  private final Set preparedTransforms;
+  private final Set finishedTaskIds;
+  private final AtomicInteger completedFutures;
+  private int numBoundedSources;
 
 Review comment:
   Can this become a local variable?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] johnyangk commented on a change in pull request #2: [NEMO-7] Intra-TaskGroup pipelining

2018-03-06 Thread GitBox
johnyangk commented on a change in pull request #2: [NEMO-7] Intra-TaskGroup 
pipelining
URL: https://github.com/apache/incubator-nemo/pull/2#discussion_r172707585
 
 

 ##
 File path: 
runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskGroupExecutor.java
 ##
 @@ -145,303 +201,504 @@ private void initializeDataTransfer() {
 .collect(Collectors.toSet());
   }
 
-  // Helper functions to initializes stage-internal edges.
-  private void createLocalReader(final Task task, final RuntimeEdge 
internalEdge) {
-final InputReader inputReader = 
channelFactory.createLocalReader(taskGroupIdx, internalEdge);
-addInputReader(task, inputReader);
-  }
+  /**
+   * Add input pipes to each {@link Task}.
+   * Input pipe denotes all the pipes of intra-Stage parent tasks of this task.
+   *
+   * @param task the Task to add input pipes to.
+   */
+  private void addInputPipe(final Task task) {
+List inputPipes = new ArrayList<>();
+List parentTasks = taskGroupDag.getParents(task.getId());
+final String physicalTaskId = getPhysicalTaskId(task.getId());
 
-  private void createLocalWriter(final Task task, final RuntimeEdge 
internalEdge) {
-final OutputWriter outputWriter = channelFactory.createLocalWriter(task, 
taskGroupIdx, internalEdge);
-addOutputWriter(task, outputWriter);
+if (parentTasks != null) {
+  parentTasks.forEach(parent -> {
+final LocalPipe parentOutputPipe = taskToOutputPipeMap.get(parent);
+inputPipes.add(parentOutputPipe);
+LOG.info("log: Added Outputpipe of {} as InputPipe of {} {}",
+getPhysicalTaskId(parent.getId()), taskGroupId, physicalTaskId);
+  });
+  taskToInputPipesMap.put(task, inputPipes);
+}
   }
 
-  // Helper functions to add the initialized reader/writer to the maintained 
map.
-  private void addInputReader(final Task task, final InputReader inputReader) {
+  /**
+   * Add output pipes to each {@link Task}.
+   * Output pipe denotes the one and only one pipe of this task.
+   * Check the outgoing edges that will use this pipe,
+   * and set this pipe as side input if any one of the edges uses this pipe as 
side input.
+   *
+   * @param task the Task to add output pipes to.
+   */
+  private void addOutputPipe(final Task task) {
+final LocalPipe outputPipe = new LocalPipe();
 final String physicalTaskId = getPhysicalTaskId(task.getId());
-physicalTaskIdToInputReaderMap.computeIfAbsent(physicalTaskId, readerList 
-> new ArrayList<>());
-physicalTaskIdToInputReaderMap.get(physicalTaskId).add(inputReader);
-  }
+final List outEdges = 
taskGroupDag.getOutgoingEdgesOf(task);
+
+outEdges.forEach(outEdge -> {
+  if (outEdge.isSideInput()) {
+outputPipe.setSideInputRuntimeEdge(outEdge);
+outputPipe.setAsSideInput(physicalTaskId);
+LOG.info("log: {} {} Marked as accepting sideInput(edge {})",
+taskGroupId, physicalTaskId, outEdge.getId());
+  }
+});
 
-  private void addOutputWriter(final Task task, final OutputWriter 
outputWriter) {
-final String physicalTaskId = getPhysicalTaskId(task.getId());
-physicalTaskIdToOutputWriterMap.computeIfAbsent(physicalTaskId, readerList 
-> new ArrayList<>());
-physicalTaskIdToOutputWriterMap.get(physicalTaskId).add(outputWriter);
+taskToOutputPipeMap.put(task, outputPipe);
+LOG.info("log: {} {} Added OutputPipe", taskGroupId, physicalTaskId);
   }
 
-  /**
-   * Executes the task group.
-   */
-  public void execute() {
-LOG.info("{} Execution Started!", taskGroupId);
-if (isExecutionRequested) {
-  throw new RuntimeException("TaskGroup {" + taskGroupId + "} execution 
called again!");
-} else {
-  isExecutionRequested = true;
-}
+  private boolean hasInputPipe(final Task task) {
+return taskToInputPipesMap.containsKey(task);
+  }
 
-taskGroupStateManager.onTaskGroupStateChanged(
-TaskGroupState.State.EXECUTING, Optional.empty(), Optional.empty());
+  private boolean hasOutputWriter(final Task task) {
+return taskToOutputWritersMap.containsKey(task);
+  }
 
-taskGroupDag.topologicalDo(task -> {
-  final String physicalTaskId = getPhysicalTaskId(task.getId());
-  taskGroupStateManager.onTaskStateChanged(physicalTaskId, 
TaskState.State.EXECUTING, Optional.empty());
-  try {
-if (task instanceof BoundedSourceTask) {
-  launchBoundedSourceTask((BoundedSourceTask) task);
-  taskGroupStateManager.onTaskStateChanged(physicalTaskId, 
TaskState.State.COMPLETE, Optional.empty());
-  LOG.info("{} Execution Complete!", taskGroupId);
-} else if (task instanceof OperatorTask) {
-  launchOperatorTask((OperatorTask) task);
-  taskGroupStateManager.onTaskStateChanged(physicalTaskId, 
TaskState.State.COMPLETE, Optional.empty());
-  LOG.info("{} Execution Complete!", taskGroupId);
-} else if (task instanceof 

[GitHub] johnyangk commented on a change in pull request #2: [NEMO-7] Intra-TaskGroup pipelining

2018-03-06 Thread GitBox
johnyangk commented on a change in pull request #2: [NEMO-7] Intra-TaskGroup 
pipelining
URL: https://github.com/apache/incubator-nemo/pull/2#discussion_r172711570
 
 

 ##
 File path: 
runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskGroupExecutor.java
 ##
 @@ -60,13 +55,36 @@
   private final DataTransferFactory channelFactory;
   private final MetricCollector metricCollector;
 
-  /**
-   * Map of task IDs in this task group to their readers/writers.
-   */
-  private final Map physicalTaskIdToInputReaderMap;
-  private final Map 
physicalTaskIdToOutputWriterMap;
-
-  private boolean isExecutionRequested;
+  // Map of task ID to its intra-TaskGroup data pipe.
+  private final Map taskToInputPipesMap;
+  private final Map taskToOutputPipeMap;  // one and only one 
Pipe per task
+  // Readers/writers that deals with inter-TaskGroup data.
+  private final List inputReaders;
+  private final Map taskToInputReadersMap;
+  private final Map taskToSideInputReadersMap;
+  private final Map taskToOutputWritersMap;
+  private final Map inputReaderToTasksMap;
+  private final Map idToSrcIteratorMap;
+  private final Map srcIteratorIdToTasksMap;
+  private final Map iteratorIdToTasksMap;
+  private final LinkedBlockingQueue> iteratorQueue;
+  private volatile Map pipeIdToDstTasksMap;
+  private final Set preparedTransforms;
+  private final Set finishedTaskIds;
+  private final AtomicInteger completedFutures;
+  private int numBoundedSources;
+  private int numIterators;
+
+  // For metrics
+  private long serBlockSize;
+  private long encodedBlockSize;
+  private long accumulatedBlockedReadTime;
+
+  private volatile boolean isExecutionRequested;
+  private volatile String logicalTaskIdPutOnHold;
 
 Review comment:
   Ditto on volatile.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] johnyangk commented on a change in pull request #2: [NEMO-7] Intra-TaskGroup pipelining

2018-03-06 Thread GitBox
johnyangk commented on a change in pull request #2: [NEMO-7] Intra-TaskGroup 
pipelining
URL: https://github.com/apache/incubator-nemo/pull/2#discussion_r172708330
 
 

 ##
 File path: 
runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskGroupExecutor.java
 ##
 @@ -60,13 +55,36 @@
   private final DataTransferFactory channelFactory;
   private final MetricCollector metricCollector;
 
-  /**
-   * Map of task IDs in this task group to their readers/writers.
-   */
-  private final Map physicalTaskIdToInputReaderMap;
-  private final Map 
physicalTaskIdToOutputWriterMap;
-
-  private boolean isExecutionRequested;
+  // Map of task ID to its intra-TaskGroup data pipe.
+  private final Map taskToInputPipesMap;
+  private final Map taskToOutputPipeMap;  // one and only one 
Pipe per task
+  // Readers/writers that deals with inter-TaskGroup data.
+  private final List inputReaders;
+  private final Map taskToInputReadersMap;
+  private final Map taskToSideInputReadersMap;
+  private final Map taskToOutputWritersMap;
+  private final Map inputReaderToTasksMap;
+  private final Map idToSrcIteratorMap;
+  private final Map srcIteratorIdToTasksMap;
+  private final Map iteratorIdToTasksMap;
+  private final LinkedBlockingQueue> iteratorQueue;
+  private volatile Map pipeIdToDstTasksMap;
+  private final Set preparedTransforms;
+  private final Set finishedTaskIds;
+  private final AtomicInteger completedFutures;
+  private int numBoundedSources;
+  private int numIterators;
 
 Review comment:
   Can this become a local variable?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] johnyangk commented on a change in pull request #2: [NEMO-7] Intra-TaskGroup pipelining

2018-03-06 Thread GitBox
johnyangk commented on a change in pull request #2: [NEMO-7] Intra-TaskGroup 
pipelining
URL: https://github.com/apache/incubator-nemo/pull/2#discussion_r172707978
 
 

 ##
 File path: 
runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskGroupExecutor.java
 ##
 @@ -145,303 +201,504 @@ private void initializeDataTransfer() {
 .collect(Collectors.toSet());
   }
 
-  // Helper functions to initializes stage-internal edges.
-  private void createLocalReader(final Task task, final RuntimeEdge 
internalEdge) {
-final InputReader inputReader = 
channelFactory.createLocalReader(taskGroupIdx, internalEdge);
-addInputReader(task, inputReader);
-  }
+  /**
+   * Add input pipes to each {@link Task}.
+   * Input pipe denotes all the pipes of intra-Stage parent tasks of this task.
+   *
+   * @param task the Task to add input pipes to.
+   */
+  private void addInputPipe(final Task task) {
+List inputPipes = new ArrayList<>();
+List parentTasks = taskGroupDag.getParents(task.getId());
+final String physicalTaskId = getPhysicalTaskId(task.getId());
 
-  private void createLocalWriter(final Task task, final RuntimeEdge 
internalEdge) {
-final OutputWriter outputWriter = channelFactory.createLocalWriter(task, 
taskGroupIdx, internalEdge);
-addOutputWriter(task, outputWriter);
+if (parentTasks != null) {
+  parentTasks.forEach(parent -> {
+final LocalPipe parentOutputPipe = taskToOutputPipeMap.get(parent);
+inputPipes.add(parentOutputPipe);
+LOG.info("log: Added Outputpipe of {} as InputPipe of {} {}",
+getPhysicalTaskId(parent.getId()), taskGroupId, physicalTaskId);
+  });
+  taskToInputPipesMap.put(task, inputPipes);
+}
   }
 
-  // Helper functions to add the initialized reader/writer to the maintained 
map.
-  private void addInputReader(final Task task, final InputReader inputReader) {
+  /**
+   * Add output pipes to each {@link Task}.
+   * Output pipe denotes the one and only one pipe of this task.
+   * Check the outgoing edges that will use this pipe,
+   * and set this pipe as side input if any one of the edges uses this pipe as 
side input.
+   *
+   * @param task the Task to add output pipes to.
+   */
+  private void addOutputPipe(final Task task) {
+final LocalPipe outputPipe = new LocalPipe();
 final String physicalTaskId = getPhysicalTaskId(task.getId());
-physicalTaskIdToInputReaderMap.computeIfAbsent(physicalTaskId, readerList 
-> new ArrayList<>());
-physicalTaskIdToInputReaderMap.get(physicalTaskId).add(inputReader);
-  }
+final List outEdges = 
taskGroupDag.getOutgoingEdgesOf(task);
+
+outEdges.forEach(outEdge -> {
+  if (outEdge.isSideInput()) {
+outputPipe.setSideInputRuntimeEdge(outEdge);
+outputPipe.setAsSideInput(physicalTaskId);
+LOG.info("log: {} {} Marked as accepting sideInput(edge {})",
+taskGroupId, physicalTaskId, outEdge.getId());
+  }
+});
 
-  private void addOutputWriter(final Task task, final OutputWriter 
outputWriter) {
-final String physicalTaskId = getPhysicalTaskId(task.getId());
-physicalTaskIdToOutputWriterMap.computeIfAbsent(physicalTaskId, readerList 
-> new ArrayList<>());
-physicalTaskIdToOutputWriterMap.get(physicalTaskId).add(outputWriter);
+taskToOutputPipeMap.put(task, outputPipe);
+LOG.info("log: {} {} Added OutputPipe", taskGroupId, physicalTaskId);
   }
 
-  /**
-   * Executes the task group.
-   */
-  public void execute() {
-LOG.info("{} Execution Started!", taskGroupId);
-if (isExecutionRequested) {
-  throw new RuntimeException("TaskGroup {" + taskGroupId + "} execution 
called again!");
-} else {
-  isExecutionRequested = true;
-}
+  private boolean hasInputPipe(final Task task) {
+return taskToInputPipesMap.containsKey(task);
+  }
 
-taskGroupStateManager.onTaskGroupStateChanged(
-TaskGroupState.State.EXECUTING, Optional.empty(), Optional.empty());
+  private boolean hasOutputWriter(final Task task) {
+return taskToOutputWritersMap.containsKey(task);
+  }
 
-taskGroupDag.topologicalDo(task -> {
-  final String physicalTaskId = getPhysicalTaskId(task.getId());
-  taskGroupStateManager.onTaskStateChanged(physicalTaskId, 
TaskState.State.EXECUTING, Optional.empty());
-  try {
-if (task instanceof BoundedSourceTask) {
-  launchBoundedSourceTask((BoundedSourceTask) task);
-  taskGroupStateManager.onTaskStateChanged(physicalTaskId, 
TaskState.State.COMPLETE, Optional.empty());
-  LOG.info("{} Execution Complete!", taskGroupId);
-} else if (task instanceof OperatorTask) {
-  launchOperatorTask((OperatorTask) task);
-  taskGroupStateManager.onTaskStateChanged(physicalTaskId, 
TaskState.State.COMPLETE, Optional.empty());
-  LOG.info("{} Execution Complete!", taskGroupId);
-} else if (task instanceof 

[GitHub] johnyangk commented on a change in pull request #2: [NEMO-7] Intra-TaskGroup pipelining

2018-03-06 Thread GitBox
johnyangk commented on a change in pull request #2: [NEMO-7] Intra-TaskGroup 
pipelining
URL: https://github.com/apache/incubator-nemo/pull/2#discussion_r172710298
 
 

 ##
 File path: 
runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskGroupExecutor.java
 ##
 @@ -60,13 +55,36 @@
   private final DataTransferFactory channelFactory;
   private final MetricCollector metricCollector;
 
-  /**
-   * Map of task IDs in this task group to their readers/writers.
-   */
-  private final Map physicalTaskIdToInputReaderMap;
-  private final Map 
physicalTaskIdToOutputWriterMap;
-
-  private boolean isExecutionRequested;
+  // Map of task ID to its intra-TaskGroup data pipe.
+  private final Map taskToInputPipesMap;
+  private final Map taskToOutputPipeMap;  // one and only one 
Pipe per task
+  // Readers/writers that deals with inter-TaskGroup data.
+  private final List inputReaders;
+  private final Map taskToInputReadersMap;
+  private final Map taskToSideInputReadersMap;
+  private final Map taskToOutputWritersMap;
+  private final Map inputReaderToTasksMap;
+  private final Map idToSrcIteratorMap;
+  private final Map srcIteratorIdToTasksMap;
+  private final Map iteratorIdToTasksMap;
+  private final LinkedBlockingQueue> iteratorQueue;
 
 Review comment:
   Please use a more descriptive name.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] johnyangk commented on a change in pull request #2: [NEMO-7] Intra-TaskGroup pipelining

2018-03-06 Thread GitBox
johnyangk commented on a change in pull request #2: [NEMO-7] Intra-TaskGroup 
pipelining
URL: https://github.com/apache/incubator-nemo/pull/2#discussion_r172710971
 
 

 ##
 File path: 
runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskGroupExecutor.java
 ##
 @@ -60,13 +55,36 @@
   private final DataTransferFactory channelFactory;
   private final MetricCollector metricCollector;
 
-  /**
-   * Map of task IDs in this task group to their readers/writers.
-   */
-  private final Map physicalTaskIdToInputReaderMap;
-  private final Map 
physicalTaskIdToOutputWriterMap;
-
-  private boolean isExecutionRequested;
+  // Map of task ID to its intra-TaskGroup data pipe.
+  private final Map taskToInputPipesMap;
+  private final Map taskToOutputPipeMap;  // one and only one 
Pipe per task
+  // Readers/writers that deals with inter-TaskGroup data.
+  private final List inputReaders;
+  private final Map taskToInputReadersMap;
+  private final Map taskToSideInputReadersMap;
+  private final Map taskToOutputWritersMap;
+  private final Map inputReaderToTasksMap;
+  private final Map idToSrcIteratorMap;
+  private final Map srcIteratorIdToTasksMap;
+  private final Map iteratorIdToTasksMap;
+  private final LinkedBlockingQueue> iteratorQueue;
+  private volatile Map pipeIdToDstTasksMap;
+  private final Set preparedTransforms;
+  private final Set finishedTaskIds;
+  private final AtomicInteger completedFutures;
+  private int numBoundedSources;
+  private int numIterators;
+
+  // For metrics
+  private long serBlockSize;
+  private long encodedBlockSize;
+  private long accumulatedBlockedReadTime;
+
+  private volatile boolean isExecutionRequested;
+  private volatile String logicalTaskIdPutOnHold;
+
+  private static final String ITERATORID_PREFIX = "ITERATOR_";
 
 Review comment:
   Can you do without iterator ids?
   (e.g., composing iterators, using pointers in a DAG)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sanha closed pull request #4: [NEMO-28] Javadoc and Travis

2018-03-06 Thread GitBox
sanha closed pull request #4: [NEMO-28] Javadoc and Travis
URL: https://github.com/apache/incubator-nemo/pull/4
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/.travis.yml b/.travis.yml
index 651caea5..1a342896 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -16,11 +16,12 @@
 # .travis.yml
 # For maven builds
 language: java
-after_script: "mvn verify -B"
+script: mvn verify -B -q -ff -Dsurefire.useFile=false 
-Dorg.slf4j.simpleLogger.defaultLogLevel=warn
 
 notifications:
-  slack: snuspl:owgt4d5uZwww5nGovPAMgvZB
+  slack:
+secure: 
iDNSliZcKu4PW55tOe8a2jxvr1OGfqaX2sCqAmHMkAH7rvOBFwln5UBaeUY8p8BIa0P12oHLCOAtqWzHiXqe2CGVz7xyXhMHD2FzAtUIQ6GocNRXkdU5JJBXMN4XOPGMX/r38GbHRhO1ThjHr3hJIhfISjnXaOMxUoIOE6aLr/Dk9LUm6iQ2eBUajmcz4vy7BuS8Wec1DOaUIH025SRDyxzkhezJjB2JgPdLsc91amWfV04tZg+NBLZmn3DhB6Jl3dRrvABbszhBqDM2tZfV1MXuI522fzKa2tMuT2dd1BxUIkIicF+IB5tLnlKmnbRXCT7gQR0KAP5bXzdZnRY8UZo+Bbd6AlHHdIHWcNItJ2b0k4LLOVJ0MfoaKmoOJoQgkAYNPIGaDHM+q1FrhJSGtPCRKN4oaGevwWQPBZot9RY8QCU0v07p+MG9wQ8sR1nbfjF0jLQiLODSVsSNVSv2c01t6HkwmOfHe+YbTy9WkKIxQq3wMLcPT7iNcHAzsY3QQ+MSJn+xWrWcvJmfsGocVUUy20DV946NGUzpfHlXnfAxLTSOkUIj4kTXmUthIolzsRDqueoTeliIN5yeHFhQr7aX+NMrrVPba48EXRLsdQUzq6okTF6XnTDoDiLSu/AxKzItqz2lAVmpc011L6F9YIN/RQi6kK44++CvHf5kaHw=
 
 cache:
   directories:
-  - $HOME/.m2
+  - "$HOME/.m2"
diff --git 
a/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/DuplicateEdgeGroupPropertyValue.java
 
b/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/DuplicateEdgeGroupPropertyValue.java
index 2f1f6ae0..074f615c 100644
--- 
a/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/DuplicateEdgeGroupPropertyValue.java
+++ 
b/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/DuplicateEdgeGroupPropertyValue.java
@@ -32,6 +32,7 @@
 
   /**
* Constructor.
+   * @param groupId Group ID.
*/
   public DuplicateEdgeGroupPropertyValue(final String groupId) {
 this.isRepresentativeEdgeDecided = false;
@@ -44,7 +45,7 @@ public DuplicateEdgeGroupPropertyValue(final String groupId) {
* @param representativeEdgeId physical edge id of representative edge.
*/
   public void setRepresentativeEdgeId(final String representativeEdgeId) {
-if (isRepresentativeEdgeDecided) {
+if (isRepresentativeEdgeDecided && 
!this.representativeEdgeId.equals(representativeEdgeId)) {
   throw new RuntimeException("edge id is already decided");
 }
 this.isRepresentativeEdgeDecided = true;
diff --git a/common/src/main/java/edu/snu/nemo/common/test/ArgBuilder.java 
b/common/src/main/java/edu/snu/nemo/common/test/ArgBuilder.java
index 751325f6..7fc6e335 100644
--- a/common/src/main/java/edu/snu/nemo/common/test/ArgBuilder.java
+++ b/common/src/main/java/edu/snu/nemo/common/test/ArgBuilder.java
@@ -19,13 +19,20 @@
 import java.util.Arrays;
 import java.util.List;
 import java.util.StringJoiner;
-import java.util.stream.Collectors;
 
 /**
  * Argument builder.
  */
 public final class ArgBuilder {
-  private List args = new ArrayList<>();
+  private List args;
+
+  /**
+   * Constructor with default values.
+   */
+  public ArgBuilder() {
+this.args = new ArrayList<>();
+this.args.add(Arrays.asList("-executor_json", 
"../resources/sample_executor_resources.json"));
+  }
 
   /**
* @param jobId job id.
@@ -78,8 +85,6 @@ public ArgBuilder addDAGDirectory(final String directory) {
* @return the built arguments.
*/
   public String[] build() {
-// new String[0] is good for performance
-// see 
http://stackoverflow.com/questions/4042434/converting-arrayliststring-to-string-in-java
-return 
args.stream().flatMap(List::stream).collect(Collectors.toList()).toArray(new 
String[0]);
+return args.stream().flatMap(List::stream).toArray(String[]::new);
   }
 }
diff --git a/common/src/main/java/edu/snu/nemo/common/test/ExampleTestUtil.java 
b/common/src/main/java/edu/snu/nemo/common/test/ExampleTestUtil.java
index eeec1a91..22c1127a 100644
--- a/common/src/main/java/edu/snu/nemo/common/test/ExampleTestUtil.java
+++ b/common/src/main/java/edu/snu/nemo/common/test/ExampleTestUtil.java
@@ -82,6 +82,11 @@ public static void ensureOutputValidity(final String 
resourcePath,
* This method test the output validity of AlternatingLeastSquareITCase.
* Due to the floating point math error, the output of the test can be 
different every time.
* Thus we cannot compare plain text output, but have to check its numeric 
error.
+   *
+   * @param resourcePath path to resources.
+   * @param outputFileName name of output file.
+   * @param testResourceFileName name of the file to compare the outputs to.
+   * @throws IOException exception.
*/
   public static void ensureALSOutputValidity(final String resourcePath,
  

[GitHub] jooykim commented on issue #4: [NEMO-28] Javadoc and Travis

2018-03-06 Thread GitBox
jooykim commented on issue #4: [NEMO-28] Javadoc and Travis
URL: https://github.com/apache/incubator-nemo/pull/4#issuecomment-370780405
 
 
   @sanha Please go ahead and merge this PR if you're fine with what @wonook 
said. Thanks!


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] wonook commented on issue #4: [NEMO-28] Javadoc and Travis

2018-03-06 Thread GitBox
wonook commented on issue #4: [NEMO-28] Javadoc and Travis
URL: https://github.com/apache/incubator-nemo/pull/4#issuecomment-370778158
 
 
   @jooykim My bad. We should set up the PR template on the repo to remove 
confusion. I'll add a ticket on JIRA for that.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] jooykim commented on issue #4: [NEMO-28] Javadoc and Travis

2018-03-06 Thread GitBox
jooykim commented on issue #4: [NEMO-28] Javadoc and Travis
URL: https://github.com/apache/incubator-nemo/pull/4#issuecomment-370777667
 
 
   @wonook Can we remove the "The contents below prerequisites and setup should 
soon move to either the website or the wiki." under other comments in the PR 
description?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] wonook commented on a change in pull request #4: [NEMO-28] Javadoc and Travis

2018-03-06 Thread GitBox
wonook commented on a change in pull request #4: [NEMO-28] Javadoc and Travis
URL: https://github.com/apache/incubator-nemo/pull/4#discussion_r172510877
 
 

 ##
 File path: common/src/main/java/edu/snu/nemo/common/test/ArgBuilder.java
 ##
 @@ -19,12 +19,14 @@
 import java.util.Arrays;
 import java.util.List;
 import java.util.StringJoiner;
-import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 /**
  * Argument builder.
  */
 public final class ArgBuilder {
+  private static final List DEFAULT_ARGS = 
Arrays.asList(Arrays.asList("-executor_json",
 
 Review comment:
   I'd rather not fix it, since it would prevent users from running their 
applications using the `bin/run.sh` script without explicitly setting the 
`executor_json` parameter. This is indeed the default value for running 
integration tests, but not for all use cases.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] wonook opened a new pull request #4: [NEMO-28] Javadoc and Travis

2018-03-06 Thread GitBox
wonook opened a new pull request #4: [NEMO-28] Javadoc and Travis
URL: https://github.com/apache/incubator-nemo/pull/4
 
 
   JIRA: [NEMO-28: Javadoc and 
Travis](https://issues.apache.org/jira/browse/NEMO-28)
   
   **Major changes:**
   
   - Travis script fixed to catch errors  
   
   **Minor changes to note:**
   
   - Some trivial fixes to make travis pass have been made to the code
   - Integration test code have been reordered to look neat and consistent.
   
   **Tests for the changes:**
   
   - Not required
   
   **Other comments:**
   
   - The contents below prerequisites and setup should soon move to either the 
website or the wiki.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services