[jira] [Commented] (BEAM-2421) Migrate Apache Beam to use impulse primitive as the only root primitive
[ https://issues.apache.org/jira/browse/BEAM-2421?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16448782#comment-16448782 ] Ben Sidhom commented on BEAM-2421: -- [https://github.com/apache/beam/pull/4783] adds support for the Flink batch runner. > Migrate Apache Beam to use impulse primitive as the only root primitive > --- > > Key: BEAM-2421 > URL: https://issues.apache.org/jira/browse/BEAM-2421 > Project: Beam > Issue Type: Improvement > Components: beam-model >Reporter: Luke Cwik >Priority: Major > Labels: portability > Time Spent: 4.5h > Remaining Estimate: 0h > > The impulse source emits a single byte array element within the global window. > This is in preparation for using SDF as the replacement for different bounded > and unbounded source implementations. > Before: > Read(Source) -> ParDo -> ... > Impulse -> SDF(Source) -> ParDo -> ... -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (BEAM-2421) Migrate Apache Beam to use impulse primitive as the only root primitive
[ https://issues.apache.org/jira/browse/BEAM-2421?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16379653#comment-16379653 ] Kenneth Knowles commented on BEAM-2421: --- I misread the links above. It looks it is expected to be done for Python Dataflow anyhow. > Migrate Apache Beam to use impulse primitive as the only root primitive > --- > > Key: BEAM-2421 > URL: https://issues.apache.org/jira/browse/BEAM-2421 > Project: Beam > Issue Type: Improvement > Components: beam-model >Reporter: Luke Cwik >Priority: Major > Labels: portability > Time Spent: 0.5h > Remaining Estimate: 0h > > The impulse source emits a single byte array element within the global window. > This is in preparation for using SDF as the replacement for different bounded > and unbounded source implementations. > Before: > Read(Source) -> ParDo -> ... > Impulse -> SDF(Source) -> ParDo -> ... -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (BEAM-2421) Migrate Apache Beam to use impulse primitive as the only root primitive
[ https://issues.apache.org/jira/browse/BEAM-2421?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16379291#comment-16379291 ] Kenneth Knowles commented on BEAM-2421: --- This is also not done for Python, yes? > Migrate Apache Beam to use impulse primitive as the only root primitive > --- > > Key: BEAM-2421 > URL: https://issues.apache.org/jira/browse/BEAM-2421 > Project: Beam > Issue Type: Improvement > Components: beam-model >Reporter: Luke Cwik >Priority: Major > Labels: portability > Time Spent: 0.5h > Remaining Estimate: 0h > > The impulse source emits a single byte array element within the global window. > This is in preparation for using SDF as the replacement for different bounded > and unbounded source implementations. > Before: > Read(Source) -> ParDo -> ... > Impulse -> SDF(Source) -> ParDo -> ... -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (BEAM-2421) Migrate Apache Beam to use impulse primitive as the only root primitive
[ https://issues.apache.org/jira/browse/BEAM-2421?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16288547#comment-16288547 ] ASF GitHub Bot commented on BEAM-2421: -- lukecwik closed pull request #4234: [BEAM-2421] Replaces BoundedSource with a composite transform when using Fn API URL: https://github.com/apache/beam/pull/4234 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index a6500924149..ddad43fe6ec 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -21,6 +21,7 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; import static com.google.common.base.Strings.isNullOrEmpty; +import static org.apache.beam.sdk.util.CoderUtils.encodeToByteArray; import static org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray; import static org.apache.beam.sdk.util.StringUtils.byteArrayToJsonString; @@ -91,6 +92,7 @@ import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.Coder.NonDeterministicException; import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.SerializableCoder; import org.apache.beam.sdk.coders.VoidCoder; import org.apache.beam.sdk.extensions.gcp.storage.PathValidator; import org.apache.beam.sdk.io.BoundedSource; @@ -132,6 +134,7 @@ import org.apache.beam.sdk.transforms.reflect.DoFnSignature; import org.apache.beam.sdk.transforms.reflect.DoFnSignatures; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.util.CoderUtils; import org.apache.beam.sdk.util.InstanceBuilder; import org.apache.beam.sdk.util.MimeTypes; @@ -420,6 +423,13 @@ public static DataflowRunner fromOptions(PipelineOptions options) { new ReflectiveViewOverrideFactory( BatchViewOverrides.BatchViewAsIterable.class, this))); } +// Expands into Reshuffle and single-output ParDo, so has to be before the overrides below. +if (hasExperiment(options, "beam_fn_api")) { + overridesBuilder.add( + PTransformOverride.of( + PTransformMatchers.classEqualTo(Read.Bounded.class), + new FnApiBoundedReadOverrideFactory())); +} overridesBuilder .add( PTransformOverride.of( @@ -1185,7 +1195,7 @@ private StreamingFnApiCreate( public final PCollection expand(PBegin input) { try { PCollection pc = Pipeline -.applyTransform(input, new Impulse(IsBounded.BOUNDED)) +.applyTransform(input, new Impulse()) .apply(ParDo.of(DecodeAndEmitDoFn .fromIterable(transform.getElements(), originalOutput.getCoder(; pc.setCoder(originalOutput.getCoder()); @@ -1206,7 +1216,7 @@ private StreamingFnApiCreate( throws IOException { ImmutableList.Builder allElementsBytes = ImmutableList.builder(); for (T element : elements) { - byte[] bytes = CoderUtils.encodeToByteArray(elemCoder, element); + byte[] bytes = encodeToByteArray(elemCoder, element); allElementsBytes.add(bytes); } return new DecodeAndEmitDoFn<>(allElementsBytes.build(), elemCoder); @@ -1244,16 +1254,16 @@ public void processElement(ProcessContext context) throws IOException { /** The Dataflow specific override for the impulse primitive. */ private static class Impulse extends PTransform> { -private final IsBounded isBounded; - -private Impulse(IsBounded isBounded) { - this.isBounded = isBounded; +private Impulse() { } @Override public PCollection expand(PBegin input) { return PCollection.createPrimitiveOutputInternal( - input.getPipeline(), WindowingStrategy.globalDefault(), isBounded, ByteArrayCoder.of()); + input.getPipeline(), + WindowingStrategy.globalDefault(), + IsBounded.BOUNDED, + ByteArrayCoder.of()); } private static class Translator implements TransformTranslator { @@ -1265,8 +1275,21 @@ public void translate(Impulse transform, TranslationContext context) { stepContext.addInput(PropertyNames.PUBSUB_SUBSCRIPTION, "_starting_signal/"); stepContext.addO
[jira] [Commented] (BEAM-2421) Migrate Apache Beam to use impulse primitive as the only root primitive
[ https://issues.apache.org/jira/browse/BEAM-2421?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16282918#comment-16282918 ] ASF GitHub Bot commented on BEAM-2421: -- jkff opened a new pull request #4234: [BEAM-2421] Replaces BoundedSource with a composite transform when using Fn API URL: https://github.com/apache/beam/pull/4234 https://issues.apache.org/jira/browse/BEAM-2421 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Migrate Apache Beam to use impulse primitive as the only root primitive > --- > > Key: BEAM-2421 > URL: https://issues.apache.org/jira/browse/BEAM-2421 > Project: Beam > Issue Type: Improvement > Components: beam-model >Reporter: Luke Cwik >Assignee: Eugene Kirpichov > Labels: portability > > The impulse source emits a single byte array element within the global window. > This is in preparation for using SDF as the replacement for different bounded > and unbounded source implementations. > Before: > Read(Source) -> ParDo -> ... > Impulse -> SDF(Source) -> ParDo -> ... -- This message was sent by Atlassian JIRA (v6.4.14#64029)