Repository: incubator-beam
Updated Branches:
  refs/heads/master 636b5f7f6 -> 23fa61ce0


Add handleEmpty to CompletionCallback

This is invoked when a Transform Executor has no work to do. Usually
this is due to reinvocation of a Source.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a8eb274a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a8eb274a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a8eb274a

Branch: refs/heads/master
Commit: a8eb274aca3e4ab2b89a3e3d44bb2c755fd638eb
Parents: 8b1e64a
Author: Thomas Groh <tg...@google.com>
Authored: Fri Jul 22 18:01:41 2016 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Fri Jul 29 16:00:47 2016 -0700

----------------------------------------------------------------------
 .../beam/runners/direct/CompletionCallback.java |  5 ++
 .../direct/ExecutorServiceParallelExecutor.java | 57 +++++---------------
 .../beam/runners/direct/TransformExecutor.java  |  1 +
 .../runners/direct/TransformExecutorTest.java   | 27 ++++++++++
 4 files changed, 47 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a8eb274a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CompletionCallback.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CompletionCallback.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CompletionCallback.java
index 0c5fe24..2f496e9 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CompletionCallback.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CompletionCallback.java
@@ -30,6 +30,11 @@ interface CompletionCallback {
       CommittedBundle<?> inputBundle, TransformResult result);
 
   /**
+   * Handle an input bundle that did not require processing.
+   */
+  void handleEmpty(CommittedBundle<?> inputBundle);
+
+  /**
    * Handle a result that terminated abnormally due to the provided {@link 
Throwable}.
    */
   void handleThrowable(CommittedBundle<?> inputBundle, Throwable t);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a8eb274a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
index 43195e3..3901472 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
@@ -122,7 +122,8 @@ final class ExecutorServiceParallelExecutor implements 
PipelineExecutor {
     this.visibleUpdates = new ArrayBlockingQueue<>(20);
 
     parallelExecutorService = 
TransformExecutorServices.parallel(executorService);
-    defaultCompletionCallback = new DefaultCompletionCallback();
+    defaultCompletionCallback =
+        new 
TimerIterableCompletionCallback(Collections.<TimerData>emptyList());
   }
 
   private CacheLoader<StepAndKey, TransformExecutorService>
