[GitHub] johnyangk commented on a change in pull request #2: [NEMO-7] Intra-TaskGroup pipelining

2018-03-06 Thread GitBox
johnyangk commented on a change in pull request #2: [NEMO-7] Intra-TaskGroup 
pipelining
URL: https://github.com/apache/incubator-nemo/pull/2#discussion_r172710456
 
 

 ##
 File path: 
runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskGroupExecutor.java
 ##
 @@ -60,13 +55,36 @@
   private final DataTransferFactory channelFactory;
   private final MetricCollector metricCollector;
 
-  /**
-   * Map of task IDs in this task group to their readers/writers.
-   */
-  private final Map physicalTaskIdToInputReaderMap;
-  private final Map 
physicalTaskIdToOutputWriterMap;
-
-  private boolean isExecutionRequested;
+  // Map of task ID to its intra-TaskGroup data pipe.
+  private final Map taskToInputPipesMap;
+  private final Map taskToOutputPipeMap;  // one and only one 
Pipe per task
+  // Readers/writers that deals with inter-TaskGroup data.
+  private final List inputReaders;
+  private final Map taskToInputReadersMap;
+  private final Map taskToSideInputReadersMap;
+  private final Map taskToOutputWritersMap;
+  private final Map inputReaderToTasksMap;
+  private final Map idToSrcIteratorMap;
+  private final Map srcIteratorIdToTasksMap;
+  private final Map iteratorIdToTasksMap;
+  private final LinkedBlockingQueue> iteratorQueue;
+  private volatile Map pipeIdToDstTasksMap;
 
 Review comment:
   Can you explain in the comment why this is marked as `volatile`? My 
understanding is that only a single thread instantiates and executes a 
`TaskGroupExecutor`. Let me know if I'm missing something.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] johnyangk commented on a change in pull request #2: [NEMO-7] Intra-TaskGroup pipelining

2018-03-06 Thread GitBox
johnyangk commented on a change in pull request #2: [NEMO-7] Intra-TaskGroup 
pipelining
URL: https://github.com/apache/incubator-nemo/pull/2#discussion_r172709998
 
 

 ##
 File path: 
runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskGroupExecutor.java
 ##
 @@ -60,13 +55,36 @@
   private final DataTransferFactory channelFactory;
   private final MetricCollector metricCollector;
 
-  /**
-   * Map of task IDs in this task group to their readers/writers.
-   */
-  private final Map physicalTaskIdToInputReaderMap;
-  private final Map 
physicalTaskIdToOutputWriterMap;
-
-  private boolean isExecutionRequested;
+  // Map of task ID to its intra-TaskGroup data pipe.
+  private final Map taskToInputPipesMap;
+  private final Map taskToOutputPipeMap;  // one and only one 
Pipe per task
+  // Readers/writers that deals with inter-TaskGroup data.
+  private final List inputReaders;
+  private final Map taskToInputReadersMap;
+  private final Map taskToSideInputReadersMap;
+  private final Map taskToOutputWritersMap;
+  private final Map inputReaderToTasksMap;
+  private final Map idToSrcIteratorMap;
+  private final Map srcIteratorIdToTasksMap;
+  private final Map iteratorIdToTasksMap;
+  private final LinkedBlockingQueue> iteratorQueue;
+  private volatile Map pipeIdToDstTasksMap;
+  private final Set preparedTransforms;
+  private final Set finishedTaskIds;
 
 Review comment:
   Can you do without this?
   (e.g., We assume a task is finished, when it consumes all of its input 
iterators)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] johnyangk commented on a change in pull request #2: [NEMO-7] Intra-TaskGroup pipelining

2018-03-06 Thread GitBox
johnyangk commented on a change in pull request #2: [NEMO-7] Intra-TaskGroup 
pipelining
URL: https://github.com/apache/incubator-nemo/pull/2#discussion_r172709098
 
 

 ##
 File path: 
runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskGroupExecutor.java
 ##
 @@ -60,13 +55,36 @@
   private final DataTransferFactory channelFactory;
   private final MetricCollector metricCollector;
 
-  /**
-   * Map of task IDs in this task group to their readers/writers.
-   */
-  private final Map physicalTaskIdToInputReaderMap;
-  private final Map 
physicalTaskIdToOutputWriterMap;
-
-  private boolean isExecutionRequested;
+  // Map of task ID to its intra-TaskGroup data pipe.
+  private final Map taskToInputPipesMap;
+  private final Map taskToOutputPipeMap;  // one and only one 
Pipe per task
+  // Readers/writers that deals with inter-TaskGroup data.
 
 Review comment:
   I'm not sure about the newly added data structures below for the following 
reasons
   - Some of them cost an extra per-element operation and memory overhead
   - Maybe we can do without some of them
   
   Alternatively, I'd think about the following options
   - Compose iterators like Spark 
(https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala#L38)
   - Reuse `taskGroupDag` to obtain dependency information


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] johnyangk commented on a change in pull request #2: [NEMO-7] Intra-TaskGroup pipelining

2018-03-06 Thread GitBox
johnyangk commented on a change in pull request #2: [NEMO-7] Intra-TaskGroup 
pipelining
URL: https://github.com/apache/incubator-nemo/pull/2#discussion_r172710709
 
 

 ##
 File path: 
runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskGroupExecutor.java
 ##
 @@ -60,13 +55,36 @@
   private final DataTransferFactory channelFactory;
   private final MetricCollector metricCollector;
 
-  /**
-   * Map of task IDs in this task group to their readers/writers.
-   */
-  private final Map physicalTaskIdToInputReaderMap;
-  private final Map 
physicalTaskIdToOutputWriterMap;
-
-  private boolean isExecutionRequested;
+  // Map of task ID to its intra-TaskGroup data pipe.
+  private final Map taskToInputPipesMap;
+  private final Map taskToOutputPipeMap;  // one and only one 
Pipe per task
+  // Readers/writers that deals with inter-TaskGroup data.
+  private final List inputReaders;
+  private final Map taskToInputReadersMap;
+  private final Map taskToSideInputReadersMap;
+  private final Map taskToOutputWritersMap;
+  private final Map inputReaderToTasksMap;
+  private final Map idToSrcIteratorMap;
+  private final Map srcIteratorIdToTasksMap;
+  private final Map iteratorIdToTasksMap;
+  private final LinkedBlockingQueue> iteratorQueue;
+  private volatile Map pipeIdToDstTasksMap;
+  private final Set preparedTransforms;
+  private final Set finishedTaskIds;
+  private final AtomicInteger completedFutures;
+  private int numBoundedSources;
+  private int numIterators;
+
+  // For metrics
 
 Review comment:
   Can you refactor these variables into a separate class?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] johnyangk commented on a change in pull request #2: [NEMO-7] Intra-TaskGroup pipelining

2018-03-06 Thread GitBox
johnyangk commented on a change in pull request #2: [NEMO-7] Intra-TaskGroup 
pipelining
URL: https://github.com/apache/incubator-nemo/pull/2#discussion_r172710239
 
 

 ##
 File path: 
runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskGroupExecutor.java
 ##
 @@ -60,13 +55,36 @@
   private final DataTransferFactory channelFactory;
   private final MetricCollector metricCollector;
 
