Repository: incubator-beam Updated Branches: refs/heads/master c2c650a63 -> b8e6eea69
Replace DirectResult#awaitCompletion with waitUntilFinish This is the PipelineResult interface method, rather than the DirectRunner-specific method. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/06bd0747 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/06bd0747 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/06bd0747 Branch: refs/heads/master Commit: 06bd0747ab35c2c10e44638dac60279ed870136a Parents: 5e51c84 Author: Thomas Groh <tg...@google.com> Authored: Thu Oct 13 15:07:51 2016 -0700 Committer: Luke Cwik <lc...@google.com> Committed: Fri Oct 14 17:21:18 2016 -0700 ---------------------------------------------------------------------- .../beam/runners/direct/DirectOptions.java | 4 +-- .../beam/runners/direct/DirectRunner.java | 27 ++++++++++++-------- .../beam/runners/direct/DirectRunnerTest.java | 6 ++--- 3 files changed, 22 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/06bd0747/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectOptions.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectOptions.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectOptions.java index b2c4f47..32ef352 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectOptions.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectOptions.java @@ -42,8 +42,8 @@ public interface DirectOptions extends PipelineOptions, ApplicationNameOptions { @Description( "If the pipeline should block awaiting completion of the pipeline. If set to true, " + "a call to Pipeline#run() will block until all PTransforms are complete. Otherwise, " - + "the Pipeline will execute asynchronously. If set to false, the completion of the " - + "pipeline can be awaited on by use of DirectPipelineResult#awaitCompletion().") + + "the Pipeline will execute asynchronously. If set to false, use " + + "PipelineResult#waitUntilFinish() to block until the Pipeline is complete.") boolean isBlockOnRun(); void setBlockOnRun(boolean b); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/06bd0747/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java index 664a915..36d19cf 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java @@ -274,7 +274,7 @@ public class DirectRunner DirectPipelineResult result = new DirectPipelineResult(executor, context, aggregatorSteps); if (options.isBlockOnRun()) { try { - result.awaitCompletion(); + result.waitUntilFinish(); } catch (UserCodeException userException) { throw new PipelineExecutionException(userException.getCause()); } catch (Throwable t) { @@ -403,10 +403,21 @@ public class DirectRunner * * <p>See also {@link PipelineExecutor#awaitCompletion()}. */ - public State awaitCompletion() throws Exception { + @Override + public State waitUntilFinish() { if (!state.isTerminal()) { - executor.awaitCompletion(); - state = State.DONE; + try { + executor.awaitCompletion(); + state = State.DONE; + } catch (Exception e) { + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } + if (e instanceof RuntimeException) { + throw (RuntimeException) e; + } + throw new RuntimeException(e); + } } return state; } @@ -417,14 +428,10 @@ public class DirectRunner } @Override - public State waitUntilFinish() throws IOException { - return waitUntilFinish(Duration.millis(-1)); - } - - @Override public State waitUntilFinish(Duration duration) throws IOException { throw new UnsupportedOperationException( - "DirectPipelineResult does not support waitUntilFinish."); + "DirectPipelineResult does not support waitUntilFinish with a Duration parameter. See" + + " BEAM-596."); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/06bd0747/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java index d93dd7a..37af90c 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java @@ -126,7 +126,7 @@ public class DirectRunnerTest implements Serializable { PAssert.that(countStrs).containsInAnyOrder("baz: 1", "bar: 2", "foo: 3"); DirectPipelineResult result = ((DirectPipelineResult) p.run()); - result.awaitCompletion(); + result.waitUntilFinish(); } private static AtomicInteger changed; @@ -164,10 +164,10 @@ public class DirectRunnerTest implements Serializable { PAssert.that(countStrs).containsInAnyOrder("baz: 1", "bar: 2", "foo: 3"); DirectPipelineResult result = ((DirectPipelineResult) p.run()); - result.awaitCompletion(); + result.waitUntilFinish(); DirectPipelineResult otherResult = ((DirectPipelineResult) p.run()); - otherResult.awaitCompletion(); + otherResult.waitUntilFinish(); assertThat("Each element should have been processed twice", changed.get(), equalTo(6)); }