Limit the number of work schedules per MonitorRunnable run

This ensures that work readded to the queue will not cause the monitor runnable
to run forever before delivering timers


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/d3b96bc3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/d3b96bc3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/d3b96bc3

Branch: refs/heads/master
Commit: d3b96bc33c7e9846f756457d1214a011da1cf84b
Parents: 272493e
Author: Thomas Groh <tg...@google.com>
Authored: Thu Apr 28 10:12:09 2016 -0700
Committer: Kenneth Knowles <k...@google.com>
Committed: Tue May 10 10:15:14 2016 -0700

----------------------------------------------------------------------
 .../direct/ExecutorServiceParallelExecutor.java  | 19 +++++++++++++------
 1 file changed, 13 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d3b96bc3/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
index de409e3..367c190 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
@@ -54,6 +54,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
 
 import javax.annotation.Nullable;
 
@@ -344,11 +345,11 @@ final class ExecutorServiceParallelExecutor implements 
InProcessExecutor {
   }
 
   private class MonitorRunnable implements Runnable {
-    private final String runnableName =
-        String.format(
-            "%s$%s-monitor",
-            evaluationContext.getPipelineOptions().getAppName(),
-            ExecutorServiceParallelExecutor.class.getSimpleName());
+    // arbitrary termination condition to ensure progress in the presence of 
pushback
+    private final long maxTimeProcessingUpdatesNanos = 
TimeUnit.MILLISECONDS.toNanos(5L);
+    private final String runnableName = String.format("%s$%s-monitor",
+        evaluationContext.getPipelineOptions().getAppName(),
+        ExecutorServiceParallelExecutor.class.getSimpleName());
 
     @Override
     public void run() {
@@ -356,7 +357,9 @@ final class ExecutorServiceParallelExecutor implements 
InProcessExecutor {
       Thread.currentThread().setName(runnableName);
       try {
         ExecutorUpdate update = allUpdates.poll();
+        int numUpdates = 0;
         // pull all of the pending work off of the queue
+        long updatesStart = System.nanoTime();
         while (update != null) {
           LOG.debug("Executor Update: {}", update);
           if (update.getBundle().isPresent()) {
@@ -364,7 +367,11 @@ final class ExecutorServiceParallelExecutor implements 
InProcessExecutor {
           } else if (update.getException().isPresent()) {
             
visibleUpdates.offer(VisibleExecutorUpdate.fromThrowable(update.getException().get()));
           }
-          update = allUpdates.poll();
+          if (System.nanoTime() - updatesStart > 
maxTimeProcessingUpdatesNanos) {
+            break;
+          } else {
+            update = allUpdates.poll();
+          }
         }
         boolean timersFired = fireTimers();
         addWorkIfNecessary(timersFired);

Reply via email to