Repository: incubator-beam Updated Branches: refs/heads/master 636b5f7f6 -> 23fa61ce0
Add handleEmpty to CompletionCallback This is invoked when a Transform Executor has no work to do. Usually this is due to reinvocation of a Source. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a8eb274a Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a8eb274a Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a8eb274a Branch: refs/heads/master Commit: a8eb274aca3e4ab2b89a3e3d44bb2c755fd638eb Parents: 8b1e64a Author: Thomas Groh <tg...@google.com> Authored: Fri Jul 22 18:01:41 2016 -0700 Committer: Thomas Groh <tg...@google.com> Committed: Fri Jul 29 16:00:47 2016 -0700 ---------------------------------------------------------------------- .../beam/runners/direct/CompletionCallback.java | 5 ++ .../direct/ExecutorServiceParallelExecutor.java | 57 +++++--------------- .../beam/runners/direct/TransformExecutor.java | 1 + .../runners/direct/TransformExecutorTest.java | 27 ++++++++++ 4 files changed, 47 insertions(+), 43 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a8eb274a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CompletionCallback.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CompletionCallback.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CompletionCallback.java index 0c5fe24..2f496e9 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CompletionCallback.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CompletionCallback.java @@ -30,6 +30,11 @@ interface CompletionCallback { CommittedBundle<?> inputBundle, TransformResult result); /** + * Handle an input bundle that did not require processing. + */ + void handleEmpty(CommittedBundle<?> inputBundle); + + /** * Handle a result that terminated abnormally due to the provided {@link Throwable}. */ void handleThrowable(CommittedBundle<?> inputBundle, Throwable t); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a8eb274a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java index 43195e3..3901472 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java @@ -122,7 +122,8 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor { this.visibleUpdates = new ArrayBlockingQueue<>(20); parallelExecutorService = TransformExecutorServices.parallel(executorService); - defaultCompletionCallback = new DefaultCompletionCallback(); + defaultCompletionCallback = + new TimerIterableCompletionCallback(Collections.<TimerData>emptyList()); } private CacheLoader<StepAndKey, TransformExecutorService> @@ -217,18 +218,19 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor { /** * The base implementation of {@link CompletionCallback} that provides implementations for * {@link #handleResult(CommittedBundle, TransformResult)} and - * {@link #handleThrowable(CommittedBundle, Throwable)}, given an implementation of - * {@link #getCommittedResult(CommittedBundle, TransformResult)}. + * {@link #handleThrowable(CommittedBundle, Throwable)}. */ - private abstract class CompletionCallbackBase implements CompletionCallback { - protected abstract CommittedResult getCommittedResult( - CommittedBundle<?> inputBundle, - TransformResult result); + private class TimerIterableCompletionCallback implements CompletionCallback { + private final Iterable<TimerData> timers; + + protected TimerIterableCompletionCallback(Iterable<TimerData> timers) { + this.timers = timers; + } @Override public final CommittedResult handleResult( CommittedBundle<?> inputBundle, TransformResult result) { - CommittedResult committedResult = getCommittedResult(inputBundle, result); + CommittedResult committedResult = evaluationContext.handleResult(inputBundle, timers, result); for (CommittedBundle<?> outputBundle : committedResult.getOutputs()) { allUpdates.offer(ExecutorUpdate.fromBundle(outputBundle, valueToConsumers.get(outputBundle.getPCollection()))); @@ -242,43 +244,12 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor { } @Override - public final void handleThrowable(CommittedBundle<?> inputBundle, Throwable t) { - allUpdates.offer(ExecutorUpdate.fromThrowable(t)); - } - } - - /** - * The default {@link CompletionCallback}. The default completion callback is used to complete - * transform evaluations that are triggered due to the arrival of elements from an upstream - * transform, or for a source transform. - */ - private class DefaultCompletionCallback extends CompletionCallbackBase { - @Override - public CommittedResult getCommittedResult( - CommittedBundle<?> inputBundle, TransformResult result) { - return evaluationContext.handleResult(inputBundle, - Collections.<TimerData>emptyList(), - result); - } - } - - /** - * A {@link CompletionCallback} where the completed bundle was produced to deliver some collection - * of {@link TimerData timers}. When the evaluator completes successfully, reports all of the - * timers used to create the input to the {@link EvaluationContext evaluation context} - * as part of the result. - */ - private class TimerCompletionCallback extends CompletionCallbackBase { - private final Iterable<TimerData> timers; - - private TimerCompletionCallback(Iterable<TimerData> timers) { - this.timers = timers; + public void handleEmpty(CommittedBundle<?> inputBundle) { } @Override - public CommittedResult getCommittedResult( - CommittedBundle<?> inputBundle, TransformResult result) { - return evaluationContext.handleResult(inputBundle, timers, result); + public final void handleThrowable(CommittedBundle<?> inputBundle, Throwable t) { + allUpdates.offer(ExecutorUpdate.fromThrowable(t)); } } @@ -424,7 +395,7 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor { null, keyTimers.getKey(), (PCollection) transform.getInput()) .add(WindowedValue.valueInEmptyWindows(work)) .commit(Instant.now()); - scheduleConsumption(transform, bundle, new TimerCompletionCallback(delivery)); + scheduleConsumption(transform, bundle, new TimerIterableCompletionCallback(delivery)); firedTimers = true; } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a8eb274a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java index ef31ba7..793b508 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java @@ -111,6 +111,7 @@ class TransformExecutor<T> implements Runnable { TransformEvaluator<T> evaluator = evaluatorFactory.forApplication(transform, inputBundle, evaluationContext); if (evaluator == null) { + onComplete.handleEmpty(inputBundle); // Nothing to do return; } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a8eb274a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java index cb5cd46..95206f3 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java @@ -134,6 +134,26 @@ public class TransformExecutorTest { } @Test + public void nullTransformEvaluatorTerminates() throws Exception { + when(registry.forApplication(created.getProducingTransformInternal(), + null, + evaluationContext)).thenReturn(null); + + TransformExecutor<Object> executor = TransformExecutor.create(registry, + Collections.<ModelEnforcementFactory>emptyList(), + evaluationContext, + null, + created.getProducingTransformInternal(), + completionCallback, + transformEvaluationState); + executor.run(); + + assertThat(completionCallback.handledResult, is(nullValue())); + assertThat(completionCallback.handledEmpty, equalTo(true)); + assertThat(completionCallback.handledThrowable, is(nullValue())); + } + + @Test public void inputBundleProcessesEachElementFinishesAndCompletes() throws Exception { final TransformResult result = StepTransformResult.withoutHold(downstream.getProducingTransformInternal()).build(); @@ -471,6 +491,7 @@ public class TransformExecutorTest { private static class RegisteringCompletionCallback implements CompletionCallback { private TransformResult handledResult = null; + private boolean handledEmpty = false; private Throwable handledThrowable = null; private final CountDownLatch onMethod; @@ -496,6 +517,12 @@ public class TransformExecutorTest { } @Override + public void handleEmpty(CommittedBundle<?> inputBundle) { + handledEmpty = true; + onMethod.countDown(); + } + + @Override public void handleThrowable(CommittedBundle<?> inputBundle, Throwable t) { handledThrowable = t; onMethod.countDown();