[jira] [Commented] (BEAM-2421) Migrate Apache Beam to use impulse primitive as the only root primitive

2018-04-23 Thread Ben Sidhom (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2421?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16448782#comment-16448782
 ] 

Ben Sidhom commented on BEAM-2421:
--

[https://github.com/apache/beam/pull/4783] adds support for the Flink batch 
runner.

> Migrate Apache Beam to use impulse primitive as the only root primitive
> ---
>
> Key: BEAM-2421
> URL: https://issues.apache.org/jira/browse/BEAM-2421
> Project: Beam
>  Issue Type: Improvement
>  Components: beam-model
>Reporter: Luke Cwik
>Priority: Major
>  Labels: portability
>  Time Spent: 4.5h
>  Remaining Estimate: 0h
>
> The impulse source emits a single byte array element within the global window.
> This is in preparation for using SDF as the replacement for different bounded 
> and unbounded source implementations.
> Before:
> Read(Source) -> ParDo -> ...
> Impulse -> SDF(Source) -> ParDo -> ...



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-2421) Migrate Apache Beam to use impulse primitive as the only root primitive

2018-02-27 Thread Kenneth Knowles (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2421?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16379653#comment-16379653
 ] 

Kenneth Knowles commented on BEAM-2421:
---

I misread the links above. It looks it is expected to be done for Python 
Dataflow anyhow.

> Migrate Apache Beam to use impulse primitive as the only root primitive
> ---
>
> Key: BEAM-2421
> URL: https://issues.apache.org/jira/browse/BEAM-2421
> Project: Beam
>  Issue Type: Improvement
>  Components: beam-model
>Reporter: Luke Cwik
>Priority: Major
>  Labels: portability
>  Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> The impulse source emits a single byte array element within the global window.
> This is in preparation for using SDF as the replacement for different bounded 
> and unbounded source implementations.
> Before:
> Read(Source) -> ParDo -> ...
> Impulse -> SDF(Source) -> ParDo -> ...



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-2421) Migrate Apache Beam to use impulse primitive as the only root primitive

2018-02-27 Thread Kenneth Knowles (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2421?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16379291#comment-16379291
 ] 

Kenneth Knowles commented on BEAM-2421:
---

This is also not done for Python, yes?

> Migrate Apache Beam to use impulse primitive as the only root primitive
> ---
>
> Key: BEAM-2421
> URL: https://issues.apache.org/jira/browse/BEAM-2421
> Project: Beam
>  Issue Type: Improvement
>  Components: beam-model
>Reporter: Luke Cwik
>Priority: Major
>  Labels: portability
>  Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> The impulse source emits a single byte array element within the global window.
> This is in preparation for using SDF as the replacement for different bounded 
> and unbounded source implementations.
> Before:
> Read(Source) -> ParDo -> ...
> Impulse -> SDF(Source) -> ParDo -> ...



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-2421) Migrate Apache Beam to use impulse primitive as the only root primitive

2017-12-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2421?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16288547#comment-16288547
 ] 

ASF GitHub Bot commented on BEAM-2421:
--

lukecwik closed pull request #4234: [BEAM-2421] Replaces BoundedSource with a 
composite transform when using Fn API
URL: https://github.com/apache/beam/pull/4234
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index a6500924149..ddad43fe6ec 100644
--- 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -21,6 +21,7 @@
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkState;
 import static com.google.common.base.Strings.isNullOrEmpty;
+import static org.apache.beam.sdk.util.CoderUtils.encodeToByteArray;
 import static org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray;
 import static org.apache.beam.sdk.util.StringUtils.byteArrayToJsonString;
 
@@ -91,6 +92,7 @@
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.Coder.NonDeterministicException;
 import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.SerializableCoder;
 import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.extensions.gcp.storage.PathValidator;
 import org.apache.beam.sdk.io.BoundedSource;
@@ -132,6 +134,7 @@
 import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.util.CoderUtils;
 import org.apache.beam.sdk.util.InstanceBuilder;
 import org.apache.beam.sdk.util.MimeTypes;
