Repository: beam
Updated Branches:
  refs/heads/master c65aca07f -> c25bead55


Move DoFnInfo to SDK util

Previously, the DoFnInfo wrapped things just enough for Dataflow to execute a
DoFn without much context. The Java SDK harness has the same need, and relies
on DoFnInfo. Effectively, DoFnInfo is the UDF that the Java SDK harness
understands.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f3f32549
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f3f32549
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f3f32549

Branch: refs/heads/master
Commit: f3f325499e33f82eb8873a4e877c56dbc9928043
Parents: fdd9965
Author: Kenneth Knowles <k...@google.com>
Authored: Sat Sep 16 15:16:56 2017 -0700
Committer: Kenneth Knowles <k...@google.com>
Committed: Mon Sep 18 21:22:50 2017 -0700

----------------------------------------------------------------------
 .../dataflow/DataflowPipelineTranslator.java    |   2 +-
 .../beam/runners/dataflow/util/DoFnInfo.java    | 104 -------------------
 .../DataflowPipelineTranslatorTest.java         |   4 +-
 .../java/org/apache/beam/sdk/util/DoFnInfo.java | 104 +++++++++++++++++++
 .../apache/beam/fn/harness/FnApiDoFnRunner.java |   2 +-
 .../beam/fn/harness/FnApiDoFnRunnerTest.java    |   2 +-
 6 files changed, 109 insertions(+), 109 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/f3f32549/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
----------------------------------------------------------------------
diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
index 2bed6be..4f9b939 100644
--- 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
+++ 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
@@ -67,7 +67,6 @@ import 
org.apache.beam.runners.dataflow.TransformTranslator.TranslationContext;
 import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
 import org.apache.beam.runners.dataflow.util.CloudObject;
 import org.apache.beam.runners.dataflow.util.CloudObjects;
-import org.apache.beam.runners.dataflow.util.DoFnInfo;
 import org.apache.beam.runners.dataflow.util.OutputReference;
 import org.apache.beam.runners.dataflow.util.PropertyNames;
 import org.apache.beam.sdk.Pipeline;
@@ -92,6 +91,7 @@ import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
 import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.util.AppliedCombineFn;
+import org.apache.beam.sdk.util.DoFnInfo;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.common.ReflectHelpers;
 import org.apache.beam.sdk.values.KV;

