Repository: beam Updated Branches: refs/heads/master 07020c961 -> a93e218ba
Remove value only outputs in Dataflow job representation Always define outputs in terms of full windowed value representations Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/8492ec38 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/8492ec38 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/8492ec38 Branch: refs/heads/master Commit: 8492ec38079d98f337d6e1dfc7bba79fd464d6fd Parents: 07020c9 Author: Luke Cwik <lc...@google.com> Authored: Wed Jan 11 11:46:59 2017 -0800 Committer: Luke Cwik <lc...@google.com> Committed: Wed Jan 11 16:14:36 2017 -0800 ---------------------------------------------------------------------- .../dataflow/DataflowPipelineTranslator.java | 17 ----------------- .../beam/runners/dataflow/DataflowRunner.java | 2 +- .../beam/runners/dataflow/ReadTranslator.java | 2 +- .../beam/runners/dataflow/TransformTranslator.java | 8 -------- 4 files changed, 2 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/8492ec38/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java index bd0d5ba..7609745 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java @@ -598,23 +598,6 @@ public class DataflowPipelineTranslator { } @Override - public long addValueOnlyOutput(PValue value) { - Coder<?> coder; - if (value instanceof TypedPValue) { - coder = ((TypedPValue<?>) value).getCoder(); - if (value instanceof PCollection) { - // Wrap the PCollection element Coder inside a ValueOnly - // WindowedValueCoder. - coder = WindowedValue.getValueOnlyCoder(coder); - } - } else { - // No output coder to encode. - coder = null; - } - return addOutput(value, coder); - } - - @Override public long addCollectionToSingletonOutput( PValue inputValue, PValue outputValue) { Coder<?> inputValueCoder = http://git-wip-us.apache.org/repos/asf/beam/blob/8492ec38/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index 9ff856a..d21da59 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -2149,7 +2149,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { if (overriddenTransform.getIdLabel() != null) { stepContext.addInput(PropertyNames.PUBSUB_ID_LABEL, overriddenTransform.getIdLabel()); } - stepContext.addValueOnlyOutput(context.getOutput(transform)); + stepContext.addOutput(context.getOutput(transform)); } } http://git-wip-us.apache.org/repos/asf/beam/blob/8492ec38/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/ReadTranslator.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/ReadTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/ReadTranslator.java index ed03b53..bc68511 100755 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/ReadTranslator.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/ReadTranslator.java @@ -64,7 +64,7 @@ class ReadTranslator implements TransformTranslator<Read.Bounded<?>> { PropertyNames.SOURCE_STEP_INPUT, cloudSourceToDictionary( CustomSources.serializeToCloudSource(source, context.getPipelineOptions()))); - stepContext.addValueOnlyOutput(context.getOutput(transform)); + stepContext.addOutput(context.getOutput(transform)); } catch (Exception e) { throw new RuntimeException(e); } http://git-wip-us.apache.org/repos/asf/beam/blob/8492ec38/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TransformTranslator.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TransformTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TransformTranslator.java index fb883a7..4297a80 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TransformTranslator.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TransformTranslator.java @@ -106,14 +106,6 @@ interface TransformTranslator<TransformT extends PTransform> { long addOutput(PValue value); /** - * Adds an output to this Dataflow step, producing the specified output {@code PValue}, - * including its {@code Coder} if a {@code TypedPValue}. If the {@code PValue} is a {@code - * PCollection}, wraps its coder inside a {@code ValueOnlyCoder}. Returns a pipeline level - * unique id. - */ - long addValueOnlyOutput(PValue value); - - /** * Adds an output to this {@code CollectionToSingleton} Dataflow step, consuming the specified * input {@code PValue} and producing the specified output {@code PValue}. This step requires * special treatment for its output encoding. Returns a pipeline level unique id.