Repository: incubator-beam
Updated Branches:
  refs/heads/master c2c650a63 -> b8e6eea69


Replace DirectResult#awaitCompletion with waitUntilFinish

This is the PipelineResult interface method, rather than the
DirectRunner-specific method.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/06bd0747
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/06bd0747
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/06bd0747

Branch: refs/heads/master
Commit: 06bd0747ab35c2c10e44638dac60279ed870136a
Parents: 5e51c84
Author: Thomas Groh <tg...@google.com>
Authored: Thu Oct 13 15:07:51 2016 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Fri Oct 14 17:21:18 2016 -0700

----------------------------------------------------------------------
 .../beam/runners/direct/DirectOptions.java      |  4 +--
 .../beam/runners/direct/DirectRunner.java       | 27 ++++++++++++--------
 .../beam/runners/direct/DirectRunnerTest.java   |  6 ++---
 3 files changed, 22 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/06bd0747/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectOptions.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectOptions.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectOptions.java
index b2c4f47..32ef352 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectOptions.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectOptions.java
@@ -42,8 +42,8 @@ public interface DirectOptions extends PipelineOptions, 
ApplicationNameOptions {
   @Description(
       "If the pipeline should block awaiting completion of the pipeline. If 
set to true, "
           + "a call to Pipeline#run() will block until all PTransforms are 
complete. Otherwise, "
-          + "the Pipeline will execute asynchronously. If set to false, the 
completion of the "
-          + "pipeline can be awaited on by use of 
DirectPipelineResult#awaitCompletion().")
+          + "the Pipeline will execute asynchronously. If set to false, use "
+          + "PipelineResult#waitUntilFinish() to block until the Pipeline is 
complete.")
   boolean isBlockOnRun();
 
   void setBlockOnRun(boolean b);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/06bd0747/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
index 664a915..36d19cf 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
@@ -274,7 +274,7 @@ public class DirectRunner
     DirectPipelineResult result = new DirectPipelineResult(executor, context, 
aggregatorSteps);
     if (options.isBlockOnRun()) {
       try {
-        result.awaitCompletion();
+        result.waitUntilFinish();
       } catch (UserCodeException userException) {
         throw new PipelineExecutionException(userException.getCause());
       } catch (Throwable t) {
@@ -403,10 +403,21 @@ public class DirectRunner
      *
      * <p>See also {@link PipelineExecutor#awaitCompletion()}.
      */
-    public State awaitCompletion() throws Exception {
+    @Override
+    public State waitUntilFinish() {
       if (!state.isTerminal()) {
-        executor.awaitCompletion();
-        state = State.DONE;
+        try {
+          executor.awaitCompletion();
+          state = State.DONE;
+        } catch (Exception e) {
+          if (e instanceof InterruptedException) {
+            Thread.currentThread().interrupt();
+          }
+          if (e instanceof RuntimeException) {
+            throw (RuntimeException) e;
+          }
+          throw new RuntimeException(e);
+        }
       }
       return state;
     }
@@ -417,14 +428,10 @@ public class DirectRunner
     }
 
     @Override
-    public State waitUntilFinish() throws IOException {
-      return waitUntilFinish(Duration.millis(-1));
-    }
-
-    @Override
     public State waitUntilFinish(Duration duration) throws IOException {
       throw new UnsupportedOperationException(
-          "DirectPipelineResult does not support waitUntilFinish.");
+          "DirectPipelineResult does not support waitUntilFinish with a 
Duration parameter. See"
+              + " BEAM-596.");
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/06bd0747/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
index d93dd7a..37af90c 100644
--- 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
+++ 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
@@ -126,7 +126,7 @@ public class DirectRunnerTest implements Serializable {
     PAssert.that(countStrs).containsInAnyOrder("baz: 1", "bar: 2", "foo: 3");
 
     DirectPipelineResult result = ((DirectPipelineResult) p.run());
-    result.awaitCompletion();
+    result.waitUntilFinish();
   }
 
   private static AtomicInteger changed;
@@ -164,10 +164,10 @@ public class DirectRunnerTest implements Serializable {
     PAssert.that(countStrs).containsInAnyOrder("baz: 1", "bar: 2", "foo: 3");
 
     DirectPipelineResult result = ((DirectPipelineResult) p.run());
-    result.awaitCompletion();
+    result.waitUntilFinish();
 
     DirectPipelineResult otherResult = ((DirectPipelineResult) p.run());
-    otherResult.awaitCompletion();
+    otherResult.waitUntilFinish();
 
     assertThat("Each element should have been processed twice", changed.get(), 
equalTo(6));
   }

Reply via email to