-  /**
-   * Map of task IDs in this task group to their readers/writers.
-   */
-  private final Map physicalTaskIdToInputReaderMap;
-  private final Map 
physicalTaskIdToOutputWriterMap;
-
-  private boolean isExecutionRequested;
+  // Map of task ID to its intra-TaskGroup data pipe.
+  private final Map taskToInputPipesMap;
+  private final Map taskToOutputPipeMap;  // one and only one 
Pipe per task
+  // Readers/writers that deals with inter-TaskGroup data.
+  private final List inputReaders;
+  private final Map taskToInputReadersMap;
+  private final Map taskToSideInputReadersMap;
+  private final Map taskToOutputWritersMap;
+  private final Map inputReaderToTasksMap;
+  private final Map idToSrcIteratorMap;
+  private final Map srcIteratorIdToTasksMap;
+  private final Map iteratorIdToTasksMap;
+  private final LinkedBlockingQueue> iteratorQueue;
+  private volatile Map pipeIdToDstTasksMap;
+  private final Set preparedTransforms;
 
 Review comment:
   Remove, and instead prepare transforms before consuming any data?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] johnyangk commented on a change in pull request #2: [NEMO-7] Intra-TaskGroup pipelining

2018-03-06 Thread GitBox
johnyangk commented on a change in pull request #2: [NEMO-7] Intra-TaskGroup 
pipelining
URL: https://github.com/apache/incubator-nemo/pull/2#discussion_r172704416
 
 

 ##
 File path: 
runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskGroupExecutor.java
 ##
 @@ -145,303 +201,504 @@ private void initializeDataTransfer() {
 .collect(Collectors.toSet());
   }
 
