Stop Catching Errors in the DirectRunner

Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/5e51c840
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/5e51c840
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/5e51c840

Branch: refs/heads/master
Commit: 5e51c84003c2c9e03d51f94cbc2be07569bf090e
Parents: c2c650a
Author: Thomas Groh <tg...@google.com>
Authored: Fri Oct 14 10:32:14 2016 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Fri Oct 14 17:21:18 2016 -0700

----------------------------------------------------------------------
 .../beam/runners/direct/CompletionCallback.java |  4 +-
 .../beam/runners/direct/DirectRunner.java       | 14 ++-----
 .../direct/ExecutorServiceParallelExecutor.java | 40 ++++++++++----------
 .../beam/runners/direct/PipelineExecutor.java   |  2 +-
 .../beam/runners/direct/TransformExecutor.java  | 10 ++---
 .../runners/direct/TransformExecutorTest.java   | 16 ++++----
 6 files changed, 39 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5e51c840/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CompletionCallback.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CompletionCallback.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CompletionCallback.java
index 8e51d6f..2986df1 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CompletionCallback.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CompletionCallback.java
@@ -38,7 +38,7 @@ interface CompletionCallback {
   void handleEmpty(AppliedPTransform<?, ?, ?> transform);
 
   /**
-   * Handle a result that terminated abnormally due to the provided {@link 
Throwable}.
+   * Handle a result that terminated abnormally due to the provided {@link 
Exception}.
    */
-  void handleThrowable(CommittedBundle<?> inputBundle, Throwable t);
+  void handleException(CommittedBundle<?> inputBundle, Exception t);
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5e51c840/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
index 6ef2472..664a915 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
@@ -403,18 +403,10 @@ public class DirectRunner
      *
      * <p>See also {@link PipelineExecutor#awaitCompletion()}.
      */
-    public State awaitCompletion() throws Throwable {
+    public State awaitCompletion() throws Exception {
       if (!state.isTerminal()) {
-        try {
-          executor.awaitCompletion();
-          state = State.DONE;
-        } catch (InterruptedException e) {
-          Thread.currentThread().interrupt();
-          throw e;
-        } catch (Throwable t) {
-          state = State.FAILED;
-          throw t;
-        }
+        executor.awaitCompletion();
+        state = State.DONE;
       }
       return state;
     }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5e51c840/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
index 3274524..e32f671 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
@@ -234,7 +234,7 @@ final class ExecutorServiceParallelExecutor implements 
PipelineExecutor {
   }
 
   @Override
-  public void awaitCompletion() throws Throwable {
+  public void awaitCompletion() throws Exception {
     VisibleExecutorUpdate update;
     do {
       // Get an update; don't block forever if another thread has handled it
@@ -243,8 +243,8 @@ final class ExecutorServiceParallelExecutor implements 
PipelineExecutor {
         // there are no updates to process and no updates will ever be 
published because the
         // executor is shutdown
         return;
-      } else if (update != null && update.throwable.isPresent()) {
-        throw update.throwable.get();
+      } else if (update != null && update.exception.isPresent()) {
+        throw update.exception.get();
       }
     } while (update == null || !update.isDone());
     executorService.shutdown();
@@ -253,7 +253,7 @@ final class ExecutorServiceParallelExecutor implements 
PipelineExecutor {
   /**
    * The base implementation of {@link CompletionCallback} that provides 
implementations for
    * {@link #handleResult(CommittedBundle, TransformResult)} and
-   * {@link #handleThrowable(CommittedBundle, Throwable)}.
+   * {@link #handleException(CommittedBundle, Exception)}.
    */
   private class TimerIterableCompletionCallback implements CompletionCallback {
     private final Iterable<TimerData> timers;
@@ -296,8 +296,8 @@ final class ExecutorServiceParallelExecutor implements 
PipelineExecutor {
     }
 
     @Override
-    public final void handleThrowable(CommittedBundle<?> inputBundle, 
Throwable t) {
-      allUpdates.offer(ExecutorUpdate.fromThrowable(t));
+    public final void handleException(CommittedBundle<?> inputBundle, 
Exception e) {
+      allUpdates.offer(ExecutorUpdate.fromException(e));
       outstandingWork.decrementAndGet();
     }
   }
@@ -315,14 +315,14 @@ final class ExecutorServiceParallelExecutor implements 
PipelineExecutor {
       return new AutoValue_ExecutorServiceParallelExecutor_ExecutorUpdate(
           Optional.of(bundle),
           consumers,
-          Optional.<Throwable>absent());
+          Optional.<Exception>absent());
     }
 
-    public static ExecutorUpdate fromThrowable(Throwable t) {
+    public static ExecutorUpdate fromException(Exception e) {
       return new AutoValue_ExecutorServiceParallelExecutor_ExecutorUpdate(
           Optional.<CommittedBundle<?>>absent(),
           Collections.<AppliedPTransform<?, ?, ?>>emptyList(),
-          Optional.of(t));
+          Optional.of(e));
     }
 
     /**
@@ -336,7 +336,7 @@ final class ExecutorServiceParallelExecutor implements 
PipelineExecutor {
      */
     public abstract Collection<AppliedPTransform<?, ?, ?>> getConsumers();
 
-    public abstract Optional<? extends Throwable> getException();
+    public abstract Optional<? extends Exception> getException();
   }
 
   /**
@@ -344,10 +344,10 @@ final class ExecutorServiceParallelExecutor implements 
PipelineExecutor {
    * return normally or throw an exception.
    */
   private static class VisibleExecutorUpdate {
-    private final Optional<? extends Throwable> throwable;
+    private final Optional<? extends Exception> exception;
     private final boolean done;
 
-    public static VisibleExecutorUpdate fromThrowable(Throwable e) {
+    public static VisibleExecutorUpdate fromException(Exception e) {
       return new VisibleExecutorUpdate(false, e);
     }
 
@@ -355,8 +355,8 @@ final class ExecutorServiceParallelExecutor implements 
PipelineExecutor {
       return new VisibleExecutorUpdate(true, null);
     }
 
-    private VisibleExecutorUpdate(boolean done, @Nullable Throwable exception) 
{
-      this.throwable = Optional.fromNullable(exception);
+    private VisibleExecutorUpdate(boolean done, @Nullable Exception exception) 
{
+      this.exception = Optional.fromNullable(exception);
       this.done = done;
     }
 
@@ -410,7 +410,7 @@ final class ExecutorServiceParallelExecutor implements 
PipelineExecutor {
               allUpdates.offer(update);
             }
           } else if (update.getException().isPresent()) {
-            
visibleUpdates.offer(VisibleExecutorUpdate.fromThrowable(update.getException().get()));
+            
visibleUpdates.offer(VisibleExecutorUpdate.fromException(update.getException().get()));
             exceptionThrown = true;
           }
         }
@@ -418,12 +418,12 @@ final class ExecutorServiceParallelExecutor implements 
PipelineExecutor {
       } catch (InterruptedException e) {
         Thread.currentThread().interrupt();
         LOG.error("Monitor died due to being interrupted");
-        while (!visibleUpdates.offer(VisibleExecutorUpdate.fromThrowable(e))) {
+        while (!visibleUpdates.offer(VisibleExecutorUpdate.fromException(e))) {
           visibleUpdates.poll();
         }
-      } catch (Throwable t) {
-        LOG.error("Monitor thread died due to throwable", t);
-        while (!visibleUpdates.offer(VisibleExecutorUpdate.fromThrowable(t))) {
+      } catch (Exception t) {
+        LOG.error("Monitor thread died due to exception", t);
+        while (!visibleUpdates.offer(VisibleExecutorUpdate.fromException(t))) {
           visibleUpdates.poll();
         }
       } finally {
@@ -478,7 +478,7 @@ final class ExecutorServiceParallelExecutor implements 
PipelineExecutor {
         try {
           registry.cleanup();
         } catch (Exception e) {
-          visibleUpdates.add(VisibleExecutorUpdate.fromThrowable(e));
+          visibleUpdates.add(VisibleExecutorUpdate.fromException(e));
         }
         if (evaluationContext.isDone()) {
           while (!visibleUpdates.offer(VisibleExecutorUpdate.finished())) {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5e51c840/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PipelineExecutor.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PipelineExecutor.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PipelineExecutor.java
index 01a5c54..f900a22 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PipelineExecutor.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PipelineExecutor.java
@@ -43,5 +43,5 @@ interface PipelineExecutor {
    * @throws Throwable whenever an executor thread throws anything, transfers 
the throwable to the
    *                   waiting thread and rethrows it
    */
-  void awaitCompletion() throws Throwable;
+  void awaitCompletion() throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5e51c840/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java
index 03f615b..c4002b5 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java
@@ -121,12 +121,12 @@ class TransformExecutor<T> implements Runnable {
       processElements(evaluator, metricsContainer, enforcements);
 
       finishBundle(evaluator, metricsContainer, enforcements);
-    } catch (Throwable t) {
-      onComplete.handleThrowable(inputBundle, t);
-      if (t instanceof RuntimeException) {
-        throw (RuntimeException) t;
+    } catch (Exception e) {
+      onComplete.handleException(inputBundle, e);
+      if (e instanceof RuntimeException) {
+        throw (RuntimeException) e;
       }
-      throw new RuntimeException(t);
+      throw new RuntimeException(e);
     } finally {
       // Report the physical metrics from the end of this step.
       context.getMetrics().commitPhysical(inputBundle, 
metricsContainer.getCumulative());

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5e51c840/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java
 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java
index 5015e5a..32f874d 100644
--- 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java
+++ 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java
@@ -130,7 +130,7 @@ public class TransformExecutorTest {
 
     assertThat(finishCalled.get(), is(true));
     assertThat(completionCallback.handledResult, equalTo(result));
-    assertThat(completionCallback.handledThrowable, is(nullValue()));
+    assertThat(completionCallback.handledException, is(nullValue()));
   }
 
   @Test
@@ -150,7 +150,7 @@ public class TransformExecutorTest {
 
     assertThat(completionCallback.handledResult, is(nullValue()));
     assertThat(completionCallback.handledEmpty, equalTo(true));
-    assertThat(completionCallback.handledThrowable, is(nullValue()));
+    assertThat(completionCallback.handledException, is(nullValue()));
   }
 
   @Test
@@ -196,7 +196,7 @@ public class TransformExecutorTest {
 
     assertThat(elementsProcessed, containsInAnyOrder(spam, third, foo));
     assertThat(completionCallback.handledResult, equalTo(result));
-    assertThat(completionCallback.handledThrowable, is(nullValue()));
+    assertThat(completionCallback.handledException, is(nullValue()));
   }
 
   @Test
@@ -237,7 +237,7 @@ public class TransformExecutorTest {
     evaluatorCompleted.await();
 
     assertThat(completionCallback.handledResult, is(nullValue()));
-    assertThat(completionCallback.handledThrowable, 
Matchers.<Throwable>equalTo(exception));
+    assertThat(completionCallback.handledException, 
Matchers.<Throwable>equalTo(exception));
   }
 
   @Test
@@ -273,7 +273,7 @@ public class TransformExecutorTest {
     evaluatorCompleted.await();
 
     assertThat(completionCallback.handledResult, is(nullValue()));
-    assertThat(completionCallback.handledThrowable, 
Matchers.<Throwable>equalTo(exception));
+    assertThat(completionCallback.handledException, 
Matchers.<Throwable>equalTo(exception));
   }
 
   @Test
@@ -479,7 +479,7 @@ public class TransformExecutorTest {
   private static class RegisteringCompletionCallback implements 
CompletionCallback {
     private TransformResult handledResult = null;
     private boolean handledEmpty = false;
-    private Throwable handledThrowable = null;
+    private Exception handledException = null;
     private final CountDownLatch onMethod;
 
     private RegisteringCompletionCallback(CountDownLatch onMethod) {
@@ -512,8 +512,8 @@ public class TransformExecutorTest {
     }
 
     @Override
-    public void handleThrowable(CommittedBundle<?> inputBundle, Throwable t) {
-      handledThrowable = t;
+    public void handleException(CommittedBundle<?> inputBundle, Exception e) {
+      handledException = e;
       onMethod.countDown();
     }
   }

Reply via email to