Repository: beam Updated Branches: refs/heads/master c65aca07f -> c25bead55
Move DoFnInfo to SDK util Previously, the DoFnInfo wrapped things just enough for Dataflow to execute a DoFn without much context. The Java SDK harness has the same need, and relies on DoFnInfo. Effectively, DoFnInfo is the UDF that the Java SDK harness understands. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f3f32549 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f3f32549 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f3f32549 Branch: refs/heads/master Commit: f3f325499e33f82eb8873a4e877c56dbc9928043 Parents: fdd9965 Author: Kenneth Knowles <k...@google.com> Authored: Sat Sep 16 15:16:56 2017 -0700 Committer: Kenneth Knowles <k...@google.com> Committed: Mon Sep 18 21:22:50 2017 -0700 ---------------------------------------------------------------------- .../dataflow/DataflowPipelineTranslator.java | 2 +- .../beam/runners/dataflow/util/DoFnInfo.java | 104 ------------------- .../DataflowPipelineTranslatorTest.java | 4 +- .../java/org/apache/beam/sdk/util/DoFnInfo.java | 104 +++++++++++++++++++ .../apache/beam/fn/harness/FnApiDoFnRunner.java | 2 +- .../beam/fn/harness/FnApiDoFnRunnerTest.java | 2 +- 6 files changed, 109 insertions(+), 109 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/f3f32549/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java index 2bed6be..4f9b939 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java @@ -67,7 +67,6 @@ import org.apache.beam.runners.dataflow.TransformTranslator.TranslationContext; import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; import org.apache.beam.runners.dataflow.util.CloudObject; import org.apache.beam.runners.dataflow.util.CloudObjects; -import org.apache.beam.runners.dataflow.util.DoFnInfo; import org.apache.beam.runners.dataflow.util.OutputReference; import org.apache.beam.runners.dataflow.util.PropertyNames; import org.apache.beam.sdk.Pipeline; @@ -92,6 +91,7 @@ import org.apache.beam.sdk.transforms.reflect.DoFnSignatures; import org.apache.beam.sdk.transforms.windowing.DefaultTrigger; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.util.AppliedCombineFn; +import org.apache.beam.sdk.util.DoFnInfo; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.common.ReflectHelpers; import org.apache.beam.sdk.values.KV; http://git-wip-us.apache.org/repos/asf/beam/blob/f3f32549/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java deleted file mode 100644 index 4a26795..0000000 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java +++ /dev/null @@ -1,104 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.dataflow.util; - -import java.io.Serializable; -import java.util.Map; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.values.PCollectionView; -import org.apache.beam.sdk.values.TupleTag; -import org.apache.beam.sdk.values.WindowingStrategy; - -/** - * Wrapper class holding the necessary information to serialize a {@link DoFn}. - * - * @param <InputT> the type of the (main) input elements of the {@link DoFn} - * @param <OutputT> the type of the (main) output elements of the {@link DoFn} - */ -public class DoFnInfo<InputT, OutputT> implements Serializable { - private final DoFn<InputT, OutputT> doFn; - private final WindowingStrategy<?, ?> windowingStrategy; - private final Iterable<PCollectionView<?>> sideInputViews; - private final Coder<InputT> inputCoder; - private final long mainOutput; - private final Map<Long, TupleTag<?>> outputMap; - - /** - * Creates a {@link DoFnInfo} for the given {@link DoFn}. - */ - public static <InputT, OutputT> DoFnInfo<InputT, OutputT> forFn( - DoFn<InputT, OutputT> doFn, - WindowingStrategy<?, ?> windowingStrategy, - Iterable<PCollectionView<?>> sideInputViews, - Coder<InputT> inputCoder, - long mainOutput, - Map<Long, TupleTag<?>> outputMap) { - return new DoFnInfo<>( - doFn, windowingStrategy, sideInputViews, inputCoder, mainOutput, outputMap); - } - - public DoFnInfo<InputT, OutputT> withFn(DoFn<InputT, OutputT> newFn) { - return DoFnInfo.forFn(newFn, - windowingStrategy, - sideInputViews, - inputCoder, - mainOutput, - outputMap); - } - - private DoFnInfo( - DoFn<InputT, OutputT> doFn, - WindowingStrategy<?, ?> windowingStrategy, - Iterable<PCollectionView<?>> sideInputViews, - Coder<InputT> inputCoder, - long mainOutput, - Map<Long, TupleTag<?>> outputMap) { - this.doFn = doFn; - this.windowingStrategy = windowingStrategy; - this.sideInputViews = sideInputViews; - this.inputCoder = inputCoder; - this.mainOutput = mainOutput; - this.outputMap = outputMap; - } - - /** Returns the embedded function. */ - public DoFn<InputT, OutputT> getDoFn() { - return doFn; - } - - public WindowingStrategy<?, ?> getWindowingStrategy() { - return windowingStrategy; - } - - public Iterable<PCollectionView<?>> getSideInputViews() { - return sideInputViews; - } - - public Coder<InputT> getInputCoder() { - return inputCoder; - } - - public long getMainOutput() { - return mainOutput; - } - - public Map<Long, TupleTag<?>> getOutputMap() { - return outputMap; - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/f3f32549/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java index f756065..81e7a97 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java @@ -71,7 +71,6 @@ import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; import org.apache.beam.runners.dataflow.options.DataflowPipelineWorkerPoolOptions; import org.apache.beam.runners.dataflow.util.CloudObject; import org.apache.beam.runners.dataflow.util.CloudObjects; -import org.apache.beam.runners.dataflow.util.DoFnInfo; import org.apache.beam.runners.dataflow.util.OutputReference; import org.apache.beam.runners.dataflow.util.PropertyNames; import org.apache.beam.runners.dataflow.util.Structs; @@ -103,6 +102,7 @@ import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker; import org.apache.beam.sdk.transforms.windowing.FixedWindows; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.transforms.windowing.WindowFn; +import org.apache.beam.sdk.util.DoFnInfo; import org.apache.beam.sdk.util.GcsUtil; import org.apache.beam.sdk.util.SerializableUtils; import org.apache.beam.sdk.util.gcsfs.GcsPath; @@ -748,7 +748,7 @@ public class DataflowPipelineTranslatorTest implements Serializable { /** * Test that in translation the name for a collection (in this case just a Create output) is - * overriden to be what the Dataflow service expects. + * overridden to be what the Dataflow service expects. */ @Test public void testNamesOverridden() throws Exception { http://git-wip-us.apache.org/repos/asf/beam/blob/f3f32549/sdks/java/core/src/main/java/org/apache/beam/sdk/util/DoFnInfo.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/DoFnInfo.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/DoFnInfo.java new file mode 100644 index 0000000..0800b21 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/DoFnInfo.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.util; + +import java.io.Serializable; +import java.util.Map; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.WindowingStrategy; + +/** + * Wrapper class holding the necessary information to serialize a {@link DoFn}. + * + * @param <InputT> the type of the (main) input elements of the {@link DoFn} + * @param <OutputT> the type of the (main) output elements of the {@link DoFn} + */ +public class DoFnInfo<InputT, OutputT> implements Serializable { + private final DoFn<InputT, OutputT> doFn; + private final WindowingStrategy<?, ?> windowingStrategy; + private final Iterable<PCollectionView<?>> sideInputViews; + private final Coder<InputT> inputCoder; + private final long mainOutput; + private final Map<Long, TupleTag<?>> outputMap; + + /** + * Creates a {@link DoFnInfo} for the given {@link DoFn}. + */ + public static <InputT, OutputT> DoFnInfo<InputT, OutputT> forFn( + DoFn<InputT, OutputT> doFn, + WindowingStrategy<?, ?> windowingStrategy, + Iterable<PCollectionView<?>> sideInputViews, + Coder<InputT> inputCoder, + long mainOutput, + Map<Long, TupleTag<?>> outputMap) { + return new DoFnInfo<>( + doFn, windowingStrategy, sideInputViews, inputCoder, mainOutput, outputMap); + } + + public DoFnInfo<InputT, OutputT> withFn(DoFn<InputT, OutputT> newFn) { + return DoFnInfo.forFn(newFn, + windowingStrategy, + sideInputViews, + inputCoder, + mainOutput, + outputMap); + } + + private DoFnInfo( + DoFn<InputT, OutputT> doFn, + WindowingStrategy<?, ?> windowingStrategy, + Iterable<PCollectionView<?>> sideInputViews, + Coder<InputT> inputCoder, + long mainOutput, + Map<Long, TupleTag<?>> outputMap) { + this.doFn = doFn; + this.windowingStrategy = windowingStrategy; + this.sideInputViews = sideInputViews; + this.inputCoder = inputCoder; + this.mainOutput = mainOutput; + this.outputMap = outputMap; + } + + /** Returns the embedded function. */ + public DoFn<InputT, OutputT> getDoFn() { + return doFn; + } + + public WindowingStrategy<?, ?> getWindowingStrategy() { + return windowingStrategy; + } + + public Iterable<PCollectionView<?>> getSideInputViews() { + return sideInputViews; + } + + public Coder<InputT> getInputCoder() { + return inputCoder; + } + + public long getMainOutput() { + return mainOutput; + } + + public Map<Long, TupleTag<?>> getOutputMap() { + return outputMap; + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/f3f32549/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java index c361647..f0ee319 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java @@ -50,7 +50,6 @@ import org.apache.beam.fn.v1.BeamFnApi.StateRequest; import org.apache.beam.fn.v1.BeamFnApi.StateRequest.Builder; import org.apache.beam.runners.core.DoFnRunner; import org.apache.beam.runners.core.construction.ParDoTranslation; -import org.apache.beam.runners.dataflow.util.DoFnInfo; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.common.runner.v1.RunnerApi; @@ -84,6 +83,7 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; import org.apache.beam.sdk.util.CombineFnUtil; +import org.apache.beam.sdk.util.DoFnInfo; import org.apache.beam.sdk.util.SerializableUtils; import org.apache.beam.sdk.util.UserCodeException; import org.apache.beam.sdk.util.WindowedValue; http://git-wip-us.apache.org/repos/asf/beam/blob/f3f32549/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java index 4aa8080..9113be7 100644 --- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java +++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java @@ -44,7 +44,6 @@ import org.apache.beam.fn.harness.fn.ThrowingRunnable; import org.apache.beam.fn.harness.state.FakeBeamFnStateClient; import org.apache.beam.fn.v1.BeamFnApi.StateKey; import org.apache.beam.runners.core.construction.ParDoTranslation; -import org.apache.beam.runners.dataflow.util.DoFnInfo; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.common.runner.v1.RunnerApi; @@ -61,6 +60,7 @@ import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.util.CoderUtils; +import org.apache.beam.sdk.util.DoFnInfo; import org.apache.beam.sdk.util.SerializableUtils; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.KV;