-  // Helper functions to initializes stage-internal edges.
-  private void createLocalReader(final Task task, final RuntimeEdge 
internalEdge) {
-final InputReader inputReader = 
channelFactory.createLocalReader(taskGroupIdx, internalEdge);
-addInputReader(task, inputReader);
-  }
+  /**
+   * Add input pipes to each {@link Task}.
+   * Input pipe denotes all the pipes of intra-Stage parent tasks of this task.
+   *
+   * @param task the Task to add input pipes to.
+   */
+  private void addInputPipe(final Task task) {
+List inputPipes = new ArrayList<>();
+List parentTasks = taskGroupDag.getParents(task.getId());
+final String physicalTaskId = getPhysicalTaskId(task.getId());
 
-  private void createLocalWriter(final Task task, final RuntimeEdge 
internalEdge) {
-final OutputWriter outputWriter = channelFactory.createLocalWriter(task, 
taskGroupIdx, internalEdge);
-addOutputWriter(task, outputWriter);
+if (parentTasks != null) {
+  parentTasks.forEach(parent -> {
+final LocalPipe parentOutputPipe = taskToOutputPipeMap.get(parent);
+inputPipes.add(parentOutputPipe);
+LOG.info("log: Added Outputpipe of {} as InputPipe of {} {}",
+getPhysicalTaskId(parent.getId()), taskGroupId, physicalTaskId);
+  });
+  taskToInputPipesMap.put(task, inputPipes);
+}
   }
 
-  // Helper functions to add the initialized reader/writer to the maintained 
map.
-  private void addInputReader(final Task task, final InputReader inputReader) {
+  /**
+   * Add output pipes to each {@link Task}.
+   * Output pipe denotes the one and only one pipe of this task.
+   * Check the outgoing edges that will use this pipe,
+   * and set this pipe as side input if any one of the edges uses this pipe as 
side input.
+   *
+   * @param task the Task to add output pipes to.
+   */
+  private void addOutputPipe(final Task task) {
+final LocalPipe outputPipe = new LocalPipe();
 final String physicalTaskId = getPhysicalTaskId(task.getId());
-physicalTaskIdToInputReaderMap.computeIfAbsent(physicalTaskId, readerList 
-> new ArrayList<>());
-physicalTaskIdToInputReaderMap.get(physicalTaskId).add(inputReader);
-  }
+final List outEdges = 
taskGroupDag.getOutgoingEdgesOf(task);
+
+outEdges.forEach(outEdge -> {
+  if (outEdge.isSideInput()) {
+outputPipe.setSideInputRuntimeEdge(outEdge);
+outputPipe.setAsSideInput(physicalTaskId);
+LOG.info("log: {} {} Marked as accepting sideInput(edge {})",
+taskGroupId, physicalTaskId, outEdge.getId());
+  }
+});
 
-  private void addOutputWriter(final Task task, final OutputWriter 
outputWriter) {
-final String physicalTaskId = getPhysicalTaskId(task.getId());
-physicalTaskIdToOutputWriterMap.computeIfAbsent(physicalTaskId, readerList 
-> new ArrayList<>());
-physicalTaskIdToOutputWriterMap.get(physicalTaskId).add(outputWriter);
+taskToOutputPipeMap.put(task, outputPipe);
+LOG.info("log: {} {} Added OutputPipe", taskGroupId, physicalTaskId);
   }
 
-  /**
-   * Executes the task group.
-   */
-  public void execute() {
-LOG.info("{} Execution Started!", taskGroupId);
-if (isExecutionRequested) {
-  throw new RuntimeException("TaskGroup {" + taskGroupId + "} execution 
called again!");
-} else {
-  isExecutionRequested = true;
-}
+  private boolean hasInputPipe(final Task task) {
+return taskToInputPipesMap.containsKey(task);
+  }
 
-taskGroupStateManager.onTaskGroupStateChanged(
-TaskGroupState.State.EXECUTING, Optional.empty(), Optional.empty());
+  private boolean hasOutputWriter(final Task task) {
+return taskToOutputWritersMap.containsKey(task);
+  }
 
-taskGroupDag.topologicalDo(task -> {
-  final String physicalTaskId = getPhysicalTaskId(task.getId());
-  taskGroupStateManager.onTaskStateChanged(physicalTaskId, 
TaskState.State.EXECUTING, Optional.empty());
-  try {
-if (task instanceof BoundedSourceTask) {
-  launchBoundedSourceTask((BoundedSourceTask) task);
-  taskGroupStateManager.onTaskStateChanged(physicalTaskId, 
TaskState.State.COMPLETE, Optional.empty());
-  LOG.info("{} Execution Complete!", taskGroupId);
-} else if (task instanceof OperatorTask) {
-  launchOperatorTask((OperatorTask) task);
-  taskGroupStateManager.onTaskStateChanged(physicalTaskId, 
TaskState.State.COMPLETE, Optional.empty());
-  LOG.info("{} Execution Complete!", taskGroupId);
-} else if (task instanceof 

[GitHub] johnyangk commented on a change in pull request #2: [NEMO-7] Intra-TaskGroup pipelining

2018-03-06 Thread GitBox
johnyangk commented on a change in pull request #2: [NEMO-7] Intra-TaskGroup 
pipelining
URL: https://github.com/apache/incubator-nemo/pull/2#discussion_r172706819
 
 

 ##
 File path: 
runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskGroupExecutor.java
 ##
 @@ -145,303 +201,504 @@ private void initializeDataTransfer() {
 .collect(Collectors.toSet());
   }
 
-  // Helper functions to initializes stage-internal edges.
-  private void createLocalReader(final Task task, final RuntimeEdge 
internalEdge) {
-final InputReader inputReader = 
channelFactory.createLocalReader(taskGroupIdx, internalEdge);
-addInputReader(task, inputReader);
-  }
+  /**
+   * Add input pipes to each {@link Task}.
+   * Input pipe denotes all the pipes of intra-Stage parent tasks of this task.
+   *
+   * @param task the Task to add input pipes to.
+   */
+  private void addInputPipe(final Task task) {
+List inputPipes = new ArrayList<>();
+List parentTasks = taskGroupDag.getParents(task.getId());
+final String physicalTaskId = getPhysicalTaskId(task.getId());
 
-  private void createLocalWriter(final Task task, final RuntimeEdge 
internalEdge) {
-final OutputWriter outputWriter = channelFactory.createLocalWriter(task, 
taskGroupIdx, internalEdge);
-addOutputWriter(task, outputWriter);
+if (parentTasks != null) {
+  parentTasks.forEach(parent -> {
+final LocalPipe parentOutputPipe = taskToOutputPipeMap.get(parent);
+inputPipes.add(parentOutputPipe);
+LOG.info("log: Added Outputpipe of {} as InputPipe of {} {}",
+getPhysicalTaskId(parent.getId()), taskGroupId, physicalTaskId);
+  });
+  taskToInputPipesMap.put(task, inputPipes);
+}
   }
 
-  // Helper functions to add the initialized reader/writer to the maintained 
map.
-  private void addInputReader(final Task task, final InputReader inputReader) {
+  /**
+   * Add output pipes to each {@link Task}.
+   * Output pipe denotes the one and only one pipe of this task.
+   * Check the outgoing edges that will use this pipe,
+   * and set this pipe as side input if any one of the edges uses this pipe as 
side input.
+   *
+   * @param task the Task to add output pipes to.
+   */
+  private void addOutputPipe(final Task task) {
+final LocalPipe outputPipe = new LocalPipe();
 final String physicalTaskId = getPhysicalTaskId(task.getId());
-physicalTaskIdToInputReaderMap.computeIfAbsent(physicalTaskId, readerList 
-> new ArrayList<>());
-physicalTaskIdToInputReaderMap.get(physicalTaskId).add(inputReader);
-  }
+final List outEdges = 
taskGroupDag.getOutgoingEdgesOf(task);
+
+outEdges.forEach(outEdge -> {
+  if (outEdge.isSideInput()) {
+outputPipe.setSideInputRuntimeEdge(outEdge);
+outputPipe.setAsSideInput(physicalTaskId);
+LOG.info("log: {} {} Marked as accepting sideInput(edge {})",
+taskGroupId, physicalTaskId, outEdge.getId());
+  }
+});
 
-  private void addOutputWriter(final Task task, final OutputWriter 
outputWriter) {
-final String physicalTaskId = getPhysicalTaskId(task.getId());
-physicalTaskIdToOutputWriterMap.computeIfAbsent(physicalTaskId, readerList 
-> new ArrayList<>());
-physicalTaskIdToOutputWriterMap.get(physicalTaskId).add(outputWriter);
+taskToOutputPipeMap.put(task, outputPipe);
+LOG.info("log: {} {} Added OutputPipe", taskGroupId, physicalTaskId);
   }
 
-  /**
-   * Executes the task group.
-   */
-  public void execute() {
-LOG.info("{} Execution Started!", taskGroupId);
-if (isExecutionRequested) {
-  throw new RuntimeException("TaskGroup {" + taskGroupId + "} execution 
called again!");
-} else {
-  isExecutionRequested = true;
-}
+  private boolean hasInputPipe(final Task task) {
+return taskToInputPipesMap.containsKey(task);
+  }
 
-taskGroupStateManager.onTaskGroupStateChanged(
-TaskGroupState.State.EXECUTING, Optional.empty(), Optional.empty());
+  private boolean hasOutputWriter(final Task task) {
+return taskToOutputWritersMap.containsKey(task);
+  }
 
-taskGroupDag.topologicalDo(task -> {
-  final String physicalTaskId = getPhysicalTaskId(task.getId());
-  taskGroupStateManager.onTaskStateChanged(physicalTaskId, 
TaskState.State.EXECUTING, Optional.empty());
-  try {
-if (task instanceof BoundedSourceTask) {
-  launchBoundedSourceTask((BoundedSourceTask) task);
-  taskGroupStateManager.onTaskStateChanged(physicalTaskId, 
TaskState.State.COMPLETE, Optional.empty());
-  LOG.info("{} Execution Complete!", taskGroupId);
-} else if (task instanceof OperatorTask) {
-  launchOperatorTask((OperatorTask) task);
-  taskGroupStateManager.onTaskStateChanged(physicalTaskId, 
TaskState.State.COMPLETE, Optional.empty());
-  LOG.info("{} Execution Complete!", taskGroupId);
-} else if (task instanceof 

[GitHub] johnyangk commented on a change in pull request #2: [NEMO-7] Intra-TaskGroup pipelining

2018-03-06 Thread GitBox
johnyangk commented on a change in pull request #2: [NEMO-7] Intra-TaskGroup 
pipelining
URL: https://github.com/apache/incubator-nemo/pull/2#discussion_r172711081
 
 

 ##
 File path: 
runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskGroupExecutor.java
 ##
 @@ -60,13 +55,36 @@
   private final DataTransferFactory channelFactory;
   private final MetricCollector metricCollector;
 
-  /**
-   * Map of task IDs in this task group to their readers/writers.
-   */
-  private final Map physicalTaskIdToInputReaderMap;
-  private final Map 
physicalTaskIdToOutputWriterMap;
-
-  private boolean isExecutionRequested;
+  // Map of task ID to its intra-TaskGroup data pipe.
+  private final Map taskToInputPipesMap;
+  private final Map taskToOutputPipeMap;  // one and only one 
Pipe per task
+  // Readers/writers that deals with inter-TaskGroup data.
+  private final List inputReaders;
+  private final Map taskToInputReadersMap;
+  private final Map taskToSideInputReadersMap;
+  private final Map taskToOutputWritersMap;
+  private final Map inputReaderToTasksMap;
+  private final Map idToSrcIteratorMap;
+  private final Map srcIteratorIdToTasksMap;
+  private final Map iteratorIdToTasksMap;
+  private final LinkedBlockingQueue> iteratorQueue;
+  private volatile Map pipeIdToDstTasksMap;
+  private final Set preparedTransforms;
+  private final Set finishedTaskIds;
+  private final AtomicInteger completedFutures;
+  private int numBoundedSources;
+  private int numIterators;
+
+  // For metrics
+  private long serBlockSize;
+  private long encodedBlockSize;
+  private long accumulatedBlockedReadTime;
 
 Review comment:
   Remove unused variable.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] johnyangk commented on a change in pull request #2: [NEMO-7] Intra-TaskGroup pipelining

2018-03-06 Thread GitBox
johnyangk commented on a change in pull request #2: [NEMO-7] Intra-TaskGroup 
pipelining
URL: https://github.com/apache/incubator-nemo/pull/2#discussion_r172711250
 
 

 ##
 File path: 
runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskGroupExecutor.java
 ##
 @@ -145,303 +201,504 @@ private void initializeDataTransfer() {
 .collect(Collectors.toSet());
   }
 
-  // Helper functions to initializes stage-internal edges.
-  private void createLocalReader(final Task task, final RuntimeEdge 
internalEdge) {
-final InputReader inputReader = 
channelFactory.createLocalReader(taskGroupIdx, internalEdge);
-addInputReader(task, inputReader);
-  }
+  /**
+   * Add input pipes to each {@link Task}.
+   * Input pipe denotes all the pipes of intra-Stage parent tasks of this task.
+   *
+   * @param task the Task to add input pipes to.
+   */
+  private void addInputPipe(final Task task) {
+List inputPipes = new ArrayList<>();
+List parentTasks = taskGroupDag.getParents(task.getId());
+final String physicalTaskId = getPhysicalTaskId(task.getId());
 
-  private void createLocalWriter(final Task task, final RuntimeEdge 
internalEdge) {
-final OutputWriter outputWriter = channelFactory.createLocalWriter(task, 
taskGroupIdx, internalEdge);
-addOutputWriter(task, outputWriter);
+if (parentTasks != null) {
+  parentTasks.forEach(parent -> {
+final LocalPipe parentOutputPipe = taskToOutputPipeMap.get(parent);
+inputPipes.add(parentOutputPipe);
+LOG.info("log: Added Outputpipe of {} as InputPipe of {} {}",
+getPhysicalTaskId(parent.getId()), taskGroupId, physicalTaskId);
+  });
+  taskToInputPipesMap.put(task, inputPipes);
+}
   }
 
-  // Helper functions to add the initialized reader/writer to the maintained 
map.
-  private void addInputReader(final Task task, final InputReader inputReader) {
+  /**
+   * Add output pipes to each {@link Task}.
+   * Output pipe denotes the one and only one pipe of this task.
+   * Check the outgoing edges that will use this pipe,
+   * and set this pipe as side input if any one of the edges uses this pipe as 
side input.
+   *
+   * @param task the Task to add output pipes to.
+   */
+  private void addOutputPipe(final Task task) {
+final LocalPipe outputPipe = new LocalPipe();
 final String physicalTaskId = getPhysicalTaskId(task.getId());
-physicalTaskIdToInputReaderMap.computeIfAbsent(physicalTaskId, readerList 
-> new ArrayList<>());
-physicalTaskIdToInputReaderMap.get(physicalTaskId).add(inputReader);
-  }
+final List outEdges = 
taskGroupDag.getOutgoingEdgesOf(task);
+
+outEdges.forEach(outEdge -> {
+  if (outEdge.isSideInput()) {
 
 Review comment:
   Ditto on else{}.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] johnyangk commented on a change in pull request #2: [NEMO-7] Intra-TaskGroup pipelining

2018-03-06 Thread GitBox
johnyangk commented on a change in pull request #2: [NEMO-7] Intra-TaskGroup 
pipelining
URL: https://github.com/apache/incubator-nemo/pull/2#discussion_r172708316
 
 

 ##
 File path: 
runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskGroupExecutor.java
 ##
 @@ -60,13 +55,36 @@
   private final DataTransferFactory channelFactory;
   private final MetricCollector metricCollector;
 
-  /**
-   * Map of task IDs in this task group to their readers/writers.
-   */
-  private final Map physicalTaskIdToInputReaderMap;
-  private final Map 
physicalTaskIdToOutputWriterMap;
-
-  private boolean isExecutionRequested;
+  // Map of task ID to its intra-TaskGroup data pipe.
+  private final Map taskToInputPipesMap;
+  private final Map taskToOutputPipeMap;  // one and only one 
Pipe per task
+  // Readers/writers that deals with inter-TaskGroup data.
+  private final List inputReaders;
+  private final Map taskToInputReadersMap;
+  private final Map taskToSideInputReadersMap;
+  private final Map taskToOutputWritersMap;
+  private final Map inputReaderToTasksMap;
+  private final Map idToSrcIteratorMap;
+  private final Map srcIteratorIdToTasksMap;
+  private final Map iteratorIdToTasksMap;
+  private final LinkedBlockingQueue> iteratorQueue;
+  private volatile Map pipeIdToDstTasksMap;
+  private final Set preparedTransforms;
+  private final Set finishedTaskIds;
+  private final AtomicInteger completedFutures;
+  private int numBoundedSources;
 
 Review comment:
   Can this become a local variable?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] johnyangk commented on a change in pull request #2: [NEMO-7] Intra-TaskGroup pipelining

2018-03-06 Thread GitBox
johnyangk commented on a change in pull request #2: [NEMO-7] Intra-TaskGroup 
pipelining
URL: https://github.com/apache/incubator-nemo/pull/2#discussion_r172711562
 
 

 ##
 File path: 
runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskGroupExecutor.java
 ##
 @@ -60,13 +55,36 @@
   private final DataTransferFactory channelFactory;
   private final MetricCollector metricCollector;
 
-  /**
-   * Map of task IDs in this task group to their readers/writers.
-   */
-  private final Map physicalTaskIdToInputReaderMap;
-  private final Map 
physicalTaskIdToOutputWriterMap;
-
-  private boolean isExecutionRequested;
+  // Map of task ID to its intra-TaskGroup data pipe.
+  private final Map taskToInputPipesMap;
+  private final Map taskToOutputPipeMap;  // one and only one 
Pipe per task
+  // Readers/writers that deals with inter-TaskGroup data.
+  private final List inputReaders;
+  private final Map taskToInputReadersMap;
+  private final Map taskToSideInputReadersMap;
+  private final Map taskToOutputWritersMap;
+  private final Map inputReaderToTasksMap;
+  private final Map idToSrcIteratorMap;
+  private final Map srcIteratorIdToTasksMap;
+  private final Map iteratorIdToTasksMap;
+  private final LinkedBlockingQueue> iteratorQueue;
+  private volatile Map pipeIdToDstTasksMap;
+  private final Set preparedTransforms;
+  private final Set finishedTaskIds;
+  private final AtomicInteger completedFutures;
+  private int numBoundedSources;
+  private int numIterators;
+
+  // For metrics
+  private long serBlockSize;
+  private long encodedBlockSize;
+  private long accumulatedBlockedReadTime;
+
+  private volatile boolean isExecutionRequested;
 
 Review comment:
   Ditto on volatile.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] johnyangk commented on a change in pull request #2: [NEMO-7] Intra-TaskGroup pipelining

2018-03-06 Thread GitBox
johnyangk commented on a change in pull request #2: [NEMO-7] Intra-TaskGroup 
pipelining
URL: https://github.com/apache/incubator-nemo/pull/2#discussion_r172709428
 
 

 ##
 File path: 
runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskGroupExecutor.java
 ##
 @@ -60,13 +55,36 @@
   private final DataTransferFactory channelFactory;
   private final MetricCollector metricCollector;
 
-  /**
-   * Map of task IDs in this task group to their readers/writers.
-   */
-  private final Map physicalTaskIdToInputReaderMap;
-  private final Map 
physicalTaskIdToOutputWriterMap;
-
-  private boolean isExecutionRequested;
+  // Map of task ID to its intra-TaskGroup data pipe.
+  private final Map taskToInputPipesMap;
+  private final Map taskToOutputPipeMap;  // one and only one 
Pipe per task
+  // Readers/writers that deals with inter-TaskGroup data.
+  private final List inputReaders;
+  private final Map taskToInputReadersMap;
+  private final Map taskToSideInputReadersMap;
+  private final Map taskToOutputWritersMap;
+  private final Map inputReaderToTasksMap;
+  private final Map idToSrcIteratorMap;
+  private final Map srcIteratorIdToTasksMap;
+  private final Map iteratorIdToTasksMap;
+  private final LinkedBlockingQueue> iteratorQueue;
+  private volatile Map pipeIdToDstTasksMap;
+  private final Set preparedTransforms;
+  private final Set finishedTaskIds;
+  private final AtomicInteger completedFutures;
 
 Review comment:
   Remove? It does `getAndIncrement()`, but the returned value is not used.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] johnyangk commented on a change in pull request #2: [NEMO-7] Intra-TaskGroup pipelining

2018-03-06 Thread GitBox
johnyangk commented on a change in pull request #2: [NEMO-7] Intra-TaskGroup 
pipelining
URL: https://github.com/apache/incubator-nemo/pull/2#discussion_r172705428
 
 

 ##
 File path: 
runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskGroupExecutor.java
 ##
 @@ -145,303 +201,504 @@ private void initializeDataTransfer() {
 .collect(Collectors.toSet());
   }
 
-  // Helper functions to initializes stage-internal edges.
-  private void createLocalReader(final Task task, final RuntimeEdge 
internalEdge) {
-final InputReader inputReader = 
channelFactory.createLocalReader(taskGroupIdx, internalEdge);
-addInputReader(task, inputReader);
-  }
+  /**
+   * Add input pipes to each {@link Task}.
+   * Input pipe denotes all the pipes of intra-Stage parent tasks of this task.
+   *
+   * @param task the Task to add input pipes to.
+   */
+  private void addInputPipe(final Task task) {
+List inputPipes = new ArrayList<>();
+List parentTasks = taskGroupDag.getParents(task.getId());
+final String physicalTaskId = getPhysicalTaskId(task.getId());
 
-  private void createLocalWriter(final Task task, final RuntimeEdge 
internalEdge) {
-final OutputWriter outputWriter = channelFactory.createLocalWriter(task, 
taskGroupIdx, internalEdge);
-addOutputWriter(task, outputWriter);
+if (parentTasks != null) {
+  parentTasks.forEach(parent -> {
+final LocalPipe parentOutputPipe = taskToOutputPipeMap.get(parent);
+inputPipes.add(parentOutputPipe);
+LOG.info("log: Added Outputpipe of {} as InputPipe of {} {}",
+getPhysicalTaskId(parent.getId()), taskGroupId, physicalTaskId);
+  });
+  taskToInputPipesMap.put(task, inputPipes);
+}
   }
 
-  // Helper functions to add the initialized reader/writer to the maintained 
map.
-  private void addInputReader(final Task task, final InputReader inputReader) {
+  /**
+   * Add output pipes to each {@link Task}.
+   * Output pipe denotes the one and only one pipe of this task.
+   * Check the outgoing edges that will use this pipe,
+   * and set this pipe as side input if any one of the edges uses this pipe as 
side input.
+   *
+   * @param task the Task to add output pipes to.
+   */
+  private void addOutputPipe(final Task task) {
+final LocalPipe outputPipe = new LocalPipe();
 final String physicalTaskId = getPhysicalTaskId(task.getId());
-physicalTaskIdToInputReaderMap.computeIfAbsent(physicalTaskId, readerList 
-> new ArrayList<>());
-physicalTaskIdToInputReaderMap.get(physicalTaskId).add(inputReader);
-  }
+final List outEdges = 
taskGroupDag.getOutgoingEdgesOf(task);
+
+outEdges.forEach(outEdge -> {
+  if (outEdge.isSideInput()) {
+outputPipe.setSideInputRuntimeEdge(outEdge);
+outputPipe.setAsSideInput(physicalTaskId);
+LOG.info("log: {} {} Marked as accepting sideInput(edge {})",
+taskGroupId, physicalTaskId, outEdge.getId());
+  }
+});
 
-  private void addOutputWriter(final Task task, final OutputWriter 
outputWriter) {
-final String physicalTaskId = getPhysicalTaskId(task.getId());
-physicalTaskIdToOutputWriterMap.computeIfAbsent(physicalTaskId, readerList 
-> new ArrayList<>());
-physicalTaskIdToOutputWriterMap.get(physicalTaskId).add(outputWriter);
+taskToOutputPipeMap.put(task, outputPipe);
+LOG.info("log: {} {} Added OutputPipe", taskGroupId, physicalTaskId);
   }
 
-  /**
-   * Executes the task group.
-   */
-  public void execute() {
-LOG.info("{} Execution Started!", taskGroupId);
-if (isExecutionRequested) {
-  throw new RuntimeException("TaskGroup {" + taskGroupId + "} execution 
called again!");
-} else {
-  isExecutionRequested = true;
-}
+  private boolean hasInputPipe(final Task task) {
+return taskToInputPipesMap.containsKey(task);
+  }
 
-taskGroupStateManager.onTaskGroupStateChanged(
-TaskGroupState.State.EXECUTING, Optional.empty(), Optional.empty());
+  private boolean hasOutputWriter(final Task task) {
+return taskToOutputWritersMap.containsKey(task);
+  }
 
-taskGroupDag.topologicalDo(task -> {
-  final String physicalTaskId = getPhysicalTaskId(task.getId());
-  taskGroupStateManager.onTaskStateChanged(physicalTaskId, 
TaskState.State.EXECUTING, Optional.empty());
-  try {
-if (task instanceof BoundedSourceTask) {
-  launchBoundedSourceTask((BoundedSourceTask) task);
-  taskGroupStateManager.onTaskStateChanged(physicalTaskId, 
TaskState.State.COMPLETE, Optional.empty());
-  LOG.info("{} Execution Complete!", taskGroupId);
-} else if (task instanceof OperatorTask) {
-  launchOperatorTask((OperatorTask) task);
-  taskGroupStateManager.onTaskStateChanged(physicalTaskId, 
TaskState.State.COMPLETE, Optional.empty());
-  LOG.info("{} Execution Complete!", taskGroupId);
-} else if (task instanceof 

[GitHub] johnyangk commented on a change in pull request #2: [NEMO-7] Intra-TaskGroup pipelining

2018-03-06 Thread GitBox
johnyangk commented on a change in pull request #2: [NEMO-7] Intra-TaskGroup 
pipelining
URL: https://github.com/apache/incubator-nemo/pull/2#discussion_r172711209
 
 

 ##
 File path: 
runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskGroupExecutor.java
 ##
 @@ -145,303 +201,504 @@ private void initializeDataTransfer() {
 .collect(Collectors.toSet());
   }
 
-  // Helper functions to initializes stage-internal edges.
-  private void createLocalReader(final Task task, final RuntimeEdge 
internalEdge) {
-final InputReader inputReader = 
channelFactory.createLocalReader(taskGroupIdx, internalEdge);
-addInputReader(task, inputReader);
-  }
+  /**
+   * Add input pipes to each {@link Task}.
+   * Input pipe denotes all the pipes of intra-Stage parent tasks of this task.
+   *
+   * @param task the Task to add input pipes to.
+   */
+  private void addInputPipe(final Task task) {
+List inputPipes = new ArrayList<>();
+List parentTasks = taskGroupDag.getParents(task.getId());
+final String physicalTaskId = getPhysicalTaskId(task.getId());
 
-  private void createLocalWriter(final Task task, final RuntimeEdge 
internalEdge) {
-final OutputWriter outputWriter = channelFactory.createLocalWriter(task, 
taskGroupIdx, internalEdge);
-addOutputWriter(task, outputWriter);
+if (parentTasks != null) {
+  parentTasks.forEach(parent -> {
+final LocalPipe parentOutputPipe = taskToOutputPipeMap.get(parent);
+inputPipes.add(parentOutputPipe);
+LOG.info("log: Added Outputpipe of {} as InputPipe of {} {}",
+getPhysicalTaskId(parent.getId()), taskGroupId, physicalTaskId);
+  });
+  taskToInputPipesMap.put(task, inputPipes);
+}
 
 Review comment:
   add else {} and add a comment in it to explain why we're not handling that 
case..


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] johnyangk commented on a change in pull request #2: [NEMO-7] Intra-TaskGroup pipelining

2018-03-06 Thread GitBox
johnyangk commented on a change in pull request #2: [NEMO-7] Intra-TaskGroup 
pipelining
URL: https://github.com/apache/incubator-nemo/pull/2#discussion_r172707585
 
 

 ##
 File path: 
runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskGroupExecutor.java
 ##
 @@ -145,303 +201,504 @@ private void initializeDataTransfer() {
 .collect(Collectors.toSet());
   }
 
-  // Helper functions to initializes stage-internal edges.
-  private void createLocalReader(final Task task, final RuntimeEdge 
internalEdge) {
-final InputReader inputReader = 
channelFactory.createLocalReader(taskGroupIdx, internalEdge);
-addInputReader(task, inputReader);
-  }
+  /**
+   * Add input pipes to each {@link Task}.
+   * Input pipe denotes all the pipes of intra-Stage parent tasks of this task.
+   *
+   * @param task the Task to add input pipes to.
+   */
+  private void addInputPipe(final Task task) {
+List inputPipes = new ArrayList<>();
+List parentTasks = taskGroupDag.getParents(task.getId());
+final String physicalTaskId = getPhysicalTaskId(task.getId());
 
-  private void createLocalWriter(final Task task, final RuntimeEdge 
internalEdge) {
-final OutputWriter outputWriter = channelFactory.createLocalWriter(task, 
taskGroupIdx, internalEdge);
-addOutputWriter(task, outputWriter);
+if (parentTasks != null) {
+  parentTasks.forEach(parent -> {
+final LocalPipe parentOutputPipe = taskToOutputPipeMap.get(parent);
+inputPipes.add(parentOutputPipe);
+LOG.info("log: Added Outputpipe of {} as InputPipe of {} {}",
+getPhysicalTaskId(parent.getId()), taskGroupId, physicalTaskId);
+  });
+  taskToInputPipesMap.put(task, inputPipes);
+}
   }
 
-  // Helper functions to add the initialized reader/writer to the maintained 
map.
-  private void addInputReader(final Task task, final InputReader inputReader) {
+  /**
+   * Add output pipes to each {@link Task}.
+   * Output pipe denotes the one and only one pipe of this task.
+   * Check the outgoing edges that will use this pipe,
+   * and set this pipe as side input if any one of the edges uses this pipe as 
side input.
+   *
+   * @param task the Task to add output pipes to.
+   */
+  private void addOutputPipe(final Task task) {
+final LocalPipe outputPipe = new LocalPipe();
 final String physicalTaskId = getPhysicalTaskId(task.getId());
-physicalTaskIdToInputReaderMap.computeIfAbsent(physicalTaskId, readerList 
-> new ArrayList<>());
-physicalTaskIdToInputReaderMap.get(physicalTaskId).add(inputReader);
-  }
+final List outEdges = 
taskGroupDag.getOutgoingEdgesOf(task);
+
+outEdges.forEach(outEdge -> {
+  if (outEdge.isSideInput()) {
+outputPipe.setSideInputRuntimeEdge(outEdge);
+outputPipe.setAsSideInput(physicalTaskId);
+LOG.info("log: {} {} Marked as accepting sideInput(edge {})",
+taskGroupId, physicalTaskId, outEdge.getId());
+  }
+});
 
-  private void addOutputWriter(final Task task, final OutputWriter 
outputWriter) {
-final String physicalTaskId = getPhysicalTaskId(task.getId());
-physicalTaskIdToOutputWriterMap.computeIfAbsent(physicalTaskId, readerList 
-> new ArrayList<>());
-physicalTaskIdToOutputWriterMap.get(physicalTaskId).add(outputWriter);
+taskToOutputPipeMap.put(task, outputPipe);
+LOG.info("log: {} {} Added OutputPipe", taskGroupId, physicalTaskId);
   }
 
-  /**
-   * Executes the task group.
-   */
-  public void execute() {
-LOG.info("{} Execution Started!", taskGroupId);
-if (isExecutionRequested) {
-  throw new RuntimeException("TaskGroup {" + taskGroupId + "} execution 
called again!");
-} else {
-  isExecutionRequested = true;
-}
+  private boolean hasInputPipe(final Task task) {
+return taskToInputPipesMap.containsKey(task);
+  }
 
-taskGroupStateManager.onTaskGroupStateChanged(
-TaskGroupState.State.EXECUTING, Optional.empty(), Optional.empty());
+  private boolean hasOutputWriter(final Task task) {
+return taskToOutputWritersMap.containsKey(task);
+  }
 
-taskGroupDag.topologicalDo(task -> {
-  final String physicalTaskId = getPhysicalTaskId(task.getId());
-  taskGroupStateManager.onTaskStateChanged(physicalTaskId, 
TaskState.State.EXECUTING, Optional.empty());
-  try {
-if (task instanceof BoundedSourceTask) {
-  launchBoundedSourceTask((BoundedSourceTask) task);
-  taskGroupStateManager.onTaskStateChanged(physicalTaskId, 
TaskState.State.COMPLETE, Optional.empty());
-  LOG.info("{} Execution Complete!", taskGroupId);
-} else if (task instanceof OperatorTask) {
-  launchOperatorTask((OperatorTask) task);
-  taskGroupStateManager.onTaskStateChanged(physicalTaskId, 
TaskState.State.COMPLETE, Optional.empty());
-  LOG.info("{} Execution Complete!", taskGroupId);
-} else if (task instanceof 

[GitHub] johnyangk commented on a change in pull request #2: [NEMO-7] Intra-TaskGroup pipelining

2018-03-06 Thread GitBox
johnyangk commented on a change in pull request #2: [NEMO-7] Intra-TaskGroup 
pipelining
URL: https://github.com/apache/incubator-nemo/pull/2#discussion_r172711570
 
 

 ##
 File path: 
runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskGroupExecutor.java
 ##
 @@ -60,13 +55,36 @@
   private final DataTransferFactory channelFactory;
   private final MetricCollector metricCollector;
 
-  /**
-   * Map of task IDs in this task group to their readers/writers.
-   */
-  private final Map physicalTaskIdToInputReaderMap;
-  private final Map 
physicalTaskIdToOutputWriterMap;
-
-  private boolean isExecutionRequested;
+  // Map of task ID to its intra-TaskGroup data pipe.
+  private final Map taskToInputPipesMap;
+  private final Map taskToOutputPipeMap;  // one and only one 
Pipe per task
+  // Readers/writers that deals with inter-TaskGroup data.
+  private final List inputReaders;
+  private final Map taskToInputReadersMap;
+  private final Map taskToSideInputReadersMap;
+  private final Map taskToOutputWritersMap;
+  private final Map inputReaderToTasksMap;
+  private final Map idToSrcIteratorMap;
+  private final Map srcIteratorIdToTasksMap;
+  private final Map iteratorIdToTasksMap;
+  private final LinkedBlockingQueue> iteratorQueue;
+  private volatile Map pipeIdToDstTasksMap;
+  private final Set preparedTransforms;
+  private final Set finishedTaskIds;
+  private final AtomicInteger completedFutures;
+  private int numBoundedSources;
+  private int numIterators;
+
+  // For metrics
+  private long serBlockSize;
+  private long encodedBlockSize;
+  private long accumulatedBlockedReadTime;
+
+  private volatile boolean isExecutionRequested;
+  private volatile String logicalTaskIdPutOnHold;
 
 Review comment:
   Ditto on volatile.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] johnyangk commented on a change in pull request #2: [NEMO-7] Intra-TaskGroup pipelining

2018-03-06 Thread GitBox
johnyangk commented on a change in pull request #2: [NEMO-7] Intra-TaskGroup 
pipelining
URL: https://github.com/apache/incubator-nemo/pull/2#discussion_r172707266
 
 

 ##
 File path: 
runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskGroupExecutor.java
 ##
 @@ -145,303 +201,504 @@ private void initializeDataTransfer() {
 .collect(Collectors.toSet());
   }
 
-  // Helper functions to initializes stage-internal edges.
-  private void createLocalReader(final Task task, final RuntimeEdge 
internalEdge) {
-final InputReader inputReader = 
channelFactory.createLocalReader(taskGroupIdx, internalEdge);
-addInputReader(task, inputReader);
-  }
+  /**
+   * Add input pipes to each {@link Task}.
+   * Input pipe denotes all the pipes of intra-Stage parent tasks of this task.
+   *
+   * @param task the Task to add input pipes to.
+   */
+  private void addInputPipe(final Task task) {
+List inputPipes = new ArrayList<>();
+List parentTasks = taskGroupDag.getParents(task.getId());
+final String physicalTaskId = getPhysicalTaskId(task.getId());
 
-  private void createLocalWriter(final Task task, final RuntimeEdge 
internalEdge) {
-final OutputWriter outputWriter = channelFactory.createLocalWriter(task, 
taskGroupIdx, internalEdge);
-addOutputWriter(task, outputWriter);
+if (parentTasks != null) {
+  parentTasks.forEach(parent -> {
+final LocalPipe parentOutputPipe = taskToOutputPipeMap.get(parent);
+inputPipes.add(parentOutputPipe);
+LOG.info("log: Added Outputpipe of {} as InputPipe of {} {}",
+getPhysicalTaskId(parent.getId()), taskGroupId, physicalTaskId);
+  });
+  taskToInputPipesMap.put(task, inputPipes);
+}
   }
 
-  // Helper functions to add the initialized reader/writer to the maintained 
map.
-  private void addInputReader(final Task task, final InputReader inputReader) {
+  /**
+   * Add output pipes to each {@link Task}.
+   * Output pipe denotes the one and only one pipe of this task.
+   * Check the outgoing edges that will use this pipe,
+   * and set this pipe as side input if any one of the edges uses this pipe as 
side input.
+   *
+   * @param task the Task to add output pipes to.
+   */
+  private void addOutputPipe(final Task task) {
+final LocalPipe outputPipe = new LocalPipe();
 final String physicalTaskId = getPhysicalTaskId(task.getId());
-physicalTaskIdToInputReaderMap.computeIfAbsent(physicalTaskId, readerList 
-> new ArrayList<>());
-physicalTaskIdToInputReaderMap.get(physicalTaskId).add(inputReader);
-  }
+final List outEdges = 
taskGroupDag.getOutgoingEdgesOf(task);
+
+outEdges.forEach(outEdge -> {
+  if (outEdge.isSideInput()) {
+outputPipe.setSideInputRuntimeEdge(outEdge);
+outputPipe.setAsSideInput(physicalTaskId);
+LOG.info("log: {} {} Marked as accepting sideInput(edge {})",
+taskGroupId, physicalTaskId, outEdge.getId());
+  }
+});
 
-  private void addOutputWriter(final Task task, final OutputWriter 
outputWriter) {
-final String physicalTaskId = getPhysicalTaskId(task.getId());
-physicalTaskIdToOutputWriterMap.computeIfAbsent(physicalTaskId, readerList 
-> new ArrayList<>());
-physicalTaskIdToOutputWriterMap.get(physicalTaskId).add(outputWriter);
+taskToOutputPipeMap.put(task, outputPipe);
+LOG.info("log: {} {} Added OutputPipe", taskGroupId, physicalTaskId);
   }
 
-  /**
-   * Executes the task group.
-   */
-  public void execute() {
-LOG.info("{} Execution Started!", taskGroupId);
-if (isExecutionRequested) {
-  throw new RuntimeException("TaskGroup {" + taskGroupId + "} execution 
called again!");
-} else {
-  isExecutionRequested = true;
-}
+  private boolean hasInputPipe(final Task task) {
+return taskToInputPipesMap.containsKey(task);
+  }
 
-taskGroupStateManager.onTaskGroupStateChanged(
-TaskGroupState.State.EXECUTING, Optional.empty(), Optional.empty());
+  private boolean hasOutputWriter(final Task task) {
+return taskToOutputWritersMap.containsKey(task);
+  }
 
-taskGroupDag.topologicalDo(task -> {
-  final String physicalTaskId = getPhysicalTaskId(task.getId());
-  taskGroupStateManager.onTaskStateChanged(physicalTaskId, 
TaskState.State.EXECUTING, Optional.empty());
-  try {
-if (task instanceof BoundedSourceTask) {
-  launchBoundedSourceTask((BoundedSourceTask) task);
-  taskGroupStateManager.onTaskStateChanged(physicalTaskId, 
TaskState.State.COMPLETE, Optional.empty());
-  LOG.info("{} Execution Complete!", taskGroupId);
-} else if (task instanceof OperatorTask) {
-  launchOperatorTask((OperatorTask) task);
-  taskGroupStateManager.onTaskStateChanged(physicalTaskId, 
TaskState.State.COMPLETE, Optional.empty());
-  LOG.info("{} Execution Complete!", taskGroupId);
-} else if (task instanceof 

[GitHub] johnyangk commented on a change in pull request #2: [NEMO-7] Intra-TaskGroup pipelining

2018-03-06 Thread GitBox
johnyangk commented on a change in pull request #2: [NEMO-7] Intra-TaskGroup 
pipelining
URL: https://github.com/apache/incubator-nemo/pull/2#discussion_r172708330
 
 

 ##
 File path: 
runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskGroupExecutor.java
 ##
 @@ -60,13 +55,36 @@
   private final DataTransferFactory channelFactory;
   private final MetricCollector metricCollector;
 
-  /**
-   * Map of task IDs in this task group to their readers/writers.
-   */
-  private final Map physicalTaskIdToInputReaderMap;
-  private final Map 
physicalTaskIdToOutputWriterMap;
-
-  private boolean isExecutionRequested;
+  // Map of task ID to its intra-TaskGroup data pipe.
+  private final Map taskToInputPipesMap;
+  private final Map taskToOutputPipeMap;  // one and only one 
Pipe per task
+  // Readers/writers that deals with inter-TaskGroup data.
+  private final List inputReaders;
+  private final Map taskToInputReadersMap;
+  private final Map taskToSideInputReadersMap;
+  private final Map taskToOutputWritersMap;
+  private final Map inputReaderToTasksMap;
+  private final Map idToSrcIteratorMap;
+  private final Map srcIteratorIdToTasksMap;
+  private final Map iteratorIdToTasksMap;
+  private final LinkedBlockingQueue> iteratorQueue;
+  private volatile Map pipeIdToDstTasksMap;
+  private final Set preparedTransforms;
+  private final Set finishedTaskIds;
+  private final AtomicInteger completedFutures;
+  private int numBoundedSources;
+  private int numIterators;
 
 Review comment:
   Can this become a local variable?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] johnyangk commented on a change in pull request #2: [NEMO-7] Intra-TaskGroup pipelining

2018-03-06 Thread GitBox
johnyangk commented on a change in pull request #2: [NEMO-7] Intra-TaskGroup 
pipelining
URL: https://github.com/apache/incubator-nemo/pull/2#discussion_r172707978
 
 

 ##
 File path: 
runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskGroupExecutor.java
 ##
 @@ -145,303 +201,504 @@ private void initializeDataTransfer() {
 .collect(Collectors.toSet());
   }
 
-  // Helper functions to initializes stage-internal edges.
-  private void createLocalReader(final Task task, final RuntimeEdge 
internalEdge) {
-final InputReader inputReader = 
channelFactory.createLocalReader(taskGroupIdx, internalEdge);
-addInputReader(task, inputReader);
-  }
+  /**
+   * Add input pipes to each {@link Task}.
+   * Input pipe denotes all the pipes of intra-Stage parent tasks of this task.
+   *
+   * @param task the Task to add input pipes to.
+   */
+  private void addInputPipe(final Task task) {
+List inputPipes = new ArrayList<>();
+List parentTasks = taskGroupDag.getParents(task.getId());
+final String physicalTaskId = getPhysicalTaskId(task.getId());
 
-  private void createLocalWriter(final Task task, final RuntimeEdge 
internalEdge) {
-final OutputWriter outputWriter = channelFactory.createLocalWriter(task, 
taskGroupIdx, internalEdge);
-addOutputWriter(task, outputWriter);
+if (parentTasks != null) {
+  parentTasks.forEach(parent -> {
+final LocalPipe parentOutputPipe = taskToOutputPipeMap.get(parent);
+inputPipes.add(parentOutputPipe);
+LOG.info("log: Added Outputpipe of {} as InputPipe of {} {}",
+getPhysicalTaskId(parent.getId()), taskGroupId, physicalTaskId);
+  });
+  taskToInputPipesMap.put(task, inputPipes);
+}
   }
 
-  // Helper functions to add the initialized reader/writer to the maintained 
map.
-  private void addInputReader(final Task task, final InputReader inputReader) {
+  /**
+   * Add output pipes to each {@link Task}.
+   * Output pipe denotes the one and only one pipe of this task.
+   * Check the outgoing edges that will use this pipe,
+   * and set this pipe as side input if any one of the edges uses this pipe as 
side input.
+   *
+   * @param task the Task to add output pipes to.
+   */
+  private void addOutputPipe(final Task task) {
+final LocalPipe outputPipe = new LocalPipe();
 final String physicalTaskId = getPhysicalTaskId(task.getId());
-physicalTaskIdToInputReaderMap.computeIfAbsent(physicalTaskId, readerList 
-> new ArrayList<>());
-physicalTaskIdToInputReaderMap.get(physicalTaskId).add(inputReader);
-  }
+final List outEdges = 
taskGroupDag.getOutgoingEdgesOf(task);
+
+outEdges.forEach(outEdge -> {
+  if (outEdge.isSideInput()) {
+outputPipe.setSideInputRuntimeEdge(outEdge);
+outputPipe.setAsSideInput(physicalTaskId);
+LOG.info("log: {} {} Marked as accepting sideInput(edge {})",
+taskGroupId, physicalTaskId, outEdge.getId());
+  }
+});
 
-  private void addOutputWriter(final Task task, final OutputWriter 
outputWriter) {
-final String physicalTaskId = getPhysicalTaskId(task.getId());
-physicalTaskIdToOutputWriterMap.computeIfAbsent(physicalTaskId, readerList 
-> new ArrayList<>());
-physicalTaskIdToOutputWriterMap.get(physicalTaskId).add(outputWriter);
+taskToOutputPipeMap.put(task, outputPipe);
+LOG.info("log: {} {} Added OutputPipe", taskGroupId, physicalTaskId);
   }
 
-  /**
-   * Executes the task group.
-   */
-  public void execute() {
-LOG.info("{} Execution Started!", taskGroupId);
-if (isExecutionRequested) {
-  throw new RuntimeException("TaskGroup {" + taskGroupId + "} execution 
called again!");
-} else {
-  isExecutionRequested = true;
-}
+  private boolean hasInputPipe(final Task task) {
+return taskToInputPipesMap.containsKey(task);
+  }
 
-taskGroupStateManager.onTaskGroupStateChanged(
-TaskGroupState.State.EXECUTING, Optional.empty(), Optional.empty());
+  private boolean hasOutputWriter(final Task task) {
+return taskToOutputWritersMap.containsKey(task);
+  }
 
-taskGroupDag.topologicalDo(task -> {
-  final String physicalTaskId = getPhysicalTaskId(task.getId());
-  taskGroupStateManager.onTaskStateChanged(physicalTaskId, 
TaskState.State.EXECUTING, Optional.empty());
-  try {
-if (task instanceof BoundedSourceTask) {
-  launchBoundedSourceTask((BoundedSourceTask) task);
-  taskGroupStateManager.onTaskStateChanged(physicalTaskId, 
TaskState.State.COMPLETE, Optional.empty());
-  LOG.info("{} Execution Complete!", taskGroupId);
-} else if (task instanceof OperatorTask) {
-  launchOperatorTask((OperatorTask) task);
-  taskGroupStateManager.onTaskStateChanged(physicalTaskId, 
TaskState.State.COMPLETE, Optional.empty());
-  LOG.info("{} Execution Complete!", taskGroupId);
-} else if (task instanceof 

[GitHub] johnyangk commented on a change in pull request #2: [NEMO-7] Intra-TaskGroup pipelining

2018-03-06 Thread GitBox
johnyangk commented on a change in pull request #2: [NEMO-7] Intra-TaskGroup 
pipelining
URL: https://github.com/apache/incubator-nemo/pull/2#discussion_r172710298
 
 

 ##
 File path: 
runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskGroupExecutor.java
 ##
 @@ -60,13 +55,36 @@
   private final DataTransferFactory channelFactory;
   private final MetricCollector metricCollector;
 
-  /**
-   * Map of task IDs in this task group to their readers/writers.
-   */
-  private final Map physicalTaskIdToInputReaderMap;
-  private final Map 
physicalTaskIdToOutputWriterMap;
-
-  private boolean isExecutionRequested;
+  // Map of task ID to its intra-TaskGroup data pipe.
+  private final Map taskToInputPipesMap;
+  private final Map taskToOutputPipeMap;  // one and only one 
Pipe per task
+  // Readers/writers that deals with inter-TaskGroup data.
+  private final List inputReaders;
+  private final Map taskToInputReadersMap;
+  private final Map taskToSideInputReadersMap;
+  private final Map taskToOutputWritersMap;
+  private final Map inputReaderToTasksMap;
+  private final Map idToSrcIteratorMap;
+  private final Map srcIteratorIdToTasksMap;
+  private final Map iteratorIdToTasksMap;
+  private final LinkedBlockingQueue> iteratorQueue;
 
 Review comment:
   Please use a more descriptive name.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] johnyangk commented on a change in pull request #2: [NEMO-7] Intra-TaskGroup pipelining

2018-03-06 Thread GitBox
johnyangk commented on a change in pull request #2: [NEMO-7] Intra-TaskGroup 
pipelining
URL: https://github.com/apache/incubator-nemo/pull/2#discussion_r172710971
 
 

 ##
 File path: 
runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskGroupExecutor.java
 ##
 @@ -60,13 +55,36 @@
   private final DataTransferFactory channelFactory;
   private final MetricCollector metricCollector;
 
-  /**
-   * Map of task IDs in this task group to their readers/writers.
-   */
-  private final Map physicalTaskIdToInputReaderMap;
-  private final Map 
physicalTaskIdToOutputWriterMap;
-
-  private boolean isExecutionRequested;
+  // Map of task ID to its intra-TaskGroup data pipe.
+  private final Map taskToInputPipesMap;
+  private final Map taskToOutputPipeMap;  // one and only one 
Pipe per task
+  // Readers/writers that deals with inter-TaskGroup data.
+  private final List inputReaders;
+  private final Map taskToInputReadersMap;
+  private final Map taskToSideInputReadersMap;
+  private final Map taskToOutputWritersMap;
+  private final Map inputReaderToTasksMap;
+  private final Map idToSrcIteratorMap;
+  private final Map srcIteratorIdToTasksMap;
+  private final Map iteratorIdToTasksMap;
+  private final LinkedBlockingQueue> iteratorQueue;
+  private volatile Map pipeIdToDstTasksMap;
+  private final Set preparedTransforms;
+  private final Set finishedTaskIds;
+  private final AtomicInteger completedFutures;
+  private int numBoundedSources;
+  private int numIterators;
+
+  // For metrics
+  private long serBlockSize;
+  private long encodedBlockSize;
+  private long accumulatedBlockedReadTime;
+
+  private volatile boolean isExecutionRequested;
+  private volatile String logicalTaskIdPutOnHold;
+
+  private static final String ITERATORID_PREFIX = "ITERATOR_";
 
 Review comment:
   Can you do without iterator ids?
   (e.g., composing iterators, using pointers in a DAG)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services