Stop Catching Errors in the DirectRunner
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/5e51c840 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/5e51c840 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/5e51c840 Branch: refs/heads/master Commit: 5e51c84003c2c9e03d51f94cbc2be07569bf090e Parents: c2c650a Author: Thomas Groh <tg...@google.com> Authored: Fri Oct 14 10:32:14 2016 -0700 Committer: Luke Cwik <lc...@google.com> Committed: Fri Oct 14 17:21:18 2016 -0700 ---------------------------------------------------------------------- .../beam/runners/direct/CompletionCallback.java | 4 +- .../beam/runners/direct/DirectRunner.java | 14 ++----- .../direct/ExecutorServiceParallelExecutor.java | 40 ++++++++++---------- .../beam/runners/direct/PipelineExecutor.java | 2 +- .../beam/runners/direct/TransformExecutor.java | 10 ++--- .../runners/direct/TransformExecutorTest.java | 16 ++++---- 6 files changed, 39 insertions(+), 47 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5e51c840/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CompletionCallback.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CompletionCallback.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CompletionCallback.java index 8e51d6f..2986df1 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CompletionCallback.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CompletionCallback.java @@ -38,7 +38,7 @@ interface CompletionCallback { void handleEmpty(AppliedPTransform<?, ?, ?> transform); /** - * Handle a result that terminated abnormally due to the provided {@link Throwable}. + * Handle a result that terminated abnormally due to the provided {@link Exception}. */ - void handleThrowable(CommittedBundle<?> inputBundle, Throwable t); + void handleException(CommittedBundle<?> inputBundle, Exception t); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5e51c840/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java index 6ef2472..664a915 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java @@ -403,18 +403,10 @@ public class DirectRunner * * <p>See also {@link PipelineExecutor#awaitCompletion()}. */ - public State awaitCompletion() throws Throwable { + public State awaitCompletion() throws Exception { if (!state.isTerminal()) { - try { - executor.awaitCompletion(); - state = State.DONE; - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw e; - } catch (Throwable t) { - state = State.FAILED; - throw t; - } + executor.awaitCompletion(); + state = State.DONE; } return state; } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5e51c840/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java index 3274524..e32f671 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java @@ -234,7 +234,7 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor { } @Override - public void awaitCompletion() throws Throwable { + public void awaitCompletion() throws Exception { VisibleExecutorUpdate update; do { // Get an update; don't block forever if another thread has handled it @@ -243,8 +243,8 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor { // there are no updates to process and no updates will ever be published because the // executor is shutdown return; - } else if (update != null && update.throwable.isPresent()) { - throw update.throwable.get(); + } else if (update != null && update.exception.isPresent()) { + throw update.exception.get(); } } while (update == null || !update.isDone()); executorService.shutdown(); @@ -253,7 +253,7 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor { /** * The base implementation of {@link CompletionCallback} that provides implementations for * {@link #handleResult(CommittedBundle, TransformResult)} and - * {@link #handleThrowable(CommittedBundle, Throwable)}. + * {@link #handleException(CommittedBundle, Exception)}. */ private class TimerIterableCompletionCallback implements CompletionCallback { private final Iterable<TimerData> timers; @@ -296,8 +296,8 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor { } @Override - public final void handleThrowable(CommittedBundle<?> inputBundle, Throwable t) { - allUpdates.offer(ExecutorUpdate.fromThrowable(t)); + public final void handleException(CommittedBundle<?> inputBundle, Exception e) { + allUpdates.offer(ExecutorUpdate.fromException(e)); outstandingWork.decrementAndGet(); } } @@ -315,14 +315,14 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor { return new AutoValue_ExecutorServiceParallelExecutor_ExecutorUpdate( Optional.of(bundle), consumers, - Optional.<Throwable>absent()); + Optional.<Exception>absent()); } - public static ExecutorUpdate fromThrowable(Throwable t) { + public static ExecutorUpdate fromException(Exception e) { return new AutoValue_ExecutorServiceParallelExecutor_ExecutorUpdate( Optional.<CommittedBundle<?>>absent(), Collections.<AppliedPTransform<?, ?, ?>>emptyList(), - Optional.of(t)); + Optional.of(e)); } /** @@ -336,7 +336,7 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor { */ public abstract Collection<AppliedPTransform<?, ?, ?>> getConsumers(); - public abstract Optional<? extends Throwable> getException(); + public abstract Optional<? extends Exception> getException(); } /** @@ -344,10 +344,10 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor { * return normally or throw an exception. */ private static class VisibleExecutorUpdate { - private final Optional<? extends Throwable> throwable; + private final Optional<? extends Exception> exception; private final boolean done; - public static VisibleExecutorUpdate fromThrowable(Throwable e) { + public static VisibleExecutorUpdate fromException(Exception e) { return new VisibleExecutorUpdate(false, e); } @@ -355,8 +355,8 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor { return new VisibleExecutorUpdate(true, null); } - private VisibleExecutorUpdate(boolean done, @Nullable Throwable exception) { - this.throwable = Optional.fromNullable(exception); + private VisibleExecutorUpdate(boolean done, @Nullable Exception exception) { + this.exception = Optional.fromNullable(exception); this.done = done; } @@ -410,7 +410,7 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor { allUpdates.offer(update); } } else if (update.getException().isPresent()) { - visibleUpdates.offer(VisibleExecutorUpdate.fromThrowable(update.getException().get())); + visibleUpdates.offer(VisibleExecutorUpdate.fromException(update.getException().get())); exceptionThrown = true; } } @@ -418,12 +418,12 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor { } catch (InterruptedException e) { Thread.currentThread().interrupt(); LOG.error("Monitor died due to being interrupted"); - while (!visibleUpdates.offer(VisibleExecutorUpdate.fromThrowable(e))) { + while (!visibleUpdates.offer(VisibleExecutorUpdate.fromException(e))) { visibleUpdates.poll(); } - } catch (Throwable t) { - LOG.error("Monitor thread died due to throwable", t); - while (!visibleUpdates.offer(VisibleExecutorUpdate.fromThrowable(t))) { + } catch (Exception t) { + LOG.error("Monitor thread died due to exception", t); + while (!visibleUpdates.offer(VisibleExecutorUpdate.fromException(t))) { visibleUpdates.poll(); } } finally { @@ -478,7 +478,7 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor { try { registry.cleanup(); } catch (Exception e) { - visibleUpdates.add(VisibleExecutorUpdate.fromThrowable(e)); + visibleUpdates.add(VisibleExecutorUpdate.fromException(e)); } if (evaluationContext.isDone()) { while (!visibleUpdates.offer(VisibleExecutorUpdate.finished())) { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5e51c840/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PipelineExecutor.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PipelineExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PipelineExecutor.java index 01a5c54..f900a22 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PipelineExecutor.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PipelineExecutor.java @@ -43,5 +43,5 @@ interface PipelineExecutor { * @throws Throwable whenever an executor thread throws anything, transfers the throwable to the * waiting thread and rethrows it */ - void awaitCompletion() throws Throwable; + void awaitCompletion() throws Exception; } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5e51c840/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java index 03f615b..c4002b5 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java @@ -121,12 +121,12 @@ class TransformExecutor<T> implements Runnable { processElements(evaluator, metricsContainer, enforcements); finishBundle(evaluator, metricsContainer, enforcements); - } catch (Throwable t) { - onComplete.handleThrowable(inputBundle, t); - if (t instanceof RuntimeException) { - throw (RuntimeException) t; + } catch (Exception e) { + onComplete.handleException(inputBundle, e); + if (e instanceof RuntimeException) { + throw (RuntimeException) e; } - throw new RuntimeException(t); + throw new RuntimeException(e); } finally { // Report the physical metrics from the end of this step. context.getMetrics().commitPhysical(inputBundle, metricsContainer.getCumulative()); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5e51c840/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java index 5015e5a..32f874d 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java @@ -130,7 +130,7 @@ public class TransformExecutorTest { assertThat(finishCalled.get(), is(true)); assertThat(completionCallback.handledResult, equalTo(result)); - assertThat(completionCallback.handledThrowable, is(nullValue())); + assertThat(completionCallback.handledException, is(nullValue())); } @Test @@ -150,7 +150,7 @@ public class TransformExecutorTest { assertThat(completionCallback.handledResult, is(nullValue())); assertThat(completionCallback.handledEmpty, equalTo(true)); - assertThat(completionCallback.handledThrowable, is(nullValue())); + assertThat(completionCallback.handledException, is(nullValue())); } @Test @@ -196,7 +196,7 @@ public class TransformExecutorTest { assertThat(elementsProcessed, containsInAnyOrder(spam, third, foo)); assertThat(completionCallback.handledResult, equalTo(result)); - assertThat(completionCallback.handledThrowable, is(nullValue())); + assertThat(completionCallback.handledException, is(nullValue())); } @Test @@ -237,7 +237,7 @@ public class TransformExecutorTest { evaluatorCompleted.await(); assertThat(completionCallback.handledResult, is(nullValue())); - assertThat(completionCallback.handledThrowable, Matchers.<Throwable>equalTo(exception)); + assertThat(completionCallback.handledException, Matchers.<Throwable>equalTo(exception)); } @Test @@ -273,7 +273,7 @@ public class TransformExecutorTest { evaluatorCompleted.await(); assertThat(completionCallback.handledResult, is(nullValue())); - assertThat(completionCallback.handledThrowable, Matchers.<Throwable>equalTo(exception)); + assertThat(completionCallback.handledException, Matchers.<Throwable>equalTo(exception)); } @Test @@ -479,7 +479,7 @@ public class TransformExecutorTest { private static class RegisteringCompletionCallback implements CompletionCallback { private TransformResult handledResult = null; private boolean handledEmpty = false; - private Throwable handledThrowable = null; + private Exception handledException = null; private final CountDownLatch onMethod; private RegisteringCompletionCallback(CountDownLatch onMethod) { @@ -512,8 +512,8 @@ public class TransformExecutorTest { } @Override - public void handleThrowable(CommittedBundle<?> inputBundle, Throwable t) { - handledThrowable = t; + public void handleException(CommittedBundle<?> inputBundle, Exception e) { + handledException = e; onMethod.countDown(); } }