This is an automated email from the ASF dual-hosted git repository.

jeongyoon pushed a commit to branch 717-TGE
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git


The following commit(s) were added to refs/heads/717-TGE by this push:
     new 0f5d66e  remove extra per-element object creation
0f5d66e is described below

commit 0f5d66e5e6c3f119a7b27683741bf96e6d46123a
Author: Jeongyoon Eo <jeongyoon0...@gmail.com>
AuthorDate: Thu Mar 8 13:46:32 2018 +0900

    remove extra per-element object creation
---
 .../nemo/runtime/executor/TaskGroupExecutor.java   | 58 ++++++++--------------
 1 file changed, 22 insertions(+), 36 deletions(-)

diff --git 
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskGroupExecutor.java
 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskGroupExecutor.java
index 2c64b4b..bc8ac79 100644
--- 
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskGroupExecutor.java
+++ 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskGroupExecutor.java
@@ -34,7 +34,6 @@ import java.util.*;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
-import java.util.stream.IntStream;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -469,8 +468,7 @@ public final class TaskGroupExecutor {
         Iterator iterator = idToSrcIteratorMap.get(srcIteratorId);
         iterator.forEachRemaining(element -> {
           for (final Task task : tasks) {
-            List data = Collections.singletonList(element);
-            runTask(task, data);
+            runTask(task, element);
           }
         });
       });
@@ -485,8 +483,7 @@ public final class TaskGroupExecutor {
         List<Task> dstTasks = iteratorIdToTasksMap.get(iteratorId);
         iterator.forEachRemaining(element -> {
           for (final Task task : dstTasks) {
-            List data = Collections.singletonList(element);
-            runTask(task, data);
+            runTask(task, element);
           }
         });
 
@@ -532,9 +529,9 @@ public final class TaskGroupExecutor {
           if (!dstTasks.isEmpty()) {
             while (!outputCollector.isEmpty()) {
               // Form input element-wise from the outputCollector
-              final List input = 
Collections.singletonList(outputCollector.remove());
+              final Object element = outputCollector.remove();
               for (final Task task : dstTasks) {
-                runTask(task, input);
+                runTask(task, element);
               }
             }
           }
@@ -588,46 +585,35 @@ public final class TaskGroupExecutor {
    *
    * @param task to execute
    */
-  private void runTask(final Task task, final List<Object> data) {
+  private void runTask(final Task task, final Object dataElement) {
     final String physicalTaskId = getPhysicalTaskId(task.getId());
 
     // Process element-wise depending on the Task type
     if (task instanceof BoundedSourceTask) {
       OutputCollectorImpl outputCollector = 
taskToDataHandlerMap.get(task).getOutputCollector();
 
-      if (data.contains(null)) {  // data is [null] used for VoidCoders
-        outputCollector.emit(data);
+      if (dataElement == null) { // null used for Beam VoidCoders
+        final List<Object> nullForVoidCoder = 
Collections.singletonList(dataElement);
+        outputCollector.emit(nullForVoidCoder);
       } else {
-        data.forEach(dataElement -> {
-          outputCollector.emit(dataElement);
-          LOG.info("log: {} {} BoundedSourceTask emitting {} to 
outputCollector",
-              taskGroupId, physicalTaskId, dataElement);
-        });
+        outputCollector.emit(dataElement);
+        LOG.info("log: {} {} BoundedSourceTask emitting {} to outputCollector",
+            taskGroupId, physicalTaskId, dataElement);
       }
     } else if (task instanceof OperatorTask) {
       final Transform transform = ((OperatorTask) task).getTransform();
-
-      // Consumes the received element from incoming edges.
-      // Calculate the number of inter-TaskGroup data to process.
-      int numElements = data.size();
-      LOG.info("log: {} {}: numElements {}", taskGroupId, physicalTaskId, 
numElements);
-
-      IntStream.range(0, numElements).forEach(dataNum -> {
-        Object dataElement = data.get(dataNum);
-        LOG.info("log: {} {} OperatorTask applying {} to onData", taskGroupId, 
physicalTaskId, dataElement);
-        transform.onData(dataElement);
-      });
+      transform.onData(dataElement);
+      LOG.info("log: {} {} OperatorTask applying {} to onData", taskGroupId, 
physicalTaskId, dataElement);
     } else if (task instanceof MetricCollectionBarrierTask) {
       OutputCollectorImpl outputCollector = 
taskToDataHandlerMap.get(task).getOutputCollector();
 
-      if (data.contains(null)) {  // data is [null] used for VoidCoders
-        outputCollector.emit(data);
+      if (dataElement == null) { // null used for Beam VoidCoders
+        final List<Object> nullForVoidCoder = 
Collections.singletonList(dataElement);
+        outputCollector.emit(nullForVoidCoder);
       } else {
-        data.forEach(dataElement -> {
-          outputCollector.emit(dataElement);
-          LOG.info("log: {} {} MetricCollectionTask emitting {} to 
outputCollector",
-              taskGroupId, physicalTaskId, dataElement);
-        });
+        outputCollector.emit(dataElement);
+        LOG.info("log: {} {} BoundedSourceTask emitting {} to outputCollector",
+            taskGroupId, physicalTaskId, dataElement);
       }
       setTaskPutOnHold((MetricCollectionBarrierTask) task);
     } else {
@@ -650,10 +636,10 @@ public final class TaskGroupExecutor {
       // Pass output to its children tasks recursively.
       List<Task> dstTasks = taskGroupDag.getChildren(task.getId());
       if (!dstTasks.isEmpty()) {
-        final List output = Collections.singletonList(element); // 
intra-TaskGroup data are safe here
+        //final List output = Collections.singletonList(element); // 
intra-TaskGroup data are safe here
         for (final Task dstTask : dstTasks) {
-          LOG.info("{} {} input to runTask {}", taskGroupId, 
getPhysicalTaskId(dstTask.getId()), output);
-          runTask(dstTask, output);
+          LOG.info("{} {} input to runTask {}", taskGroupId, 
getPhysicalTaskId(dstTask.getId()), element);
+          runTask(dstTask, element);
         }
       }
     }

-- 
To stop receiving notification emails like this one, please contact
jeongy...@apache.org.

Reply via email to