Limit the number of work schedules per MonitorRunnable run This ensures that work readded to the queue will not cause the monitor runnable to run forever before delivering timers
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/d3b96bc3 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/d3b96bc3 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/d3b96bc3 Branch: refs/heads/master Commit: d3b96bc33c7e9846f756457d1214a011da1cf84b Parents: 272493e Author: Thomas Groh <tg...@google.com> Authored: Thu Apr 28 10:12:09 2016 -0700 Committer: Kenneth Knowles <k...@google.com> Committed: Tue May 10 10:15:14 2016 -0700 ---------------------------------------------------------------------- .../direct/ExecutorServiceParallelExecutor.java | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d3b96bc3/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java index de409e3..367c190 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java @@ -54,6 +54,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; import javax.annotation.Nullable; @@ -344,11 +345,11 @@ final class ExecutorServiceParallelExecutor implements InProcessExecutor { } private class MonitorRunnable implements Runnable { - private final String runnableName = - String.format( - "%s$%s-monitor", - evaluationContext.getPipelineOptions().getAppName(), - ExecutorServiceParallelExecutor.class.getSimpleName()); + // arbitrary termination condition to ensure progress in the presence of pushback + private final long maxTimeProcessingUpdatesNanos = TimeUnit.MILLISECONDS.toNanos(5L); + private final String runnableName = String.format("%s$%s-monitor", + evaluationContext.getPipelineOptions().getAppName(), + ExecutorServiceParallelExecutor.class.getSimpleName()); @Override public void run() { @@ -356,7 +357,9 @@ final class ExecutorServiceParallelExecutor implements InProcessExecutor { Thread.currentThread().setName(runnableName); try { ExecutorUpdate update = allUpdates.poll(); + int numUpdates = 0; // pull all of the pending work off of the queue + long updatesStart = System.nanoTime(); while (update != null) { LOG.debug("Executor Update: {}", update); if (update.getBundle().isPresent()) { @@ -364,7 +367,11 @@ final class ExecutorServiceParallelExecutor implements InProcessExecutor { } else if (update.getException().isPresent()) { visibleUpdates.offer(VisibleExecutorUpdate.fromThrowable(update.getException().get())); } - update = allUpdates.poll(); + if (System.nanoTime() - updatesStart > maxTimeProcessingUpdatesNanos) { + break; + } else { + update = allUpdates.poll(); + } } boolean timersFired = fireTimers(); addWorkIfNecessary(timersFired);