Remove EvaluationContext as a forApplication Parameter

Instead use it as a paraemmter to the Evaluator Factory. Evaluator
Factories must not be reused across pipelines, as they may be stateful.
Evaluation Contexts are representative of a single execution of a
Pipeline and thus can be passed at construction time.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/c2970aa7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/c2970aa7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/c2970aa7

Branch: refs/heads/master
Commit: c2970aa7cc10e5e2d4bdc9c939a30df686c41ad2
Parents: 643cf63
Author: Thomas Groh <tg...@google.com>
Authored: Thu Sep 8 13:41:52 2016 -0700
Committer: bchambers <bchamb...@google.com>
Committed: Mon Sep 12 09:53:16 2016 -0700

----------------------------------------------------------------------
 .../direct/BoundedReadEvaluatorFactory.java     | 30 ++++----
 .../beam/runners/direct/DirectRunner.java       |  2 +-
 .../direct/ExecutorServiceParallelExecutor.java |  1 -
 .../runners/direct/FlattenEvaluatorFactory.java | 15 ++--
 .../GroupAlsoByWindowEvaluatorFactory.java      | 14 ++--
 .../direct/GroupByKeyOnlyEvaluatorFactory.java  | 14 ++--
 .../direct/ParDoMultiEvaluatorFactory.java      | 13 ++--
 .../direct/ParDoSingleEvaluatorFactory.java     | 13 ++--
 .../direct/TestStreamEvaluatorFactory.java      | 13 ++--
 .../direct/TransformEvaluatorFactory.java       | 22 +++---
 .../direct/TransformEvaluatorRegistry.java      | 37 ++++------
 .../beam/runners/direct/TransformExecutor.java  |  8 +--
 .../direct/UnboundedReadEvaluatorFactory.java   | 46 ++++++------
 .../runners/direct/ViewEvaluatorFactory.java    | 14 ++--
 .../runners/direct/WindowEvaluatorFactory.java  | 14 ++--
 .../direct/BoundedReadEvaluatorFactoryTest.java | 16 ++---
 .../direct/FlattenEvaluatorFactoryTest.java     | 15 ++--
 .../direct/GroupByKeyEvaluatorFactoryTest.java  |  5 +-
 .../GroupByKeyOnlyEvaluatorFactoryTest.java     |  4 +-
 .../direct/ParDoMultiEvaluatorFactoryTest.java  | 16 ++---
 .../direct/ParDoSingleEvaluatorFactoryTest.java | 16 ++---
 .../direct/TestStreamEvaluatorFactoryTest.java  | 35 +++++----
 .../runners/direct/TransformExecutorTest.java   | 75 +++++++-------------
 .../UnboundedReadEvaluatorFactoryTest.java      | 24 +++----
 .../direct/ViewEvaluatorFactoryTest.java        |  4 +-
 .../direct/WindowEvaluatorFactoryTest.java      |  6 +-
 26 files changed, 231 insertions(+), 241 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c2970aa7/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
index 2b15ad0..2046d31 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
@@ -45,33 +45,35 @@ final class BoundedReadEvaluatorFactory implements 
TransformEvaluatorFactory {
    * retriggered.
    */
   private final ConcurrentMap<AppliedPTransform<?, ?, ?>, Queue<? extends 
BoundedReadEvaluator<?>>>
-      sourceEvaluators = new ConcurrentHashMap<>();
+      sourceEvaluators;
+  private final EvaluationContext evaluationContext;
+
+  BoundedReadEvaluatorFactory(EvaluationContext evaluationContext) {
+    this.evaluationContext = evaluationContext;
+    sourceEvaluators = new ConcurrentHashMap<>();
+  }
 
   @SuppressWarnings({"unchecked", "rawtypes"})
   @Override
   @Nullable
   public <InputT> TransformEvaluator<InputT> forApplication(
-      AppliedPTransform<?, ?, ?> application,
-      @Nullable CommittedBundle<?> inputBundle,
-      EvaluationContext evaluationContext)
+      AppliedPTransform<?, ?, ?> application, @Nullable CommittedBundle<?> 
inputBundle)
       throws IOException {
-    return getTransformEvaluator((AppliedPTransform) application, 
evaluationContext);
+    return getTransformEvaluator((AppliedPTransform) application);
   }
 
   @Override