@@ -217,18 +218,19 @@ final class ExecutorServiceParallelExecutor implements 
PipelineExecutor {
   /**
    * The base implementation of {@link CompletionCallback} that provides 
implementations for
    * {@link #handleResult(CommittedBundle, TransformResult)} and
-   * {@link #handleThrowable(CommittedBundle, Throwable)}, given an 
implementation of
-   * {@link #getCommittedResult(CommittedBundle, TransformResult)}.
+   * {@link #handleThrowable(CommittedBundle, Throwable)}.
    */
-  private abstract class CompletionCallbackBase implements CompletionCallback {
-    protected abstract CommittedResult getCommittedResult(
-        CommittedBundle<?> inputBundle,
-        TransformResult result);
+  private class TimerIterableCompletionCallback implements CompletionCallback {
+    private final Iterable<TimerData> timers;
+
+    protected TimerIterableCompletionCallback(Iterable<TimerData> timers) {
+      this.timers = timers;
+    }
 
     @Override
     public final CommittedResult handleResult(
         CommittedBundle<?> inputBundle, TransformResult result) {
-      CommittedResult committedResult = getCommittedResult(inputBundle, 
result);
+      CommittedResult committedResult = 
evaluationContext.handleResult(inputBundle, timers, result);
       for (CommittedBundle<?> outputBundle : committedResult.getOutputs()) {
         allUpdates.offer(ExecutorUpdate.fromBundle(outputBundle,
             valueToConsumers.get(outputBundle.getPCollection())));
@@ -242,43 +244,12 @@ final class ExecutorServiceParallelExecutor implements 
PipelineExecutor {
     }
 
     @Override
-    public final void handleThrowable(CommittedBundle<?> inputBundle, 
Throwable t) {
-      allUpdates.offer(ExecutorUpdate.fromThrowable(t));
-    }
-  }
-
-  /**
-   * The default {@link CompletionCallback}. The default completion callback 
is used to complete
-   * transform evaluations that are triggered due to the arrival of elements 
from an upstream
-   * transform, or for a source transform.
-   */
-  private class DefaultCompletionCallback extends CompletionCallbackBase {
-    @Override
-    public CommittedResult getCommittedResult(
-        CommittedBundle<?> inputBundle, TransformResult result) {
-      return evaluationContext.handleResult(inputBundle,
-          Collections.<TimerData>emptyList(),
-          result);
-    }
-  }
-
-  /**
-   * A {@link CompletionCallback} where the completed bundle was produced to 
deliver some collection
-   * of {@link TimerData timers}. When the evaluator completes successfully, 
reports all of the
-   * timers used to create the input to the {@link EvaluationContext 
evaluation context}
-   * as part of the result.
-   */
-  private class TimerCompletionCallback extends CompletionCallbackBase {
-    private final Iterable<TimerData> timers;
-
-    private TimerCompletionCallback(Iterable<TimerData> timers) {
-      this.timers = timers;
+    public void handleEmpty(CommittedBundle<?> inputBundle) {
     }
 
     @Override
-    public CommittedResult getCommittedResult(
-        CommittedBundle<?> inputBundle, TransformResult result) {
-          return evaluationContext.handleResult(inputBundle, timers, result);
+    public final void handleThrowable(CommittedBundle<?> inputBundle, 
Throwable t) {
+      allUpdates.offer(ExecutorUpdate.fromThrowable(t));
     }
   }
 
@@ -424,7 +395,7 @@ final class ExecutorServiceParallelExecutor implements 
PipelineExecutor {
                           null, keyTimers.getKey(), (PCollection) 
transform.getInput())
                       .add(WindowedValue.valueInEmptyWindows(work))
                       .commit(Instant.now());
-              scheduleConsumption(transform, bundle, new 
TimerCompletionCallback(delivery));
+              scheduleConsumption(transform, bundle, new 
TimerIterableCompletionCallback(delivery));
               firedTimers = true;
             }
           }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a8eb274a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java
index ef31ba7..793b508 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java
@@ -111,6 +111,7 @@ class TransformExecutor<T> implements Runnable {
       TransformEvaluator<T> evaluator =
           evaluatorFactory.forApplication(transform, inputBundle, 
evaluationContext);
       if (evaluator == null) {
+        onComplete.handleEmpty(inputBundle);
         // Nothing to do
         return;
       }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a8eb274a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java
 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java
index cb5cd46..95206f3 100644
--- 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java
+++ 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java
@@ -134,6 +134,26 @@ public class TransformExecutorTest {
   }
 
   @Test
+  public void nullTransformEvaluatorTerminates() throws Exception {
+    when(registry.forApplication(created.getProducingTransformInternal(),
+        null,
+        evaluationContext)).thenReturn(null);
+
+    TransformExecutor<Object> executor = TransformExecutor.create(registry,
+        Collections.<ModelEnforcementFactory>emptyList(),
+        evaluationContext,
+        null,
+        created.getProducingTransformInternal(),
+        completionCallback,
+        transformEvaluationState);
+    executor.run();
+
+    assertThat(completionCallback.handledResult, is(nullValue()));
+    assertThat(completionCallback.handledEmpty, equalTo(true));
+    assertThat(completionCallback.handledThrowable, is(nullValue()));
+  }
+
+  @Test
   public void inputBundleProcessesEachElementFinishesAndCompletes() throws 
Exception {
     final TransformResult result =
         
StepTransformResult.withoutHold(downstream.getProducingTransformInternal()).build();
@@ -471,6 +491,7 @@ public class TransformExecutorTest {
 
   private static class RegisteringCompletionCallback implements 
CompletionCallback {
     private TransformResult handledResult = null;
+    private boolean handledEmpty = false;
     private Throwable handledThrowable = null;
     private final CountDownLatch onMethod;
 
@@ -496,6 +517,12 @@ public class TransformExecutorTest {
     }
 
     @Override
+    public void handleEmpty(CommittedBundle<?> inputBundle) {
+      handledEmpty = true;
+      onMethod.countDown();
+    }
+
+    @Override
     public void handleThrowable(CommittedBundle<?> inputBundle, Throwable t) {
       handledThrowable = t;
       onMethod.countDown();

Reply via email to