Move ExecutionContext and related classes to runners-core
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/9d2b8e09 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/9d2b8e09 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/9d2b8e09 Branch: refs/heads/python-sdk Commit: 9d2b8e09bcb5e04017b487e1a919d335875dbfc0 Parents: 64336e4 Author: Kenneth Knowles <k...@google.com> Authored: Thu Dec 15 20:20:34 2016 -0800 Committer: Kenneth Knowles <k...@google.com> Committed: Wed Dec 21 10:10:00 2016 -0800 ---------------------------------------------------------------------- .../operators/ApexParDoOperator.java | 2 +- .../apex/translation/utils/NoOpStepContext.java | 3 +- .../beam/runners/core/AggregatorFactory.java | 1 - .../beam/runners/core/BaseExecutionContext.java | 176 +++++++++++++++++++ .../apache/beam/runners/core/DoFnRunners.java | 2 +- .../beam/runners/core/ExecutionContext.java | 102 +++++++++++ .../beam/runners/core/SimpleDoFnRunner.java | 2 +- .../beam/runners/core/SimpleOldDoFnRunner.java | 2 +- .../beam/runners/core/SimpleDoFnRunnerTest.java | 2 +- .../runners/core/SimpleOldDoFnRunnerTest.java | 3 +- .../runners/direct/AggregatorContainer.java | 2 +- .../runners/direct/DirectExecutionContext.java | 6 +- .../beam/runners/direct/EvaluationContext.java | 2 +- .../runners/direct/AggregatorContainerTest.java | 2 +- .../wrappers/streaming/DoFnOperator.java | 2 +- .../wrappers/streaming/WindowDoFnOperator.java | 2 +- .../spark/aggregators/SparkAggregators.java | 2 +- .../spark/translation/SparkProcessContext.java | 2 +- .../beam/sdk/util/BaseExecutionContext.java | 174 ------------------ .../apache/beam/sdk/util/ExecutionContext.java | 100 ----------- 20 files changed, 295 insertions(+), 294 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9d2b8e09/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java index a3d3a97..c41cd45 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java @@ -41,6 +41,7 @@ import org.apache.beam.runners.core.AggregatorFactory; import org.apache.beam.runners.core.DoFnRunner; import org.apache.beam.runners.core.DoFnRunners; import org.apache.beam.runners.core.DoFnRunners.OutputManager; +import org.apache.beam.runners.core.ExecutionContext; import org.apache.beam.runners.core.PushbackSideInputDoFnRunner; import org.apache.beam.runners.core.SideInputHandler; import org.apache.beam.sdk.coders.Coder; @@ -50,7 +51,6 @@ import org.apache.beam.sdk.transforms.Combine.CombineFn; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.DoFnAdapters; import org.apache.beam.sdk.transforms.OldDoFn; -import org.apache.beam.sdk.util.ExecutionContext; import org.apache.beam.sdk.util.NullSideInputReader; import org.apache.beam.sdk.util.SideInputReader; import org.apache.beam.sdk.util.UserCodeException; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9d2b8e09/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/NoOpStepContext.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/NoOpStepContext.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/NoOpStepContext.java index 078f95f..f169ae6 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/NoOpStepContext.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/NoOpStepContext.java @@ -19,10 +19,9 @@ package org.apache.beam.runners.apex.translation.utils; import java.io.IOException; import java.io.Serializable; - +import org.apache.beam.runners.core.ExecutionContext; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.util.ExecutionContext; import org.apache.beam.sdk.util.TimerInternals; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.state.StateInternals; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9d2b8e09/runners/core-java/src/main/java/org/apache/beam/runners/core/AggregatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/AggregatorFactory.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/AggregatorFactory.java index 153d30d..24a605f 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/AggregatorFactory.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/AggregatorFactory.java @@ -20,7 +20,6 @@ package org.apache.beam.runners.core; import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.Combine.CombineFn; import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.util.ExecutionContext; /** * A factory for creating aggregators. http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9d2b8e09/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseExecutionContext.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseExecutionContext.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseExecutionContext.java new file mode 100644 index 0000000..7b674dc --- /dev/null +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseExecutionContext.java @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core; + +import java.io.IOException; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.util.TimerInternals; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.util.state.StateInternals; +import org.apache.beam.sdk.values.TupleTag; + +/** + * Base class for implementations of {@link ExecutionContext}. + * + * <p>A concrete subclass should implement {@link #createStepContext} to create the appropriate + * {@link StepContext} implementation. Any {@code StepContext} created will + * be cached for the lifetime of this {@link ExecutionContext}. + * + * <p>BaseExecutionContext is generic to allow implementing subclasses to return a concrete subclass + * of {@link StepContext} from {@link #getOrCreateStepContext(String, String)} and + * {@link #getAllStepContexts()} without forcing each subclass to override the method, e.g. + * <pre>{@code + * {@literal @}Override + * StreamingModeExecutionContext.StepContext getOrCreateStepContext(...) { + * return (StreamingModeExecutionContext.StepContext) super.getOrCreateStepContext(...); + * } + * }</pre> + * + * <p>When a subclass of {@code BaseExecutionContext} has been downcast, the return types of + * {@link #createStepContext(String, String)}, + * {@link #getOrCreateStepContext(String, String)}, and {@link #getAllStepContexts()} + * will be appropriately specialized. + */ +public abstract class BaseExecutionContext<T extends ExecutionContext.StepContext> + implements ExecutionContext { + + private Map<String, T> cachedStepContexts = new HashMap<>(); + + /** + * Implementations should override this to create the specific type + * of {@link StepContext} they need. + */ + protected abstract T createStepContext(String stepName, String transformName); + + /** + * Returns the {@link StepContext} associated with the given step. + */ + @Override + public T getOrCreateStepContext(String stepName, String transformName) { + final String finalStepName = stepName; + final String finalTransformName = transformName; + return getOrCreateStepContext( + stepName, + new CreateStepContextFunction<T>() { + @Override + public T create() { + return createStepContext(finalStepName, finalTransformName); + } + }); + } + + /** + * Factory method interface to create an execution context if none exists during + * {@link #getOrCreateStepContext(String, CreateStepContextFunction)}. + */ + protected interface CreateStepContextFunction<T extends ExecutionContext.StepContext> { + T create(); + } + + protected final T getOrCreateStepContext(String stepName, + CreateStepContextFunction<T> createContextFunc) { + T context = cachedStepContexts.get(stepName); + if (context == null) { + context = createContextFunc.create(); + cachedStepContexts.put(stepName, context); + } + + return context; + } + + /** + * Returns a collection view of all of the {@link StepContext}s. + */ + @Override + public Collection<? extends T> getAllStepContexts() { + return Collections.unmodifiableCollection(cachedStepContexts.values()); + } + + /** + * Hook for subclasses to implement that will be called whenever + * {@code DoFn.Context#output} + * is called. + */ + @Override + public void noteOutput(WindowedValue<?> output) {} + + /** + * Hook for subclasses to implement that will be called whenever + * {@code DoFn.Context#sideOutput} + * is called. + */ + @Override + public void noteSideOutput(TupleTag<?> tag, WindowedValue<?> output) {} + + /** + * Base class for implementations of {@link ExecutionContext.StepContext}. + * + * <p>To complete a concrete subclass, implement {@link #timerInternals} and + * {@link #stateInternals}. + */ + public abstract static class StepContext implements ExecutionContext.StepContext { + private final ExecutionContext executionContext; + private final String stepName; + private final String transformName; + + public StepContext(ExecutionContext executionContext, String stepName, String transformName) { + this.executionContext = executionContext; + this.stepName = stepName; + this.transformName = transformName; + } + + @Override + public String getStepName() { + return stepName; + } + + @Override + public String getTransformName() { + return transformName; + } + + @Override + public void noteOutput(WindowedValue<?> output) { + executionContext.noteOutput(output); + } + + @Override + public void noteSideOutput(TupleTag<?> tag, WindowedValue<?> output) { + executionContext.noteSideOutput(tag, output); + } + + @Override + public <T, W extends BoundedWindow> void writePCollectionViewData( + TupleTag<?> tag, + Iterable<WindowedValue<T>> data, Coder<Iterable<WindowedValue<T>>> dataCoder, + W window, Coder<W> windowCoder) throws IOException { + throw new UnsupportedOperationException("Not implemented."); + } + + @Override + public abstract StateInternals<?> stateInternals(); + + @Override + public abstract TimerInternals timerInternals(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9d2b8e09/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java index 0e4bf75..820bfcd 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java @@ -19,13 +19,13 @@ package org.apache.beam.runners.core; import java.util.List; import org.apache.beam.runners.core.DoFnRunner.ReduceFnExecutor; +import org.apache.beam.runners.core.ExecutionContext.StepContext; import org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly.GroupAlsoByWindow; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.util.ExecutionContext.StepContext; import org.apache.beam.sdk.util.SideInputReader; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9d2b8e09/runners/core-java/src/main/java/org/apache/beam/runners/core/ExecutionContext.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ExecutionContext.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ExecutionContext.java new file mode 100644 index 0000000..f67aff4 --- /dev/null +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ExecutionContext.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core; + +import java.io.IOException; +import java.util.Collection; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.util.TimerInternals; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.util.state.StateInternals; +import org.apache.beam.sdk.values.TupleTag; + +/** + * Context for the current execution. This is guaranteed to exist during processing, + * but does not necessarily persist between different batches of work. + */ +public interface ExecutionContext { + /** + * Returns the {@link StepContext} associated with the given step. + */ + StepContext getOrCreateStepContext(String stepName, String transformName); + + /** + * Returns a collection view of all of the {@link StepContext}s. + */ + Collection<? extends StepContext> getAllStepContexts(); + + /** + * Hook for subclasses to implement that will be called whenever + * {@link org.apache.beam.sdk.transforms.DoFn.Context#output} + * is called. + */ + void noteOutput(WindowedValue<?> output); + + /** + * Hook for subclasses to implement that will be called whenever + * {@link org.apache.beam.sdk.transforms.DoFn.Context#sideOutput} + * is called. + */ + void noteSideOutput(TupleTag<?> tag, WindowedValue<?> output); + + /** + * Per-step, per-key context used for retrieving state. + */ + public interface StepContext { + + /** + * The name of the step. + */ + String getStepName(); + + /** + * The name of the transform for the step. + */ + String getTransformName(); + + /** + * Hook for subclasses to implement that will be called whenever + * {@link org.apache.beam.sdk.transforms.DoFn.Context#output} + * is called. + */ + void noteOutput(WindowedValue<?> output); + + /** + * Hook for subclasses to implement that will be called whenever + * {@link org.apache.beam.sdk.transforms.DoFn.Context#sideOutput} + * is called. + */ + void noteSideOutput(TupleTag<?> tag, WindowedValue<?> output); + + /** + * Writes the given {@code PCollectionView} data to a globally accessible location. + */ + <T, W extends BoundedWindow> void writePCollectionViewData( + TupleTag<?> tag, + Iterable<WindowedValue<T>> data, + Coder<Iterable<WindowedValue<T>>> dataCoder, + W window, + Coder<W> windowCoder) + throws IOException; + + StateInternals<?> stateInternals(); + + TimerInternals timerInternals(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9d2b8e09/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java index a7d82bf..b42c57d 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java @@ -27,6 +27,7 @@ import java.util.List; import java.util.Set; import javax.annotation.Nullable; import org.apache.beam.runners.core.DoFnRunners.OutputManager; +import org.apache.beam.runners.core.ExecutionContext.StepContext; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.Aggregator; @@ -47,7 +48,6 @@ import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindows; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.transforms.windowing.WindowFn; -import org.apache.beam.sdk.util.ExecutionContext.StepContext; import org.apache.beam.sdk.util.SideInputReader; import org.apache.beam.sdk.util.SystemDoFnInternal; import org.apache.beam.sdk.util.TimeDomain; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9d2b8e09/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java index 342a4a8..1ff0212 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java @@ -26,6 +26,7 @@ import java.util.Iterator; import java.util.List; import java.util.Set; import org.apache.beam.runners.core.DoFnRunners.OutputManager; +import org.apache.beam.runners.core.ExecutionContext.StepContext; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.Combine.CombineFn; @@ -36,7 +37,6 @@ import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindows; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.transforms.windowing.WindowFn; -import org.apache.beam.sdk.util.ExecutionContext.StepContext; import org.apache.beam.sdk.util.SideInputReader; import org.apache.beam.sdk.util.SystemDoFnInternal; import org.apache.beam.sdk.util.TimeDomain; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9d2b8e09/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java index ec5d375..8ae09cb 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java @@ -26,13 +26,13 @@ import static org.mockito.Mockito.when; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import org.apache.beam.runners.core.BaseExecutionContext.StepContext; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindows; import org.apache.beam.sdk.transforms.windowing.WindowFn; -import org.apache.beam.sdk.util.BaseExecutionContext.StepContext; import org.apache.beam.sdk.util.NullSideInputReader; import org.apache.beam.sdk.util.TimeDomain; import org.apache.beam.sdk.util.Timer; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9d2b8e09/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleOldDoFnRunnerTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleOldDoFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleOldDoFnRunnerTest.java index 0e23dcb..4610069 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleOldDoFnRunnerTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleOldDoFnRunnerTest.java @@ -22,9 +22,8 @@ import static org.mockito.Mockito.mock; import java.util.Arrays; import java.util.List; - +import org.apache.beam.runners.core.BaseExecutionContext.StepContext; import org.apache.beam.sdk.transforms.OldDoFn; -import org.apache.beam.sdk.util.BaseExecutionContext.StepContext; import org.apache.beam.sdk.util.SystemDoFnInternal; import org.apache.beam.sdk.util.UserCodeException; import org.apache.beam.sdk.util.WindowedValue; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9d2b8e09/runners/direct-java/src/main/java/org/apache/beam/runners/direct/AggregatorContainer.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/AggregatorContainer.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/AggregatorContainer.java index c7fa4df..fd17704 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/AggregatorContainer.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/AggregatorContainer.java @@ -28,9 +28,9 @@ import java.util.concurrent.ConcurrentMap; import javax.annotation.Nullable; import javax.annotation.concurrent.GuardedBy; import org.apache.beam.runners.core.AggregatorFactory; +import org.apache.beam.runners.core.ExecutionContext; import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.Combine.CombineFn; -import org.apache.beam.sdk.util.ExecutionContext; /** * AccumT container for the current values associated with {@link Aggregator Aggregators}. http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9d2b8e09/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java index c6051f0..8250cf1 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectExecutionContext.java @@ -17,11 +17,11 @@ */ package org.apache.beam.runners.direct; +import org.apache.beam.runners.core.BaseExecutionContext; +import org.apache.beam.runners.core.ExecutionContext; import org.apache.beam.runners.direct.DirectExecutionContext.DirectStepContext; import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate; import org.apache.beam.runners.direct.WatermarkManager.TransformWatermarks; -import org.apache.beam.sdk.util.BaseExecutionContext; -import org.apache.beam.sdk.util.ExecutionContext; import org.apache.beam.sdk.util.TimerInternals; /** @@ -54,7 +54,7 @@ class DirectExecutionContext * Step Context for the {@link DirectRunner}. */ public class DirectStepContext - extends org.apache.beam.sdk.util.BaseExecutionContext.StepContext { + extends BaseExecutionContext.StepContext { private CopyOnAccessInMemoryStateInternals<Object> stateInternals; private DirectTimerInternals timerInternals; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9d2b8e09/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java index bbcab8e..3b9367a 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java @@ -31,6 +31,7 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import javax.annotation.Nullable; +import org.apache.beam.runners.core.ExecutionContext; import org.apache.beam.runners.direct.CommittedResult.OutputType; import org.apache.beam.runners.direct.DirectGroupByKey.DirectGroupByKeyOnly; import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; @@ -43,7 +44,6 @@ import org.apache.beam.sdk.transforms.AppliedPTransform; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.Trigger; -import org.apache.beam.sdk.util.ExecutionContext; import org.apache.beam.sdk.util.ReadyCheckingSideInputReader; import org.apache.beam.sdk.util.SideInputReader; import org.apache.beam.sdk.util.TimerInternals.TimerData; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9d2b8e09/runners/direct-java/src/test/java/org/apache/beam/runners/direct/AggregatorContainerTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/AggregatorContainerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/AggregatorContainerTest.java index c8310c9..f770800 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/AggregatorContainerTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/AggregatorContainerTest.java @@ -24,9 +24,9 @@ import static org.mockito.Mockito.when; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import org.apache.beam.runners.core.ExecutionContext.StepContext; import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.Sum.SumIntegerFn; -import org.apache.beam.sdk.util.ExecutionContext.StepContext; import org.junit.Before; import org.junit.Rule; import org.junit.Test; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9d2b8e09/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java index 8704308..057a3e7 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java @@ -31,6 +31,7 @@ import java.util.Map; import org.apache.beam.runners.core.AggregatorFactory; import org.apache.beam.runners.core.DoFnRunner; import org.apache.beam.runners.core.DoFnRunners; +import org.apache.beam.runners.core.ExecutionContext; import org.apache.beam.runners.core.PushbackSideInputDoFnRunner; import org.apache.beam.runners.core.SideInputHandler; import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions; @@ -46,7 +47,6 @@ import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.join.RawUnionValue; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.CoderUtils; -import org.apache.beam.sdk.util.ExecutionContext; import org.apache.beam.sdk.util.NullSideInputReader; import org.apache.beam.sdk.util.SideInputReader; import org.apache.beam.sdk.util.TimerInternals; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9d2b8e09/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java index 9cea529..9855d46 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java @@ -38,6 +38,7 @@ import java.util.Queue; import java.util.Set; import java.util.concurrent.ScheduledFuture; import javax.annotation.Nullable; +import org.apache.beam.runners.core.ExecutionContext; import org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetDoFn; import org.apache.beam.runners.core.KeyedWorkItem; import org.apache.beam.runners.core.KeyedWorkItems; @@ -48,7 +49,6 @@ import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.util.ExecutionContext; import org.apache.beam.sdk.util.TimeDomain; import org.apache.beam.sdk.util.TimerInternals; import org.apache.beam.sdk.util.WindowedValue; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9d2b8e09/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/SparkAggregators.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/SparkAggregators.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/SparkAggregators.java index 17d5844..fa5c8d1 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/SparkAggregators.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/SparkAggregators.java @@ -22,11 +22,11 @@ import com.google.common.collect.ImmutableList; import java.util.Collection; import java.util.Map; import org.apache.beam.runners.core.AggregatorFactory; +import org.apache.beam.runners.core.ExecutionContext; import org.apache.beam.runners.spark.translation.SparkRuntimeContext; import org.apache.beam.sdk.AggregatorValues; import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.Combine; -import org.apache.beam.sdk.util.ExecutionContext; import org.apache.spark.Accumulator; import org.apache.spark.api.java.JavaSparkContext; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9d2b8e09/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java index 3a31cae..9957bf3 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java @@ -24,11 +24,11 @@ import java.io.IOException; import java.util.Iterator; import org.apache.beam.runners.core.DoFnRunner; import org.apache.beam.runners.core.DoFnRunners.OutputManager; +import org.apache.beam.runners.core.ExecutionContext.StepContext; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.reflect.DoFnInvokers; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.util.ExecutionContext.StepContext; import org.apache.beam.sdk.util.TimerInternals; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.state.StateInternals; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9d2b8e09/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BaseExecutionContext.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BaseExecutionContext.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BaseExecutionContext.java deleted file mode 100644 index e26f2b0..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BaseExecutionContext.java +++ /dev/null @@ -1,174 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.util; - -import java.io.IOException; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.util.state.StateInternals; -import org.apache.beam.sdk.values.TupleTag; - -/** - * Base class for implementations of {@link ExecutionContext}. - * - * <p>A concrete subclass should implement {@link #createStepContext} to create the appropriate - * {@link StepContext} implementation. Any {@code StepContext} created will - * be cached for the lifetime of this {@link ExecutionContext}. - * - * <p>BaseExecutionContext is generic to allow implementing subclasses to return a concrete subclass - * of {@link StepContext} from {@link #getOrCreateStepContext(String, String)} and - * {@link #getAllStepContexts()} without forcing each subclass to override the method, e.g. - * <pre>{@code - * {@literal @}Override - * StreamingModeExecutionContext.StepContext getOrCreateStepContext(...) { - * return (StreamingModeExecutionContext.StepContext) super.getOrCreateStepContext(...); - * } - * }</pre> - * - * <p>When a subclass of {@code BaseExecutionContext} has been downcast, the return types of - * {@link #createStepContext(String, String)}, - * {@link #getOrCreateStepContext(String, String)}, and {@link #getAllStepContexts()} - * will be appropriately specialized. - */ -public abstract class BaseExecutionContext<T extends ExecutionContext.StepContext> - implements ExecutionContext { - - private Map<String, T> cachedStepContexts = new HashMap<>(); - - /** - * Implementations should override this to create the specific type - * of {@link StepContext} they need. - */ - protected abstract T createStepContext(String stepName, String transformName); - - /** - * Returns the {@link StepContext} associated with the given step. - */ - @Override - public T getOrCreateStepContext(String stepName, String transformName) { - final String finalStepName = stepName; - final String finalTransformName = transformName; - return getOrCreateStepContext( - stepName, - new CreateStepContextFunction<T>() { - @Override - public T create() { - return createStepContext(finalStepName, finalTransformName); - } - }); - } - - /** - * Factory method interface to create an execution context if none exists during - * {@link #getOrCreateStepContext(String, CreateStepContextFunction)}. - */ - protected interface CreateStepContextFunction<T extends ExecutionContext.StepContext> { - T create(); - } - - protected final T getOrCreateStepContext(String stepName, - CreateStepContextFunction<T> createContextFunc) { - T context = cachedStepContexts.get(stepName); - if (context == null) { - context = createContextFunc.create(); - cachedStepContexts.put(stepName, context); - } - - return context; - } - - /** - * Returns a collection view of all of the {@link StepContext}s. - */ - @Override - public Collection<? extends T> getAllStepContexts() { - return Collections.unmodifiableCollection(cachedStepContexts.values()); - } - - /** - * Hook for subclasses to implement that will be called whenever - * {@code DoFn.Context#output} - * is called. - */ - @Override - public void noteOutput(WindowedValue<?> output) {} - - /** - * Hook for subclasses to implement that will be called whenever - * {@code DoFn.Context#sideOutput} - * is called. - */ - @Override - public void noteSideOutput(TupleTag<?> tag, WindowedValue<?> output) {} - - /** - * Base class for implementations of {@link ExecutionContext.StepContext}. - * - * <p>To complete a concrete subclass, implement {@link #timerInternals} and - * {@link #stateInternals}. - */ - public abstract static class StepContext implements ExecutionContext.StepContext { - private final ExecutionContext executionContext; - private final String stepName; - private final String transformName; - - public StepContext(ExecutionContext executionContext, String stepName, String transformName) { - this.executionContext = executionContext; - this.stepName = stepName; - this.transformName = transformName; - } - - @Override - public String getStepName() { - return stepName; - } - - @Override - public String getTransformName() { - return transformName; - } - - @Override - public void noteOutput(WindowedValue<?> output) { - executionContext.noteOutput(output); - } - - @Override - public void noteSideOutput(TupleTag<?> tag, WindowedValue<?> output) { - executionContext.noteSideOutput(tag, output); - } - - @Override - public <T, W extends BoundedWindow> void writePCollectionViewData( - TupleTag<?> tag, - Iterable<WindowedValue<T>> data, Coder<Iterable<WindowedValue<T>>> dataCoder, - W window, Coder<W> windowCoder) throws IOException { - throw new UnsupportedOperationException("Not implemented."); - } - - @Override - public abstract StateInternals<?> stateInternals(); - - @Override - public abstract TimerInternals timerInternals(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9d2b8e09/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ExecutionContext.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ExecutionContext.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ExecutionContext.java deleted file mode 100644 index 4429d76..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ExecutionContext.java +++ /dev/null @@ -1,100 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.util; - -import java.io.IOException; -import java.util.Collection; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.util.state.StateInternals; -import org.apache.beam.sdk.values.TupleTag; - -/** - * Context for the current execution. This is guaranteed to exist during processing, - * but does not necessarily persist between different batches of work. - */ -public interface ExecutionContext { - /** - * Returns the {@link StepContext} associated with the given step. - */ - StepContext getOrCreateStepContext(String stepName, String transformName); - - /** - * Returns a collection view of all of the {@link StepContext}s. - */ - Collection<? extends StepContext> getAllStepContexts(); - - /** - * Hook for subclasses to implement that will be called whenever - * {@link org.apache.beam.sdk.transforms.DoFn.Context#output} - * is called. - */ - void noteOutput(WindowedValue<?> output); - - /** - * Hook for subclasses to implement that will be called whenever - * {@link org.apache.beam.sdk.transforms.DoFn.Context#sideOutput} - * is called. - */ - void noteSideOutput(TupleTag<?> tag, WindowedValue<?> output); - - /** - * Per-step, per-key context used for retrieving state. - */ - public interface StepContext { - - /** - * The name of the step. - */ - String getStepName(); - - /** - * The name of the transform for the step. - */ - String getTransformName(); - - /** - * Hook for subclasses to implement that will be called whenever - * {@link org.apache.beam.sdk.transforms.DoFn.Context#output} - * is called. - */ - void noteOutput(WindowedValue<?> output); - - /** - * Hook for subclasses to implement that will be called whenever - * {@link org.apache.beam.sdk.transforms.DoFn.Context#sideOutput} - * is called. - */ - void noteSideOutput(TupleTag<?> tag, WindowedValue<?> output); - - /** - * Writes the given {@code PCollectionView} data to a globally accessible location. - */ - <T, W extends BoundedWindow> void writePCollectionViewData( - TupleTag<?> tag, - Iterable<WindowedValue<T>> data, - Coder<Iterable<WindowedValue<T>>> dataCoder, - W window, - Coder<W> windowCoder) - throws IOException; - - StateInternals<?> stateInternals(); - - TimerInternals timerInternals(); - } -}