Remove EvaluationContext as a forApplication Parameter Instead use it as a paraemmter to the Evaluator Factory. Evaluator Factories must not be reused across pipelines, as they may be stateful. Evaluation Contexts are representative of a single execution of a Pipeline and thus can be passed at construction time.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/c2970aa7 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/c2970aa7 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/c2970aa7 Branch: refs/heads/master Commit: c2970aa7cc10e5e2d4bdc9c939a30df686c41ad2 Parents: 643cf63 Author: Thomas Groh <tg...@google.com> Authored: Thu Sep 8 13:41:52 2016 -0700 Committer: bchambers <bchamb...@google.com> Committed: Mon Sep 12 09:53:16 2016 -0700 ---------------------------------------------------------------------- .../direct/BoundedReadEvaluatorFactory.java | 30 ++++---- .../beam/runners/direct/DirectRunner.java | 2 +- .../direct/ExecutorServiceParallelExecutor.java | 1 - .../runners/direct/FlattenEvaluatorFactory.java | 15 ++-- .../GroupAlsoByWindowEvaluatorFactory.java | 14 ++-- .../direct/GroupByKeyOnlyEvaluatorFactory.java | 14 ++-- .../direct/ParDoMultiEvaluatorFactory.java | 13 ++-- .../direct/ParDoSingleEvaluatorFactory.java | 13 ++-- .../direct/TestStreamEvaluatorFactory.java | 13 ++-- .../direct/TransformEvaluatorFactory.java | 22 +++--- .../direct/TransformEvaluatorRegistry.java | 37 ++++------ .../beam/runners/direct/TransformExecutor.java | 8 +-- .../direct/UnboundedReadEvaluatorFactory.java | 46 ++++++------ .../runners/direct/ViewEvaluatorFactory.java | 14 ++-- .../runners/direct/WindowEvaluatorFactory.java | 14 ++-- .../direct/BoundedReadEvaluatorFactoryTest.java | 16 ++--- .../direct/FlattenEvaluatorFactoryTest.java | 15 ++-- .../direct/GroupByKeyEvaluatorFactoryTest.java | 5 +- .../GroupByKeyOnlyEvaluatorFactoryTest.java | 4 +- .../direct/ParDoMultiEvaluatorFactoryTest.java | 16 ++--- .../direct/ParDoSingleEvaluatorFactoryTest.java | 16 ++--- .../direct/TestStreamEvaluatorFactoryTest.java | 35 +++++---- .../runners/direct/TransformExecutorTest.java | 75 +++++++------------- .../UnboundedReadEvaluatorFactoryTest.java | 24 +++---- .../direct/ViewEvaluatorFactoryTest.java | 4 +- .../direct/WindowEvaluatorFactoryTest.java | 6 +- 26 files changed, 231 insertions(+), 241 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c2970aa7/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java index 2b15ad0..2046d31 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java @@ -45,33 +45,35 @@ final class BoundedReadEvaluatorFactory implements TransformEvaluatorFactory { * retriggered. */ private final ConcurrentMap<AppliedPTransform<?, ?, ?>, Queue<? extends BoundedReadEvaluator<?>>> - sourceEvaluators = new ConcurrentHashMap<>(); + sourceEvaluators; + private final EvaluationContext evaluationContext; + + BoundedReadEvaluatorFactory(EvaluationContext evaluationContext) { + this.evaluationContext = evaluationContext; + sourceEvaluators = new ConcurrentHashMap<>(); + } @SuppressWarnings({"unchecked", "rawtypes"}) @Override @Nullable public <InputT> TransformEvaluator<InputT> forApplication( - AppliedPTransform<?, ?, ?> application, - @Nullable CommittedBundle<?> inputBundle, - EvaluationContext evaluationContext) + AppliedPTransform<?, ?, ?> application, @Nullable CommittedBundle<?> inputBundle) throws IOException { - return getTransformEvaluator((AppliedPTransform) application, evaluationContext); + return getTransformEvaluator((AppliedPTransform) application); } @Override - public void cleanup() { - } + public void cleanup() {} /** - * Get a {@link TransformEvaluator} that produces elements for the provided application of - * {@link Bounded Read.Bounded}, initializing the queue of evaluators if required. + * Get a {@link TransformEvaluator} that produces elements for the provided application of {@link + * Bounded Read.Bounded}, initializing the queue of evaluators if required. * * <p>This method is thread-safe, and will only produce new evaluators if no other invocation has * already done so. */ private <OutputT> TransformEvaluator<?> getTransformEvaluator( - final AppliedPTransform<?, PCollection<OutputT>, Bounded<OutputT>> transform, - final EvaluationContext evaluationContext) { + final AppliedPTransform<?, PCollection<OutputT>, Bounded<OutputT>> transform) { // Key by the application and the context the evaluation is occurring in (which call to // Pipeline#run). Queue<BoundedReadEvaluator<OutputT>> evaluatorQueue = @@ -106,8 +108,8 @@ final class BoundedReadEvaluatorFactory implements TransformEvaluatorFactory { private final AppliedPTransform<?, PCollection<OutputT>, Bounded<OutputT>> transform; private final EvaluationContext evaluationContext; /** - * The source being read from by this {@link BoundedReadEvaluator}. This may not be the same - * as the source derived from {@link #transform} due to splitting. + * The source being read from by this {@link BoundedReadEvaluator}. This may not be the same as + * the source derived from {@link #transform} due to splitting. */ private BoundedSource<OutputT> source; @@ -126,7 +128,7 @@ final class BoundedReadEvaluatorFactory implements TransformEvaluatorFactory { @Override public TransformResult finishBundle() throws IOException { try (final BoundedReader<OutputT> reader = - source.createReader(evaluationContext.getPipelineOptions())) { + source.createReader(evaluationContext.getPipelineOptions())) { boolean contentsRemaining = reader.start(); UncommittedBundle<OutputT> output = evaluationContext.createRootBundle(transform.getOutput()); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c2970aa7/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java index b2d61c3..d8d82bd 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java @@ -245,7 +245,7 @@ public class DirectRunner // independent executor service for each run ExecutorService executorService = executorServiceSupplier.get(); - TransformEvaluatorRegistry registry = TransformEvaluatorRegistry.defaultRegistry(); + TransformEvaluatorRegistry registry = TransformEvaluatorRegistry.defaultRegistry(context); PipelineExecutor executor = ExecutorServiceParallelExecutor.create( executorService, http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c2970aa7/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java index 401ed7f..e765bd3 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java @@ -189,7 +189,6 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor { TransformExecutor.create( registry, enforcements, - evaluationContext, bundle, transform, onComplete, http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c2970aa7/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java index 5a0d31d..456921c 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java @@ -32,14 +32,20 @@ import org.apache.beam.sdk.values.PCollectionList; * {@link PTransform}. */ class FlattenEvaluatorFactory implements TransformEvaluatorFactory { + private final EvaluationContext evaluationContext; + + FlattenEvaluatorFactory(EvaluationContext evaluationContext) { + this.evaluationContext = evaluationContext; + } + @Override public <InputT> TransformEvaluator<InputT> forApplication( AppliedPTransform<?, ?, ?> application, - CommittedBundle<?> inputBundle, - EvaluationContext evaluationContext) { + CommittedBundle<?> inputBundle + ) { @SuppressWarnings({"cast", "unchecked", "rawtypes"}) TransformEvaluator<InputT> evaluator = (TransformEvaluator<InputT>) createInMemoryEvaluator( - (AppliedPTransform) application, inputBundle, evaluationContext); + (AppliedPTransform) application, inputBundle); return evaluator; } @@ -50,8 +56,7 @@ class FlattenEvaluatorFactory implements TransformEvaluatorFactory { final AppliedPTransform< PCollectionList<InputT>, PCollection<InputT>, FlattenPCollectionList<InputT>> application, - final CommittedBundle<InputT> inputBundle, - final EvaluationContext evaluationContext) { + final CommittedBundle<InputT> inputBundle) { if (inputBundle == null) { // it is impossible to call processElement on a flatten with no input bundle. A Flatten with // no input bundle occurs as an output of Flatten.pcollections(PCollectionList.empty()) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c2970aa7/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java index c08c229..c7cf9e3 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java @@ -47,15 +47,20 @@ import org.apache.beam.sdk.values.TupleTag; * {@link GroupByKeyOnly} {@link PTransform}. */ class GroupAlsoByWindowEvaluatorFactory implements TransformEvaluatorFactory { + private final EvaluationContext evaluationContext; + + GroupAlsoByWindowEvaluatorFactory(EvaluationContext evaluationContext) { + this.evaluationContext = evaluationContext; + } + @Override public <InputT> TransformEvaluator<InputT> forApplication( AppliedPTransform<?, ?, ?> application, - CommittedBundle<?> inputBundle, - EvaluationContext evaluationContext) { + CommittedBundle<?> inputBundle) { @SuppressWarnings({"cast", "unchecked", "rawtypes"}) TransformEvaluator<InputT> evaluator = createEvaluator( - (AppliedPTransform) application, (CommittedBundle) inputBundle, evaluationContext); + (AppliedPTransform) application, (CommittedBundle) inputBundle); return evaluator; } @@ -68,8 +73,7 @@ class GroupAlsoByWindowEvaluatorFactory implements TransformEvaluatorFactory { PCollection<KV<K, Iterable<V>>>, DirectGroupAlsoByWindow<K, V>> application, - CommittedBundle<KeyedWorkItem<K, V>> inputBundle, - EvaluationContext evaluationContext) { + CommittedBundle<KeyedWorkItem<K, V>> inputBundle) { return new GroupAlsoByWindowEvaluator<>( evaluationContext, inputBundle, application); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c2970aa7/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactory.java index 17dc0be..61d0e7b 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactory.java @@ -47,15 +47,20 @@ import org.apache.beam.sdk.values.PCollection; * {@link GroupByKeyOnly} {@link PTransform}. */ class GroupByKeyOnlyEvaluatorFactory implements TransformEvaluatorFactory { + private final EvaluationContext evaluationContext; + + GroupByKeyOnlyEvaluatorFactory(EvaluationContext evaluationContext) { + this.evaluationContext = evaluationContext; + } + @Override public <InputT> TransformEvaluator<InputT> forApplication( AppliedPTransform<?, ?, ?> application, - CommittedBundle<?> inputBundle, - EvaluationContext evaluationContext) { + CommittedBundle<?> inputBundle) { @SuppressWarnings({"cast", "unchecked", "rawtypes"}) TransformEvaluator<InputT> evaluator = createEvaluator( - (AppliedPTransform) application, (CommittedBundle) inputBundle, evaluationContext); + (AppliedPTransform) application, (CommittedBundle) inputBundle); return evaluator; } @@ -68,8 +73,7 @@ class GroupByKeyOnlyEvaluatorFactory implements TransformEvaluatorFactory { PCollection<KeyedWorkItem<K, V>>, DirectGroupByKeyOnly<K, V>> application, - final CommittedBundle<KV<K, WindowedValue<V>>> inputBundle, - final EvaluationContext evaluationContext) { + final CommittedBundle<KV<K, WindowedValue<V>>> inputBundle) { return new GroupByKeyOnlyEvaluator<>(evaluationContext, inputBundle, application); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c2970aa7/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactory.java index 6a41adf..fcb68c4 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactory.java @@ -41,8 +41,10 @@ class ParDoMultiEvaluatorFactory implements TransformEvaluatorFactory { private static final Logger LOG = LoggerFactory.getLogger(ParDoMultiEvaluatorFactory.class); private final LoadingCache<AppliedPTransform<?, ?, BoundMulti<?, ?>>, DoFnLifecycleManager> fnClones; + private final EvaluationContext evaluationContext; - public ParDoMultiEvaluatorFactory() { + public ParDoMultiEvaluatorFactory(EvaluationContext evaluationContext) { + this.evaluationContext = evaluationContext; fnClones = CacheBuilder.newBuilder() .build(new CacheLoader<AppliedPTransform<?, ?, BoundMulti<?, ?>>, DoFnLifecycleManager>() { @Override @@ -55,12 +57,10 @@ class ParDoMultiEvaluatorFactory implements TransformEvaluatorFactory { @Override public <T> TransformEvaluator<T> forApplication( - AppliedPTransform<?, ?, ?> application, - CommittedBundle<?> inputBundle, - EvaluationContext evaluationContext) throws Exception { + AppliedPTransform<?, ?, ?> application, CommittedBundle<?> inputBundle) throws Exception { @SuppressWarnings({"unchecked", "rawtypes"}) TransformEvaluator<T> evaluator = - createMultiEvaluator((AppliedPTransform) application, inputBundle, evaluationContext); + createMultiEvaluator((AppliedPTransform) application, inputBundle); return evaluator; } @@ -71,8 +71,7 @@ class ParDoMultiEvaluatorFactory implements TransformEvaluatorFactory { private <InT, OuT> TransformEvaluator<InT> createMultiEvaluator( AppliedPTransform<PCollection<InT>, PCollectionTuple, BoundMulti<InT, OuT>> application, - CommittedBundle<InT> inputBundle, - EvaluationContext evaluationContext) throws Exception { + CommittedBundle<InT> inputBundle) throws Exception { Map<TupleTag<?>, PCollection<?>> outputs = application.getOutput().getAll(); DoFnLifecycleManager fnLocal = fnClones.getUnchecked((AppliedPTransform) application); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c2970aa7/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactory.java index 4bb740b..91da35f 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactory.java @@ -40,8 +40,10 @@ import org.slf4j.LoggerFactory; class ParDoSingleEvaluatorFactory implements TransformEvaluatorFactory { private static final Logger LOG = LoggerFactory.getLogger(ParDoSingleEvaluatorFactory.class); private final LoadingCache<AppliedPTransform<?, ?, Bound<?, ?>>, DoFnLifecycleManager> fnClones; + private final EvaluationContext evaluationContext; - public ParDoSingleEvaluatorFactory() { + public ParDoSingleEvaluatorFactory(EvaluationContext evaluationContext) { + this.evaluationContext = evaluationContext; fnClones = CacheBuilder.newBuilder() .build( @@ -57,11 +59,10 @@ class ParDoSingleEvaluatorFactory implements TransformEvaluatorFactory { @Override public <T> TransformEvaluator<T> forApplication( final AppliedPTransform<?, ?, ?> application, - CommittedBundle<?> inputBundle, - EvaluationContext evaluationContext) throws Exception { + CommittedBundle<?> inputBundle) throws Exception { @SuppressWarnings({"unchecked", "rawtypes"}) TransformEvaluator<T> evaluator = - createSingleEvaluator((AppliedPTransform) application, inputBundle, evaluationContext); + createSingleEvaluator((AppliedPTransform) application, inputBundle); return evaluator; } @@ -73,8 +74,8 @@ class ParDoSingleEvaluatorFactory implements TransformEvaluatorFactory { private <InputT, OutputT> TransformEvaluator<InputT> createSingleEvaluator( AppliedPTransform<PCollection<InputT>, PCollection<OutputT>, Bound<InputT, OutputT>> application, - CommittedBundle<InputT> inputBundle, - EvaluationContext evaluationContext) throws Exception { + CommittedBundle<InputT> inputBundle) + throws Exception { TupleTag<OutputT> mainOutputTag = new TupleTag<>("out"); String stepName = evaluationContext.getStepName(application); DirectStepContext stepContext = http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c2970aa7/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java index 5fe771c..2adff59 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java @@ -53,15 +53,19 @@ import org.joda.time.Instant; class TestStreamEvaluatorFactory implements TransformEvaluatorFactory { private final KeyedResourcePool<AppliedPTransform<?, ?, ?>, Evaluator<?>> evaluators = LockedKeyedResourcePool.create(); + private final EvaluationContext evaluationContext; + + TestStreamEvaluatorFactory(EvaluationContext evaluationContext) { + this.evaluationContext = evaluationContext; + } @Nullable @Override public <InputT> TransformEvaluator<InputT> forApplication( AppliedPTransform<?, ?, ?> application, - @Nullable CommittedBundle<?> inputBundle, - EvaluationContext evaluationContext) + @Nullable CommittedBundle<?> inputBundle) throws Exception { - return createEvaluator((AppliedPTransform) application, evaluationContext); + return createEvaluator((AppliedPTransform) application); } @Override @@ -76,8 +80,7 @@ class TestStreamEvaluatorFactory implements TransformEvaluatorFactory { * a separate collection of events cannot be created. */ private <InputT, OutputT> TransformEvaluator<? super InputT> createEvaluator( - AppliedPTransform<PBegin, PCollection<OutputT>, TestStream<OutputT>> application, - EvaluationContext evaluationContext) + AppliedPTransform<PBegin, PCollection<OutputT>, TestStream<OutputT>> application) throws ExecutionException { return evaluators .tryAcquire(application, new CreateEvaluator<>(application, evaluationContext, evaluators)) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c2970aa7/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorFactory.java index ecf2da8..e4f3e0c 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorFactory.java @@ -35,26 +35,26 @@ public interface TransformEvaluatorFactory { /** * Create a new {@link TransformEvaluator} for the application of the {@link PTransform}. * - * <p>Any work that must be done before input elements are processed (such as calling - * {@code DoFn.StartBundle}) must be done before the - * {@link TransformEvaluator} is made available to the caller. + * <p>Any work that must be done before input elements are processed (such as calling {@code + * DoFn.StartBundle}) must be done before the {@link TransformEvaluator} is made available to the + * caller. * * <p>May return null if the application cannot produce an evaluator (for example, it is a * {@link Read} {@link PTransform} where all evaluators are in-use). * * @return An evaluator capable of processing the transform on the bundle, or null if no evaluator - * can be constructed. + * can be constructed. * @throws Exception whenever constructing the underlying evaluator throws an exception */ - @Nullable <InputT> TransformEvaluator<InputT> forApplication( - AppliedPTransform<?, ?, ?> application, @Nullable CommittedBundle<?> inputBundle, - EvaluationContext evaluationContext) throws Exception; + @Nullable + <InputT> TransformEvaluator<InputT> forApplication( + AppliedPTransform<?, ?, ?> application, @Nullable CommittedBundle<?> inputBundle) + throws Exception; /** - * Cleans up any state maintained by this {@link TransformEvaluatorFactory}. Called after a - * {@link Pipeline} is shut down. No more calls to - * {@link #forApplication(AppliedPTransform, CommittedBundle, EvaluationContext)} will be made - * after a call to {@link #cleanup()}. + * Cleans up any state maintained by this {@link TransformEvaluatorFactory}. Called after a {@link + * Pipeline} is shut down. No more calls to {@link #forApplication(AppliedPTransform, + * CommittedBundle)} will be made after a call to {@link #cleanup()}. */ void cleanup() throws Exception; } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c2970aa7/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java index 9edc50f..08b636e 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java @@ -44,21 +44,21 @@ import org.slf4j.LoggerFactory; */ class TransformEvaluatorRegistry implements TransformEvaluatorFactory { private static final Logger LOG = LoggerFactory.getLogger(TransformEvaluatorRegistry.class); - public static TransformEvaluatorRegistry defaultRegistry() { + public static TransformEvaluatorRegistry defaultRegistry(EvaluationContext ctxt) { @SuppressWarnings("rawtypes") ImmutableMap<Class<? extends PTransform>, TransformEvaluatorFactory> primitives = ImmutableMap.<Class<? extends PTransform>, TransformEvaluatorFactory>builder() - .put(Read.Bounded.class, new BoundedReadEvaluatorFactory()) - .put(Read.Unbounded.class, new UnboundedReadEvaluatorFactory()) - .put(ParDo.Bound.class, new ParDoSingleEvaluatorFactory()) - .put(ParDo.BoundMulti.class, new ParDoMultiEvaluatorFactory()) - .put(FlattenPCollectionList.class, new FlattenEvaluatorFactory()) - .put(ViewEvaluatorFactory.WriteView.class, new ViewEvaluatorFactory()) - .put(Window.Bound.class, new WindowEvaluatorFactory()) + .put(Read.Bounded.class, new BoundedReadEvaluatorFactory(ctxt)) + .put(Read.Unbounded.class, new UnboundedReadEvaluatorFactory(ctxt)) + .put(ParDo.Bound.class, new ParDoSingleEvaluatorFactory(ctxt)) + .put(ParDo.BoundMulti.class, new ParDoMultiEvaluatorFactory(ctxt)) + .put(FlattenPCollectionList.class, new FlattenEvaluatorFactory(ctxt)) + .put(ViewEvaluatorFactory.WriteView.class, new ViewEvaluatorFactory(ctxt)) + .put(Window.Bound.class, new WindowEvaluatorFactory(ctxt)) // Runner-specific primitives used in expansion of GroupByKey - .put(DirectGroupByKeyOnly.class, new GroupByKeyOnlyEvaluatorFactory()) - .put(DirectGroupAlsoByWindow.class, new GroupAlsoByWindowEvaluatorFactory()) - .put(TestStream.class, new TestStreamEvaluatorFactory()) + .put(DirectGroupByKeyOnly.class, new GroupByKeyOnlyEvaluatorFactory(ctxt)) + .put(DirectGroupAlsoByWindow.class, new GroupAlsoByWindowEvaluatorFactory(ctxt)) + .put(TestStream.class, new TestStreamEvaluatorFactory(ctxt)) .build(); return new TransformEvaluatorRegistry(primitives); } @@ -78,14 +78,12 @@ class TransformEvaluatorRegistry implements TransformEvaluatorFactory { @Override public <InputT> TransformEvaluator<InputT> forApplication( - AppliedPTransform<?, ?, ?> application, - @Nullable CommittedBundle<?> inputBundle, - EvaluationContext evaluationContext) + AppliedPTransform<?, ?, ?> application, @Nullable CommittedBundle<?> inputBundle) throws Exception { checkState( !finished.get(), "Tried to get an evaluator for a finished TransformEvaluatorRegistry"); TransformEvaluatorFactory factory = factories.get(application.getTransform().getClass()); - return factory.forApplication(application, inputBundle, evaluationContext); + return factory.forApplication(application, inputBundle); } @Override @@ -115,13 +113,4 @@ class TransformEvaluatorRegistry implements TransformEvaluatorFactory { throw toThrow; } } - - /** - * A factory to create Transform Evaluator Registries. - */ - public static class Factory { - public TransformEvaluatorRegistry create() { - return TransformEvaluatorRegistry.defaultRegistry(); - } - } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c2970aa7/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java index cc6b5b7..aaee9a5 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java @@ -40,7 +40,6 @@ class TransformExecutor<T> implements Runnable { public static <T> TransformExecutor<T> create( TransformEvaluatorFactory factory, Iterable<? extends ModelEnforcementFactory> modelEnforcements, - EvaluationContext evaluationContext, CommittedBundle<T> inputBundle, AppliedPTransform<?, ?, ?> transform, CompletionCallback completionCallback, @@ -48,7 +47,6 @@ class TransformExecutor<T> implements Runnable { return new TransformExecutor<>( factory, modelEnforcements, - evaluationContext, inputBundle, transform, completionCallback, @@ -58,8 +56,6 @@ class TransformExecutor<T> implements Runnable { private final TransformEvaluatorFactory evaluatorFactory; private final Iterable<? extends ModelEnforcementFactory> modelEnforcements; - private final EvaluationContext evaluationContext; - /** The transform that will be evaluated. */ private final AppliedPTransform<?, ?, ?> transform; /** The inputs this {@link TransformExecutor} will deliver to the transform. */ @@ -73,14 +69,12 @@ class TransformExecutor<T> implements Runnable { private TransformExecutor( TransformEvaluatorFactory factory, Iterable<? extends ModelEnforcementFactory> modelEnforcements, - EvaluationContext evaluationContext, CommittedBundle<T> inputBundle, AppliedPTransform<?, ?, ?> transform, CompletionCallback completionCallback, TransformExecutorService transformEvaluationState) { this.evaluatorFactory = factory; this.modelEnforcements = modelEnforcements; - this.evaluationContext = evaluationContext; this.inputBundle = inputBundle; this.transform = transform; @@ -107,7 +101,7 @@ class TransformExecutor<T> implements Runnable { enforcements.add(enforcement); } TransformEvaluator<T> evaluator = - evaluatorFactory.forApplication(transform, inputBundle, evaluationContext); + evaluatorFactory.forApplication(transform, inputBundle); if (evaluator == null) { onComplete.handleEmpty(transform); // Nothing to do http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c2970aa7/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java index 9f485e0..0dfcd69 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java @@ -43,8 +43,7 @@ import org.joda.time.Instant; */ class UnboundedReadEvaluatorFactory implements TransformEvaluatorFactory { // Resume from a checkpoint every nth invocation, to ensure close-and-resume is exercised - @VisibleForTesting - static final int MAX_READER_REUSE_COUNT = 20; + @VisibleForTesting static final int MAX_READER_REUSE_COUNT = 20; /* * An evaluator for a Source is stateful, to ensure the CheckpointMark is properly persisted. @@ -57,28 +56,33 @@ class UnboundedReadEvaluatorFactory implements TransformEvaluatorFactory { * an arbitrary Queue implementation does not, so the concrete type is used explicitly. */ private final ConcurrentMap< - AppliedPTransform<?, ?, ?>, ConcurrentLinkedQueue<? extends UnboundedReadEvaluator<?, ?>>> - sourceEvaluators = new ConcurrentHashMap<>(); + AppliedPTransform<?, ?, ?>, ConcurrentLinkedQueue<? extends UnboundedReadEvaluator<?, ?>>> + sourceEvaluators; + private final EvaluationContext evaluationContext; + + UnboundedReadEvaluatorFactory(EvaluationContext evaluationContext) { + this.evaluationContext = evaluationContext; + sourceEvaluators = new ConcurrentHashMap<>(); + } @SuppressWarnings({"unchecked", "rawtypes"}) @Override @Nullable - public <InputT> TransformEvaluator<InputT> forApplication(AppliedPTransform<?, ?, ?> application, - @Nullable CommittedBundle<?> inputBundle, EvaluationContext evaluationContext) { - return getTransformEvaluator((AppliedPTransform) application, evaluationContext); + public <InputT> TransformEvaluator<InputT> forApplication( + AppliedPTransform<?, ?, ?> application, @Nullable CommittedBundle<?> inputBundle) { + return getTransformEvaluator((AppliedPTransform) application); } /** - * Get a {@link TransformEvaluator} that produces elements for the provided application of - * {@link Unbounded Read.Unbounded}, initializing the queue of evaluators if required. + * Get a {@link TransformEvaluator} that produces elements for the provided application of {@link + * Unbounded Read.Unbounded}, initializing the queue of evaluators if required. * * <p>This method is thread-safe, and will only produce new evaluators if no other invocation has * already done so. */ private <OutputT, CheckpointMarkT extends CheckpointMark> TransformEvaluator<?> getTransformEvaluator( - final AppliedPTransform<?, PCollection<OutputT>, Unbounded<OutputT>> transform, - final EvaluationContext evaluationContext) { + final AppliedPTransform<?, PCollection<OutputT>, Unbounded<OutputT>> transform) { ConcurrentLinkedQueue<UnboundedReadEvaluator<OutputT, CheckpointMarkT>> evaluatorQueue = (ConcurrentLinkedQueue<UnboundedReadEvaluator<OutputT, CheckpointMarkT>>) sourceEvaluators.get(transform); @@ -119,8 +123,8 @@ class UnboundedReadEvaluatorFactory implements TransformEvaluatorFactory { * * <p>Calls to {@link UnboundedReadEvaluator} are not internally thread-safe, and should only be * used by a single thread at a time. Each {@link UnboundedReadEvaluator} maintains its own - * checkpoint, and constructs its reader from the current checkpoint in each call to - * {@link #finishBundle()}. + * checkpoint, and constructs its reader from the current checkpoint in each call to {@link + * #finishBundle()}. */ private static class UnboundedReadEvaluator<OutputT, CheckpointMarkT extends CheckpointMark> implements TransformEvaluator<Object> { @@ -135,13 +139,14 @@ class UnboundedReadEvaluatorFactory implements TransformEvaluatorFactory { * source as derived from {@link #transform} due to splitting. */ private final UnboundedSource<OutputT, CheckpointMarkT> source; + private final UnboundedReadDeduplicator deduplicator; private UnboundedReader<OutputT> currentReader; private CheckpointMarkT checkpointMark; /** - * The count of bundles output from this {@link UnboundedReadEvaluator}. Used to exercise - * {@link UnboundedReader#close()}. + * The count of bundles output from this {@link UnboundedReadEvaluator}. Used to exercise {@link + * UnboundedReader#close()}. */ private int outputBundles = 0; @@ -174,8 +179,9 @@ class UnboundedReadEvaluatorFactory implements TransformEvaluatorFactory { int numElements = 0; do { if (deduplicator.shouldOutput(currentReader.getCurrentRecordId())) { - output.add(WindowedValue.timestampedValueInGlobalWindow(currentReader.getCurrent(), - currentReader.getCurrentTimestamp())); + output.add( + WindowedValue.timestampedValueInGlobalWindow( + currentReader.getCurrent(), currentReader.getCurrentTimestamp())); } numElements++; } while (numElements < ARBITRARY_MAX_ELEMENTS && currentReader.advance()); @@ -224,7 +230,8 @@ class UnboundedReadEvaluatorFactory implements TransformEvaluatorFactory { // If the watermark is the max value, this source may not be invoked again. Finalize after // committing the output. if (!currentReader.getWatermark().isBefore(BoundedWindow.TIMESTAMP_MAX_VALUE)) { - evaluationContext.scheduleAfterOutputWouldBeProduced(transform.getOutput(), + evaluationContext.scheduleAfterOutputWouldBeProduced( + transform.getOutput(), GlobalWindow.INSTANCE, transform.getOutput().getWindowingStrategy(), new Runnable() { @@ -234,8 +241,7 @@ class UnboundedReadEvaluatorFactory implements TransformEvaluatorFactory { mark.finalizeCheckpoint(); } catch (IOException e) { throw new RuntimeException( - "Couldn't finalize checkpoint after the end of the Global Window", - e); + "Couldn't finalize checkpoint after the end of the Global Window", e); } } }); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c2970aa7/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java index 40ac7f0..a4e8d6f 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java @@ -47,14 +47,19 @@ import org.apache.beam.sdk.values.POutput; * written. */ class ViewEvaluatorFactory implements TransformEvaluatorFactory { + private final EvaluationContext context; + + ViewEvaluatorFactory(EvaluationContext context) { + this.context = context; + } + @Override public <T> TransformEvaluator<T> forApplication( AppliedPTransform<?, ?, ?> application, - DirectRunner.CommittedBundle<?> inputBundle, - EvaluationContext evaluationContext) { + DirectRunner.CommittedBundle<?> inputBundle) { @SuppressWarnings({"cast", "unchecked", "rawtypes"}) TransformEvaluator<T> evaluator = createEvaluator( - (AppliedPTransform) application, evaluationContext); + (AppliedPTransform) application); return evaluator; } @@ -63,8 +68,7 @@ class ViewEvaluatorFactory implements TransformEvaluatorFactory { private <InT, OuT> TransformEvaluator<Iterable<InT>> createEvaluator( final AppliedPTransform<PCollection<Iterable<InT>>, PCollectionView<OuT>, WriteView<InT, OuT>> - application, - EvaluationContext context) { + application) { PCollection<Iterable<InT>> input = application.getInput(); final PCollectionViewWriter<InT, OuT> writer = context.createPCollectionViewWriter(input, application.getOutput()); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c2970aa7/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java index 19c1a98..47848e6 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java @@ -38,21 +38,25 @@ import org.joda.time.Instant; * {@link Bound Window.Bound} primitive {@link PTransform}. */ class WindowEvaluatorFactory implements TransformEvaluatorFactory { + private final EvaluationContext evaluationContext; + + WindowEvaluatorFactory(EvaluationContext evaluationContext) { + this.evaluationContext = evaluationContext; + } @Override public <InputT> TransformEvaluator<InputT> forApplication( AppliedPTransform<?, ?, ?> application, - @Nullable CommittedBundle<?> inputBundle, - EvaluationContext evaluationContext) + @Nullable CommittedBundle<?> inputBundle + ) throws Exception { return createTransformEvaluator( - (AppliedPTransform) application, inputBundle, evaluationContext); + (AppliedPTransform) application, inputBundle); } private <InputT> TransformEvaluator<InputT> createTransformEvaluator( AppliedPTransform<PCollection<InputT>, PCollection<InputT>, Window.Bound<InputT>> transform, - CommittedBundle<?> inputBundle, - EvaluationContext evaluationContext) { + CommittedBundle<?> inputBundle) { WindowFn<? super InputT, ?> fn = transform.getTransform().getWindowFn(); UncommittedBundle<InputT> outputBundle = evaluationContext.createBundle(inputBundle, transform.getOutput()); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c2970aa7/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java index cbeb733..cdd1661 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java @@ -71,7 +71,7 @@ public class BoundedReadEvaluatorFactoryTest { TestPipeline p = TestPipeline.create(); longs = p.apply(Read.from(source)); - factory = new BoundedReadEvaluatorFactory(); + factory = new BoundedReadEvaluatorFactory(context); bundleFactory = ImmutableListBundleFactory.create(); } @@ -81,7 +81,7 @@ public class BoundedReadEvaluatorFactoryTest { when(context.createRootBundle(longs)).thenReturn(output); TransformEvaluator<?> evaluator = - factory.forApplication(longs.getProducingTransformInternal(), null, context); + factory.forApplication(longs.getProducingTransformInternal(), null); TransformResult result = evaluator.finishBundle(); assertThat(result.getWatermarkHold(), equalTo(BoundedWindow.TIMESTAMP_MAX_VALUE)); assertThat( @@ -101,7 +101,7 @@ public class BoundedReadEvaluatorFactoryTest { when(context.createRootBundle(longs)).thenReturn(output); TransformEvaluator<?> evaluator = - factory.forApplication(longs.getProducingTransformInternal(), null, context); + factory.forApplication(longs.getProducingTransformInternal(), null); TransformResult result = evaluator.finishBundle(); assertThat(result.getWatermarkHold(), equalTo(BoundedWindow.TIMESTAMP_MAX_VALUE)); Iterable<? extends WindowedValue<Long>> outputElements = @@ -114,7 +114,7 @@ public class BoundedReadEvaluatorFactoryTest { UncommittedBundle<Long> secondOutput = bundleFactory.createRootBundle(longs); when(context.createRootBundle(longs)).thenReturn(secondOutput); TransformEvaluator<?> secondEvaluator = - factory.forApplication(longs.getProducingTransformInternal(), null, context); + factory.forApplication(longs.getProducingTransformInternal(), null); assertThat(secondEvaluator, nullValue()); } @@ -130,9 +130,9 @@ public class BoundedReadEvaluatorFactoryTest { // create both evaluators before finishing either. TransformEvaluator<?> evaluator = - factory.forApplication(longs.getProducingTransformInternal(), null, context); + factory.forApplication(longs.getProducingTransformInternal(), null); TransformEvaluator<?> secondEvaluator = - factory.forApplication(longs.getProducingTransformInternal(), null, context); + factory.forApplication(longs.getProducingTransformInternal(), null); assertThat(secondEvaluator, nullValue()); TransformResult result = evaluator.finishBundle(); @@ -163,7 +163,7 @@ public class BoundedReadEvaluatorFactoryTest { UncommittedBundle<Long> output = bundleFactory.createRootBundle(pcollection); when(context.createRootBundle(pcollection)).thenReturn(output); - TransformEvaluator<?> evaluator = factory.forApplication(sourceTransform, null, context); + TransformEvaluator<?> evaluator = factory.forApplication(sourceTransform, null); evaluator.finishBundle(); CommittedBundle<Long> committed = output.commit(Instant.now()); assertThat(committed.getElements(), containsInAnyOrder(gw(2L), gw(3L), gw(1L))); @@ -181,7 +181,7 @@ public class BoundedReadEvaluatorFactoryTest { UncommittedBundle<Long> output = bundleFactory.createRootBundle(pcollection); when(context.createRootBundle(pcollection)).thenReturn(output); - TransformEvaluator<?> evaluator = factory.forApplication(sourceTransform, null, context); + TransformEvaluator<?> evaluator = factory.forApplication(sourceTransform, null); evaluator.finishBundle(); CommittedBundle<Long> committed = output.commit(Instant.now()); assertThat(committed.getElements(), emptyIterable()); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c2970aa7/runners/direct-java/src/test/java/org/apache/beam/runners/direct/FlattenEvaluatorFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/FlattenEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/FlattenEvaluatorFactoryTest.java index 1c46c24..3bae1ab 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/FlattenEvaluatorFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/FlattenEvaluatorFactoryTest.java @@ -67,14 +67,11 @@ public class FlattenEvaluatorFactoryTest { when(context.createBundle(leftBundle, flattened)).thenReturn(flattenedLeftBundle); when(context.createBundle(rightBundle, flattened)).thenReturn(flattenedRightBundle); - FlattenEvaluatorFactory factory = new FlattenEvaluatorFactory(); + FlattenEvaluatorFactory factory = new FlattenEvaluatorFactory(context); TransformEvaluator<Integer> leftSideEvaluator = - factory.forApplication(flattened.getProducingTransformInternal(), leftBundle, context); + factory.forApplication(flattened.getProducingTransformInternal(), leftBundle); TransformEvaluator<Integer> rightSideEvaluator = - factory.forApplication( - flattened.getProducingTransformInternal(), - rightBundle, - context); + factory.forApplication(flattened.getProducingTransformInternal(), rightBundle); leftSideEvaluator.processElement(WindowedValue.valueInGlobalWindow(1)); rightSideEvaluator.processElement(WindowedValue.valueInGlobalWindow(-1)); @@ -123,11 +120,11 @@ public class FlattenEvaluatorFactoryTest { PCollection<Integer> flattened = list.apply(Flatten.<Integer>pCollections()); - EvaluationContext context = mock(EvaluationContext.class); + EvaluationContext evaluationContext = mock(EvaluationContext.class); - FlattenEvaluatorFactory factory = new FlattenEvaluatorFactory(); + FlattenEvaluatorFactory factory = new FlattenEvaluatorFactory(evaluationContext); TransformEvaluator<Integer> emptyEvaluator = - factory.forApplication(flattened.getProducingTransformInternal(), null, context); + factory.forApplication(flattened.getProducingTransformInternal(), null); TransformResult leftSideResult = emptyEvaluator.finishBundle(); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c2970aa7/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactoryTest.java index 8d1f8bd..9395017 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactoryTest.java @@ -99,9 +99,8 @@ public class GroupByKeyEvaluatorFactoryTest { Coder<String> keyCoder = ((KvCoder<String, WindowedValue<Integer>>) kvs.getCoder()).getKeyCoder(); TransformEvaluator<KV<String, WindowedValue<Integer>>> evaluator = - new GroupByKeyOnlyEvaluatorFactory() - .forApplication( - groupedKvs.getProducingTransformInternal(), inputBundle, evaluationContext); + new GroupByKeyOnlyEvaluatorFactory(evaluationContext) + .forApplication(groupedKvs.getProducingTransformInternal(), inputBundle); evaluator.processElement(WindowedValue.valueInEmptyWindows(gwValue(firstFoo))); evaluator.processElement(WindowedValue.valueInEmptyWindows(gwValue(secondFoo))); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c2970aa7/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactoryTest.java index 9f1e916..814a89a 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactoryTest.java @@ -101,9 +101,9 @@ public class GroupByKeyOnlyEvaluatorFactoryTest { Coder<String> keyCoder = ((KvCoder<String, WindowedValue<Integer>>) kvs.getCoder()).getKeyCoder(); TransformEvaluator<KV<String, WindowedValue<Integer>>> evaluator = - new GroupByKeyOnlyEvaluatorFactory() + new GroupByKeyOnlyEvaluatorFactory(evaluationContext) .forApplication( - groupedKvs.getProducingTransformInternal(), inputBundle, evaluationContext); + groupedKvs.getProducingTransformInternal(), inputBundle); evaluator.processElement(WindowedValue.valueInEmptyWindows(gwValue(firstFoo))); evaluator.processElement(WindowedValue.valueInEmptyWindows(gwValue(secondFoo))); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c2970aa7/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactoryTest.java index 5552196..94b7f5d 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactoryTest.java @@ -119,9 +119,9 @@ public class ParDoMultiEvaluatorFactoryTest implements Serializable { when(evaluationContext.getAggregatorMutator()).thenReturn(mutator); TransformEvaluator<String> evaluator = - new ParDoMultiEvaluatorFactory() + new ParDoMultiEvaluatorFactory(evaluationContext) .forApplication( - mainOutput.getProducingTransformInternal(), inputBundle, evaluationContext); + mainOutput.getProducingTransformInternal(), inputBundle); evaluator.processElement(WindowedValue.valueInGlobalWindow("foo")); evaluator.processElement( @@ -206,9 +206,9 @@ public class ParDoMultiEvaluatorFactoryTest implements Serializable { when(evaluationContext.getAggregatorMutator()).thenReturn(mutator); TransformEvaluator<String> evaluator = - new ParDoMultiEvaluatorFactory() + new ParDoMultiEvaluatorFactory(evaluationContext) .forApplication( - mainOutput.getProducingTransformInternal(), inputBundle, evaluationContext); + mainOutput.getProducingTransformInternal(), inputBundle); evaluator.processElement(WindowedValue.valueInGlobalWindow("foo")); evaluator.processElement( @@ -300,9 +300,9 @@ public class ParDoMultiEvaluatorFactoryTest implements Serializable { when(evaluationContext.getAggregatorMutator()).thenReturn(mutator); TransformEvaluator<String> evaluator = - new ParDoMultiEvaluatorFactory() + new ParDoMultiEvaluatorFactory(evaluationContext) .forApplication( - mainOutput.getProducingTransformInternal(), inputBundle, evaluationContext); + mainOutput.getProducingTransformInternal(), inputBundle); evaluator.processElement(WindowedValue.valueInGlobalWindow("foo")); evaluator.processElement( @@ -413,9 +413,9 @@ public class ParDoMultiEvaluatorFactoryTest implements Serializable { when(evaluationContext.getAggregatorMutator()).thenReturn(mutator); TransformEvaluator<String> evaluator = - new ParDoMultiEvaluatorFactory() + new ParDoMultiEvaluatorFactory(evaluationContext) .forApplication( - mainOutput.getProducingTransformInternal(), inputBundle, evaluationContext); + mainOutput.getProducingTransformInternal(), inputBundle); evaluator.processElement(WindowedValue.valueInGlobalWindow("foo")); evaluator.processElement( http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c2970aa7/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactoryTest.java index 60b6dd9..7207b99 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactoryTest.java @@ -95,9 +95,9 @@ public class ParDoSingleEvaluatorFactoryTest implements Serializable { when(evaluationContext.getAggregatorMutator()).thenReturn(mutator); org.apache.beam.runners.direct.TransformEvaluator<String> evaluator = - new ParDoSingleEvaluatorFactory() + new ParDoSingleEvaluatorFactory(evaluationContext) .forApplication( - collection.getProducingTransformInternal(), inputBundle, evaluationContext); + collection.getProducingTransformInternal(), inputBundle); evaluator.processElement(WindowedValue.valueInGlobalWindow("foo")); evaluator.processElement( @@ -149,9 +149,9 @@ public class ParDoSingleEvaluatorFactoryTest implements Serializable { when(evaluationContext.getAggregatorMutator()).thenReturn(mutator); TransformEvaluator<String> evaluator = - new ParDoSingleEvaluatorFactory() + new ParDoSingleEvaluatorFactory(evaluationContext) .forApplication( - collection.getProducingTransformInternal(), inputBundle, evaluationContext); + collection.getProducingTransformInternal(), inputBundle); evaluator.processElement(WindowedValue.valueInGlobalWindow("foo")); evaluator.processElement( @@ -217,9 +217,9 @@ public class ParDoSingleEvaluatorFactoryTest implements Serializable { when(evaluationContext.getAggregatorMutator()).thenReturn(mutator); org.apache.beam.runners.direct.TransformEvaluator<String> evaluator = - new ParDoSingleEvaluatorFactory() + new ParDoSingleEvaluatorFactory(evaluationContext) .forApplication( - mainOutput.getProducingTransformInternal(), inputBundle, evaluationContext); + mainOutput.getProducingTransformInternal(), inputBundle); evaluator.processElement(WindowedValue.valueInGlobalWindow("foo")); evaluator.processElement( @@ -320,9 +320,9 @@ public class ParDoSingleEvaluatorFactoryTest implements Serializable { when(evaluationContext.getAggregatorMutator()).thenReturn(mutator); TransformEvaluator<String> evaluator = - new ParDoSingleEvaluatorFactory() + new ParDoSingleEvaluatorFactory(evaluationContext) .forApplication( - mainOutput.getProducingTransformInternal(), inputBundle, evaluationContext); + mainOutput.getProducingTransformInternal(), inputBundle); evaluator.processElement(WindowedValue.valueInGlobalWindow("foo")); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c2970aa7/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactoryTest.java index 7703881..7413b25 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactoryTest.java @@ -35,6 +35,7 @@ import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.PCollection; import org.hamcrest.Matchers; import org.joda.time.Instant; +import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -42,8 +43,16 @@ import org.junit.runners.JUnit4; /** Tests for {@link TestStreamEvaluatorFactory}. */ @RunWith(JUnit4.class) public class TestStreamEvaluatorFactoryTest { - private TestStreamEvaluatorFactory factory = new TestStreamEvaluatorFactory(); - private BundleFactory bundleFactory = ImmutableListBundleFactory.create(); + private TestStreamEvaluatorFactory factory; + private BundleFactory bundleFactory; + private EvaluationContext context; + + @Before + public void setup() { + context = mock(EvaluationContext.class); + factory = new TestStreamEvaluatorFactory(context); + bundleFactory = ImmutableListBundleFactory.create(); + } /** Demonstrates that returned evaluators produce elements in sequence. */ @Test @@ -56,21 +65,20 @@ public class TestStreamEvaluatorFactoryTest { .addElements(4, 5, 6) .advanceWatermarkToInfinity()); - EvaluationContext context = mock(EvaluationContext.class); when(context.createRootBundle(streamVals)) .thenReturn( bundleFactory.createRootBundle(streamVals), bundleFactory.createRootBundle(streamVals)); TransformEvaluator<Object> firstEvaluator = - factory.forApplication(streamVals.getProducingTransformInternal(), null, context); + factory.forApplication(streamVals.getProducingTransformInternal(), null); TransformResult firstResult = firstEvaluator.finishBundle(); TransformEvaluator<Object> secondEvaluator = - factory.forApplication(streamVals.getProducingTransformInternal(), null, context); + factory.forApplication(streamVals.getProducingTransformInternal(), null); TransformResult secondResult = secondEvaluator.finishBundle(); TransformEvaluator<Object> thirdEvaluator = - factory.forApplication(streamVals.getProducingTransformInternal(), null, context); + factory.forApplication(streamVals.getProducingTransformInternal(), null); TransformResult thirdResult = thirdEvaluator.finishBundle(); assertThat( @@ -105,13 +113,12 @@ public class TestStreamEvaluatorFactoryTest { p.apply( TestStream.create(VarIntCoder.of()).addElements(4, 5, 6).advanceWatermarkToInfinity()); - EvaluationContext context = mock(EvaluationContext.class); TransformEvaluator<Object> firstEvaluator = - factory.forApplication(streamVals.getProducingTransformInternal(), null, context); + factory.forApplication(streamVals.getProducingTransformInternal(), null); // create a second evaluator before the first is finished. The evaluator should not be available TransformEvaluator<Object> secondEvaluator = - factory.forApplication(streamVals.getProducingTransformInternal(), null, context); + factory.forApplication(streamVals.getProducingTransformInternal(), null); assertThat(secondEvaluator, is(nullValue())); } @@ -127,16 +134,15 @@ public class TestStreamEvaluatorFactoryTest { PCollection<Integer> firstVals = p.apply("Stream One", stream); PCollection<Integer> secondVals = p.apply("Stream A", stream); - EvaluationContext context = mock(EvaluationContext.class); when(context.createRootBundle(firstVals)).thenReturn(bundleFactory.createRootBundle(firstVals)); when(context.createRootBundle(secondVals)) .thenReturn(bundleFactory.createRootBundle(secondVals)); TransformEvaluator<Object> firstEvaluator = - factory.forApplication(firstVals.getProducingTransformInternal(), null, context); + factory.forApplication(firstVals.getProducingTransformInternal(), null); // The two evaluators can exist independently TransformEvaluator<Object> secondEvaluator = - factory.forApplication(secondVals.getProducingTransformInternal(), null, context); + factory.forApplication(secondVals.getProducingTransformInternal(), null); TransformResult firstResult = firstEvaluator.finishBundle(); TransformResult secondResult = secondEvaluator.finishBundle(); @@ -175,16 +181,15 @@ public class TestStreamEvaluatorFactoryTest { .addElements("Two") .advanceWatermarkToInfinity()); - EvaluationContext context = mock(EvaluationContext.class); when(context.createRootBundle(firstVals)).thenReturn(bundleFactory.createRootBundle(firstVals)); when(context.createRootBundle(secondVals)) .thenReturn(bundleFactory.createRootBundle(secondVals)); TransformEvaluator<Object> firstEvaluator = - factory.forApplication(firstVals.getProducingTransformInternal(), null, context); + factory.forApplication(firstVals.getProducingTransformInternal(), null); // The two evaluators can exist independently TransformEvaluator<Object> secondEvaluator = - factory.forApplication(secondVals.getProducingTransformInternal(), null, context); + factory.forApplication(secondVals.getProducingTransformInternal(), null); TransformResult firstResult = firstEvaluator.finishBundle(); TransformResult secondResult = secondEvaluator.finishBundle(); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c2970aa7/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java index 5af568f..344fd4b 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java @@ -60,10 +60,7 @@ import org.junit.runners.JUnit4; import org.mockito.Mock; import org.mockito.MockitoAnnotations; - -/** - * Tests for {@link TransformExecutor}. - */ +/** Tests for {@link TransformExecutor}. */ @RunWith(JUnit4.class) public class TransformExecutorTest { @Rule public ExpectedException thrown = ExpectedException.none(); @@ -114,14 +111,13 @@ public class TransformExecutorTest { } }; - when(registry.forApplication(created.getProducingTransformInternal(), null, evaluationContext)) + when(registry.forApplication(created.getProducingTransformInternal(), null)) .thenReturn(evaluator); TransformExecutor<Object> executor = TransformExecutor.create( registry, Collections.<ModelEnforcementFactory>emptyList(), - evaluationContext, null, created.getProducingTransformInternal(), completionCallback, @@ -135,17 +131,16 @@ public class TransformExecutorTest { @Test public void nullTransformEvaluatorTerminates() throws Exception { - when(registry.forApplication(created.getProducingTransformInternal(), - null, - evaluationContext)).thenReturn(null); - - TransformExecutor<Object> executor = TransformExecutor.create(registry, - Collections.<ModelEnforcementFactory>emptyList(), - evaluationContext, - null, - created.getProducingTransformInternal(), - completionCallback, - transformEvaluationState); + when(registry.forApplication(created.getProducingTransformInternal(), null)).thenReturn(null); + + TransformExecutor<Object> executor = + TransformExecutor.create( + registry, + Collections.<ModelEnforcementFactory>emptyList(), + null, + created.getProducingTransformInternal(), + completionCallback, + transformEvaluationState); executor.run(); assertThat(completionCallback.handledResult, is(nullValue())); @@ -177,16 +172,13 @@ public class TransformExecutorTest { WindowedValue<String> third = WindowedValue.valueInGlobalWindow("third"); CommittedBundle<String> inputBundle = bundleFactory.createRootBundle(created).add(foo).add(spam).add(third).commit(Instant.now()); - when( - registry.<String>forApplication( - downstream.getProducingTransformInternal(), inputBundle, evaluationContext)) + when(registry.<String>forApplication(downstream.getProducingTransformInternal(), inputBundle)) .thenReturn(evaluator); TransformExecutor<String> executor = TransformExecutor.create( registry, Collections.<ModelEnforcementFactory>emptyList(), - evaluationContext, inputBundle, downstream.getProducingTransformInternal(), completionCallback, @@ -222,16 +214,13 @@ public class TransformExecutorTest { WindowedValue<String> foo = WindowedValue.valueInGlobalWindow("foo"); CommittedBundle<String> inputBundle = bundleFactory.createRootBundle(created).add(foo).commit(Instant.now()); - when( - registry.<String>forApplication( - downstream.getProducingTransformInternal(), inputBundle, evaluationContext)) + when(registry.<String>forApplication(downstream.getProducingTransformInternal(), inputBundle)) .thenReturn(evaluator); TransformExecutor<String> executor = TransformExecutor.create( registry, Collections.<ModelEnforcementFactory>emptyList(), - evaluationContext, inputBundle, downstream.getProducingTransformInternal(), completionCallback, @@ -260,16 +249,13 @@ public class TransformExecutorTest { CommittedBundle<String> inputBundle = bundleFactory.createRootBundle(created).commit(Instant.now()); - when( - registry.<String>forApplication( - downstream.getProducingTransformInternal(), inputBundle, evaluationContext)) + when(registry.<String>forApplication(downstream.getProducingTransformInternal(), inputBundle)) .thenReturn(evaluator); TransformExecutor<String> executor = TransformExecutor.create( registry, Collections.<ModelEnforcementFactory>emptyList(), - evaluationContext, inputBundle, downstream.getProducingTransformInternal(), completionCallback, @@ -303,14 +289,13 @@ public class TransformExecutorTest { } }; - when(registry.forApplication(created.getProducingTransformInternal(), null, evaluationContext)) + when(registry.forApplication(created.getProducingTransformInternal(), null)) .thenReturn(evaluator); TransformExecutor<String> executor = TransformExecutor.create( registry, Collections.<ModelEnforcementFactory>emptyList(), - evaluationContext, null, created.getProducingTransformInternal(), completionCallback, @@ -332,8 +317,7 @@ public class TransformExecutorTest { TransformEvaluator<Object> evaluator = new TransformEvaluator<Object>() { @Override - public void processElement(WindowedValue<Object> element) throws Exception { - } + public void processElement(WindowedValue<Object> element) throws Exception {} @Override public TransformResult finishBundle() throws Exception { @@ -345,9 +329,7 @@ public class TransformExecutorTest { WindowedValue<String> barElem = WindowedValue.valueInGlobalWindow("bar"); CommittedBundle<String> inputBundle = bundleFactory.createRootBundle(created).add(fooElem).add(barElem).commit(Instant.now()); - when( - registry.forApplication( - downstream.getProducingTransformInternal(), inputBundle, evaluationContext)) + when(registry.forApplication(downstream.getProducingTransformInternal(), inputBundle)) .thenReturn(evaluator); TestEnforcementFactory enforcement = new TestEnforcementFactory(); @@ -355,7 +337,6 @@ public class TransformExecutorTest { TransformExecutor.create( registry, Collections.<ModelEnforcementFactory>singleton(enforcement), - evaluationContext, inputBundle, downstream.getProducingTransformInternal(), completionCallback, @@ -406,16 +387,13 @@ public class TransformExecutorTest { WindowedValue<byte[]> fooBytes = WindowedValue.valueInGlobalWindow("foo".getBytes()); CommittedBundle<byte[]> inputBundle = bundleFactory.createRootBundle(pcBytes).add(fooBytes).commit(Instant.now()); - when( - registry.forApplication( - pcBytes.getProducingTransformInternal(), inputBundle, evaluationContext)) + when(registry.forApplication(pcBytes.getProducingTransformInternal(), inputBundle)) .thenReturn(evaluator); TransformExecutor<byte[]> executor = TransformExecutor.create( registry, Collections.<ModelEnforcementFactory>singleton(ImmutabilityEnforcementFactory.create()), - evaluationContext, inputBundle, pcBytes.getProducingTransformInternal(), completionCallback, @@ -465,16 +443,13 @@ public class TransformExecutorTest { WindowedValue<byte[]> fooBytes = WindowedValue.valueInGlobalWindow("foo".getBytes()); CommittedBundle<byte[]> inputBundle = bundleFactory.createRootBundle(pcBytes).add(fooBytes).commit(Instant.now()); - when( - registry.forApplication( - pcBytes.getProducingTransformInternal(), inputBundle, evaluationContext)) + when(registry.forApplication(pcBytes.getProducingTransformInternal(), inputBundle)) .thenReturn(evaluator); TransformExecutor<byte[]> executor = TransformExecutor.create( registry, Collections.<ModelEnforcementFactory>singleton(ImmutabilityEnforcementFactory.create()), - evaluationContext, inputBundle, pcBytes.getProducingTransformInternal(), completionCallback, @@ -500,18 +475,19 @@ public class TransformExecutorTest { } @Override - public CommittedResult handleResult( - CommittedBundle<?> inputBundle, TransformResult result) { + public CommittedResult handleResult(CommittedBundle<?> inputBundle, TransformResult result) { handledResult = result; onMethod.countDown(); - @SuppressWarnings("rawtypes") Iterable unprocessedElements = + @SuppressWarnings("rawtypes") + Iterable unprocessedElements = result.getUnprocessedElements() == null ? Collections.emptyList() : result.getUnprocessedElements(); CommittedBundle<?> unprocessedBundle = inputBundle == null ? null : inputBundle.withElements(unprocessedElements); - return CommittedResult.create(result, + return CommittedResult.create( + result, unprocessedBundle, Collections.<CommittedBundle<?>>emptyList(), EnumSet.noneOf(OutputType.class)); @@ -532,6 +508,7 @@ public class TransformExecutorTest { private static class TestEnforcementFactory implements ModelEnforcementFactory { private TestEnforcement<?> instance; + @Override public <T> TestEnforcement<T> forBundle( CommittedBundle<T> input, AppliedPTransform<?, ?, ?> consumer) { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c2970aa7/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java index 3a6add6..94c9dd5 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java @@ -81,8 +81,8 @@ public class UnboundedReadEvaluatorFactoryTest { TestPipeline p = TestPipeline.create(); longs = p.apply(Read.from(source)); - factory = new UnboundedReadEvaluatorFactory(); context = mock(EvaluationContext.class); + factory = new UnboundedReadEvaluatorFactory(context); output = bundleFactory.createRootBundle(longs); when(context.createRootBundle(longs)).thenReturn(output); } @@ -90,7 +90,7 @@ public class UnboundedReadEvaluatorFactoryTest { @Test public void unboundedSourceInMemoryTransformEvaluatorProducesElements() throws Exception { TransformEvaluator<?> evaluator = - factory.forApplication(longs.getProducingTransformInternal(), null, context); + factory.forApplication(longs.getProducingTransformInternal(), null); TransformResult result = evaluator.finishBundle(); assertThat( @@ -109,7 +109,7 @@ public class UnboundedReadEvaluatorFactoryTest { @Test public void unboundedSourceInMemoryTransformEvaluatorMultipleSequentialCalls() throws Exception { TransformEvaluator<?> evaluator = - factory.forApplication(longs.getProducingTransformInternal(), null, context); + factory.forApplication(longs.getProducingTransformInternal(), null); TransformResult result = evaluator.finishBundle(); assertThat( @@ -123,7 +123,7 @@ public class UnboundedReadEvaluatorFactoryTest { UncommittedBundle<Long> secondOutput = bundleFactory.createRootBundle(longs); when(context.createRootBundle(longs)).thenReturn(secondOutput); TransformEvaluator<?> secondEvaluator = - factory.forApplication(longs.getProducingTransformInternal(), null, context); + factory.forApplication(longs.getProducingTransformInternal(), null); TransformResult secondResult = secondEvaluator.finishBundle(); assertThat( secondResult.getWatermarkHold(), @@ -150,7 +150,7 @@ public class UnboundedReadEvaluatorFactoryTest { UncommittedBundle<Long> output = bundleFactory.createRootBundle(pcollection); when(context.createRootBundle(pcollection)).thenReturn(output); - TransformEvaluator<?> evaluator = factory.forApplication(sourceTransform, null, context); + TransformEvaluator<?> evaluator = factory.forApplication(sourceTransform, null); evaluator.finishBundle(); assertThat( @@ -159,7 +159,7 @@ public class UnboundedReadEvaluatorFactoryTest { UncommittedBundle<Long> secondOutput = bundleFactory.createRootBundle(longs); when(context.createRootBundle(longs)).thenReturn(secondOutput); - TransformEvaluator<?> secondEvaluator = factory.forApplication(sourceTransform, null, context); + TransformEvaluator<?> secondEvaluator = factory.forApplication(sourceTransform, null); secondEvaluator.finishBundle(); assertThat( secondOutput.commit(Instant.now()).getElements(), @@ -182,7 +182,7 @@ public class UnboundedReadEvaluatorFactoryTest { when(context.createRootBundle(pcollection)).thenReturn(output); for (int i = 0; i < UnboundedReadEvaluatorFactory.MAX_READER_REUSE_COUNT + 1; i++) { - TransformEvaluator<?> evaluator = factory.forApplication(sourceTransform, null, context); + TransformEvaluator<?> evaluator = factory.forApplication(sourceTransform, null); evaluator.finishBundle(); } assertThat(TestUnboundedSource.readerClosedCount, equalTo(1)); @@ -200,14 +200,14 @@ public class UnboundedReadEvaluatorFactoryTest { UncommittedBundle<Long> output = bundleFactory.createRootBundle(pcollection); when(context.createRootBundle(pcollection)).thenReturn(output); - TransformEvaluator<?> evaluator = factory.forApplication(sourceTransform, null, context); + TransformEvaluator<?> evaluator = factory.forApplication(sourceTransform, null); evaluator.finishBundle(); CommittedBundle<Long> committed = output.commit(Instant.now()); assertThat(ImmutableList.copyOf(committed.getElements()), hasSize(3)); assertThat(TestUnboundedSource.readerClosedCount, equalTo(0)); assertThat(TestUnboundedSource.readerAdvancedCount, equalTo(4)); - evaluator = factory.forApplication(sourceTransform, null, context); + evaluator = factory.forApplication(sourceTransform, null); evaluator.finishBundle(); assertThat(TestUnboundedSource.readerClosedCount, equalTo(0)); // Tried to advance again, even with no elements @@ -226,7 +226,7 @@ public class UnboundedReadEvaluatorFactoryTest { when(context.createRootBundle(pcollection)).thenReturn(output); for (int i = 0; i < 2 * UnboundedReadEvaluatorFactory.MAX_READER_REUSE_COUNT; i++) { - TransformEvaluator<?> evaluator = factory.forApplication(sourceTransform, null, context); + TransformEvaluator<?> evaluator = factory.forApplication(sourceTransform, null); evaluator.finishBundle(); } assertThat(TestUnboundedSource.readerClosedCount, equalTo(0)); @@ -243,10 +243,10 @@ public class UnboundedReadEvaluatorFactoryTest { @Test public void unboundedSourceWithMultipleSimultaneousEvaluatorsIndependent() throws Exception { TransformEvaluator<?> evaluator = - factory.forApplication(longs.getProducingTransformInternal(), null, context); + factory.forApplication(longs.getProducingTransformInternal(), null); TransformEvaluator<?> secondEvaluator = - factory.forApplication(longs.getProducingTransformInternal(), null, context); + factory.forApplication(longs.getProducingTransformInternal(), null); assertThat(secondEvaluator, nullValue()); TransformResult result = evaluator.finishBundle(); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c2970aa7/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java index d3ab81d..ae904e4 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java @@ -74,8 +74,8 @@ public class ViewEvaluatorFactoryTest { CommittedBundle<String> inputBundle = bundleFactory.createRootBundle(input).commit(Instant.now()); TransformEvaluator<Iterable<String>> evaluator = - new ViewEvaluatorFactory() - .forApplication(view.getProducingTransformInternal(), inputBundle, context); + new ViewEvaluatorFactory(context) + .forApplication(view.getProducingTransformInternal(), inputBundle); evaluator.processElement( WindowedValue.<Iterable<String>>valueInGlobalWindow(ImmutableList.of("foo", "bar"))); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c2970aa7/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java index 63800cf..29330df 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java @@ -103,7 +103,7 @@ public class WindowEvaluatorFactoryTest { input = p.apply(Create.of(1L, 2L, 3L)); bundleFactory = ImmutableListBundleFactory.create(); - factory = new WindowEvaluatorFactory(); + factory = new WindowEvaluatorFactory(evaluationContext); } @Test @@ -308,9 +308,7 @@ public class WindowEvaluatorFactoryTest { throws Exception { TransformEvaluator<Long> evaluator = factory.forApplication( - AppliedPTransform.of("Window", input, windowed, windowTransform), - inputBundle, - evaluationContext); + AppliedPTransform.of("Window", input, windowed, windowTransform), inputBundle); evaluator.processElement(valueInGlobalWindow); evaluator.processElement(valueInGlobalAndTwoIntervalWindows);