Repository: beam Updated Branches: refs/heads/master 132d4c55d -> a2d328a2d
DataflowRunner: remove dead code It is not possible to try to use an unbounded source in batch mode any more, as the runner will automatically enable streaming mode. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f4736729 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f4736729 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f4736729 Branch: refs/heads/master Commit: f4736729a5a04dc9d1cb30a44b2dd4692be7e935 Parents: 132d4c5 Author: Dan Halperin <dhalp...@google.com> Authored: Tue Apr 11 14:12:43 2017 -0700 Committer: Dan Halperin <dhalp...@google.com> Committed: Tue Apr 11 15:39:40 2017 -0700 ---------------------------------------------------------------------- .../beam/runners/dataflow/DataflowRunner.java | 16 ---------------- 1 file changed, 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/f4736729/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index 1708d8a..8726635 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -39,7 +39,6 @@ import com.google.common.base.Strings; import com.google.common.base.Utf8; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import java.io.File; import java.io.IOException; @@ -65,7 +64,6 @@ import org.apache.beam.runners.core.construction.PTransformMatchers; import org.apache.beam.runners.core.construction.ReplacementOutputs; import org.apache.beam.runners.core.construction.SingleInputOutputOverrideFactory; import org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource; -import org.apache.beam.runners.core.construction.UnsupportedOverrideFactory; import org.apache.beam.runners.dataflow.BatchViewOverrides.BatchCombineGloballyAsSingletonViewFactory; import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.JobSpecification; import org.apache.beam.runners.dataflow.StreamingViewOverrides.StreamingCreatePCollectionViewFactory; @@ -330,14 +328,6 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { PTransformMatchers.classEqualTo(View.CreatePCollectionView.class), new StreamingCreatePCollectionViewFactory())); } else { - // In batch mode must use the custom Pubsub bounded source/sink. - for (Class<? extends PTransform> unsupported : - ImmutableSet.of(PubsubUnboundedSink.class, PubsubUnboundedSource.class)) { - overridesBuilder.add( - PTransformOverride.of( - PTransformMatchers.classEqualTo(unsupported), - UnsupportedOverrideFactory.withMessage(getUnsupportedMessage(unsupported, false)))); - } overridesBuilder // State and timer pardos are implemented by expansion to GBK-then-ParDo .add( @@ -399,12 +389,6 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { return overridesBuilder.build(); } - private String getUnsupportedMessage(Class<?> unsupported, boolean streaming) { - return String.format( - "%s is not supported in %s", - NameUtils.approximateSimpleName(unsupported), streaming ? "streaming" : "batch"); - } - private static class ReflectiveOneToOneOverrideFactory< InputT extends PValue, OutputT extends PValue,