http://git-wip-us.apache.org/repos/asf/beam/blob/f3f32549/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java
----------------------------------------------------------------------
diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java
deleted file mode 100644
index 4a26795..0000000
--- 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.dataflow.util;
-
-import java.io.Serializable;
-import java.util.Map;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.TupleTag;
-import org.apache.beam.sdk.values.WindowingStrategy;
-
-/**
- * Wrapper class holding the necessary information to serialize a {@link DoFn}.
- *
- * @param <InputT> the type of the (main) input elements of the {@link DoFn}
- * @param <OutputT> the type of the (main) output elements of the {@link DoFn}
- */
-public class DoFnInfo<InputT, OutputT> implements Serializable {
-  private final DoFn<InputT, OutputT> doFn;
-  private final WindowingStrategy<?, ?> windowingStrategy;
-  private final Iterable<PCollectionView<?>> sideInputViews;
-  private final Coder<InputT> inputCoder;
-  private final long mainOutput;
-  private final Map<Long, TupleTag<?>> outputMap;
-
-  /**
-   * Creates a {@link DoFnInfo} for the given {@link DoFn}.
-   */
-  public static <InputT, OutputT> DoFnInfo<InputT, OutputT> forFn(
-      DoFn<InputT, OutputT> doFn,
-      WindowingStrategy<?, ?> windowingStrategy,
-      Iterable<PCollectionView<?>> sideInputViews,
-      Coder<InputT> inputCoder,
-      long mainOutput,
-      Map<Long, TupleTag<?>> outputMap) {
-    return new DoFnInfo<>(
-        doFn, windowingStrategy, sideInputViews, inputCoder, mainOutput, 
outputMap);
-  }
-
-  public DoFnInfo<InputT, OutputT> withFn(DoFn<InputT, OutputT> newFn) {
-    return DoFnInfo.forFn(newFn,
-        windowingStrategy,
-        sideInputViews,
-        inputCoder,
-        mainOutput,
-        outputMap);
-  }
-
-  private DoFnInfo(
-      DoFn<InputT, OutputT> doFn,
-      WindowingStrategy<?, ?> windowingStrategy,
-      Iterable<PCollectionView<?>> sideInputViews,
-      Coder<InputT> inputCoder,
-      long mainOutput,
-      Map<Long, TupleTag<?>> outputMap) {
-    this.doFn = doFn;
-    this.windowingStrategy = windowingStrategy;
-    this.sideInputViews = sideInputViews;
-    this.inputCoder = inputCoder;
-    this.mainOutput = mainOutput;
-    this.outputMap = outputMap;
-  }
-
-  /** Returns the embedded function. */
-  public DoFn<InputT, OutputT> getDoFn() {
-    return doFn;
-  }
-
-  public WindowingStrategy<?, ?> getWindowingStrategy() {
-    return windowingStrategy;
-  }
-
-  public Iterable<PCollectionView<?>> getSideInputViews() {
-    return sideInputViews;
-  }
-
-  public Coder<InputT> getInputCoder() {
-    return inputCoder;
-  }
-
-  public long getMainOutput() {
-    return mainOutput;
-  }
-
-  public Map<Long, TupleTag<?>> getOutputMap() {
-    return outputMap;
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/f3f32549/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
----------------------------------------------------------------------
diff --git 
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
 
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
index f756065..81e7a97 100644
--- 
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
+++ 
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
@@ -71,7 +71,6 @@ import 
org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
 import 
org.apache.beam.runners.dataflow.options.DataflowPipelineWorkerPoolOptions;
 import org.apache.beam.runners.dataflow.util.CloudObject;
 import org.apache.beam.runners.dataflow.util.CloudObjects;
-import org.apache.beam.runners.dataflow.util.DoFnInfo;
 import org.apache.beam.runners.dataflow.util.OutputReference;
 import org.apache.beam.runners.dataflow.util.PropertyNames;
 import org.apache.beam.runners.dataflow.util.Structs;
@@ -103,6 +102,7 @@ import 
org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.DoFnInfo;
 import org.apache.beam.sdk.util.GcsUtil;
 import org.apache.beam.sdk.util.SerializableUtils;
 import org.apache.beam.sdk.util.gcsfs.GcsPath;
@@ -748,7 +748,7 @@ public class DataflowPipelineTranslatorTest implements 
Serializable {
 
   /**
    * Test that in translation the name for a collection (in this case just a 
Create output) is
-   * overriden to be what the Dataflow service expects.
+   * overridden to be what the Dataflow service expects.
    */
   @Test
   public void testNamesOverridden() throws Exception {

http://git-wip-us.apache.org/repos/asf/beam/blob/f3f32549/sdks/java/core/src/main/java/org/apache/beam/sdk/util/DoFnInfo.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/DoFnInfo.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/DoFnInfo.java
new file mode 100644
index 0000000..0800b21
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/DoFnInfo.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import java.io.Serializable;
+import java.util.Map;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.WindowingStrategy;
+
+/**
+ * Wrapper class holding the necessary information to serialize a {@link DoFn}.
+ *
+ * @param <InputT> the type of the (main) input elements of the {@link DoFn}
+ * @param <OutputT> the type of the (main) output elements of the {@link DoFn}
+ */
+public class DoFnInfo<InputT, OutputT> implements Serializable {
+  private final DoFn<InputT, OutputT> doFn;
+  private final WindowingStrategy<?, ?> windowingStrategy;
+  private final Iterable<PCollectionView<?>> sideInputViews;
+  private final Coder<InputT> inputCoder;
+  private final long mainOutput;
+  private final Map<Long, TupleTag<?>> outputMap;
+
+  /**
+   * Creates a {@link DoFnInfo} for the given {@link DoFn}.
+   */
+  public static <InputT, OutputT> DoFnInfo<InputT, OutputT> forFn(
+      DoFn<InputT, OutputT> doFn,
+      WindowingStrategy<?, ?> windowingStrategy,
+      Iterable<PCollectionView<?>> sideInputViews,
+      Coder<InputT> inputCoder,
+      long mainOutput,
+      Map<Long, TupleTag<?>> outputMap) {
+    return new DoFnInfo<>(
+        doFn, windowingStrategy, sideInputViews, inputCoder, mainOutput, 
outputMap);
+  }
+
+  public DoFnInfo<InputT, OutputT> withFn(DoFn<InputT, OutputT> newFn) {
+    return DoFnInfo.forFn(newFn,
+        windowingStrategy,
+        sideInputViews,
+        inputCoder,
+        mainOutput,
+        outputMap);
+  }
+
+  private DoFnInfo(
+      DoFn<InputT, OutputT> doFn,
+      WindowingStrategy<?, ?> windowingStrategy,
+      Iterable<PCollectionView<?>> sideInputViews,
+      Coder<InputT> inputCoder,
+      long mainOutput,
+      Map<Long, TupleTag<?>> outputMap) {
+    this.doFn = doFn;
+    this.windowingStrategy = windowingStrategy;
+    this.sideInputViews = sideInputViews;
+    this.inputCoder = inputCoder;
+    this.mainOutput = mainOutput;
+    this.outputMap = outputMap;
+  }
+
+  /** Returns the embedded function. */
+  public DoFn<InputT, OutputT> getDoFn() {
+    return doFn;
+  }
+
+  public WindowingStrategy<?, ?> getWindowingStrategy() {
+    return windowingStrategy;
+  }
+
+  public Iterable<PCollectionView<?>> getSideInputViews() {
+    return sideInputViews;
+  }
+
+  public Coder<InputT> getInputCoder() {
+    return inputCoder;
+  }
+
+  public long getMainOutput() {
+    return mainOutput;
+  }
+
+  public Map<Long, TupleTag<?>> getOutputMap() {
+    return outputMap;
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/f3f32549/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
index c361647..f0ee319 100644
--- 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
+++ 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
@@ -50,7 +50,6 @@ import org.apache.beam.fn.v1.BeamFnApi.StateRequest;
 import org.apache.beam.fn.v1.BeamFnApi.StateRequest.Builder;
 import org.apache.beam.runners.core.DoFnRunner;
 import org.apache.beam.runners.core.construction.ParDoTranslation;
-import org.apache.beam.runners.dataflow.util.DoFnInfo;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.common.runner.v1.RunnerApi;
@@ -84,6 +83,7 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
 import org.apache.beam.sdk.util.CombineFnUtil;
+import org.apache.beam.sdk.util.DoFnInfo;
 import org.apache.beam.sdk.util.SerializableUtils;
 import org.apache.beam.sdk.util.UserCodeException;
 import org.apache.beam.sdk.util.WindowedValue;

http://git-wip-us.apache.org/repos/asf/beam/blob/f3f32549/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java
 
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java
index 4aa8080..9113be7 100644
--- 
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java
+++ 
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java
@@ -44,7 +44,6 @@ import org.apache.beam.fn.harness.fn.ThrowingRunnable;
 import org.apache.beam.fn.harness.state.FakeBeamFnStateClient;
 import org.apache.beam.fn.v1.BeamFnApi.StateKey;
 import org.apache.beam.runners.core.construction.ParDoTranslation;
-import org.apache.beam.runners.dataflow.util.DoFnInfo;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.common.runner.v1.RunnerApi;
@@ -61,6 +60,7 @@ import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.util.CoderUtils;
+import org.apache.beam.sdk.util.DoFnInfo;
 import org.apache.beam.sdk.util.SerializableUtils;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.KV;

Reply via email to