Repository: beam
Updated Branches:
  refs/heads/master 07020c961 -> a93e218ba


Remove value only outputs in Dataflow job representation

Always define outputs in terms of full windowed value representations


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/8492ec38
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/8492ec38
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/8492ec38

Branch: refs/heads/master
Commit: 8492ec38079d98f337d6e1dfc7bba79fd464d6fd
Parents: 07020c9
Author: Luke Cwik <lc...@google.com>
Authored: Wed Jan 11 11:46:59 2017 -0800
Committer: Luke Cwik <lc...@google.com>
Committed: Wed Jan 11 16:14:36 2017 -0800

----------------------------------------------------------------------
 .../dataflow/DataflowPipelineTranslator.java       | 17 -----------------
 .../beam/runners/dataflow/DataflowRunner.java      |  2 +-
 .../beam/runners/dataflow/ReadTranslator.java      |  2 +-
 .../beam/runners/dataflow/TransformTranslator.java |  8 --------
 4 files changed, 2 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/8492ec38/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
----------------------------------------------------------------------
diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
index bd0d5ba..7609745 100644
--- 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
+++ 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
@@ -598,23 +598,6 @@ public class DataflowPipelineTranslator {
     }
 
     @Override
-    public long addValueOnlyOutput(PValue value) {
-      Coder<?> coder;
-      if (value instanceof TypedPValue) {
-        coder = ((TypedPValue<?>) value).getCoder();
-        if (value instanceof PCollection) {
-          // Wrap the PCollection element Coder inside a ValueOnly
-          // WindowedValueCoder.
-          coder = WindowedValue.getValueOnlyCoder(coder);
-        }
-      } else {
-        // No output coder to encode.
-        coder = null;
-      }
-      return addOutput(value, coder);
-    }
-
-    @Override
     public long addCollectionToSingletonOutput(
         PValue inputValue, PValue outputValue) {
       Coder<?> inputValueCoder =

http://git-wip-us.apache.org/repos/asf/beam/blob/8492ec38/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index 9ff856a..d21da59 100644
--- 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -2149,7 +2149,7 @@ public class DataflowRunner extends 
PipelineRunner<DataflowPipelineJob> {
       if (overriddenTransform.getIdLabel() != null) {
         stepContext.addInput(PropertyNames.PUBSUB_ID_LABEL, 
overriddenTransform.getIdLabel());
       }
-      stepContext.addValueOnlyOutput(context.getOutput(transform));
+      stepContext.addOutput(context.getOutput(transform));
     }
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/8492ec38/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/ReadTranslator.java
----------------------------------------------------------------------
diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/ReadTranslator.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/ReadTranslator.java
index ed03b53..bc68511 100755
--- 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/ReadTranslator.java
+++ 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/ReadTranslator.java
@@ -64,7 +64,7 @@ class ReadTranslator implements 
TransformTranslator<Read.Bounded<?>> {
           PropertyNames.SOURCE_STEP_INPUT,
           cloudSourceToDictionary(
               CustomSources.serializeToCloudSource(source, 
context.getPipelineOptions())));
-      stepContext.addValueOnlyOutput(context.getOutput(transform));
+      stepContext.addOutput(context.getOutput(transform));
     } catch (Exception e) {
       throw new RuntimeException(e);
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/8492ec38/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TransformTranslator.java
----------------------------------------------------------------------
diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TransformTranslator.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TransformTranslator.java
index fb883a7..4297a80 100644
--- 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TransformTranslator.java
+++ 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TransformTranslator.java
@@ -106,14 +106,6 @@ interface TransformTranslator<TransformT extends 
PTransform> {
     long addOutput(PValue value);
 
     /**
-     * Adds an output to this Dataflow step, producing the specified output 
{@code PValue},
-     * including its {@code Coder} if a {@code TypedPValue}. If the {@code 
PValue} is a {@code
-     * PCollection}, wraps its coder inside a {@code ValueOnlyCoder}. Returns 
a pipeline level
-     * unique id.
-     */
-    long addValueOnlyOutput(PValue value);
-
-    /**
      * Adds an output to this {@code CollectionToSingleton} Dataflow step, 
consuming the specified
      * input {@code PValue} and producing the specified output {@code PValue}. 
This step requires
      * special treatment for its output encoding. Returns a pipeline level 
unique id.

Reply via email to