This is an automated email from the ASF dual-hosted git repository. jeongyoon pushed a commit to branch 717-TGE in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git
The following commit(s) were added to refs/heads/717-TGE by this push: new 0f5d66e remove extra per-element object creation 0f5d66e is described below commit 0f5d66e5e6c3f119a7b27683741bf96e6d46123a Author: Jeongyoon Eo <jeongyoon0...@gmail.com> AuthorDate: Thu Mar 8 13:46:32 2018 +0900 remove extra per-element object creation --- .../nemo/runtime/executor/TaskGroupExecutor.java | 58 ++++++++-------------- 1 file changed, 22 insertions(+), 36 deletions(-) diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskGroupExecutor.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskGroupExecutor.java index 2c64b4b..bc8ac79 100644 --- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskGroupExecutor.java +++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskGroupExecutor.java @@ -34,7 +34,6 @@ import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; -import java.util.stream.IntStream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -469,8 +468,7 @@ public final class TaskGroupExecutor { Iterator iterator = idToSrcIteratorMap.get(srcIteratorId); iterator.forEachRemaining(element -> { for (final Task task : tasks) { - List data = Collections.singletonList(element); - runTask(task, data); + runTask(task, element); } }); }); @@ -485,8 +483,7 @@ public final class TaskGroupExecutor { List<Task> dstTasks = iteratorIdToTasksMap.get(iteratorId); iterator.forEachRemaining(element -> { for (final Task task : dstTasks) { - List data = Collections.singletonList(element); - runTask(task, data); + runTask(task, element); } }); @@ -532,9 +529,9 @@ public final class TaskGroupExecutor { if (!dstTasks.isEmpty()) { while (!outputCollector.isEmpty()) { // Form input element-wise from the outputCollector - final List input = Collections.singletonList(outputCollector.remove()); + final Object element = outputCollector.remove(); for (final Task task : dstTasks) { - runTask(task, input); + runTask(task, element); } } } @@ -588,46 +585,35 @@ public final class TaskGroupExecutor { * * @param task to execute */ - private void runTask(final Task task, final List<Object> data) { + private void runTask(final Task task, final Object dataElement) { final String physicalTaskId = getPhysicalTaskId(task.getId()); // Process element-wise depending on the Task type if (task instanceof BoundedSourceTask) { OutputCollectorImpl outputCollector = taskToDataHandlerMap.get(task).getOutputCollector(); - if (data.contains(null)) { // data is [null] used for VoidCoders - outputCollector.emit(data); + if (dataElement == null) { // null used for Beam VoidCoders + final List<Object> nullForVoidCoder = Collections.singletonList(dataElement); + outputCollector.emit(nullForVoidCoder); } else { - data.forEach(dataElement -> { - outputCollector.emit(dataElement); - LOG.info("log: {} {} BoundedSourceTask emitting {} to outputCollector", - taskGroupId, physicalTaskId, dataElement); - }); + outputCollector.emit(dataElement); + LOG.info("log: {} {} BoundedSourceTask emitting {} to outputCollector", + taskGroupId, physicalTaskId, dataElement); } } else if (task instanceof OperatorTask) { final Transform transform = ((OperatorTask) task).getTransform(); - - // Consumes the received element from incoming edges. - // Calculate the number of inter-TaskGroup data to process. - int numElements = data.size(); - LOG.info("log: {} {}: numElements {}", taskGroupId, physicalTaskId, numElements); - - IntStream.range(0, numElements).forEach(dataNum -> { - Object dataElement = data.get(dataNum); - LOG.info("log: {} {} OperatorTask applying {} to onData", taskGroupId, physicalTaskId, dataElement); - transform.onData(dataElement); - }); + transform.onData(dataElement); + LOG.info("log: {} {} OperatorTask applying {} to onData", taskGroupId, physicalTaskId, dataElement); } else if (task instanceof MetricCollectionBarrierTask) { OutputCollectorImpl outputCollector = taskToDataHandlerMap.get(task).getOutputCollector(); - if (data.contains(null)) { // data is [null] used for VoidCoders - outputCollector.emit(data); + if (dataElement == null) { // null used for Beam VoidCoders + final List<Object> nullForVoidCoder = Collections.singletonList(dataElement); + outputCollector.emit(nullForVoidCoder); } else { - data.forEach(dataElement -> { - outputCollector.emit(dataElement); - LOG.info("log: {} {} MetricCollectionTask emitting {} to outputCollector", - taskGroupId, physicalTaskId, dataElement); - }); + outputCollector.emit(dataElement); + LOG.info("log: {} {} BoundedSourceTask emitting {} to outputCollector", + taskGroupId, physicalTaskId, dataElement); } setTaskPutOnHold((MetricCollectionBarrierTask) task); } else { @@ -650,10 +636,10 @@ public final class TaskGroupExecutor { // Pass output to its children tasks recursively. List<Task> dstTasks = taskGroupDag.getChildren(task.getId()); if (!dstTasks.isEmpty()) { - final List output = Collections.singletonList(element); // intra-TaskGroup data are safe here + //final List output = Collections.singletonList(element); // intra-TaskGroup data are safe here for (final Task dstTask : dstTasks) { - LOG.info("{} {} input to runTask {}", taskGroupId, getPhysicalTaskId(dstTask.getId()), output); - runTask(dstTask, output); + LOG.info("{} {} input to runTask {}", taskGroupId, getPhysicalTaskId(dstTask.getId()), element); + runTask(dstTask, element); } } } -- To stop receiving notification emails like this one, please contact jeongy...@apache.org.