-  public void cleanup() {
-  }
+  public void cleanup() {}
 
   /**
-   * Get a {@link TransformEvaluator} that produces elements for the provided 
application of
-   * {@link Bounded Read.Bounded}, initializing the queue of evaluators if 
required.
+   * Get a {@link TransformEvaluator} that produces elements for the provided 
application of {@link
+   * Bounded Read.Bounded}, initializing the queue of evaluators if required.
    *
    * <p>This method is thread-safe, and will only produce new evaluators if no 
other invocation has
    * already done so.
    */
   private <OutputT> TransformEvaluator<?> getTransformEvaluator(
-      final AppliedPTransform<?, PCollection<OutputT>, Bounded<OutputT>> 
transform,
-      final EvaluationContext evaluationContext) {
+      final AppliedPTransform<?, PCollection<OutputT>, Bounded<OutputT>> 
transform) {
     // Key by the application and the context the evaluation is occurring in 
(which call to
     // Pipeline#run).
     Queue<BoundedReadEvaluator<OutputT>> evaluatorQueue =
@@ -106,8 +108,8 @@ final class BoundedReadEvaluatorFactory implements 
TransformEvaluatorFactory {
     private final AppliedPTransform<?, PCollection<OutputT>, Bounded<OutputT>> 
transform;
     private final EvaluationContext evaluationContext;
     /**
-     * The source being read from by this {@link BoundedReadEvaluator}. This 
may not be the same
-     * as the source derived from {@link #transform} due to splitting.
+     * The source being read from by this {@link BoundedReadEvaluator}. This 
may not be the same as
+     * the source derived from {@link #transform} due to splitting.
      */
     private BoundedSource<OutputT> source;
 
@@ -126,7 +128,7 @@ final class BoundedReadEvaluatorFactory implements 
TransformEvaluatorFactory {
     @Override
     public TransformResult finishBundle() throws IOException {
       try (final BoundedReader<OutputT> reader =
-              source.createReader(evaluationContext.getPipelineOptions())) {
+          source.createReader(evaluationContext.getPipelineOptions())) {
         boolean contentsRemaining = reader.start();
         UncommittedBundle<OutputT> output =
             evaluationContext.createRootBundle(transform.getOutput());

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c2970aa7/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
index b2d61c3..d8d82bd 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
@@ -245,7 +245,7 @@ public class DirectRunner
     // independent executor service for each run
     ExecutorService executorService = executorServiceSupplier.get();
 
-    TransformEvaluatorRegistry registry = 
TransformEvaluatorRegistry.defaultRegistry();
+    TransformEvaluatorRegistry registry = 
TransformEvaluatorRegistry.defaultRegistry(context);
     PipelineExecutor executor =
         ExecutorServiceParallelExecutor.create(
             executorService,

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c2970aa7/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
index 401ed7f..e765bd3 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
@@ -189,7 +189,6 @@ final class ExecutorServiceParallelExecutor implements 
PipelineExecutor {
         TransformExecutor.create(
             registry,
             enforcements,
-            evaluationContext,
             bundle,
             transform,
             onComplete,

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c2970aa7/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java
index 5a0d31d..456921c 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java
@@ -32,14 +32,20 @@ import org.apache.beam.sdk.values.PCollectionList;
  * {@link PTransform}.
  */
 class FlattenEvaluatorFactory implements TransformEvaluatorFactory {
+  private final EvaluationContext evaluationContext;
+
+  FlattenEvaluatorFactory(EvaluationContext evaluationContext) {
+    this.evaluationContext = evaluationContext;
+  }
+
   @Override
   public <InputT> TransformEvaluator<InputT> forApplication(
       AppliedPTransform<?, ?, ?> application,
-      CommittedBundle<?> inputBundle,
-      EvaluationContext evaluationContext) {
+      CommittedBundle<?> inputBundle
+      ) {
     @SuppressWarnings({"cast", "unchecked", "rawtypes"})
     TransformEvaluator<InputT> evaluator = (TransformEvaluator<InputT>) 
createInMemoryEvaluator(
-            (AppliedPTransform) application, inputBundle, evaluationContext);
+            (AppliedPTransform) application, inputBundle);
     return evaluator;
   }
 
@@ -50,8 +56,7 @@ class FlattenEvaluatorFactory implements 
TransformEvaluatorFactory {
       final AppliedPTransform<
               PCollectionList<InputT>, PCollection<InputT>, 
FlattenPCollectionList<InputT>>
           application,
-      final CommittedBundle<InputT> inputBundle,
-      final EvaluationContext evaluationContext) {
+      final CommittedBundle<InputT> inputBundle) {
     if (inputBundle == null) {
       // it is impossible to call processElement on a flatten with no input 
bundle. A Flatten with
       // no input bundle occurs as an output of 
Flatten.pcollections(PCollectionList.empty())

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c2970aa7/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
index c08c229..c7cf9e3 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
@@ -47,15 +47,20 @@ import org.apache.beam.sdk.values.TupleTag;
  * {@link GroupByKeyOnly} {@link PTransform}.
  */
 class GroupAlsoByWindowEvaluatorFactory implements TransformEvaluatorFactory {
+  private final EvaluationContext evaluationContext;
+
+  GroupAlsoByWindowEvaluatorFactory(EvaluationContext evaluationContext) {
+    this.evaluationContext = evaluationContext;
+  }
+
   @Override
   public <InputT> TransformEvaluator<InputT> forApplication(
       AppliedPTransform<?, ?, ?> application,
-      CommittedBundle<?> inputBundle,
-      EvaluationContext evaluationContext) {
+      CommittedBundle<?> inputBundle) {
     @SuppressWarnings({"cast", "unchecked", "rawtypes"})
     TransformEvaluator<InputT> evaluator =
         createEvaluator(
-            (AppliedPTransform) application, (CommittedBundle) inputBundle, 
evaluationContext);
+            (AppliedPTransform) application, (CommittedBundle) inputBundle);
     return evaluator;
   }
 
@@ -68,8 +73,7 @@ class GroupAlsoByWindowEvaluatorFactory implements 
TransformEvaluatorFactory {
               PCollection<KV<K, Iterable<V>>>,
               DirectGroupAlsoByWindow<K, V>>
           application,
-      CommittedBundle<KeyedWorkItem<K, V>> inputBundle,
-      EvaluationContext evaluationContext) {
+      CommittedBundle<KeyedWorkItem<K, V>> inputBundle) {
     return new GroupAlsoByWindowEvaluator<>(
         evaluationContext, inputBundle, application);
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c2970aa7/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactory.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactory.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactory.java
index 17dc0be..61d0e7b 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactory.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactory.java
@@ -47,15 +47,20 @@ import org.apache.beam.sdk.values.PCollection;
  * {@link GroupByKeyOnly} {@link PTransform}.
  */
 class GroupByKeyOnlyEvaluatorFactory implements TransformEvaluatorFactory {
+  private final EvaluationContext evaluationContext;
+
+  GroupByKeyOnlyEvaluatorFactory(EvaluationContext evaluationContext) {
+    this.evaluationContext = evaluationContext;
+  }
+
   @Override
   public <InputT> TransformEvaluator<InputT> forApplication(
       AppliedPTransform<?, ?, ?> application,
-      CommittedBundle<?> inputBundle,
-      EvaluationContext evaluationContext) {
+      CommittedBundle<?> inputBundle) {
     @SuppressWarnings({"cast", "unchecked", "rawtypes"})
     TransformEvaluator<InputT> evaluator =
         createEvaluator(
-            (AppliedPTransform) application, (CommittedBundle) inputBundle, 
evaluationContext);
+            (AppliedPTransform) application, (CommittedBundle) inputBundle);
     return evaluator;
   }
 
@@ -68,8 +73,7 @@ class GroupByKeyOnlyEvaluatorFactory implements 
TransformEvaluatorFactory {
           PCollection<KeyedWorkItem<K, V>>,
           DirectGroupByKeyOnly<K, V>>
           application,
-      final CommittedBundle<KV<K, WindowedValue<V>>> inputBundle,
-      final EvaluationContext evaluationContext) {
+      final CommittedBundle<KV<K, WindowedValue<V>>> inputBundle) {
     return new GroupByKeyOnlyEvaluator<>(evaluationContext, inputBundle, 
application);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c2970aa7/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactory.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactory.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactory.java
index 6a41adf..fcb68c4 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactory.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactory.java
@@ -41,8 +41,10 @@ class ParDoMultiEvaluatorFactory implements 
TransformEvaluatorFactory {
   private static final Logger LOG = 
LoggerFactory.getLogger(ParDoMultiEvaluatorFactory.class);
   private final LoadingCache<AppliedPTransform<?, ?, BoundMulti<?, ?>>, 
DoFnLifecycleManager>
       fnClones;
+  private final EvaluationContext evaluationContext;
 
-  public ParDoMultiEvaluatorFactory() {
+  public ParDoMultiEvaluatorFactory(EvaluationContext evaluationContext) {
+    this.evaluationContext = evaluationContext;
     fnClones = CacheBuilder.newBuilder()
         .build(new CacheLoader<AppliedPTransform<?, ?, BoundMulti<?, ?>>, 
DoFnLifecycleManager>() {
           @Override
@@ -55,12 +57,10 @@ class ParDoMultiEvaluatorFactory implements 
TransformEvaluatorFactory {
 
   @Override
   public <T> TransformEvaluator<T> forApplication(
-      AppliedPTransform<?, ?, ?> application,
-      CommittedBundle<?> inputBundle,
-      EvaluationContext evaluationContext) throws Exception {
+      AppliedPTransform<?, ?, ?> application, CommittedBundle<?> inputBundle) 
throws Exception {
     @SuppressWarnings({"unchecked", "rawtypes"})
     TransformEvaluator<T> evaluator =
-        createMultiEvaluator((AppliedPTransform) application, inputBundle, 
evaluationContext);
+        createMultiEvaluator((AppliedPTransform) application, inputBundle);
     return evaluator;
   }
 
@@ -71,8 +71,7 @@ class ParDoMultiEvaluatorFactory implements 
TransformEvaluatorFactory {
 
   private <InT, OuT> TransformEvaluator<InT> createMultiEvaluator(
       AppliedPTransform<PCollection<InT>, PCollectionTuple, BoundMulti<InT, 
OuT>> application,
-      CommittedBundle<InT> inputBundle,
-      EvaluationContext evaluationContext) throws Exception {
+      CommittedBundle<InT> inputBundle) throws Exception {
     Map<TupleTag<?>, PCollection<?>> outputs = 
application.getOutput().getAll();
 
     DoFnLifecycleManager fnLocal = fnClones.getUnchecked((AppliedPTransform) 
application);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c2970aa7/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactory.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactory.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactory.java
index 4bb740b..91da35f 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactory.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactory.java
@@ -40,8 +40,10 @@ import org.slf4j.LoggerFactory;
 class ParDoSingleEvaluatorFactory implements TransformEvaluatorFactory {
   private static final Logger LOG = 
LoggerFactory.getLogger(ParDoSingleEvaluatorFactory.class);
   private final LoadingCache<AppliedPTransform<?, ?, Bound<?, ?>>, 
DoFnLifecycleManager> fnClones;
+  private final EvaluationContext evaluationContext;
 
-  public ParDoSingleEvaluatorFactory() {
+  public ParDoSingleEvaluatorFactory(EvaluationContext evaluationContext) {
+    this.evaluationContext = evaluationContext;
     fnClones =
         CacheBuilder.newBuilder()
             .build(
@@ -57,11 +59,10 @@ class ParDoSingleEvaluatorFactory implements 
TransformEvaluatorFactory {
   @Override
   public <T> TransformEvaluator<T> forApplication(
       final AppliedPTransform<?, ?, ?> application,
-      CommittedBundle<?> inputBundle,
-      EvaluationContext evaluationContext) throws Exception {
+      CommittedBundle<?> inputBundle) throws Exception {
     @SuppressWarnings({"unchecked", "rawtypes"})
     TransformEvaluator<T> evaluator =
-        createSingleEvaluator((AppliedPTransform) application, inputBundle, 
evaluationContext);
+        createSingleEvaluator((AppliedPTransform) application, inputBundle);
     return evaluator;
   }
 
@@ -73,8 +74,8 @@ class ParDoSingleEvaluatorFactory implements 
TransformEvaluatorFactory {
   private <InputT, OutputT> TransformEvaluator<InputT> createSingleEvaluator(
       AppliedPTransform<PCollection<InputT>, PCollection<OutputT>, 
Bound<InputT, OutputT>>
           application,
-      CommittedBundle<InputT> inputBundle,
-      EvaluationContext evaluationContext) throws Exception {
+      CommittedBundle<InputT> inputBundle)
+      throws Exception {
     TupleTag<OutputT> mainOutputTag = new TupleTag<>("out");
     String stepName = evaluationContext.getStepName(application);
     DirectStepContext stepContext =

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c2970aa7/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
index 5fe771c..2adff59 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
@@ -53,15 +53,19 @@ import org.joda.time.Instant;
 class TestStreamEvaluatorFactory implements TransformEvaluatorFactory {
   private final KeyedResourcePool<AppliedPTransform<?, ?, ?>, Evaluator<?>> 
evaluators =
       LockedKeyedResourcePool.create();
+  private final EvaluationContext evaluationContext;
+
+  TestStreamEvaluatorFactory(EvaluationContext evaluationContext) {
+    this.evaluationContext = evaluationContext;
+  }
 
   @Nullable
   @Override
   public <InputT> TransformEvaluator<InputT> forApplication(
       AppliedPTransform<?, ?, ?> application,
-      @Nullable CommittedBundle<?> inputBundle,
-      EvaluationContext evaluationContext)
+      @Nullable CommittedBundle<?> inputBundle)
       throws Exception {
-    return createEvaluator((AppliedPTransform) application, evaluationContext);
+    return createEvaluator((AppliedPTransform) application);
   }
 
   @Override
@@ -76,8 +80,7 @@ class TestStreamEvaluatorFactory implements 
TransformEvaluatorFactory {
    * a separate collection of events cannot be created.
    */
   private <InputT, OutputT> TransformEvaluator<? super InputT> createEvaluator(
-      AppliedPTransform<PBegin, PCollection<OutputT>, TestStream<OutputT>> 
application,
-      EvaluationContext evaluationContext)
+      AppliedPTransform<PBegin, PCollection<OutputT>, TestStream<OutputT>> 
application)
       throws ExecutionException {
     return evaluators
         .tryAcquire(application, new CreateEvaluator<>(application, 
evaluationContext, evaluators))

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c2970aa7/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorFactory.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorFactory.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorFactory.java
index ecf2da8..e4f3e0c 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorFactory.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorFactory.java
@@ -35,26 +35,26 @@ public interface TransformEvaluatorFactory {
   /**
    * Create a new {@link TransformEvaluator} for the application of the {@link 
PTransform}.
    *
-   * <p>Any work that must be done before input elements are processed (such 
as calling
-   * {@code DoFn.StartBundle}) must be done before the
-   * {@link TransformEvaluator} is made available to the caller.
+   * <p>Any work that must be done before input elements are processed (such 
as calling {@code
+   * DoFn.StartBundle}) must be done before the {@link TransformEvaluator} is 
made available to the
+   * caller.
    *
    * <p>May return null if the application cannot produce an evaluator (for 
example, it is a
    * {@link Read} {@link PTransform} where all evaluators are in-use).
    *
    * @return An evaluator capable of processing the transform on the bundle, 
or null if no evaluator
-   * can be constructed.
+   *     can be constructed.
    * @throws Exception whenever constructing the underlying evaluator throws 
an exception
    */
-  @Nullable <InputT> TransformEvaluator<InputT> forApplication(
-      AppliedPTransform<?, ?, ?> application, @Nullable CommittedBundle<?> 
inputBundle,
-      EvaluationContext evaluationContext) throws Exception;
+  @Nullable
+  <InputT> TransformEvaluator<InputT> forApplication(
+      AppliedPTransform<?, ?, ?> application, @Nullable CommittedBundle<?> 
inputBundle)
+      throws Exception;
 
   /**
-   * Cleans up any state maintained by this {@link TransformEvaluatorFactory}. 
Called after a
-   * {@link Pipeline} is shut down. No more calls to
-   * {@link #forApplication(AppliedPTransform, CommittedBundle, 
EvaluationContext)} will be made
-   * after a call to {@link #cleanup()}.
+   * Cleans up any state maintained by this {@link TransformEvaluatorFactory}. 
Called after a {@link
+   * Pipeline} is shut down. No more calls to {@link 
#forApplication(AppliedPTransform,
+   * CommittedBundle)} will be made after a call to {@link #cleanup()}.
    */
   void cleanup() throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c2970aa7/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
index 9edc50f..08b636e 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
@@ -44,21 +44,21 @@ import org.slf4j.LoggerFactory;
  */
 class TransformEvaluatorRegistry implements TransformEvaluatorFactory {
   private static final Logger LOG = 
LoggerFactory.getLogger(TransformEvaluatorRegistry.class);
-  public static TransformEvaluatorRegistry defaultRegistry() {
+  public static TransformEvaluatorRegistry defaultRegistry(EvaluationContext 
ctxt) {
     @SuppressWarnings("rawtypes")
     ImmutableMap<Class<? extends PTransform>, TransformEvaluatorFactory> 
primitives =
         ImmutableMap.<Class<? extends PTransform>, 
TransformEvaluatorFactory>builder()
-            .put(Read.Bounded.class, new BoundedReadEvaluatorFactory())
-            .put(Read.Unbounded.class, new UnboundedReadEvaluatorFactory())
-            .put(ParDo.Bound.class, new ParDoSingleEvaluatorFactory())
-            .put(ParDo.BoundMulti.class, new ParDoMultiEvaluatorFactory())
-            .put(FlattenPCollectionList.class, new FlattenEvaluatorFactory())
-            .put(ViewEvaluatorFactory.WriteView.class, new 
ViewEvaluatorFactory())
-            .put(Window.Bound.class, new WindowEvaluatorFactory())
+            .put(Read.Bounded.class, new BoundedReadEvaluatorFactory(ctxt))
+            .put(Read.Unbounded.class, new UnboundedReadEvaluatorFactory(ctxt))
+            .put(ParDo.Bound.class, new ParDoSingleEvaluatorFactory(ctxt))
+            .put(ParDo.BoundMulti.class, new ParDoMultiEvaluatorFactory(ctxt))
+            .put(FlattenPCollectionList.class, new 
FlattenEvaluatorFactory(ctxt))
+            .put(ViewEvaluatorFactory.WriteView.class, new 
ViewEvaluatorFactory(ctxt))
+            .put(Window.Bound.class, new WindowEvaluatorFactory(ctxt))
             // Runner-specific primitives used in expansion of GroupByKey
-            .put(DirectGroupByKeyOnly.class, new 
GroupByKeyOnlyEvaluatorFactory())
-            .put(DirectGroupAlsoByWindow.class, new 
GroupAlsoByWindowEvaluatorFactory())
-            .put(TestStream.class, new TestStreamEvaluatorFactory())
+            .put(DirectGroupByKeyOnly.class, new 
GroupByKeyOnlyEvaluatorFactory(ctxt))
+            .put(DirectGroupAlsoByWindow.class, new 
GroupAlsoByWindowEvaluatorFactory(ctxt))
+            .put(TestStream.class, new TestStreamEvaluatorFactory(ctxt))
             .build();
     return new TransformEvaluatorRegistry(primitives);
   }
@@ -78,14 +78,12 @@ class TransformEvaluatorRegistry implements 
TransformEvaluatorFactory {
 
   @Override
   public <InputT> TransformEvaluator<InputT> forApplication(
-      AppliedPTransform<?, ?, ?> application,
-      @Nullable CommittedBundle<?> inputBundle,
-      EvaluationContext evaluationContext)
+      AppliedPTransform<?, ?, ?> application, @Nullable CommittedBundle<?> 
inputBundle)
       throws Exception {
     checkState(
         !finished.get(), "Tried to get an evaluator for a finished 
TransformEvaluatorRegistry");
     TransformEvaluatorFactory factory = 
factories.get(application.getTransform().getClass());
-    return factory.forApplication(application, inputBundle, evaluationContext);
+    return factory.forApplication(application, inputBundle);
   }
 
   @Override
@@ -115,13 +113,4 @@ class TransformEvaluatorRegistry implements 
TransformEvaluatorFactory {
       throw toThrow;
     }
   }
-
-  /**
-   * A factory to create Transform Evaluator Registries.
-   */
-  public static class Factory {
-    public TransformEvaluatorRegistry create() {
-      return TransformEvaluatorRegistry.defaultRegistry();
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c2970aa7/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java
index cc6b5b7..aaee9a5 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java
@@ -40,7 +40,6 @@ class TransformExecutor<T> implements Runnable {
   public static <T> TransformExecutor<T> create(
       TransformEvaluatorFactory factory,
       Iterable<? extends ModelEnforcementFactory> modelEnforcements,
-      EvaluationContext evaluationContext,
       CommittedBundle<T> inputBundle,
       AppliedPTransform<?, ?, ?> transform,
       CompletionCallback completionCallback,
@@ -48,7 +47,6 @@ class TransformExecutor<T> implements Runnable {
     return new TransformExecutor<>(
         factory,
         modelEnforcements,
-        evaluationContext,
         inputBundle,
         transform,
         completionCallback,
@@ -58,8 +56,6 @@ class TransformExecutor<T> implements Runnable {
   private final TransformEvaluatorFactory evaluatorFactory;
   private final Iterable<? extends ModelEnforcementFactory> modelEnforcements;
 
-  private final EvaluationContext evaluationContext;
-
   /** The transform that will be evaluated. */
   private final AppliedPTransform<?, ?, ?> transform;
   /** The inputs this {@link TransformExecutor} will deliver to the transform. 
*/
@@ -73,14 +69,12 @@ class TransformExecutor<T> implements Runnable {
   private TransformExecutor(
       TransformEvaluatorFactory factory,
       Iterable<? extends ModelEnforcementFactory> modelEnforcements,
-      EvaluationContext evaluationContext,
       CommittedBundle<T> inputBundle,
       AppliedPTransform<?, ?, ?> transform,
       CompletionCallback completionCallback,
       TransformExecutorService transformEvaluationState) {
     this.evaluatorFactory = factory;
     this.modelEnforcements = modelEnforcements;
-    this.evaluationContext = evaluationContext;
 
     this.inputBundle = inputBundle;
     this.transform = transform;
@@ -107,7 +101,7 @@ class TransformExecutor<T> implements Runnable {
         enforcements.add(enforcement);
       }
       TransformEvaluator<T> evaluator =
-          evaluatorFactory.forApplication(transform, inputBundle, 
evaluationContext);
+          evaluatorFactory.forApplication(transform, inputBundle);
       if (evaluator == null) {
         onComplete.handleEmpty(transform);
         // Nothing to do

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c2970aa7/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
index 9f485e0..0dfcd69 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
@@ -43,8 +43,7 @@ import org.joda.time.Instant;
  */
 class UnboundedReadEvaluatorFactory implements TransformEvaluatorFactory {
   // Resume from a checkpoint every nth invocation, to ensure close-and-resume 
is exercised
-  @VisibleForTesting
-  static final int MAX_READER_REUSE_COUNT = 20;
+  @VisibleForTesting static final int MAX_READER_REUSE_COUNT = 20;
 
   /*
    * An evaluator for a Source is stateful, to ensure the CheckpointMark is 
properly persisted.
@@ -57,28 +56,33 @@ class UnboundedReadEvaluatorFactory implements 
TransformEvaluatorFactory {
    * an arbitrary Queue implementation does not, so the concrete type is used 
explicitly.
    */
   private final ConcurrentMap<
-      AppliedPTransform<?, ?, ?>, ConcurrentLinkedQueue<? extends 
UnboundedReadEvaluator<?, ?>>>
-      sourceEvaluators = new ConcurrentHashMap<>();
+          AppliedPTransform<?, ?, ?>, ConcurrentLinkedQueue<? extends 
UnboundedReadEvaluator<?, ?>>>
+      sourceEvaluators;
+  private final EvaluationContext evaluationContext;
+
+  UnboundedReadEvaluatorFactory(EvaluationContext evaluationContext) {
+    this.evaluationContext = evaluationContext;
+    sourceEvaluators = new ConcurrentHashMap<>();
+  }
 
   @SuppressWarnings({"unchecked", "rawtypes"})
   @Override
   @Nullable
-  public <InputT> TransformEvaluator<InputT> 
forApplication(AppliedPTransform<?, ?, ?> application,
-      @Nullable CommittedBundle<?> inputBundle, EvaluationContext 
evaluationContext) {
-    return getTransformEvaluator((AppliedPTransform) application, 
evaluationContext);
+  public <InputT> TransformEvaluator<InputT> forApplication(
+      AppliedPTransform<?, ?, ?> application, @Nullable CommittedBundle<?> 
inputBundle) {
+    return getTransformEvaluator((AppliedPTransform) application);
   }
 
   /**
-   * Get a {@link TransformEvaluator} that produces elements for the provided 
application of
-   * {@link Unbounded Read.Unbounded}, initializing the queue of evaluators if 
required.
+   * Get a {@link TransformEvaluator} that produces elements for the provided 
application of {@link
+   * Unbounded Read.Unbounded}, initializing the queue of evaluators if 
required.
    *
    * <p>This method is thread-safe, and will only produce new evaluators if no 
other invocation has
    * already done so.
    */
   private <OutputT, CheckpointMarkT extends CheckpointMark>
       TransformEvaluator<?> getTransformEvaluator(
-          final AppliedPTransform<?, PCollection<OutputT>, Unbounded<OutputT>> 
transform,
-          final EvaluationContext evaluationContext) {
+          final AppliedPTransform<?, PCollection<OutputT>, Unbounded<OutputT>> 
transform) {
     ConcurrentLinkedQueue<UnboundedReadEvaluator<OutputT, CheckpointMarkT>> 
evaluatorQueue =
         (ConcurrentLinkedQueue<UnboundedReadEvaluator<OutputT, 
CheckpointMarkT>>)
             sourceEvaluators.get(transform);
@@ -119,8 +123,8 @@ class UnboundedReadEvaluatorFactory implements 
TransformEvaluatorFactory {
    *
    * <p>Calls to {@link UnboundedReadEvaluator} are not internally 
thread-safe, and should only be
    * used by a single thread at a time. Each {@link UnboundedReadEvaluator} 
maintains its own
-   * checkpoint, and constructs its reader from the current checkpoint in each 
call to
-   * {@link #finishBundle()}.
+   * checkpoint, and constructs its reader from the current checkpoint in each 
call to {@link
+   * #finishBundle()}.
    */
   private static class UnboundedReadEvaluator<OutputT, CheckpointMarkT extends 
CheckpointMark>
       implements TransformEvaluator<Object> {
@@ -135,13 +139,14 @@ class UnboundedReadEvaluatorFactory implements 
TransformEvaluatorFactory {
      * source as derived from {@link #transform} due to splitting.
      */
     private final UnboundedSource<OutputT, CheckpointMarkT> source;
+
     private final UnboundedReadDeduplicator deduplicator;
     private UnboundedReader<OutputT> currentReader;
     private CheckpointMarkT checkpointMark;
 
     /**
-     * The count of bundles output from this {@link UnboundedReadEvaluator}. 
Used to exercise
-     * {@link UnboundedReader#close()}.
+     * The count of bundles output from this {@link UnboundedReadEvaluator}. 
Used to exercise {@link
+     * UnboundedReader#close()}.
      */
     private int outputBundles = 0;
 
@@ -174,8 +179,9 @@ class UnboundedReadEvaluatorFactory implements 
TransformEvaluatorFactory {
           int numElements = 0;
           do {
             if (deduplicator.shouldOutput(currentReader.getCurrentRecordId())) 
{
-              
output.add(WindowedValue.timestampedValueInGlobalWindow(currentReader.getCurrent(),
-                  currentReader.getCurrentTimestamp()));
+              output.add(
+                  WindowedValue.timestampedValueInGlobalWindow(
+                      currentReader.getCurrent(), 
currentReader.getCurrentTimestamp()));
             }
             numElements++;
           } while (numElements < ARBITRARY_MAX_ELEMENTS && 
currentReader.advance());
@@ -224,7 +230,8 @@ class UnboundedReadEvaluatorFactory implements 
TransformEvaluatorFactory {
       // If the watermark is the max value, this source may not be invoked 
again. Finalize after
       // committing the output.
       if 
(!currentReader.getWatermark().isBefore(BoundedWindow.TIMESTAMP_MAX_VALUE)) {
-        
evaluationContext.scheduleAfterOutputWouldBeProduced(transform.getOutput(),
+        evaluationContext.scheduleAfterOutputWouldBeProduced(
+            transform.getOutput(),
             GlobalWindow.INSTANCE,
             transform.getOutput().getWindowingStrategy(),
             new Runnable() {
@@ -234,8 +241,7 @@ class UnboundedReadEvaluatorFactory implements 
TransformEvaluatorFactory {
                   mark.finalizeCheckpoint();
                 } catch (IOException e) {
                   throw new RuntimeException(
-                      "Couldn't finalize checkpoint after the end of the 
Global Window",
-                      e);
+                      "Couldn't finalize checkpoint after the end of the 
Global Window", e);
                 }
               }
             });

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c2970aa7/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java
index 40ac7f0..a4e8d6f 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java
@@ -47,14 +47,19 @@ import org.apache.beam.sdk.values.POutput;
  * written.
  */
 class ViewEvaluatorFactory implements TransformEvaluatorFactory {
+  private final EvaluationContext context;
+
+  ViewEvaluatorFactory(EvaluationContext context) {
+    this.context = context;
+  }
+
   @Override
   public <T> TransformEvaluator<T> forApplication(
       AppliedPTransform<?, ?, ?> application,
-      DirectRunner.CommittedBundle<?> inputBundle,
-      EvaluationContext evaluationContext) {
+      DirectRunner.CommittedBundle<?> inputBundle) {
     @SuppressWarnings({"cast", "unchecked", "rawtypes"})
     TransformEvaluator<T> evaluator = createEvaluator(
-            (AppliedPTransform) application, evaluationContext);
+            (AppliedPTransform) application);
     return evaluator;
   }
 
@@ -63,8 +68,7 @@ class ViewEvaluatorFactory implements 
TransformEvaluatorFactory {
 
   private <InT, OuT> TransformEvaluator<Iterable<InT>> createEvaluator(
       final AppliedPTransform<PCollection<Iterable<InT>>, 
PCollectionView<OuT>, WriteView<InT, OuT>>
-          application,
-      EvaluationContext context) {
+          application) {
     PCollection<Iterable<InT>> input = application.getInput();
     final PCollectionViewWriter<InT, OuT> writer =
         context.createPCollectionViewWriter(input, application.getOutput());

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c2970aa7/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java
index 19c1a98..47848e6 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java
@@ -38,21 +38,25 @@ import org.joda.time.Instant;
  * {@link Bound Window.Bound} primitive {@link PTransform}.
  */
 class WindowEvaluatorFactory implements TransformEvaluatorFactory {
+  private final EvaluationContext evaluationContext;
+
+  WindowEvaluatorFactory(EvaluationContext evaluationContext) {
+    this.evaluationContext = evaluationContext;
+  }
 
   @Override
   public <InputT> TransformEvaluator<InputT> forApplication(
       AppliedPTransform<?, ?, ?> application,
-      @Nullable CommittedBundle<?> inputBundle,
-      EvaluationContext evaluationContext)
+      @Nullable CommittedBundle<?> inputBundle
+ )
       throws Exception {
     return createTransformEvaluator(
-        (AppliedPTransform) application, inputBundle, evaluationContext);
+        (AppliedPTransform) application, inputBundle);
   }
 
   private <InputT> TransformEvaluator<InputT> createTransformEvaluator(
       AppliedPTransform<PCollection<InputT>, PCollection<InputT>, 
Window.Bound<InputT>> transform,
-      CommittedBundle<?> inputBundle,
-      EvaluationContext evaluationContext) {
+      CommittedBundle<?> inputBundle) {
     WindowFn<? super InputT, ?> fn = transform.getTransform().getWindowFn();
     UncommittedBundle<InputT> outputBundle =
         evaluationContext.createBundle(inputBundle, transform.getOutput());

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c2970aa7/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java
 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java
index cbeb733..cdd1661 100644
--- 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java
+++ 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java
@@ -71,7 +71,7 @@ public class BoundedReadEvaluatorFactoryTest {
     TestPipeline p = TestPipeline.create();
     longs = p.apply(Read.from(source));
 
-    factory = new BoundedReadEvaluatorFactory();
+    factory = new BoundedReadEvaluatorFactory(context);
     bundleFactory = ImmutableListBundleFactory.create();
   }
 
@@ -81,7 +81,7 @@ public class BoundedReadEvaluatorFactoryTest {
     when(context.createRootBundle(longs)).thenReturn(output);
 
     TransformEvaluator<?> evaluator =
-        factory.forApplication(longs.getProducingTransformInternal(), null, 
context);
+        factory.forApplication(longs.getProducingTransformInternal(), null);
     TransformResult result = evaluator.finishBundle();
     assertThat(result.getWatermarkHold(), 
equalTo(BoundedWindow.TIMESTAMP_MAX_VALUE));
     assertThat(
@@ -101,7 +101,7 @@ public class BoundedReadEvaluatorFactoryTest {
     when(context.createRootBundle(longs)).thenReturn(output);
 
     TransformEvaluator<?> evaluator =
-        factory.forApplication(longs.getProducingTransformInternal(), null, 
context);
+        factory.forApplication(longs.getProducingTransformInternal(), null);
     TransformResult result = evaluator.finishBundle();
     assertThat(result.getWatermarkHold(), 
equalTo(BoundedWindow.TIMESTAMP_MAX_VALUE));
     Iterable<? extends WindowedValue<Long>> outputElements =
@@ -114,7 +114,7 @@ public class BoundedReadEvaluatorFactoryTest {
     UncommittedBundle<Long> secondOutput = 
bundleFactory.createRootBundle(longs);
     when(context.createRootBundle(longs)).thenReturn(secondOutput);
     TransformEvaluator<?> secondEvaluator =
-        factory.forApplication(longs.getProducingTransformInternal(), null, 
context);
+        factory.forApplication(longs.getProducingTransformInternal(), null);
     assertThat(secondEvaluator, nullValue());
   }
 
@@ -130,9 +130,9 @@ public class BoundedReadEvaluatorFactoryTest {
 
     // create both evaluators before finishing either.
     TransformEvaluator<?> evaluator =
-        factory.forApplication(longs.getProducingTransformInternal(), null, 
context);
+        factory.forApplication(longs.getProducingTransformInternal(), null);
     TransformEvaluator<?> secondEvaluator =
-        factory.forApplication(longs.getProducingTransformInternal(), null, 
context);
+        factory.forApplication(longs.getProducingTransformInternal(), null);
     assertThat(secondEvaluator, nullValue());
 
     TransformResult result = evaluator.finishBundle();
@@ -163,7 +163,7 @@ public class BoundedReadEvaluatorFactoryTest {
     UncommittedBundle<Long> output = 
bundleFactory.createRootBundle(pcollection);
     when(context.createRootBundle(pcollection)).thenReturn(output);
 
-    TransformEvaluator<?> evaluator = factory.forApplication(sourceTransform, 
null, context);
+    TransformEvaluator<?> evaluator = factory.forApplication(sourceTransform, 
null);
     evaluator.finishBundle();
     CommittedBundle<Long> committed = output.commit(Instant.now());
     assertThat(committed.getElements(), containsInAnyOrder(gw(2L), gw(3L), 
gw(1L)));
@@ -181,7 +181,7 @@ public class BoundedReadEvaluatorFactoryTest {
     UncommittedBundle<Long> output = 
bundleFactory.createRootBundle(pcollection);
     when(context.createRootBundle(pcollection)).thenReturn(output);
 
-    TransformEvaluator<?> evaluator = factory.forApplication(sourceTransform, 
null, context);
+    TransformEvaluator<?> evaluator = factory.forApplication(sourceTransform, 
null);
     evaluator.finishBundle();
     CommittedBundle<Long> committed = output.commit(Instant.now());
     assertThat(committed.getElements(), emptyIterable());

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c2970aa7/runners/direct-java/src/test/java/org/apache/beam/runners/direct/FlattenEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/FlattenEvaluatorFactoryTest.java
 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/FlattenEvaluatorFactoryTest.java
index 1c46c24..3bae1ab 100644
--- 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/FlattenEvaluatorFactoryTest.java
+++ 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/FlattenEvaluatorFactoryTest.java
@@ -67,14 +67,11 @@ public class FlattenEvaluatorFactoryTest {
     when(context.createBundle(leftBundle, 
flattened)).thenReturn(flattenedLeftBundle);
     when(context.createBundle(rightBundle, 
flattened)).thenReturn(flattenedRightBundle);
 
-    FlattenEvaluatorFactory factory = new FlattenEvaluatorFactory();
+    FlattenEvaluatorFactory factory = new FlattenEvaluatorFactory(context);
     TransformEvaluator<Integer> leftSideEvaluator =
-        factory.forApplication(flattened.getProducingTransformInternal(), 
leftBundle, context);
+        factory.forApplication(flattened.getProducingTransformInternal(), 
leftBundle);
     TransformEvaluator<Integer> rightSideEvaluator =
-        factory.forApplication(
-            flattened.getProducingTransformInternal(),
-            rightBundle,
-            context);
+        factory.forApplication(flattened.getProducingTransformInternal(), 
rightBundle);
 
     leftSideEvaluator.processElement(WindowedValue.valueInGlobalWindow(1));
     rightSideEvaluator.processElement(WindowedValue.valueInGlobalWindow(-1));
@@ -123,11 +120,11 @@ public class FlattenEvaluatorFactoryTest {
 
     PCollection<Integer> flattened = 
list.apply(Flatten.<Integer>pCollections());
 
-    EvaluationContext context = mock(EvaluationContext.class);
+    EvaluationContext evaluationContext = mock(EvaluationContext.class);
 
-    FlattenEvaluatorFactory factory = new FlattenEvaluatorFactory();
+    FlattenEvaluatorFactory factory = new 
FlattenEvaluatorFactory(evaluationContext);
     TransformEvaluator<Integer> emptyEvaluator =
-        factory.forApplication(flattened.getProducingTransformInternal(), 
null, context);
+        factory.forApplication(flattened.getProducingTransformInternal(), 
null);
 
     TransformResult leftSideResult = emptyEvaluator.finishBundle();
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c2970aa7/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactoryTest.java
 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactoryTest.java
index 8d1f8bd..9395017 100644
--- 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactoryTest.java
+++ 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactoryTest.java
@@ -99,9 +99,8 @@ public class GroupByKeyEvaluatorFactoryTest {
     Coder<String> keyCoder =
         ((KvCoder<String, WindowedValue<Integer>>) 
kvs.getCoder()).getKeyCoder();
     TransformEvaluator<KV<String, WindowedValue<Integer>>> evaluator =
-        new GroupByKeyOnlyEvaluatorFactory()
-            .forApplication(
-                groupedKvs.getProducingTransformInternal(), inputBundle, 
evaluationContext);
+        new GroupByKeyOnlyEvaluatorFactory(evaluationContext)
+            .forApplication(groupedKvs.getProducingTransformInternal(), 
inputBundle);
 
     
evaluator.processElement(WindowedValue.valueInEmptyWindows(gwValue(firstFoo)));
     
evaluator.processElement(WindowedValue.valueInEmptyWindows(gwValue(secondFoo)));

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c2970aa7/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactoryTest.java
 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactoryTest.java
index 9f1e916..814a89a 100644
--- 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactoryTest.java
+++ 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactoryTest.java
@@ -101,9 +101,9 @@ public class GroupByKeyOnlyEvaluatorFactoryTest {
     Coder<String> keyCoder =
         ((KvCoder<String, WindowedValue<Integer>>) 
kvs.getCoder()).getKeyCoder();
     TransformEvaluator<KV<String, WindowedValue<Integer>>> evaluator =
-        new GroupByKeyOnlyEvaluatorFactory()
+        new GroupByKeyOnlyEvaluatorFactory(evaluationContext)
             .forApplication(
-                groupedKvs.getProducingTransformInternal(), inputBundle, 
evaluationContext);
+                groupedKvs.getProducingTransformInternal(), inputBundle);
 
     
evaluator.processElement(WindowedValue.valueInEmptyWindows(gwValue(firstFoo)));
     
evaluator.processElement(WindowedValue.valueInEmptyWindows(gwValue(secondFoo)));

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c2970aa7/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactoryTest.java
 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactoryTest.java
index 5552196..94b7f5d 100644
--- 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactoryTest.java
+++ 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactoryTest.java
@@ -119,9 +119,9 @@ public class ParDoMultiEvaluatorFactoryTest implements 
Serializable {
     when(evaluationContext.getAggregatorMutator()).thenReturn(mutator);
 
     TransformEvaluator<String> evaluator =
-        new ParDoMultiEvaluatorFactory()
+        new ParDoMultiEvaluatorFactory(evaluationContext)
             .forApplication(
-                mainOutput.getProducingTransformInternal(), inputBundle, 
evaluationContext);
+                mainOutput.getProducingTransformInternal(), inputBundle);
 
     evaluator.processElement(WindowedValue.valueInGlobalWindow("foo"));
     evaluator.processElement(
@@ -206,9 +206,9 @@ public class ParDoMultiEvaluatorFactoryTest implements 
Serializable {
     when(evaluationContext.getAggregatorMutator()).thenReturn(mutator);
 
     TransformEvaluator<String> evaluator =
-        new ParDoMultiEvaluatorFactory()
+        new ParDoMultiEvaluatorFactory(evaluationContext)
             .forApplication(
-                mainOutput.getProducingTransformInternal(), inputBundle, 
evaluationContext);
+                mainOutput.getProducingTransformInternal(), inputBundle);
 
     evaluator.processElement(WindowedValue.valueInGlobalWindow("foo"));
     evaluator.processElement(
@@ -300,9 +300,9 @@ public class ParDoMultiEvaluatorFactoryTest implements 
Serializable {
     when(evaluationContext.getAggregatorMutator()).thenReturn(mutator);
 
     TransformEvaluator<String> evaluator =
-        new ParDoMultiEvaluatorFactory()
+        new ParDoMultiEvaluatorFactory(evaluationContext)
             .forApplication(
-                mainOutput.getProducingTransformInternal(), inputBundle, 
evaluationContext);
+                mainOutput.getProducingTransformInternal(), inputBundle);
 
     evaluator.processElement(WindowedValue.valueInGlobalWindow("foo"));
     evaluator.processElement(
@@ -413,9 +413,9 @@ public class ParDoMultiEvaluatorFactoryTest implements 
Serializable {
     when(evaluationContext.getAggregatorMutator()).thenReturn(mutator);
 
     TransformEvaluator<String> evaluator =
-        new ParDoMultiEvaluatorFactory()
+        new ParDoMultiEvaluatorFactory(evaluationContext)
             .forApplication(
-                mainOutput.getProducingTransformInternal(), inputBundle, 
evaluationContext);
+                mainOutput.getProducingTransformInternal(), inputBundle);
 
     evaluator.processElement(WindowedValue.valueInGlobalWindow("foo"));
     evaluator.processElement(

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c2970aa7/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactoryTest.java
 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactoryTest.java
index 60b6dd9..7207b99 100644
--- 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactoryTest.java
+++ 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactoryTest.java
@@ -95,9 +95,9 @@ public class ParDoSingleEvaluatorFactoryTest implements 
Serializable {
     when(evaluationContext.getAggregatorMutator()).thenReturn(mutator);
 
     org.apache.beam.runners.direct.TransformEvaluator<String> evaluator =
-        new ParDoSingleEvaluatorFactory()
+        new ParDoSingleEvaluatorFactory(evaluationContext)
             .forApplication(
-                collection.getProducingTransformInternal(), inputBundle, 
evaluationContext);
+                collection.getProducingTransformInternal(), inputBundle);
 
     evaluator.processElement(WindowedValue.valueInGlobalWindow("foo"));
     evaluator.processElement(
@@ -149,9 +149,9 @@ public class ParDoSingleEvaluatorFactoryTest implements 
Serializable {
     when(evaluationContext.getAggregatorMutator()).thenReturn(mutator);
 
     TransformEvaluator<String> evaluator =
-        new ParDoSingleEvaluatorFactory()
+        new ParDoSingleEvaluatorFactory(evaluationContext)
             .forApplication(
-                collection.getProducingTransformInternal(), inputBundle, 
evaluationContext);
+                collection.getProducingTransformInternal(), inputBundle);
 
     evaluator.processElement(WindowedValue.valueInGlobalWindow("foo"));
     evaluator.processElement(
@@ -217,9 +217,9 @@ public class ParDoSingleEvaluatorFactoryTest implements 
Serializable {
     when(evaluationContext.getAggregatorMutator()).thenReturn(mutator);
 
     org.apache.beam.runners.direct.TransformEvaluator<String> evaluator =
-        new ParDoSingleEvaluatorFactory()
+        new ParDoSingleEvaluatorFactory(evaluationContext)
             .forApplication(
-                mainOutput.getProducingTransformInternal(), inputBundle, 
evaluationContext);
+                mainOutput.getProducingTransformInternal(), inputBundle);
 
     evaluator.processElement(WindowedValue.valueInGlobalWindow("foo"));
     evaluator.processElement(
@@ -320,9 +320,9 @@ public class ParDoSingleEvaluatorFactoryTest implements 
Serializable {
     when(evaluationContext.getAggregatorMutator()).thenReturn(mutator);
 
     TransformEvaluator<String> evaluator =
-        new ParDoSingleEvaluatorFactory()
+        new ParDoSingleEvaluatorFactory(evaluationContext)
             .forApplication(
-                mainOutput.getProducingTransformInternal(), inputBundle, 
evaluationContext);
+                mainOutput.getProducingTransformInternal(), inputBundle);
 
     evaluator.processElement(WindowedValue.valueInGlobalWindow("foo"));
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c2970aa7/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactoryTest.java
 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactoryTest.java
index 7703881..7413b25 100644
--- 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactoryTest.java
+++ 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactoryTest.java
@@ -35,6 +35,7 @@ import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PCollection;
 import org.hamcrest.Matchers;
 import org.joda.time.Instant;
+import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
@@ -42,8 +43,16 @@ import org.junit.runners.JUnit4;
 /** Tests for {@link TestStreamEvaluatorFactory}. */
 @RunWith(JUnit4.class)
 public class TestStreamEvaluatorFactoryTest {
-  private TestStreamEvaluatorFactory factory = new 
TestStreamEvaluatorFactory();
-  private BundleFactory bundleFactory = ImmutableListBundleFactory.create();
+  private TestStreamEvaluatorFactory factory;
+  private BundleFactory bundleFactory;
+  private EvaluationContext context;
+
+  @Before
+  public void setup() {
+    context = mock(EvaluationContext.class);
+    factory = new TestStreamEvaluatorFactory(context);
+    bundleFactory = ImmutableListBundleFactory.create();
+  }
 
   /** Demonstrates that returned evaluators produce elements in sequence. */
   @Test
@@ -56,21 +65,20 @@ public class TestStreamEvaluatorFactoryTest {
                 .addElements(4, 5, 6)
                 .advanceWatermarkToInfinity());
 
-    EvaluationContext context = mock(EvaluationContext.class);
     when(context.createRootBundle(streamVals))
         .thenReturn(
             bundleFactory.createRootBundle(streamVals), 
bundleFactory.createRootBundle(streamVals));
 
     TransformEvaluator<Object> firstEvaluator =
-        factory.forApplication(streamVals.getProducingTransformInternal(), 
null, context);
+        factory.forApplication(streamVals.getProducingTransformInternal(), 
null);
     TransformResult firstResult = firstEvaluator.finishBundle();
 
     TransformEvaluator<Object> secondEvaluator =
-        factory.forApplication(streamVals.getProducingTransformInternal(), 
null, context);
+        factory.forApplication(streamVals.getProducingTransformInternal(), 
null);
     TransformResult secondResult = secondEvaluator.finishBundle();
 
     TransformEvaluator<Object> thirdEvaluator =
-        factory.forApplication(streamVals.getProducingTransformInternal(), 
null, context);
+        factory.forApplication(streamVals.getProducingTransformInternal(), 
null);
     TransformResult thirdResult = thirdEvaluator.finishBundle();
 
     assertThat(
@@ -105,13 +113,12 @@ public class TestStreamEvaluatorFactoryTest {
         p.apply(
             TestStream.create(VarIntCoder.of()).addElements(4, 5, 
6).advanceWatermarkToInfinity());
 
-    EvaluationContext context = mock(EvaluationContext.class);
     TransformEvaluator<Object> firstEvaluator =
-        factory.forApplication(streamVals.getProducingTransformInternal(), 
null, context);
+        factory.forApplication(streamVals.getProducingTransformInternal(), 
null);
 
     // create a second evaluator before the first is finished. The evaluator 
should not be available
     TransformEvaluator<Object> secondEvaluator =
-        factory.forApplication(streamVals.getProducingTransformInternal(), 
null, context);
+        factory.forApplication(streamVals.getProducingTransformInternal(), 
null);
     assertThat(secondEvaluator, is(nullValue()));
   }
 
@@ -127,16 +134,15 @@ public class TestStreamEvaluatorFactoryTest {
     PCollection<Integer> firstVals = p.apply("Stream One", stream);
     PCollection<Integer> secondVals = p.apply("Stream A", stream);
 
-    EvaluationContext context = mock(EvaluationContext.class);
     
when(context.createRootBundle(firstVals)).thenReturn(bundleFactory.createRootBundle(firstVals));
     when(context.createRootBundle(secondVals))
         .thenReturn(bundleFactory.createRootBundle(secondVals));
 
     TransformEvaluator<Object> firstEvaluator =
-        factory.forApplication(firstVals.getProducingTransformInternal(), 
null, context);
+        factory.forApplication(firstVals.getProducingTransformInternal(), 
null);
     // The two evaluators can exist independently
     TransformEvaluator<Object> secondEvaluator =
-        factory.forApplication(secondVals.getProducingTransformInternal(), 
null, context);
+        factory.forApplication(secondVals.getProducingTransformInternal(), 
null);
 
     TransformResult firstResult = firstEvaluator.finishBundle();
     TransformResult secondResult = secondEvaluator.finishBundle();
@@ -175,16 +181,15 @@ public class TestStreamEvaluatorFactoryTest {
                 .addElements("Two")
                 .advanceWatermarkToInfinity());
 
-    EvaluationContext context = mock(EvaluationContext.class);
     
when(context.createRootBundle(firstVals)).thenReturn(bundleFactory.createRootBundle(firstVals));
     when(context.createRootBundle(secondVals))
         .thenReturn(bundleFactory.createRootBundle(secondVals));
 
     TransformEvaluator<Object> firstEvaluator =
-        factory.forApplication(firstVals.getProducingTransformInternal(), 
null, context);
+        factory.forApplication(firstVals.getProducingTransformInternal(), 
null);
     // The two evaluators can exist independently
     TransformEvaluator<Object> secondEvaluator =
-        factory.forApplication(secondVals.getProducingTransformInternal(), 
null, context);
+        factory.forApplication(secondVals.getProducingTransformInternal(), 
null);
 
     TransformResult firstResult = firstEvaluator.finishBundle();
     TransformResult secondResult = secondEvaluator.finishBundle();

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c2970aa7/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java
 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java
index 5af568f..344fd4b 100644
--- 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java
+++ 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java
@@ -60,10 +60,7 @@ import org.junit.runners.JUnit4;
 import org.mockito.Mock;
 import org.mockito.MockitoAnnotations;
 
-
-/**
- * Tests for {@link TransformExecutor}.
- */
+/** Tests for {@link TransformExecutor}. */
 @RunWith(JUnit4.class)
 public class TransformExecutorTest {
   @Rule public ExpectedException thrown = ExpectedException.none();
@@ -114,14 +111,13 @@ public class TransformExecutorTest {
           }
         };
 
-    when(registry.forApplication(created.getProducingTransformInternal(), 
null, evaluationContext))
+    when(registry.forApplication(created.getProducingTransformInternal(), 
null))
         .thenReturn(evaluator);
 
     TransformExecutor<Object> executor =
         TransformExecutor.create(
             registry,
             Collections.<ModelEnforcementFactory>emptyList(),
-            evaluationContext,
             null,
             created.getProducingTransformInternal(),
             completionCallback,
@@ -135,17 +131,16 @@ public class TransformExecutorTest {
 
   @Test
   public void nullTransformEvaluatorTerminates() throws Exception {
-    when(registry.forApplication(created.getProducingTransformInternal(),
-        null,
-        evaluationContext)).thenReturn(null);
-
-    TransformExecutor<Object> executor = TransformExecutor.create(registry,
-        Collections.<ModelEnforcementFactory>emptyList(),
-        evaluationContext,
-        null,
-        created.getProducingTransformInternal(),
-        completionCallback,
-        transformEvaluationState);
+    when(registry.forApplication(created.getProducingTransformInternal(), 
null)).thenReturn(null);
+
+    TransformExecutor<Object> executor =
+        TransformExecutor.create(
+            registry,
+            Collections.<ModelEnforcementFactory>emptyList(),
+            null,
+            created.getProducingTransformInternal(),
+            completionCallback,
+            transformEvaluationState);
     executor.run();
 
     assertThat(completionCallback.handledResult, is(nullValue()));
@@ -177,16 +172,13 @@ public class TransformExecutorTest {
     WindowedValue<String> third = WindowedValue.valueInGlobalWindow("third");
     CommittedBundle<String> inputBundle =
         
bundleFactory.createRootBundle(created).add(foo).add(spam).add(third).commit(Instant.now());
-    when(
-            registry.<String>forApplication(
-                downstream.getProducingTransformInternal(), inputBundle, 
evaluationContext))
+    
when(registry.<String>forApplication(downstream.getProducingTransformInternal(),
 inputBundle))
         .thenReturn(evaluator);
 
     TransformExecutor<String> executor =
         TransformExecutor.create(
             registry,
             Collections.<ModelEnforcementFactory>emptyList(),
-            evaluationContext,
             inputBundle,
             downstream.getProducingTransformInternal(),
             completionCallback,
@@ -222,16 +214,13 @@ public class TransformExecutorTest {
     WindowedValue<String> foo = WindowedValue.valueInGlobalWindow("foo");
     CommittedBundle<String> inputBundle =
         bundleFactory.createRootBundle(created).add(foo).commit(Instant.now());
-    when(
-            registry.<String>forApplication(
-                downstream.getProducingTransformInternal(), inputBundle, 
evaluationContext))
+    
when(registry.<String>forApplication(downstream.getProducingTransformInternal(),
 inputBundle))
         .thenReturn(evaluator);
 
     TransformExecutor<String> executor =
         TransformExecutor.create(
             registry,
             Collections.<ModelEnforcementFactory>emptyList(),
-            evaluationContext,
             inputBundle,
             downstream.getProducingTransformInternal(),
             completionCallback,
@@ -260,16 +249,13 @@ public class TransformExecutorTest {
 
     CommittedBundle<String> inputBundle =
         bundleFactory.createRootBundle(created).commit(Instant.now());
-    when(
-            registry.<String>forApplication(
-                downstream.getProducingTransformInternal(), inputBundle, 
evaluationContext))
+    
when(registry.<String>forApplication(downstream.getProducingTransformInternal(),
 inputBundle))
         .thenReturn(evaluator);
 
     TransformExecutor<String> executor =
         TransformExecutor.create(
             registry,
             Collections.<ModelEnforcementFactory>emptyList(),
-            evaluationContext,
             inputBundle,
             downstream.getProducingTransformInternal(),
             completionCallback,
@@ -303,14 +289,13 @@ public class TransformExecutorTest {
           }
         };
 
-    when(registry.forApplication(created.getProducingTransformInternal(), 
null, evaluationContext))
+    when(registry.forApplication(created.getProducingTransformInternal(), 
null))
         .thenReturn(evaluator);
 
     TransformExecutor<String> executor =
         TransformExecutor.create(
             registry,
             Collections.<ModelEnforcementFactory>emptyList(),
-            evaluationContext,
             null,
             created.getProducingTransformInternal(),
             completionCallback,
@@ -332,8 +317,7 @@ public class TransformExecutorTest {
     TransformEvaluator<Object> evaluator =
         new TransformEvaluator<Object>() {
           @Override
-          public void processElement(WindowedValue<Object> element) throws 
Exception {
-          }
+          public void processElement(WindowedValue<Object> element) throws 
Exception {}
 
           @Override
           public TransformResult finishBundle() throws Exception {
@@ -345,9 +329,7 @@ public class TransformExecutorTest {
     WindowedValue<String> barElem = WindowedValue.valueInGlobalWindow("bar");
     CommittedBundle<String> inputBundle =
         
bundleFactory.createRootBundle(created).add(fooElem).add(barElem).commit(Instant.now());
-    when(
-            registry.forApplication(
-                downstream.getProducingTransformInternal(), inputBundle, 
evaluationContext))
+    when(registry.forApplication(downstream.getProducingTransformInternal(), 
inputBundle))
         .thenReturn(evaluator);
 
     TestEnforcementFactory enforcement = new TestEnforcementFactory();
@@ -355,7 +337,6 @@ public class TransformExecutorTest {
         TransformExecutor.create(
             registry,
             Collections.<ModelEnforcementFactory>singleton(enforcement),
-            evaluationContext,
             inputBundle,
             downstream.getProducingTransformInternal(),
             completionCallback,
@@ -406,16 +387,13 @@ public class TransformExecutorTest {
     WindowedValue<byte[]> fooBytes = 
WindowedValue.valueInGlobalWindow("foo".getBytes());
     CommittedBundle<byte[]> inputBundle =
         
bundleFactory.createRootBundle(pcBytes).add(fooBytes).commit(Instant.now());
-    when(
-            registry.forApplication(
-                pcBytes.getProducingTransformInternal(), inputBundle, 
evaluationContext))
+    when(registry.forApplication(pcBytes.getProducingTransformInternal(), 
inputBundle))
         .thenReturn(evaluator);
 
     TransformExecutor<byte[]> executor =
         TransformExecutor.create(
             registry,
             
Collections.<ModelEnforcementFactory>singleton(ImmutabilityEnforcementFactory.create()),
-            evaluationContext,
             inputBundle,
             pcBytes.getProducingTransformInternal(),
             completionCallback,
@@ -465,16 +443,13 @@ public class TransformExecutorTest {
     WindowedValue<byte[]> fooBytes = 
WindowedValue.valueInGlobalWindow("foo".getBytes());
     CommittedBundle<byte[]> inputBundle =
         
bundleFactory.createRootBundle(pcBytes).add(fooBytes).commit(Instant.now());
-    when(
-            registry.forApplication(
-                pcBytes.getProducingTransformInternal(), inputBundle, 
evaluationContext))
+    when(registry.forApplication(pcBytes.getProducingTransformInternal(), 
inputBundle))
         .thenReturn(evaluator);
 
     TransformExecutor<byte[]> executor =
         TransformExecutor.create(
             registry,
             
Collections.<ModelEnforcementFactory>singleton(ImmutabilityEnforcementFactory.create()),
-            evaluationContext,
             inputBundle,
             pcBytes.getProducingTransformInternal(),
             completionCallback,
@@ -500,18 +475,19 @@ public class TransformExecutorTest {
     }
 
     @Override
-    public CommittedResult handleResult(
-        CommittedBundle<?> inputBundle, TransformResult result) {
+    public CommittedResult handleResult(CommittedBundle<?> inputBundle, 
TransformResult result) {
       handledResult = result;
       onMethod.countDown();
-      @SuppressWarnings("rawtypes") Iterable unprocessedElements =
+      @SuppressWarnings("rawtypes")
+      Iterable unprocessedElements =
           result.getUnprocessedElements() == null
               ? Collections.emptyList()
               : result.getUnprocessedElements();
 
       CommittedBundle<?> unprocessedBundle =
           inputBundle == null ? null : 
inputBundle.withElements(unprocessedElements);
-      return CommittedResult.create(result,
+      return CommittedResult.create(
+          result,
           unprocessedBundle,
           Collections.<CommittedBundle<?>>emptyList(),
           EnumSet.noneOf(OutputType.class));
@@ -532,6 +508,7 @@ public class TransformExecutorTest {
 
   private static class TestEnforcementFactory implements 
ModelEnforcementFactory {
     private TestEnforcement<?> instance;
+
     @Override
     public <T> TestEnforcement<T> forBundle(
         CommittedBundle<T> input, AppliedPTransform<?, ?, ?> consumer) {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c2970aa7/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
index 3a6add6..94c9dd5 100644
--- 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
+++ 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
@@ -81,8 +81,8 @@ public class UnboundedReadEvaluatorFactoryTest {
     TestPipeline p = TestPipeline.create();
     longs = p.apply(Read.from(source));
 
-    factory = new UnboundedReadEvaluatorFactory();
     context = mock(EvaluationContext.class);
+    factory = new UnboundedReadEvaluatorFactory(context);
     output = bundleFactory.createRootBundle(longs);
     when(context.createRootBundle(longs)).thenReturn(output);
   }
@@ -90,7 +90,7 @@ public class UnboundedReadEvaluatorFactoryTest {
   @Test
   public void unboundedSourceInMemoryTransformEvaluatorProducesElements() 
throws Exception {
     TransformEvaluator<?> evaluator =
-        factory.forApplication(longs.getProducingTransformInternal(), null, 
context);
+        factory.forApplication(longs.getProducingTransformInternal(), null);
 
     TransformResult result = evaluator.finishBundle();
     assertThat(
@@ -109,7 +109,7 @@ public class UnboundedReadEvaluatorFactoryTest {
   @Test
   public void 
unboundedSourceInMemoryTransformEvaluatorMultipleSequentialCalls() throws 
Exception {
     TransformEvaluator<?> evaluator =
-        factory.forApplication(longs.getProducingTransformInternal(), null, 
context);
+        factory.forApplication(longs.getProducingTransformInternal(), null);
 
     TransformResult result = evaluator.finishBundle();
     assertThat(
@@ -123,7 +123,7 @@ public class UnboundedReadEvaluatorFactoryTest {
     UncommittedBundle<Long> secondOutput = 
bundleFactory.createRootBundle(longs);
     when(context.createRootBundle(longs)).thenReturn(secondOutput);
     TransformEvaluator<?> secondEvaluator =
-        factory.forApplication(longs.getProducingTransformInternal(), null, 
context);
+        factory.forApplication(longs.getProducingTransformInternal(), null);
     TransformResult secondResult = secondEvaluator.finishBundle();
     assertThat(
         secondResult.getWatermarkHold(),
@@ -150,7 +150,7 @@ public class UnboundedReadEvaluatorFactoryTest {
 
     UncommittedBundle<Long> output = 
bundleFactory.createRootBundle(pcollection);
     when(context.createRootBundle(pcollection)).thenReturn(output);
-    TransformEvaluator<?> evaluator = factory.forApplication(sourceTransform, 
null, context);
+    TransformEvaluator<?> evaluator = factory.forApplication(sourceTransform, 
null);
 
     evaluator.finishBundle();
     assertThat(
@@ -159,7 +159,7 @@ public class UnboundedReadEvaluatorFactoryTest {
 
     UncommittedBundle<Long> secondOutput = 
bundleFactory.createRootBundle(longs);
     when(context.createRootBundle(longs)).thenReturn(secondOutput);
-    TransformEvaluator<?> secondEvaluator = 
factory.forApplication(sourceTransform, null, context);
+    TransformEvaluator<?> secondEvaluator = 
factory.forApplication(sourceTransform, null);
     secondEvaluator.finishBundle();
     assertThat(
         secondOutput.commit(Instant.now()).getElements(),
@@ -182,7 +182,7 @@ public class UnboundedReadEvaluatorFactoryTest {
     when(context.createRootBundle(pcollection)).thenReturn(output);
 
     for (int i = 0; i < UnboundedReadEvaluatorFactory.MAX_READER_REUSE_COUNT + 
1; i++) {
-      TransformEvaluator<?> evaluator = 
factory.forApplication(sourceTransform, null, context);
+      TransformEvaluator<?> evaluator = 
factory.forApplication(sourceTransform, null);
       evaluator.finishBundle();
     }
     assertThat(TestUnboundedSource.readerClosedCount, equalTo(1));
@@ -200,14 +200,14 @@ public class UnboundedReadEvaluatorFactoryTest {
     UncommittedBundle<Long> output = 
bundleFactory.createRootBundle(pcollection);
     when(context.createRootBundle(pcollection)).thenReturn(output);
 
-    TransformEvaluator<?> evaluator = factory.forApplication(sourceTransform, 
null, context);
+    TransformEvaluator<?> evaluator = factory.forApplication(sourceTransform, 
null);
     evaluator.finishBundle();
     CommittedBundle<Long> committed = output.commit(Instant.now());
     assertThat(ImmutableList.copyOf(committed.getElements()), hasSize(3));
     assertThat(TestUnboundedSource.readerClosedCount, equalTo(0));
     assertThat(TestUnboundedSource.readerAdvancedCount, equalTo(4));
 
-    evaluator = factory.forApplication(sourceTransform, null, context);
+    evaluator = factory.forApplication(sourceTransform, null);
     evaluator.finishBundle();
     assertThat(TestUnboundedSource.readerClosedCount, equalTo(0));
     // Tried to advance again, even with no elements
@@ -226,7 +226,7 @@ public class UnboundedReadEvaluatorFactoryTest {
     when(context.createRootBundle(pcollection)).thenReturn(output);
 
     for (int i = 0; i < 2 * 
UnboundedReadEvaluatorFactory.MAX_READER_REUSE_COUNT; i++) {
-      TransformEvaluator<?> evaluator = 
factory.forApplication(sourceTransform, null, context);
+      TransformEvaluator<?> evaluator = 
factory.forApplication(sourceTransform, null);
       evaluator.finishBundle();
     }
     assertThat(TestUnboundedSource.readerClosedCount, equalTo(0));
@@ -243,10 +243,10 @@ public class UnboundedReadEvaluatorFactoryTest {
   @Test
   public void unboundedSourceWithMultipleSimultaneousEvaluatorsIndependent() 
throws Exception {
     TransformEvaluator<?> evaluator =
-        factory.forApplication(longs.getProducingTransformInternal(), null, 
context);
+        factory.forApplication(longs.getProducingTransformInternal(), null);
 
     TransformEvaluator<?> secondEvaluator =
-        factory.forApplication(longs.getProducingTransformInternal(), null, 
context);
+        factory.forApplication(longs.getProducingTransformInternal(), null);
 
     assertThat(secondEvaluator, nullValue());
     TransformResult result = evaluator.finishBundle();

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c2970aa7/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java
 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java
index d3ab81d..ae904e4 100644
--- 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java
+++ 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java
@@ -74,8 +74,8 @@ public class ViewEvaluatorFactoryTest {
     CommittedBundle<String> inputBundle =
         bundleFactory.createRootBundle(input).commit(Instant.now());
     TransformEvaluator<Iterable<String>> evaluator =
-        new ViewEvaluatorFactory()
-            .forApplication(view.getProducingTransformInternal(), inputBundle, 
context);
+        new ViewEvaluatorFactory(context)
+            .forApplication(view.getProducingTransformInternal(), inputBundle);
 
     evaluator.processElement(
         
WindowedValue.<Iterable<String>>valueInGlobalWindow(ImmutableList.of("foo", 
"bar")));

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c2970aa7/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java
 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java
index 63800cf..29330df 100644
--- 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java
+++ 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java
@@ -103,7 +103,7 @@ public class WindowEvaluatorFactoryTest {
     input = p.apply(Create.of(1L, 2L, 3L));
 
     bundleFactory = ImmutableListBundleFactory.create();
-    factory = new WindowEvaluatorFactory();
+    factory = new WindowEvaluatorFactory(evaluationContext);
   }
 
   @Test
@@ -308,9 +308,7 @@ public class WindowEvaluatorFactoryTest {
       throws Exception {
     TransformEvaluator<Long> evaluator =
         factory.forApplication(
-            AppliedPTransform.of("Window", input, windowed, windowTransform),
-            inputBundle,
-            evaluationContext);
+            AppliedPTransform.of("Window", input, windowed, windowTransform), 
inputBundle);
 
     evaluator.processElement(valueInGlobalWindow);
     evaluator.processElement(valueInGlobalAndTwoIntervalWindows);

Reply via email to