Repository: incubator-beam Updated Branches: refs/heads/master c2146b9f9 -> c0b67ab12
Return from awaitCompletion if Already Done This ensures that a call to ExecutorService#awaitCompletion returns immediately if there are no visible updates and the executor has completed. If the executor is in this state, no additional visible updates will be published and the call will hang. This sequence generally will not happen, as calls via InProcessPipelineResult return if the state is already terminal, but this ensures that parallel calls to awaitCompletion do not hang one calling thread. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/ebb69320 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/ebb69320 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/ebb69320 Branch: refs/heads/master Commit: ebb69320037fda48df5e6ca24ecc9415a4b2acb4 Parents: c2146b9 Author: Thomas Groh <tg...@users.noreply.github.com> Authored: Mon May 30 11:04:15 2016 -0700 Committer: bchambers <bchamb...@google.com> Committed: Mon Jun 6 10:09:47 2016 -0700 ---------------------------------------------------------------------- .../runners/direct/ExecutorServiceParallelExecutor.java | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ebb69320/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java index a627125..3129145 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java @@ -200,11 +200,16 @@ final class ExecutorServiceParallelExecutor implements InProcessExecutor { public void awaitCompletion() throws Throwable { VisibleExecutorUpdate update; do { - update = visibleUpdates.take(); - if (update.throwable.isPresent()) { + // Get an update; don't block forever if another thread has handled it + update = visibleUpdates.poll(2L, TimeUnit.SECONDS); + if (update == null && executorService.isShutdown()) { + // there are no updates to process and no updates will ever be published because the + // executor is shutdown + return; + } else if (update != null && update.throwable.isPresent()) { throw update.throwable.get(); } - } while (!update.isDone()); + } while (update == null || !update.isDone()); executorService.shutdown(); }