@@ -420,6 +423,13 @@ public static DataflowRunner fromOptions(PipelineOptions 
options) {
   new ReflectiveViewOverrideFactory(
   BatchViewOverrides.BatchViewAsIterable.class, this)));
 }
+// Expands into Reshuffle and single-output ParDo, so has to be before the 
overrides below.
+if (hasExperiment(options, "beam_fn_api")) {
+  overridesBuilder.add(
+  PTransformOverride.of(
+  PTransformMatchers.classEqualTo(Read.Bounded.class),
+  new FnApiBoundedReadOverrideFactory()));
+}
 overridesBuilder
 .add(
 PTransformOverride.of(
@@ -1185,7 +1195,7 @@ private StreamingFnApiCreate(
 public final PCollection expand(PBegin input) {
   try {
 PCollection pc = Pipeline
-.applyTransform(input, new Impulse(IsBounded.BOUNDED))
+.applyTransform(input, new Impulse())
 .apply(ParDo.of(DecodeAndEmitDoFn
 .fromIterable(transform.getElements(), 
originalOutput.getCoder(;
 pc.setCoder(originalOutput.getCoder());
@@ -1206,7 +1216,7 @@ private StreamingFnApiCreate(
   throws IOException {
 ImmutableList.Builder allElementsBytes = 
ImmutableList.builder();
 for (T element : elements) {
-  byte[] bytes = CoderUtils.encodeToByteArray(elemCoder, element);
+  byte[] bytes = encodeToByteArray(elemCoder, element);
   allElementsBytes.add(bytes);
 }
 return new DecodeAndEmitDoFn<>(allElementsBytes.build(), elemCoder);
@@ -1244,16 +1254,16 @@ public void processElement(ProcessContext context) 
throws IOException {
 
   /** The Dataflow specific override for the impulse primitive. */
   private static class Impulse extends PTransform> 
{
-private final IsBounded isBounded;
-
-private Impulse(IsBounded isBounded) {
-  this.isBounded = isBounded;
+private Impulse() {
 }
 
 @Override
 public PCollection expand(PBegin input) {
   return PCollection.createPrimitiveOutputInternal(
-  input.getPipeline(), WindowingStrategy.globalDefault(), isBounded, 
ByteArrayCoder.of());
+  input.getPipeline(),
+  WindowingStrategy.globalDefault(),
+  IsBounded.BOUNDED,
+  ByteArrayCoder.of());
 }
 
 private static class Translator implements TransformTranslator {
@@ -1265,8 +1275,21 @@ public void translate(Impulse transform, 
TranslationContext context) {
   stepContext.addInput(PropertyNames.PUBSUB_SUBSCRIPTION, 
"_starting_signal/");
   stepContext.addO

[jira] [Commented] (BEAM-2421) Migrate Apache Beam to use impulse primitive as the only root primitive

2017-12-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2421?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16282918#comment-16282918
 ] 

ASF GitHub Bot commented on BEAM-2421:
--

jkff opened a new pull request #4234: [BEAM-2421] Replaces BoundedSource with a 
composite transform when using Fn API
URL: https://github.com/apache/beam/pull/4234
 
 
   https://issues.apache.org/jira/browse/BEAM-2421


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Migrate Apache Beam to use impulse primitive as the only root primitive
> ---
>
> Key: BEAM-2421
> URL: https://issues.apache.org/jira/browse/BEAM-2421
> Project: Beam
>  Issue Type: Improvement
>  Components: beam-model
>Reporter: Luke Cwik
>Assignee: Eugene Kirpichov
>  Labels: portability
>
> The impulse source emits a single byte array element within the global window.
> This is in preparation for using SDF as the replacement for different bounded 
> and unbounded source implementations.
> Before:
> Read(Source) -> ParDo -> ...
> Impulse -> SDF(Source) -> ParDo -> ...



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)