[GitHub] johnyangk commented on a change in pull request #2: [NEMO-7] Intra-TaskGroup pipelining
johnyangk commented on a change in pull request #2: [NEMO-7] Intra-TaskGroup pipelining URL: https://github.com/apache/incubator-nemo/pull/2#discussion_r172710456 ## File path: runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskGroupExecutor.java ## @@ -60,13 +55,36 @@ private final DataTransferFactory channelFactory; private final MetricCollector metricCollector; - /** - * Map of task IDs in this task group to their readers/writers. - */ - private final MapphysicalTaskIdToInputReaderMap; - private final Map physicalTaskIdToOutputWriterMap; - - private boolean isExecutionRequested; + // Map of task ID to its intra-TaskGroup data pipe. + private final Map taskToInputPipesMap; + private final Map taskToOutputPipeMap; // one and only one Pipe per task + // Readers/writers that deals with inter-TaskGroup data. + private final List inputReaders; + private final Map taskToInputReadersMap; + private final Map taskToSideInputReadersMap; + private final Map taskToOutputWritersMap; + private final Map inputReaderToTasksMap; + private final Map idToSrcIteratorMap; + private final Map srcIteratorIdToTasksMap; + private final Map iteratorIdToTasksMap; + private final LinkedBlockingQueue > iteratorQueue; + private volatile Map pipeIdToDstTasksMap; Review comment: Can you explain in the comment why this is marked as `volatile`? My understanding is that only a single thread instantiates and executes a `TaskGroupExecutor`. Let me know if I'm missing something. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] johnyangk commented on a change in pull request #2: [NEMO-7] Intra-TaskGroup pipelining
johnyangk commented on a change in pull request #2: [NEMO-7] Intra-TaskGroup pipelining URL: https://github.com/apache/incubator-nemo/pull/2#discussion_r172709998 ## File path: runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskGroupExecutor.java ## @@ -60,13 +55,36 @@ private final DataTransferFactory channelFactory; private final MetricCollector metricCollector; - /** - * Map of task IDs in this task group to their readers/writers. - */ - private final MapphysicalTaskIdToInputReaderMap; - private final Map physicalTaskIdToOutputWriterMap; - - private boolean isExecutionRequested; + // Map of task ID to its intra-TaskGroup data pipe. + private final Map taskToInputPipesMap; + private final Map taskToOutputPipeMap; // one and only one Pipe per task + // Readers/writers that deals with inter-TaskGroup data. + private final List inputReaders; + private final Map taskToInputReadersMap; + private final Map taskToSideInputReadersMap; + private final Map taskToOutputWritersMap; + private final Map inputReaderToTasksMap; + private final Map idToSrcIteratorMap; + private final Map srcIteratorIdToTasksMap; + private final Map iteratorIdToTasksMap; + private final LinkedBlockingQueue > iteratorQueue; + private volatile Map pipeIdToDstTasksMap; + private final Set preparedTransforms; + private final Set finishedTaskIds; Review comment: Can you do without this? (e.g., We assume a task is finished, when it consumes all of its input iterators) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] johnyangk commented on a change in pull request #2: [NEMO-7] Intra-TaskGroup pipelining
johnyangk commented on a change in pull request #2: [NEMO-7] Intra-TaskGroup pipelining URL: https://github.com/apache/incubator-nemo/pull/2#discussion_r172709098 ## File path: runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskGroupExecutor.java ## @@ -60,13 +55,36 @@ private final DataTransferFactory channelFactory; private final MetricCollector metricCollector; - /** - * Map of task IDs in this task group to their readers/writers. - */ - private final MapphysicalTaskIdToInputReaderMap; - private final Map physicalTaskIdToOutputWriterMap; - - private boolean isExecutionRequested; + // Map of task ID to its intra-TaskGroup data pipe. + private final Map taskToInputPipesMap; + private final Map taskToOutputPipeMap; // one and only one Pipe per task + // Readers/writers that deals with inter-TaskGroup data. Review comment: I'm not sure about the newly added data structures below for the following reasons - Some of them cost an extra per-element operation and memory overhead - Maybe we can do without some of them Alternatively, I'd think about the following options - Compose iterators like Spark (https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala#L38) - Reuse `taskGroupDag` to obtain dependency information This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] johnyangk commented on a change in pull request #2: [NEMO-7] Intra-TaskGroup pipelining
johnyangk commented on a change in pull request #2: [NEMO-7] Intra-TaskGroup pipelining URL: https://github.com/apache/incubator-nemo/pull/2#discussion_r172710709 ## File path: runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskGroupExecutor.java ## @@ -60,13 +55,36 @@ private final DataTransferFactory channelFactory; private final MetricCollector metricCollector; - /** - * Map of task IDs in this task group to their readers/writers. - */ - private final MapphysicalTaskIdToInputReaderMap; - private final Map physicalTaskIdToOutputWriterMap; - - private boolean isExecutionRequested; + // Map of task ID to its intra-TaskGroup data pipe. + private final Map taskToInputPipesMap; + private final Map taskToOutputPipeMap; // one and only one Pipe per task + // Readers/writers that deals with inter-TaskGroup data. + private final List inputReaders; + private final Map taskToInputReadersMap; + private final Map taskToSideInputReadersMap; + private final Map taskToOutputWritersMap; + private final Map inputReaderToTasksMap; + private final Map idToSrcIteratorMap; + private final Map srcIteratorIdToTasksMap; + private final Map iteratorIdToTasksMap; + private final LinkedBlockingQueue > iteratorQueue; + private volatile Map pipeIdToDstTasksMap; + private final Set preparedTransforms; + private final Set finishedTaskIds; + private final AtomicInteger completedFutures; + private int numBoundedSources; + private int numIterators; + + // For metrics Review comment: Can you refactor these variables into a separate class? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] johnyangk commented on a change in pull request #2: [NEMO-7] Intra-TaskGroup pipelining
johnyangk commented on a change in pull request #2: [NEMO-7] Intra-TaskGroup pipelining URL: https://github.com/apache/incubator-nemo/pull/2#discussion_r172710239 ## File path: runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskGroupExecutor.java ## @@ -60,13 +55,36 @@ private final DataTransferFactory channelFactory; private final MetricCollector metricCollector; - /** - * Map of task IDs in this task group to their readers/writers. - */ - private final MapphysicalTaskIdToInputReaderMap; - private final Map physicalTaskIdToOutputWriterMap; - - private boolean isExecutionRequested; + // Map of task ID to its intra-TaskGroup data pipe. + private final Map taskToInputPipesMap; + private final Map taskToOutputPipeMap; // one and only one Pipe per task + // Readers/writers that deals with inter-TaskGroup data. + private final List inputReaders; + private final Map taskToInputReadersMap; + private final Map taskToSideInputReadersMap; + private final Map taskToOutputWritersMap; + private final Map inputReaderToTasksMap; + private final Map idToSrcIteratorMap; + private final Map srcIteratorIdToTasksMap; + private final Map iteratorIdToTasksMap; + private final LinkedBlockingQueue > iteratorQueue; + private volatile Map pipeIdToDstTasksMap; + private final Set preparedTransforms; Review comment: Remove, and instead prepare transforms before consuming any data? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] johnyangk commented on a change in pull request #2: [NEMO-7] Intra-TaskGroup pipelining
johnyangk commented on a change in pull request #2: [NEMO-7] Intra-TaskGroup pipelining URL: https://github.com/apache/incubator-nemo/pull/2#discussion_r172704416 ## File path: runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskGroupExecutor.java ## @@ -145,303 +201,504 @@ private void initializeDataTransfer() { .collect(Collectors.toSet()); } - // Helper functions to initializes stage-internal edges. - private void createLocalReader(final Task task, final RuntimeEdge internalEdge) { -final InputReader inputReader = channelFactory.createLocalReader(taskGroupIdx, internalEdge); -addInputReader(task, inputReader); - } + /** + * Add input pipes to each {@link Task}. + * Input pipe denotes all the pipes of intra-Stage parent tasks of this task. + * + * @param task the Task to add input pipes to. + */ + private void addInputPipe(final Task task) { +List inputPipes = new ArrayList<>(); +List parentTasks = taskGroupDag.getParents(task.getId()); +final String physicalTaskId = getPhysicalTaskId(task.getId()); - private void createLocalWriter(final Task task, final RuntimeEdge internalEdge) { -final OutputWriter outputWriter = channelFactory.createLocalWriter(task, taskGroupIdx, internalEdge); -addOutputWriter(task, outputWriter); +if (parentTasks != null) { + parentTasks.forEach(parent -> { +final LocalPipe parentOutputPipe = taskToOutputPipeMap.get(parent); +inputPipes.add(parentOutputPipe); +LOG.info("log: Added Outputpipe of {} as InputPipe of {} {}", +getPhysicalTaskId(parent.getId()), taskGroupId, physicalTaskId); + }); + taskToInputPipesMap.put(task, inputPipes); +} } - // Helper functions to add the initialized reader/writer to the maintained map. - private void addInputReader(final Task task, final InputReader inputReader) { + /** + * Add output pipes to each {@link Task}. + * Output pipe denotes the one and only one pipe of this task. + * Check the outgoing edges that will use this pipe, + * and set this pipe as side input if any one of the edges uses this pipe as side input. + * + * @param task the Task to add output pipes to. + */ + private void addOutputPipe(final Task task) { +final LocalPipe outputPipe = new LocalPipe(); final String physicalTaskId = getPhysicalTaskId(task.getId()); -physicalTaskIdToInputReaderMap.computeIfAbsent(physicalTaskId, readerList -> new ArrayList<>()); -physicalTaskIdToInputReaderMap.get(physicalTaskId).add(inputReader); - } +final ListoutEdges = taskGroupDag.getOutgoingEdgesOf(task); + +outEdges.forEach(outEdge -> { + if (outEdge.isSideInput()) { +outputPipe.setSideInputRuntimeEdge(outEdge); +outputPipe.setAsSideInput(physicalTaskId); +LOG.info("log: {} {} Marked as accepting sideInput(edge {})", +taskGroupId, physicalTaskId, outEdge.getId()); + } +}); - private void addOutputWriter(final Task task, final OutputWriter outputWriter) { -final String physicalTaskId = getPhysicalTaskId(task.getId()); -physicalTaskIdToOutputWriterMap.computeIfAbsent(physicalTaskId, readerList -> new ArrayList<>()); -physicalTaskIdToOutputWriterMap.get(physicalTaskId).add(outputWriter); +taskToOutputPipeMap.put(task, outputPipe); +LOG.info("log: {} {} Added OutputPipe", taskGroupId, physicalTaskId); } - /** - * Executes the task group. - */ - public void execute() { -LOG.info("{} Execution Started!", taskGroupId); -if (isExecutionRequested) { - throw new RuntimeException("TaskGroup {" + taskGroupId + "} execution called again!"); -} else { - isExecutionRequested = true; -} + private boolean hasInputPipe(final Task task) { +return taskToInputPipesMap.containsKey(task); + } -taskGroupStateManager.onTaskGroupStateChanged( -TaskGroupState.State.EXECUTING, Optional.empty(), Optional.empty()); + private boolean hasOutputWriter(final Task task) { +return taskToOutputWritersMap.containsKey(task); + } -taskGroupDag.topologicalDo(task -> { - final String physicalTaskId = getPhysicalTaskId(task.getId()); - taskGroupStateManager.onTaskStateChanged(physicalTaskId, TaskState.State.EXECUTING, Optional.empty()); - try { -if (task instanceof BoundedSourceTask) { - launchBoundedSourceTask((BoundedSourceTask) task); - taskGroupStateManager.onTaskStateChanged(physicalTaskId, TaskState.State.COMPLETE, Optional.empty()); - LOG.info("{} Execution Complete!", taskGroupId); -} else if (task instanceof OperatorTask) { - launchOperatorTask((OperatorTask) task); - taskGroupStateManager.onTaskStateChanged(physicalTaskId, TaskState.State.COMPLETE, Optional.empty()); - LOG.info("{} Execution Complete!", taskGroupId); -} else if (task instanceof
[GitHub] johnyangk commented on a change in pull request #2: [NEMO-7] Intra-TaskGroup pipelining
johnyangk commented on a change in pull request #2: [NEMO-7] Intra-TaskGroup pipelining URL: https://github.com/apache/incubator-nemo/pull/2#discussion_r172706819 ## File path: runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskGroupExecutor.java ## @@ -145,303 +201,504 @@ private void initializeDataTransfer() { .collect(Collectors.toSet()); } - // Helper functions to initializes stage-internal edges. - private void createLocalReader(final Task task, final RuntimeEdge internalEdge) { -final InputReader inputReader = channelFactory.createLocalReader(taskGroupIdx, internalEdge); -addInputReader(task, inputReader); - } + /** + * Add input pipes to each {@link Task}. + * Input pipe denotes all the pipes of intra-Stage parent tasks of this task. + * + * @param task the Task to add input pipes to. + */ + private void addInputPipe(final Task task) { +List inputPipes = new ArrayList<>(); +List parentTasks = taskGroupDag.getParents(task.getId()); +final String physicalTaskId = getPhysicalTaskId(task.getId()); - private void createLocalWriter(final Task task, final RuntimeEdge internalEdge) { -final OutputWriter outputWriter = channelFactory.createLocalWriter(task, taskGroupIdx, internalEdge); -addOutputWriter(task, outputWriter); +if (parentTasks != null) { + parentTasks.forEach(parent -> { +final LocalPipe parentOutputPipe = taskToOutputPipeMap.get(parent); +inputPipes.add(parentOutputPipe); +LOG.info("log: Added Outputpipe of {} as InputPipe of {} {}", +getPhysicalTaskId(parent.getId()), taskGroupId, physicalTaskId); + }); + taskToInputPipesMap.put(task, inputPipes); +} } - // Helper functions to add the initialized reader/writer to the maintained map. - private void addInputReader(final Task task, final InputReader inputReader) { + /** + * Add output pipes to each {@link Task}. + * Output pipe denotes the one and only one pipe of this task. + * Check the outgoing edges that will use this pipe, + * and set this pipe as side input if any one of the edges uses this pipe as side input. + * + * @param task the Task to add output pipes to. + */ + private void addOutputPipe(final Task task) { +final LocalPipe outputPipe = new LocalPipe(); final String physicalTaskId = getPhysicalTaskId(task.getId()); -physicalTaskIdToInputReaderMap.computeIfAbsent(physicalTaskId, readerList -> new ArrayList<>()); -physicalTaskIdToInputReaderMap.get(physicalTaskId).add(inputReader); - } +final ListoutEdges = taskGroupDag.getOutgoingEdgesOf(task); + +outEdges.forEach(outEdge -> { + if (outEdge.isSideInput()) { +outputPipe.setSideInputRuntimeEdge(outEdge); +outputPipe.setAsSideInput(physicalTaskId); +LOG.info("log: {} {} Marked as accepting sideInput(edge {})", +taskGroupId, physicalTaskId, outEdge.getId()); + } +}); - private void addOutputWriter(final Task task, final OutputWriter outputWriter) { -final String physicalTaskId = getPhysicalTaskId(task.getId()); -physicalTaskIdToOutputWriterMap.computeIfAbsent(physicalTaskId, readerList -> new ArrayList<>()); -physicalTaskIdToOutputWriterMap.get(physicalTaskId).add(outputWriter); +taskToOutputPipeMap.put(task, outputPipe); +LOG.info("log: {} {} Added OutputPipe", taskGroupId, physicalTaskId); } - /** - * Executes the task group. - */ - public void execute() { -LOG.info("{} Execution Started!", taskGroupId); -if (isExecutionRequested) { - throw new RuntimeException("TaskGroup {" + taskGroupId + "} execution called again!"); -} else { - isExecutionRequested = true; -} + private boolean hasInputPipe(final Task task) { +return taskToInputPipesMap.containsKey(task); + } -taskGroupStateManager.onTaskGroupStateChanged( -TaskGroupState.State.EXECUTING, Optional.empty(), Optional.empty()); + private boolean hasOutputWriter(final Task task) { +return taskToOutputWritersMap.containsKey(task); + } -taskGroupDag.topologicalDo(task -> { - final String physicalTaskId = getPhysicalTaskId(task.getId()); - taskGroupStateManager.onTaskStateChanged(physicalTaskId, TaskState.State.EXECUTING, Optional.empty()); - try { -if (task instanceof BoundedSourceTask) { - launchBoundedSourceTask((BoundedSourceTask) task); - taskGroupStateManager.onTaskStateChanged(physicalTaskId, TaskState.State.COMPLETE, Optional.empty()); - LOG.info("{} Execution Complete!", taskGroupId); -} else if (task instanceof OperatorTask) { - launchOperatorTask((OperatorTask) task); - taskGroupStateManager.onTaskStateChanged(physicalTaskId, TaskState.State.COMPLETE, Optional.empty()); - LOG.info("{} Execution Complete!", taskGroupId); -} else if (task instanceof
[GitHub] johnyangk commented on a change in pull request #2: [NEMO-7] Intra-TaskGroup pipelining
johnyangk commented on a change in pull request #2: [NEMO-7] Intra-TaskGroup pipelining URL: https://github.com/apache/incubator-nemo/pull/2#discussion_r172711081 ## File path: runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskGroupExecutor.java ## @@ -60,13 +55,36 @@ private final DataTransferFactory channelFactory; private final MetricCollector metricCollector; - /** - * Map of task IDs in this task group to their readers/writers. - */ - private final MapphysicalTaskIdToInputReaderMap; - private final Map physicalTaskIdToOutputWriterMap; - - private boolean isExecutionRequested; + // Map of task ID to its intra-TaskGroup data pipe. + private final Map taskToInputPipesMap; + private final Map taskToOutputPipeMap; // one and only one Pipe per task + // Readers/writers that deals with inter-TaskGroup data. + private final List inputReaders; + private final Map taskToInputReadersMap; + private final Map taskToSideInputReadersMap; + private final Map taskToOutputWritersMap; + private final Map inputReaderToTasksMap; + private final Map idToSrcIteratorMap; + private final Map srcIteratorIdToTasksMap; + private final Map iteratorIdToTasksMap; + private final LinkedBlockingQueue > iteratorQueue; + private volatile Map pipeIdToDstTasksMap; + private final Set preparedTransforms; + private final Set finishedTaskIds; + private final AtomicInteger completedFutures; + private int numBoundedSources; + private int numIterators; + + // For metrics + private long serBlockSize; + private long encodedBlockSize; + private long accumulatedBlockedReadTime; Review comment: Remove unused variable. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] johnyangk commented on a change in pull request #2: [NEMO-7] Intra-TaskGroup pipelining
johnyangk commented on a change in pull request #2: [NEMO-7] Intra-TaskGroup pipelining URL: https://github.com/apache/incubator-nemo/pull/2#discussion_r172711250 ## File path: runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskGroupExecutor.java ## @@ -145,303 +201,504 @@ private void initializeDataTransfer() { .collect(Collectors.toSet()); } - // Helper functions to initializes stage-internal edges. - private void createLocalReader(final Task task, final RuntimeEdge internalEdge) { -final InputReader inputReader = channelFactory.createLocalReader(taskGroupIdx, internalEdge); -addInputReader(task, inputReader); - } + /** + * Add input pipes to each {@link Task}. + * Input pipe denotes all the pipes of intra-Stage parent tasks of this task. + * + * @param task the Task to add input pipes to. + */ + private void addInputPipe(final Task task) { +List inputPipes = new ArrayList<>(); +List parentTasks = taskGroupDag.getParents(task.getId()); +final String physicalTaskId = getPhysicalTaskId(task.getId()); - private void createLocalWriter(final Task task, final RuntimeEdge internalEdge) { -final OutputWriter outputWriter = channelFactory.createLocalWriter(task, taskGroupIdx, internalEdge); -addOutputWriter(task, outputWriter); +if (parentTasks != null) { + parentTasks.forEach(parent -> { +final LocalPipe parentOutputPipe = taskToOutputPipeMap.get(parent); +inputPipes.add(parentOutputPipe); +LOG.info("log: Added Outputpipe of {} as InputPipe of {} {}", +getPhysicalTaskId(parent.getId()), taskGroupId, physicalTaskId); + }); + taskToInputPipesMap.put(task, inputPipes); +} } - // Helper functions to add the initialized reader/writer to the maintained map. - private void addInputReader(final Task task, final InputReader inputReader) { + /** + * Add output pipes to each {@link Task}. + * Output pipe denotes the one and only one pipe of this task. + * Check the outgoing edges that will use this pipe, + * and set this pipe as side input if any one of the edges uses this pipe as side input. + * + * @param task the Task to add output pipes to. + */ + private void addOutputPipe(final Task task) { +final LocalPipe outputPipe = new LocalPipe(); final String physicalTaskId = getPhysicalTaskId(task.getId()); -physicalTaskIdToInputReaderMap.computeIfAbsent(physicalTaskId, readerList -> new ArrayList<>()); -physicalTaskIdToInputReaderMap.get(physicalTaskId).add(inputReader); - } +final ListoutEdges = taskGroupDag.getOutgoingEdgesOf(task); + +outEdges.forEach(outEdge -> { + if (outEdge.isSideInput()) { Review comment: Ditto on else{}. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] johnyangk commented on a change in pull request #2: [NEMO-7] Intra-TaskGroup pipelining
johnyangk commented on a change in pull request #2: [NEMO-7] Intra-TaskGroup pipelining URL: https://github.com/apache/incubator-nemo/pull/2#discussion_r172708316 ## File path: runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskGroupExecutor.java ## @@ -60,13 +55,36 @@ private final DataTransferFactory channelFactory; private final MetricCollector metricCollector; - /** - * Map of task IDs in this task group to their readers/writers. - */ - private final MapphysicalTaskIdToInputReaderMap; - private final Map physicalTaskIdToOutputWriterMap; - - private boolean isExecutionRequested; + // Map of task ID to its intra-TaskGroup data pipe. + private final Map taskToInputPipesMap; + private final Map taskToOutputPipeMap; // one and only one Pipe per task + // Readers/writers that deals with inter-TaskGroup data. + private final List inputReaders; + private final Map taskToInputReadersMap; + private final Map taskToSideInputReadersMap; + private final Map taskToOutputWritersMap; + private final Map inputReaderToTasksMap; + private final Map idToSrcIteratorMap; + private final Map srcIteratorIdToTasksMap; + private final Map iteratorIdToTasksMap; + private final LinkedBlockingQueue > iteratorQueue; + private volatile Map pipeIdToDstTasksMap; + private final Set preparedTransforms; + private final Set finishedTaskIds; + private final AtomicInteger completedFutures; + private int numBoundedSources; Review comment: Can this become a local variable? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] johnyangk commented on a change in pull request #2: [NEMO-7] Intra-TaskGroup pipelining
johnyangk commented on a change in pull request #2: [NEMO-7] Intra-TaskGroup pipelining URL: https://github.com/apache/incubator-nemo/pull/2#discussion_r172707585 ## File path: runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskGroupExecutor.java ## @@ -145,303 +201,504 @@ private void initializeDataTransfer() { .collect(Collectors.toSet()); } - // Helper functions to initializes stage-internal edges. - private void createLocalReader(final Task task, final RuntimeEdge internalEdge) { -final InputReader inputReader = channelFactory.createLocalReader(taskGroupIdx, internalEdge); -addInputReader(task, inputReader); - } + /** + * Add input pipes to each {@link Task}. + * Input pipe denotes all the pipes of intra-Stage parent tasks of this task. + * + * @param task the Task to add input pipes to. + */ + private void addInputPipe(final Task task) { +List inputPipes = new ArrayList<>(); +List parentTasks = taskGroupDag.getParents(task.getId()); +final String physicalTaskId = getPhysicalTaskId(task.getId()); - private void createLocalWriter(final Task task, final RuntimeEdge internalEdge) { -final OutputWriter outputWriter = channelFactory.createLocalWriter(task, taskGroupIdx, internalEdge); -addOutputWriter(task, outputWriter); +if (parentTasks != null) { + parentTasks.forEach(parent -> { +final LocalPipe parentOutputPipe = taskToOutputPipeMap.get(parent); +inputPipes.add(parentOutputPipe); +LOG.info("log: Added Outputpipe of {} as InputPipe of {} {}", +getPhysicalTaskId(parent.getId()), taskGroupId, physicalTaskId); + }); + taskToInputPipesMap.put(task, inputPipes); +} } - // Helper functions to add the initialized reader/writer to the maintained map. - private void addInputReader(final Task task, final InputReader inputReader) { + /** + * Add output pipes to each {@link Task}. + * Output pipe denotes the one and only one pipe of this task. + * Check the outgoing edges that will use this pipe, + * and set this pipe as side input if any one of the edges uses this pipe as side input. + * + * @param task the Task to add output pipes to. + */ + private void addOutputPipe(final Task task) { +final LocalPipe outputPipe = new LocalPipe(); final String physicalTaskId = getPhysicalTaskId(task.getId()); -physicalTaskIdToInputReaderMap.computeIfAbsent(physicalTaskId, readerList -> new ArrayList<>()); -physicalTaskIdToInputReaderMap.get(physicalTaskId).add(inputReader); - } +final ListoutEdges = taskGroupDag.getOutgoingEdgesOf(task); + +outEdges.forEach(outEdge -> { + if (outEdge.isSideInput()) { +outputPipe.setSideInputRuntimeEdge(outEdge); +outputPipe.setAsSideInput(physicalTaskId); +LOG.info("log: {} {} Marked as accepting sideInput(edge {})", +taskGroupId, physicalTaskId, outEdge.getId()); + } +}); - private void addOutputWriter(final Task task, final OutputWriter outputWriter) { -final String physicalTaskId = getPhysicalTaskId(task.getId()); -physicalTaskIdToOutputWriterMap.computeIfAbsent(physicalTaskId, readerList -> new ArrayList<>()); -physicalTaskIdToOutputWriterMap.get(physicalTaskId).add(outputWriter); +taskToOutputPipeMap.put(task, outputPipe); +LOG.info("log: {} {} Added OutputPipe", taskGroupId, physicalTaskId); } - /** - * Executes the task group. - */ - public void execute() { -LOG.info("{} Execution Started!", taskGroupId); -if (isExecutionRequested) { - throw new RuntimeException("TaskGroup {" + taskGroupId + "} execution called again!"); -} else { - isExecutionRequested = true; -} + private boolean hasInputPipe(final Task task) { +return taskToInputPipesMap.containsKey(task); + } -taskGroupStateManager.onTaskGroupStateChanged( -TaskGroupState.State.EXECUTING, Optional.empty(), Optional.empty()); + private boolean hasOutputWriter(final Task task) { +return taskToOutputWritersMap.containsKey(task); + } -taskGroupDag.topologicalDo(task -> { - final String physicalTaskId = getPhysicalTaskId(task.getId()); - taskGroupStateManager.onTaskStateChanged(physicalTaskId, TaskState.State.EXECUTING, Optional.empty()); - try { -if (task instanceof BoundedSourceTask) { - launchBoundedSourceTask((BoundedSourceTask) task); - taskGroupStateManager.onTaskStateChanged(physicalTaskId, TaskState.State.COMPLETE, Optional.empty()); - LOG.info("{} Execution Complete!", taskGroupId); -} else if (task instanceof OperatorTask) { - launchOperatorTask((OperatorTask) task); - taskGroupStateManager.onTaskStateChanged(physicalTaskId, TaskState.State.COMPLETE, Optional.empty()); - LOG.info("{} Execution Complete!", taskGroupId); -} else if (task instanceof
[GitHub] johnyangk commented on a change in pull request #2: [NEMO-7] Intra-TaskGroup pipelining
johnyangk commented on a change in pull request #2: [NEMO-7] Intra-TaskGroup pipelining URL: https://github.com/apache/incubator-nemo/pull/2#discussion_r172711570 ## File path: runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskGroupExecutor.java ## @@ -60,13 +55,36 @@ private final DataTransferFactory channelFactory; private final MetricCollector metricCollector; - /** - * Map of task IDs in this task group to their readers/writers. - */ - private final MapphysicalTaskIdToInputReaderMap; - private final Map physicalTaskIdToOutputWriterMap; - - private boolean isExecutionRequested; + // Map of task ID to its intra-TaskGroup data pipe. + private final Map taskToInputPipesMap; + private final Map taskToOutputPipeMap; // one and only one Pipe per task + // Readers/writers that deals with inter-TaskGroup data. + private final List inputReaders; + private final Map taskToInputReadersMap; + private final Map taskToSideInputReadersMap; + private final Map taskToOutputWritersMap; + private final Map inputReaderToTasksMap; + private final Map idToSrcIteratorMap; + private final Map srcIteratorIdToTasksMap; + private final Map iteratorIdToTasksMap; + private final LinkedBlockingQueue > iteratorQueue; + private volatile Map pipeIdToDstTasksMap; + private final Set preparedTransforms; + private final Set finishedTaskIds; + private final AtomicInteger completedFutures; + private int numBoundedSources; + private int numIterators; + + // For metrics + private long serBlockSize; + private long encodedBlockSize; + private long accumulatedBlockedReadTime; + + private volatile boolean isExecutionRequested; + private volatile String logicalTaskIdPutOnHold; Review comment: Ditto on volatile. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] johnyangk commented on a change in pull request #2: [NEMO-7] Intra-TaskGroup pipelining
johnyangk commented on a change in pull request #2: [NEMO-7] Intra-TaskGroup pipelining URL: https://github.com/apache/incubator-nemo/pull/2#discussion_r172708330 ## File path: runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskGroupExecutor.java ## @@ -60,13 +55,36 @@ private final DataTransferFactory channelFactory; private final MetricCollector metricCollector; - /** - * Map of task IDs in this task group to their readers/writers. - */ - private final MapphysicalTaskIdToInputReaderMap; - private final Map physicalTaskIdToOutputWriterMap; - - private boolean isExecutionRequested; + // Map of task ID to its intra-TaskGroup data pipe. + private final Map taskToInputPipesMap; + private final Map taskToOutputPipeMap; // one and only one Pipe per task + // Readers/writers that deals with inter-TaskGroup data. + private final List inputReaders; + private final Map taskToInputReadersMap; + private final Map taskToSideInputReadersMap; + private final Map taskToOutputWritersMap; + private final Map inputReaderToTasksMap; + private final Map idToSrcIteratorMap; + private final Map srcIteratorIdToTasksMap; + private final Map iteratorIdToTasksMap; + private final LinkedBlockingQueue > iteratorQueue; + private volatile Map pipeIdToDstTasksMap; + private final Set preparedTransforms; + private final Set finishedTaskIds; + private final AtomicInteger completedFutures; + private int numBoundedSources; + private int numIterators; Review comment: Can this become a local variable? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] johnyangk commented on a change in pull request #2: [NEMO-7] Intra-TaskGroup pipelining
johnyangk commented on a change in pull request #2: [NEMO-7] Intra-TaskGroup pipelining URL: https://github.com/apache/incubator-nemo/pull/2#discussion_r172707978 ## File path: runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskGroupExecutor.java ## @@ -145,303 +201,504 @@ private void initializeDataTransfer() { .collect(Collectors.toSet()); } - // Helper functions to initializes stage-internal edges. - private void createLocalReader(final Task task, final RuntimeEdge internalEdge) { -final InputReader inputReader = channelFactory.createLocalReader(taskGroupIdx, internalEdge); -addInputReader(task, inputReader); - } + /** + * Add input pipes to each {@link Task}. + * Input pipe denotes all the pipes of intra-Stage parent tasks of this task. + * + * @param task the Task to add input pipes to. + */ + private void addInputPipe(final Task task) { +List inputPipes = new ArrayList<>(); +List parentTasks = taskGroupDag.getParents(task.getId()); +final String physicalTaskId = getPhysicalTaskId(task.getId()); - private void createLocalWriter(final Task task, final RuntimeEdge internalEdge) { -final OutputWriter outputWriter = channelFactory.createLocalWriter(task, taskGroupIdx, internalEdge); -addOutputWriter(task, outputWriter); +if (parentTasks != null) { + parentTasks.forEach(parent -> { +final LocalPipe parentOutputPipe = taskToOutputPipeMap.get(parent); +inputPipes.add(parentOutputPipe); +LOG.info("log: Added Outputpipe of {} as InputPipe of {} {}", +getPhysicalTaskId(parent.getId()), taskGroupId, physicalTaskId); + }); + taskToInputPipesMap.put(task, inputPipes); +} } - // Helper functions to add the initialized reader/writer to the maintained map. - private void addInputReader(final Task task, final InputReader inputReader) { + /** + * Add output pipes to each {@link Task}. + * Output pipe denotes the one and only one pipe of this task. + * Check the outgoing edges that will use this pipe, + * and set this pipe as side input if any one of the edges uses this pipe as side input. + * + * @param task the Task to add output pipes to. + */ + private void addOutputPipe(final Task task) { +final LocalPipe outputPipe = new LocalPipe(); final String physicalTaskId = getPhysicalTaskId(task.getId()); -physicalTaskIdToInputReaderMap.computeIfAbsent(physicalTaskId, readerList -> new ArrayList<>()); -physicalTaskIdToInputReaderMap.get(physicalTaskId).add(inputReader); - } +final ListoutEdges = taskGroupDag.getOutgoingEdgesOf(task); + +outEdges.forEach(outEdge -> { + if (outEdge.isSideInput()) { +outputPipe.setSideInputRuntimeEdge(outEdge); +outputPipe.setAsSideInput(physicalTaskId); +LOG.info("log: {} {} Marked as accepting sideInput(edge {})", +taskGroupId, physicalTaskId, outEdge.getId()); + } +}); - private void addOutputWriter(final Task task, final OutputWriter outputWriter) { -final String physicalTaskId = getPhysicalTaskId(task.getId()); -physicalTaskIdToOutputWriterMap.computeIfAbsent(physicalTaskId, readerList -> new ArrayList<>()); -physicalTaskIdToOutputWriterMap.get(physicalTaskId).add(outputWriter); +taskToOutputPipeMap.put(task, outputPipe); +LOG.info("log: {} {} Added OutputPipe", taskGroupId, physicalTaskId); } - /** - * Executes the task group. - */ - public void execute() { -LOG.info("{} Execution Started!", taskGroupId); -if (isExecutionRequested) { - throw new RuntimeException("TaskGroup {" + taskGroupId + "} execution called again!"); -} else { - isExecutionRequested = true; -} + private boolean hasInputPipe(final Task task) { +return taskToInputPipesMap.containsKey(task); + } -taskGroupStateManager.onTaskGroupStateChanged( -TaskGroupState.State.EXECUTING, Optional.empty(), Optional.empty()); + private boolean hasOutputWriter(final Task task) { +return taskToOutputWritersMap.containsKey(task); + } -taskGroupDag.topologicalDo(task -> { - final String physicalTaskId = getPhysicalTaskId(task.getId()); - taskGroupStateManager.onTaskStateChanged(physicalTaskId, TaskState.State.EXECUTING, Optional.empty()); - try { -if (task instanceof BoundedSourceTask) { - launchBoundedSourceTask((BoundedSourceTask) task); - taskGroupStateManager.onTaskStateChanged(physicalTaskId, TaskState.State.COMPLETE, Optional.empty()); - LOG.info("{} Execution Complete!", taskGroupId); -} else if (task instanceof OperatorTask) { - launchOperatorTask((OperatorTask) task); - taskGroupStateManager.onTaskStateChanged(physicalTaskId, TaskState.State.COMPLETE, Optional.empty()); - LOG.info("{} Execution Complete!", taskGroupId); -} else if (task instanceof
[GitHub] johnyangk commented on a change in pull request #2: [NEMO-7] Intra-TaskGroup pipelining
johnyangk commented on a change in pull request #2: [NEMO-7] Intra-TaskGroup pipelining URL: https://github.com/apache/incubator-nemo/pull/2#discussion_r172710298 ## File path: runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskGroupExecutor.java ## @@ -60,13 +55,36 @@ private final DataTransferFactory channelFactory; private final MetricCollector metricCollector; - /** - * Map of task IDs in this task group to their readers/writers. - */ - private final MapphysicalTaskIdToInputReaderMap; - private final Map physicalTaskIdToOutputWriterMap; - - private boolean isExecutionRequested; + // Map of task ID to its intra-TaskGroup data pipe. + private final Map taskToInputPipesMap; + private final Map taskToOutputPipeMap; // one and only one Pipe per task + // Readers/writers that deals with inter-TaskGroup data. + private final List inputReaders; + private final Map taskToInputReadersMap; + private final Map taskToSideInputReadersMap; + private final Map taskToOutputWritersMap; + private final Map inputReaderToTasksMap; + private final Map idToSrcIteratorMap; + private final Map srcIteratorIdToTasksMap; + private final Map iteratorIdToTasksMap; + private final LinkedBlockingQueue > iteratorQueue; Review comment: Please use a more descriptive name. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] johnyangk commented on a change in pull request #2: [NEMO-7] Intra-TaskGroup pipelining
johnyangk commented on a change in pull request #2: [NEMO-7] Intra-TaskGroup pipelining URL: https://github.com/apache/incubator-nemo/pull/2#discussion_r172710971 ## File path: runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskGroupExecutor.java ## @@ -60,13 +55,36 @@ private final DataTransferFactory channelFactory; private final MetricCollector metricCollector; - /** - * Map of task IDs in this task group to their readers/writers. - */ - private final MapphysicalTaskIdToInputReaderMap; - private final Map physicalTaskIdToOutputWriterMap; - - private boolean isExecutionRequested; + // Map of task ID to its intra-TaskGroup data pipe. + private final Map taskToInputPipesMap; + private final Map taskToOutputPipeMap; // one and only one Pipe per task + // Readers/writers that deals with inter-TaskGroup data. + private final List inputReaders; + private final Map taskToInputReadersMap; + private final Map taskToSideInputReadersMap; + private final Map taskToOutputWritersMap; + private final Map inputReaderToTasksMap; + private final Map idToSrcIteratorMap; + private final Map srcIteratorIdToTasksMap; + private final Map iteratorIdToTasksMap; + private final LinkedBlockingQueue > iteratorQueue; + private volatile Map pipeIdToDstTasksMap; + private final Set preparedTransforms; + private final Set finishedTaskIds; + private final AtomicInteger completedFutures; + private int numBoundedSources; + private int numIterators; + + // For metrics + private long serBlockSize; + private long encodedBlockSize; + private long accumulatedBlockedReadTime; + + private volatile boolean isExecutionRequested; + private volatile String logicalTaskIdPutOnHold; + + private static final String ITERATORID_PREFIX = "ITERATOR_"; Review comment: Can you do without iterator ids? (e.g., composing iterators, using pointers in a DAG) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services