[jira] [Work logged] (BEAM-5441) Portable Wordcount fails in GreedyPipelineFuser

2018-09-21 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5441?focusedWorklogId=146478=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-146478
 ]

ASF GitHub Bot logged work on BEAM-5441:


Author: ASF GitHub Bot
Created on: 21/Sep/18 17:35
Start Date: 21/Sep/18 17:35
Worklog Time Spent: 10m 
  Work Description: tweise closed pull request #6461: [BEAM-5441] extend 
primitive transform check to find subtransform leaves
URL: https://github.com/apache/beam/pull/6461
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/QueryablePipeline.java
 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/QueryablePipeline.java
index 3eed941f5a6..a0349fa2e5f 100644
--- 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/QueryablePipeline.java
+++ 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/QueryablePipeline.java
@@ -38,9 +38,11 @@
 import com.google.common.graph.MutableNetwork;
 import com.google.common.graph.Network;
 import com.google.common.graph.NetworkBuilder;
+import java.util.ArrayDeque;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
+import java.util.Deque;
 import java.util.HashSet;
 import java.util.LinkedHashSet;
 import java.util.List;
@@ -123,15 +125,27 @@ private QueryablePipeline(Collection 
transformIds, Components components
   @VisibleForTesting
   static Collection getPrimitiveTransformIds(RunnerApi.Components 
components) {
 Collection ids = new LinkedHashSet<>();
+
 for (Map.Entry transformEntry : 
components.getTransformsMap().entrySet()) {
   PTransform transform = transformEntry.getValue();
   boolean isPrimitive = isPrimitiveTransform(transform);
   if (isPrimitive) {
-List subtransforms = transform.getSubtransformsList();
-if (subtransforms.isEmpty()) {
-  ids.add(transformEntry.getKey());
-} else {
-  ids.addAll(subtransforms);
+// Sometimes "primitive" transforms have sub-transforms (and even 
deeper-nested descendents), due to runners
+// either rewriting them in terms of runner-specific transforms, or 
SDKs constructing them in terms of other
+// underlying transforms (see 
https://issues.apache.org/jira/browse/BEAM-5441).
+// We consider any "leaf" descendents of these "primitive" transforms 
to be the true "primitives" that we
+// preserve here; in the common case, this is just the "primitive" 
itself, which has no descendents).
+Deque transforms = new ArrayDeque<>();
+transforms.push(transformEntry.getKey());
+while (!transforms.isEmpty()) {
+  String id = transforms.pop();
+  PTransform next = components.getTransformsMap().get(id);
+  List subtransforms = next.getSubtransformsList();
+  if (subtransforms.isEmpty()) {
+ids.add(id);
+  } else {
+transforms.addAll(subtransforms);
+  }
 }
   }
 }
@@ -175,9 +189,10 @@ private static boolean isPrimitiveTransform(PTransform 
transform) {
 network.addEdge(transformNode, producedNode, new PerElementEdge());
 checkArgument(
 network.inDegree(producedNode) == 1,
-"A %s should have exactly one producing %s, %s has %s",
+"A %s should have exactly one producing %s, but found 
%s:\nPCollection:\n%s\nProducers:\n%s",
 PCollectionNode.class.getSimpleName(),
 PTransformNode.class.getSimpleName(),
+network.predecessors(producedNode).size(),
 producedNode,
 network.predecessors(producedNode));
 unproducedCollections.remove(producedNode);


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 146478)
Time Spent: 3h  (was: 2h 50m)

> Portable Wordcount fails in GreedyPipelineFuser
> ---
>
> Key: BEAM-5441
> URL: https://issues.apache.org/jira/browse/BEAM-5441
> Project: Beam
>  Issue Type: New Feature
>  Components: examples-python, sdk-java-core
>Affects Versions: 2.8.0
>

[jira] [Work logged] (BEAM-5441) Portable Wordcount fails in GreedyPipelineFuser

2018-09-21 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5441?focusedWorklogId=146473=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-146473
 ]

ASF GitHub Bot logged work on BEAM-5441:


Author: ASF GitHub Bot
Created on: 21/Sep/18 17:09
Start Date: 21/Sep/18 17:09
Worklog Time Spent: 10m 
  Work Description: ryan-williams commented on issue #6461: [BEAM-5441] 
extend primitive transform check to find subtransform leaves
URL: https://github.com/apache/beam/pull/6461#issuecomment-423608015
 
 
   good q re: test coverage…
   
   In #6328 and here, we're mostly inelegantly accommodating emergent behavior 
in various runners and SDKs.
   
   Per [the last thread 
above](https://github.com/apache/beam/pull/6461#discussion_r219555610), a next 
step may be to remove `READ` and `CREATE_VIEW` as primitives, after which we 
could get rid of this logic / greatly simplify things.
   
   My instinct is to clarify that bigger picture, and then make runners/SDKs, 
their tests, and some independent tests all conform to it, rather than enshrine 
this accidental/transitional state in tests at this time, but definitely 
interested in discussion there.
   
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 146473)
Time Spent: 2h 50m  (was: 2h 40m)

> Portable Wordcount fails in GreedyPipelineFuser
> ---
>
> Key: BEAM-5441
> URL: https://issues.apache.org/jira/browse/BEAM-5441
> Project: Beam
>  Issue Type: New Feature
>  Components: examples-python, sdk-java-core
>Affects Versions: 2.8.0
>Reporter: Maximilian Michels
>Assignee: Ryan Williams
>Priority: Major
>  Labels: portability, portability-flink
> Fix For: 2.8.0
>
>  Time Spent: 2h 50m
>  Remaining Estimate: 0h
>
> The Python SDK wordcount with the PortableRunner throws the following 
> exception:
> {noformat}
> java.lang.IllegalArgumentException: A PCollectionNode should have exactly one 
> producing PTransformNode, PCollectionNode{id=ref_PCollection_PCollection_26, 
> PCollection=unique_name: 
> "60write/Write/WriteImpl/DoOnce/Read/Reshuffle/RemoveRandomKeys.None"
> coder_id: "ref_Coder_FastPrimitivesCoder_2"
> is_bounded: BOUNDED
> windowing_strategy_id: "ref_Windowing_Windowing_1"
> } has 
> [PTransformNode{id=ref_AppliedPTransform_write/Write/WriteImpl/DoOnce/Read/Reshuffle_32,
>  transform=spec {
>   urn: "beam:transform:generic_composite:v1"
>   payload: ""
> }
> subtransforms: 
> "ref_AppliedPTransform_write/Write/WriteImpl/DoOnce/Read/Reshuffle/AddRandomKeys_33"
> subtransforms: 
> "ref_AppliedPTransform_write/Write/WriteImpl/DoOnce/Read/Reshuffle/ReshufflePerKey_34"
> subtransforms: 
> "ref_AppliedPTransform_write/Write/WriteImpl/DoOnce/Read/Reshuffle/RemoveRandomKeys_41"
> inputs {
>   key: "0"
>   value: "ref_PCollection_PCollection_19"
> }
> outputs {
>   key: "None"
>   value: "ref_PCollection_PCollection_26"
> }
> unique_name: "write/Write/WriteImpl/DoOnce/Read/Reshuffle"
> }, 
> PTransformNode{id=ref_AppliedPTransform_write/Write/WriteImpl/DoOnce/Read/Reshuffle/RemoveRandomKeys_41,
>  transform=spec {
>   urn: "urn:beam:transform:pardo:v1"
>   payload: "\n\317\006\n\255\006\n 
> beam:dofn:pickled_python_info:v1\032\210\006eNrFkttP1EAUxtsFZe1yUcEL4v3aRbdFUEFFQEHUEGpSnPhkxml32Gnstv3a6YZNbKIxJf7ZTvdB2ER9NZNMcs6cb3LO73zfaqbPEuYLTj3OupZMWZTtx2k3s/w45cYmC0PmhfxjypKEp1vxdmRAa36HXqBmkrEkjX2eZRjx20EYWrS6DeqnnElO9/PIl0GsFKPm0HsYszaV/YQbOEHqm3Gbf1ABTpYYc1E3d3R1arvTG2Tip6Z91bQfutbRtT2cckoYTaIfoFFinPRtkvE0s7vswN7iPbuaoCV5Ju0ej3p2GHh20pcijhatZTsLJG+pSb+wDs/sYzO3Fq0Va8Fq895CK+mrUot3OscL7CModgXFSvqYIPXVkHW9NlvD5G5jlGiYIrX9CKdLnGlKnHUx7VPq5UEog4hSo8MlkzI1MDNEIugmcSppN27noaJxjsz9Yxs4X+KCi4ukTpXcl5Ri9hCXXMyJSedPC/C5CnBZjJriN9W9z6SukLZ1bXYPV5wd/RBXFVKJayWu/w+kuQzCCukNMbm7XhNTTYXvpotbYkb8HUclwu0Sd1zcFQrCPRemguAUaJLGwFpUBJHMMD9sb/UwyKveFFEm4zQz3r2v3Pe2Shu4r7z9oECrgGWSRhAluRx8l8F2yHicy6PEgpMf4qGXSSy6WCrxyMXjEk8KLJtiXlQfrRR4WuCZKWxHDKqfe6o7lnayhPtUuWVVLOUSL1ysDXe9PpBvFHhZ4NWRfNMjI5VsS6zl3ie8LrDtOBJvrF+Bv0km\022\035ref_Environment_Environment_1"
> }
> inputs {
>   key: "0"
>   value: "ref_PCollection_PCollection_25"
> }
> outputs {
>   key: "None"
>   value: "ref_PCollection_PCollection_26"
> }
> unique_name: "write/Write/WriteImpl/DoOnce/Read/Reshuffle/RemoveRandomKeys"
> }]
> at 
> org.apache.beam.repackaged.beam_runners_core_construction_java.com.google.common.base.Preconditions.checkArgument(Preconditions.java:416)
> at 
> 

[jira] [Work logged] (BEAM-5441) Portable Wordcount fails in GreedyPipelineFuser

2018-09-21 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5441?focusedWorklogId=146462=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-146462
 ]

ASF GitHub Bot logged work on BEAM-5441:


Author: ASF GitHub Bot
Created on: 21/Sep/18 16:54
Start Date: 21/Sep/18 16:54
Worklog Time Spent: 10m 
  Work Description: tweise commented on issue #6461: [BEAM-5441] extend 
primitive transform check to find subtransform leaves
URL: https://github.com/apache/beam/pull/6461#issuecomment-423603458
 
 
   @ryan-williams will merge once Java PreCommit is green. Any thoughts on test 
coverage, can we formulate what you have described as unit test case (in a 
follow-up PR)?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 146462)
Time Spent: 2h 40m  (was: 2.5h)

> Portable Wordcount fails in GreedyPipelineFuser
> ---
>
> Key: BEAM-5441
> URL: https://issues.apache.org/jira/browse/BEAM-5441
> Project: Beam
>  Issue Type: New Feature
>  Components: examples-python, sdk-java-core
>Affects Versions: 2.8.0
>Reporter: Maximilian Michels
>Assignee: Ryan Williams
>Priority: Major
>  Labels: portability, portability-flink
> Fix For: 2.8.0
>
>  Time Spent: 2h 40m
>  Remaining Estimate: 0h
>
> The Python SDK wordcount with the PortableRunner throws the following 
> exception:
> {noformat}
> java.lang.IllegalArgumentException: A PCollectionNode should have exactly one 
> producing PTransformNode, PCollectionNode{id=ref_PCollection_PCollection_26, 
> PCollection=unique_name: 
> "60write/Write/WriteImpl/DoOnce/Read/Reshuffle/RemoveRandomKeys.None"
> coder_id: "ref_Coder_FastPrimitivesCoder_2"
> is_bounded: BOUNDED
> windowing_strategy_id: "ref_Windowing_Windowing_1"
> } has 
> [PTransformNode{id=ref_AppliedPTransform_write/Write/WriteImpl/DoOnce/Read/Reshuffle_32,
>  transform=spec {
>   urn: "beam:transform:generic_composite:v1"
>   payload: ""
> }
> subtransforms: 
> "ref_AppliedPTransform_write/Write/WriteImpl/DoOnce/Read/Reshuffle/AddRandomKeys_33"
> subtransforms: 
> "ref_AppliedPTransform_write/Write/WriteImpl/DoOnce/Read/Reshuffle/ReshufflePerKey_34"
> subtransforms: 
> "ref_AppliedPTransform_write/Write/WriteImpl/DoOnce/Read/Reshuffle/RemoveRandomKeys_41"
> inputs {
>   key: "0"
>   value: "ref_PCollection_PCollection_19"
> }
> outputs {
>   key: "None"
>   value: "ref_PCollection_PCollection_26"
> }
> unique_name: "write/Write/WriteImpl/DoOnce/Read/Reshuffle"
> }, 
> PTransformNode{id=ref_AppliedPTransform_write/Write/WriteImpl/DoOnce/Read/Reshuffle/RemoveRandomKeys_41,
>  transform=spec {
>   urn: "urn:beam:transform:pardo:v1"
>   payload: "\n\317\006\n\255\006\n 
> beam:dofn:pickled_python_info:v1\032\210\006eNrFkttP1EAUxtsFZe1yUcEL4v3aRbdFUEFFQEHUEGpSnPhkxml32Gnstv3a6YZNbKIxJf7ZTvdB2ER9NZNMcs6cb3LO73zfaqbPEuYLTj3OupZMWZTtx2k3s/w45cYmC0PmhfxjypKEp1vxdmRAa36HXqBmkrEkjX2eZRjx20EYWrS6DeqnnElO9/PIl0GsFKPm0HsYszaV/YQbOEHqm3Gbf1ABTpYYc1E3d3R1arvTG2Tip6Z91bQfutbRtT2cckoYTaIfoFFinPRtkvE0s7vswN7iPbuaoCV5Ju0ej3p2GHh20pcijhatZTsLJG+pSb+wDs/sYzO3Fq0Va8Fq895CK+mrUot3OscL7CModgXFSvqYIPXVkHW9NlvD5G5jlGiYIrX9CKdLnGlKnHUx7VPq5UEog4hSo8MlkzI1MDNEIugmcSppN27noaJxjsz9Yxs4X+KCi4ukTpXcl5Ri9hCXXMyJSedPC/C5CnBZjJriN9W9z6SukLZ1bXYPV5wd/RBXFVKJayWu/w+kuQzCCukNMbm7XhNTTYXvpotbYkb8HUclwu0Sd1zcFQrCPRemguAUaJLGwFpUBJHMMD9sb/UwyKveFFEm4zQz3r2v3Pe2Shu4r7z9oECrgGWSRhAluRx8l8F2yHicy6PEgpMf4qGXSSy6WCrxyMXjEk8KLJtiXlQfrRR4WuCZKWxHDKqfe6o7lnayhPtUuWVVLOUSL1ysDXe9PpBvFHhZ4NWRfNMjI5VsS6zl3ie8LrDtOBJvrF+Bv0km\022\035ref_Environment_Environment_1"
> }
> inputs {
>   key: "0"
>   value: "ref_PCollection_PCollection_25"
> }
> outputs {
>   key: "None"
>   value: "ref_PCollection_PCollection_26"
> }
> unique_name: "write/Write/WriteImpl/DoOnce/Read/Reshuffle/RemoveRandomKeys"
> }]
> at 
> org.apache.beam.repackaged.beam_runners_core_construction_java.com.google.common.base.Preconditions.checkArgument(Preconditions.java:416)
> at 
> org.apache.beam.runners.core.construction.graph.QueryablePipeline.buildNetwork(QueryablePipeline.java:176)
> at 
> org.apache.beam.runners.core.construction.graph.QueryablePipeline.(QueryablePipeline.java:119)
> at 
> org.apache.beam.runners.core.construction.graph.QueryablePipeline.forPrimitivesIn(QueryablePipeline.java:82)
> at 
> org.apache.beam.runners.core.construction.graph.GreedyPipelineFuser.(GreedyPipelineFuser.java:67)
> at 
> 

[jira] [Work logged] (BEAM-5441) Portable Wordcount fails in GreedyPipelineFuser

2018-09-21 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5441?focusedWorklogId=146456=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-146456
 ]

ASF GitHub Bot logged work on BEAM-5441:


Author: ASF GitHub Bot
Created on: 21/Sep/18 16:29
Start Date: 21/Sep/18 16:29
Worklog Time Spent: 10m 
  Work Description: ryan-williams commented on a change in pull request 
#6461: [BEAM-5441] extend primitive transform check to find subtransform leaves
URL: https://github.com/apache/beam/pull/6461#discussion_r219555610
 
 

 ##
 File path: 
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/QueryablePipeline.java
 ##
 @@ -123,15 +125,27 @@ private QueryablePipeline(Collection 
transformIds, Components components
   @VisibleForTesting
   static Collection getPrimitiveTransformIds(RunnerApi.Components 
components) {
 Collection ids = new LinkedHashSet<>();
+
 for (Map.Entry transformEntry : 
components.getTransformsMap().entrySet()) {
   PTransform transform = transformEntry.getValue();
   boolean isPrimitive = isPrimitiveTransform(transform);
   if (isPrimitive) {
 
 Review comment:
   perhaps? I can't say definitively, but:
   
   it is already in [the `DeprecatedPrimitives` section of the 
proto](https://github.com/apache/beam/blob/a6897100e34cf2b6f177d261f65678c4ff8c7616/model/pipeline/src/main/proto/beam_runner_api.proto#L238)
   
   is it always implemented as a composite / in terms of other primitives?
   
   we're currently also having a discussion about the other "deprecated 
primitive", 
[`CREATE_VIEW`](https://github.com/apache/beam/blob/a6897100e34cf2b6f177d261f65678c4ff8c7616/model/pipeline/src/main/proto/beam_runner_api.proto#L242),
 which is related to a bunch of the remaining failing Java PVR tests


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 146456)
Time Spent: 2.5h  (was: 2h 20m)

> Portable Wordcount fails in GreedyPipelineFuser
> ---
>
> Key: BEAM-5441
> URL: https://issues.apache.org/jira/browse/BEAM-5441
> Project: Beam
>  Issue Type: New Feature
>  Components: examples-python, sdk-java-core
>Affects Versions: 2.8.0
>Reporter: Maximilian Michels
>Assignee: Ryan Williams
>Priority: Major
>  Labels: portability, portability-flink
> Fix For: 2.8.0
>
>  Time Spent: 2.5h
>  Remaining Estimate: 0h
>
> The Python SDK wordcount with the PortableRunner throws the following 
> exception:
> {noformat}
> java.lang.IllegalArgumentException: A PCollectionNode should have exactly one 
> producing PTransformNode, PCollectionNode{id=ref_PCollection_PCollection_26, 
> PCollection=unique_name: 
> "60write/Write/WriteImpl/DoOnce/Read/Reshuffle/RemoveRandomKeys.None"
> coder_id: "ref_Coder_FastPrimitivesCoder_2"
> is_bounded: BOUNDED
> windowing_strategy_id: "ref_Windowing_Windowing_1"
> } has 
> [PTransformNode{id=ref_AppliedPTransform_write/Write/WriteImpl/DoOnce/Read/Reshuffle_32,
>  transform=spec {
>   urn: "beam:transform:generic_composite:v1"
>   payload: ""
> }
> subtransforms: 
> "ref_AppliedPTransform_write/Write/WriteImpl/DoOnce/Read/Reshuffle/AddRandomKeys_33"
> subtransforms: 
> "ref_AppliedPTransform_write/Write/WriteImpl/DoOnce/Read/Reshuffle/ReshufflePerKey_34"
> subtransforms: 
> "ref_AppliedPTransform_write/Write/WriteImpl/DoOnce/Read/Reshuffle/RemoveRandomKeys_41"
> inputs {
>   key: "0"
>   value: "ref_PCollection_PCollection_19"
> }
> outputs {
>   key: "None"
>   value: "ref_PCollection_PCollection_26"
> }
> unique_name: "write/Write/WriteImpl/DoOnce/Read/Reshuffle"
> }, 
> PTransformNode{id=ref_AppliedPTransform_write/Write/WriteImpl/DoOnce/Read/Reshuffle/RemoveRandomKeys_41,
>  transform=spec {
>   urn: "urn:beam:transform:pardo:v1"
>   payload: "\n\317\006\n\255\006\n 
> 

[jira] [Work logged] (BEAM-5441) Portable Wordcount fails in GreedyPipelineFuser

2018-09-21 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5441?focusedWorklogId=146450=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-146450
 ]

ASF GitHub Bot logged work on BEAM-5441:


Author: ASF GitHub Bot
Created on: 21/Sep/18 16:25
Start Date: 21/Sep/18 16:25
Worklog Time Spent: 10m 
  Work Description: mxm commented on a change in pull request #6461: 
[BEAM-5441] extend primitive transform check to find subtransform leaves
URL: https://github.com/apache/beam/pull/6461#discussion_r219553566
 
 

 ##
 File path: 
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/QueryablePipeline.java
 ##
 @@ -123,15 +125,27 @@ private QueryablePipeline(Collection 
transformIds, Components components
   @VisibleForTesting
   static Collection getPrimitiveTransformIds(RunnerApi.Components 
components) {
 Collection ids = new LinkedHashSet<>();
+
 for (Map.Entry transformEntry : 
components.getTransformsMap().entrySet()) {
   PTransform transform = transformEntry.getValue();
   boolean isPrimitive = isPrimitiveTransform(transform);
   if (isPrimitive) {
 
 Review comment:
   Couldn't we remove the "primitive" attribute from the Read transform instead 
of introducing the additional logic? Runners could still decide whether they 
want to consider READ primitive.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 146450)
Time Spent: 2h 20m  (was: 2h 10m)

> Portable Wordcount fails in GreedyPipelineFuser
> ---
>
> Key: BEAM-5441
> URL: https://issues.apache.org/jira/browse/BEAM-5441
> Project: Beam
>  Issue Type: New Feature
>  Components: examples-python, sdk-java-core
>Affects Versions: 2.8.0
>Reporter: Maximilian Michels
>Assignee: Ryan Williams
>Priority: Major
>  Labels: portability, portability-flink
> Fix For: 2.8.0
>
>  Time Spent: 2h 20m
>  Remaining Estimate: 0h
>
> The Python SDK wordcount with the PortableRunner throws the following 
> exception:
> {noformat}
> java.lang.IllegalArgumentException: A PCollectionNode should have exactly one 
> producing PTransformNode, PCollectionNode{id=ref_PCollection_PCollection_26, 
> PCollection=unique_name: 
> "60write/Write/WriteImpl/DoOnce/Read/Reshuffle/RemoveRandomKeys.None"
> coder_id: "ref_Coder_FastPrimitivesCoder_2"
> is_bounded: BOUNDED
> windowing_strategy_id: "ref_Windowing_Windowing_1"
> } has 
> [PTransformNode{id=ref_AppliedPTransform_write/Write/WriteImpl/DoOnce/Read/Reshuffle_32,
>  transform=spec {
>   urn: "beam:transform:generic_composite:v1"
>   payload: ""
> }
> subtransforms: 
> "ref_AppliedPTransform_write/Write/WriteImpl/DoOnce/Read/Reshuffle/AddRandomKeys_33"
> subtransforms: 
> "ref_AppliedPTransform_write/Write/WriteImpl/DoOnce/Read/Reshuffle/ReshufflePerKey_34"
> subtransforms: 
> "ref_AppliedPTransform_write/Write/WriteImpl/DoOnce/Read/Reshuffle/RemoveRandomKeys_41"
> inputs {
>   key: "0"
>   value: "ref_PCollection_PCollection_19"
> }
> outputs {
>   key: "None"
>   value: "ref_PCollection_PCollection_26"
> }
> unique_name: "write/Write/WriteImpl/DoOnce/Read/Reshuffle"
> }, 
> PTransformNode{id=ref_AppliedPTransform_write/Write/WriteImpl/DoOnce/Read/Reshuffle/RemoveRandomKeys_41,
>  transform=spec {
>   urn: "urn:beam:transform:pardo:v1"
>   payload: "\n\317\006\n\255\006\n 
> beam:dofn:pickled_python_info:v1\032\210\006eNrFkttP1EAUxtsFZe1yUcEL4v3aRbdFUEFFQEHUEGpSnPhkxml32Gnstv3a6YZNbKIxJf7ZTvdB2ER9NZNMcs6cb3LO73zfaqbPEuYLTj3OupZMWZTtx2k3s/w45cYmC0PmhfxjypKEp1vxdmRAa36HXqBmkrEkjX2eZRjx20EYWrS6DeqnnElO9/PIl0GsFKPm0HsYszaV/YQbOEHqm3Gbf1ABTpYYc1E3d3R1arvTG2Tip6Z91bQfutbRtT2cckoYTaIfoFFinPRtkvE0s7vswN7iPbuaoCV5Ju0ej3p2GHh20pcijhatZTsLJG+pSb+wDs/sYzO3Fq0Va8Fq895CK+mrUot3OscL7CModgXFSvqYIPXVkHW9NlvD5G5jlGiYIrX9CKdLnGlKnHUx7VPq5UEog4hSo8MlkzI1MDNEIugmcSppN27noaJxjsz9Yxs4X+KCi4ukTpXcl5Ri9hCXXMyJSedPC/C5CnBZjJriN9W9z6SukLZ1bXYPV5wd/RBXFVKJayWu/w+kuQzCCukNMbm7XhNTTYXvpotbYkb8HUclwu0Sd1zcFQrCPRemguAUaJLGwFpUBJHMMD9sb/UwyKveFFEm4zQz3r2v3Pe2Shu4r7z9oECrgGWSRhAluRx8l8F2yHicy6PEgpMf4qGXSSy6WCrxyMXjEk8KLJtiXlQfrRR4WuCZKWxHDKqfe6o7lnayhPtUuWVVLOUSL1ysDXe9PpBvFHhZ4NWRfNMjI5VsS6zl3ie8LrDtOBJvrF+Bv0km\022\035ref_Environment_Environment_1"
> }
> inputs {
>   key: "0"
>   value: "ref_PCollection_PCollection_25"
> }
> outputs {
>   key: "None"
>   value: "ref_PCollection_PCollection_26"
> }
> unique_name: "write/Write/WriteImpl/DoOnce/Read/Reshuffle/RemoveRandomKeys"
> }]
>

[jira] [Work logged] (BEAM-5441) Portable Wordcount fails in GreedyPipelineFuser

2018-09-21 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5441?focusedWorklogId=146445=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-146445
 ]

ASF GitHub Bot logged work on BEAM-5441:


Author: ASF GitHub Bot
Created on: 21/Sep/18 16:20
Start Date: 21/Sep/18 16:20
Worklog Time Spent: 10m 
  Work Description: ryan-williams commented on a change in pull request 
#6461: [BEAM-5441] extend primitive transform check to find subtransform leaves
URL: https://github.com/apache/beam/pull/6461#discussion_r219553107
 
 

 ##
 File path: 
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/QueryablePipeline.java
 ##
 @@ -123,15 +125,27 @@ private QueryablePipeline(Collection 
transformIds, Components components
   @VisibleForTesting
   static Collection getPrimitiveTransformIds(RunnerApi.Components 
components) {
 Collection ids = new LinkedHashSet<>();
+
 for (Map.Entry transformEntry : 
components.getTransformsMap().entrySet()) {
   PTransform transform = transformEntry.getValue();
   boolean isPrimitive = isPrimitiveTransform(transform);
   if (isPrimitive) {
-List subtransforms = transform.getSubtransformsList();
-if (subtransforms.isEmpty()) {
-  ids.add(transformEntry.getKey());
-} else {
-  ids.addAll(subtransforms);
+// Sometimes "primitive" transforms have sub-transforms (and even 
deeper-nested descendents), due to runners
+// either rewriting them in terms of runner-specific transforms, or 
SDKs constructing them in terms of other
+// underlying transforms (see discussion on BEAM-5441).
 
 Review comment:
   done


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 146445)
Time Spent: 2h 10m  (was: 2h)

> Portable Wordcount fails in GreedyPipelineFuser
> ---
>
> Key: BEAM-5441
> URL: https://issues.apache.org/jira/browse/BEAM-5441
> Project: Beam
>  Issue Type: New Feature
>  Components: examples-python, sdk-java-core
>Affects Versions: 2.8.0
>Reporter: Maximilian Michels
>Assignee: Ryan Williams
>Priority: Major
>  Labels: portability, portability-flink
> Fix For: 2.8.0
>
>  Time Spent: 2h 10m
>  Remaining Estimate: 0h
>
> The Python SDK wordcount with the PortableRunner throws the following 
> exception:
> {noformat}
> java.lang.IllegalArgumentException: A PCollectionNode should have exactly one 
> producing PTransformNode, PCollectionNode{id=ref_PCollection_PCollection_26, 
> PCollection=unique_name: 
> "60write/Write/WriteImpl/DoOnce/Read/Reshuffle/RemoveRandomKeys.None"
> coder_id: "ref_Coder_FastPrimitivesCoder_2"
> is_bounded: BOUNDED
> windowing_strategy_id: "ref_Windowing_Windowing_1"
> } has 
> [PTransformNode{id=ref_AppliedPTransform_write/Write/WriteImpl/DoOnce/Read/Reshuffle_32,
>  transform=spec {
>   urn: "beam:transform:generic_composite:v1"
>   payload: ""
> }
> subtransforms: 
> "ref_AppliedPTransform_write/Write/WriteImpl/DoOnce/Read/Reshuffle/AddRandomKeys_33"
> subtransforms: 
> "ref_AppliedPTransform_write/Write/WriteImpl/DoOnce/Read/Reshuffle/ReshufflePerKey_34"
> subtransforms: 
> "ref_AppliedPTransform_write/Write/WriteImpl/DoOnce/Read/Reshuffle/RemoveRandomKeys_41"
> inputs {
>   key: "0"
>   value: "ref_PCollection_PCollection_19"
> }
> outputs {
>   key: "None"
>   value: "ref_PCollection_PCollection_26"
> }
> unique_name: "write/Write/WriteImpl/DoOnce/Read/Reshuffle"
> }, 
> PTransformNode{id=ref_AppliedPTransform_write/Write/WriteImpl/DoOnce/Read/Reshuffle/RemoveRandomKeys_41,
>  transform=spec {
>   urn: "urn:beam:transform:pardo:v1"
>   payload: "\n\317\006\n\255\006\n 
> 

[jira] [Work logged] (BEAM-5441) Portable Wordcount fails in GreedyPipelineFuser

2018-09-21 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5441?focusedWorklogId=146429=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-146429
 ]

ASF GitHub Bot logged work on BEAM-5441:


Author: ASF GitHub Bot
Created on: 21/Sep/18 16:05
Start Date: 21/Sep/18 16:05
Worklog Time Spent: 10m 
  Work Description: tweise commented on a change in pull request #6461: 
[BEAM-5441] extend primitive transform check to find subtransform leaves
URL: https://github.com/apache/beam/pull/6461#discussion_r219548811
 
 

 ##
 File path: 
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/QueryablePipeline.java
 ##
 @@ -123,15 +125,27 @@ private QueryablePipeline(Collection 
transformIds, Components components
   @VisibleForTesting
   static Collection getPrimitiveTransformIds(RunnerApi.Components 
components) {
 Collection ids = new LinkedHashSet<>();
+
 for (Map.Entry transformEntry : 
components.getTransformsMap().entrySet()) {
   PTransform transform = transformEntry.getValue();
   boolean isPrimitive = isPrimitiveTransform(transform);
   if (isPrimitive) {
-List subtransforms = transform.getSubtransformsList();
-if (subtransforms.isEmpty()) {
-  ids.add(transformEntry.getKey());
-} else {
-  ids.addAll(subtransforms);
+// Sometimes "primitive" transforms have sub-transforms (and even 
deeper-nested descendents), due to runners
+// either rewriting them in terms of runner-specific transforms, or 
SDKs constructing them in terms of other
+// underlying transforms (see discussion on BEAM-5441).
 
 Review comment:
   "see discussion on BEAM-5441" => "see 
https://issues.apache.org/jira/browse/BEAM-5441;


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 146429)
Time Spent: 2h  (was: 1h 50m)

> Portable Wordcount fails in GreedyPipelineFuser
> ---
>
> Key: BEAM-5441
> URL: https://issues.apache.org/jira/browse/BEAM-5441
> Project: Beam
>  Issue Type: New Feature
>  Components: examples-python, sdk-java-core
>Affects Versions: 2.8.0
>Reporter: Maximilian Michels
>Assignee: Ryan Williams
>Priority: Major
>  Labels: portability, portability-flink
> Fix For: 2.8.0
>
>  Time Spent: 2h
>  Remaining Estimate: 0h
>
> The Python SDK wordcount with the PortableRunner throws the following 
> exception:
> {noformat}
> java.lang.IllegalArgumentException: A PCollectionNode should have exactly one 
> producing PTransformNode, PCollectionNode{id=ref_PCollection_PCollection_26, 
> PCollection=unique_name: 
> "60write/Write/WriteImpl/DoOnce/Read/Reshuffle/RemoveRandomKeys.None"
> coder_id: "ref_Coder_FastPrimitivesCoder_2"
> is_bounded: BOUNDED
> windowing_strategy_id: "ref_Windowing_Windowing_1"
> } has 
> [PTransformNode{id=ref_AppliedPTransform_write/Write/WriteImpl/DoOnce/Read/Reshuffle_32,
>  transform=spec {
>   urn: "beam:transform:generic_composite:v1"
>   payload: ""
> }
> subtransforms: 
> "ref_AppliedPTransform_write/Write/WriteImpl/DoOnce/Read/Reshuffle/AddRandomKeys_33"
> subtransforms: 
> "ref_AppliedPTransform_write/Write/WriteImpl/DoOnce/Read/Reshuffle/ReshufflePerKey_34"
> subtransforms: 
> "ref_AppliedPTransform_write/Write/WriteImpl/DoOnce/Read/Reshuffle/RemoveRandomKeys_41"
> inputs {
>   key: "0"
>   value: "ref_PCollection_PCollection_19"
> }
> outputs {
>   key: "None"
>   value: "ref_PCollection_PCollection_26"
> }
> unique_name: "write/Write/WriteImpl/DoOnce/Read/Reshuffle"
> }, 
> PTransformNode{id=ref_AppliedPTransform_write/Write/WriteImpl/DoOnce/Read/Reshuffle/RemoveRandomKeys_41,
>  transform=spec {
>   urn: "urn:beam:transform:pardo:v1"
>   payload: "\n\317\006\n\255\006\n 
> 

[jira] [Work logged] (BEAM-5441) Portable Wordcount fails in GreedyPipelineFuser

2018-09-21 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5441?focusedWorklogId=146426=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-146426
 ]

ASF GitHub Bot logged work on BEAM-5441:


Author: ASF GitHub Bot
Created on: 21/Sep/18 16:01
Start Date: 21/Sep/18 16:01
Worklog Time Spent: 10m 
  Work Description: ryan-williams commented on a change in pull request 
#6461: [BEAM-5441] extend primitive transform check to find subtransform leaves
URL: https://github.com/apache/beam/pull/6461#discussion_r219547575
 
 

 ##
 File path: 
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/QueryablePipeline.java
 ##
 @@ -123,15 +125,24 @@ private QueryablePipeline(Collection 
transformIds, Components components
   @VisibleForTesting
   static Collection getPrimitiveTransformIds(RunnerApi.Components 
components) {
 Collection ids = new LinkedHashSet<>();
+
 for (Map.Entry transformEntry : 
components.getTransformsMap().entrySet()) {
   PTransform transform = transformEntry.getValue();
   boolean isPrimitive = isPrimitiveTransform(transform);
   if (isPrimitive) {
-List subtransforms = transform.getSubtransformsList();
-if (subtransforms.isEmpty()) {
-  ids.add(transformEntry.getKey());
-} else {
-  ids.addAll(subtransforms);
+// for a "primitive" transform, we want to keep only "leaf" 
sub-transform descendents, if any (in the common
 
 Review comment:
   @robertwb
   
   > _a runner_ might do substitution and implement a primitive in terms of 
other primitives, _but leave the original in the graph_...
   
   (mostly to check my own understanding): here it's the python SDK, not a 
runner, that:
   1. implements `Read` (normally a primitive) in terms of other transforms 
(including a `Reshuffle`, which in turn wraps a `ParDo` primitive),
   2. leaves the original in the graph
   
   The _runner_ substitution case (`ReferenceRunner`) does not leave the 
original in the graph, and the error we were getting there had to do with a 
PCollection being left with no "primitive" transform that produced it.
   
   @tweise good idea, I've updated the comment and force-pushed it here


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 146426)
Time Spent: 1h 40m  (was: 1.5h)

> Portable Wordcount fails in GreedyPipelineFuser
> ---
>
> Key: BEAM-5441
> URL: https://issues.apache.org/jira/browse/BEAM-5441
> Project: Beam
>  Issue Type: New Feature
>  Components: examples-python, sdk-java-core
>Affects Versions: 2.8.0
>Reporter: Maximilian Michels
>Assignee: Ryan Williams
>Priority: Major
>  Labels: portability, portability-flink
> Fix For: 2.8.0
>
>  Time Spent: 1h 40m
>  Remaining Estimate: 0h
>
> The Python SDK wordcount with the PortableRunner throws the following 
> exception:
> {noformat}
> java.lang.IllegalArgumentException: A PCollectionNode should have exactly one 
> producing PTransformNode, PCollectionNode{id=ref_PCollection_PCollection_26, 
> PCollection=unique_name: 
> "60write/Write/WriteImpl/DoOnce/Read/Reshuffle/RemoveRandomKeys.None"
> coder_id: "ref_Coder_FastPrimitivesCoder_2"
> is_bounded: BOUNDED
> windowing_strategy_id: "ref_Windowing_Windowing_1"
> } has 
> [PTransformNode{id=ref_AppliedPTransform_write/Write/WriteImpl/DoOnce/Read/Reshuffle_32,
>  transform=spec {
>   urn: "beam:transform:generic_composite:v1"
>   payload: ""
> }
> subtransforms: 
> "ref_AppliedPTransform_write/Write/WriteImpl/DoOnce/Read/Reshuffle/AddRandomKeys_33"
> subtransforms: 
> "ref_AppliedPTransform_write/Write/WriteImpl/DoOnce/Read/Reshuffle/ReshufflePerKey_34"
> subtransforms: 
> "ref_AppliedPTransform_write/Write/WriteImpl/DoOnce/Read/Reshuffle/RemoveRandomKeys_41"
> inputs {
>   key: "0"
>   value: "ref_PCollection_PCollection_19"
> }
> outputs {
>   key: "None"
>   value: "ref_PCollection_PCollection_26"
> }
> unique_name: "write/Write/WriteImpl/DoOnce/Read/Reshuffle"
> }, 
> PTransformNode{id=ref_AppliedPTransform_write/Write/WriteImpl/DoOnce/Read/Reshuffle/RemoveRandomKeys_41,
>  transform=spec {
>   urn: "urn:beam:transform:pardo:v1"
>   payload: "\n\317\006\n\255\006\n 
> 

[jira] [Work logged] (BEAM-5441) Portable Wordcount fails in GreedyPipelineFuser

2018-09-21 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5441?focusedWorklogId=146427=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-146427
 ]

ASF GitHub Bot logged work on BEAM-5441:


Author: ASF GitHub Bot
Created on: 21/Sep/18 16:01
Start Date: 21/Sep/18 16:01
Worklog Time Spent: 10m 
  Work Description: ryan-williams commented on a change in pull request 
#6461: [BEAM-5441] extend primitive transform check to find subtransform leaves
URL: https://github.com/apache/beam/pull/6461#discussion_r219547575
 
 

 ##
 File path: 
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/QueryablePipeline.java
 ##
 @@ -123,15 +125,24 @@ private QueryablePipeline(Collection 
transformIds, Components components
   @VisibleForTesting
   static Collection getPrimitiveTransformIds(RunnerApi.Components 
components) {
 Collection ids = new LinkedHashSet<>();
+
 for (Map.Entry transformEntry : 
components.getTransformsMap().entrySet()) {
   PTransform transform = transformEntry.getValue();
   boolean isPrimitive = isPrimitiveTransform(transform);
   if (isPrimitive) {
-List subtransforms = transform.getSubtransformsList();
-if (subtransforms.isEmpty()) {
-  ids.add(transformEntry.getKey());
-} else {
-  ids.addAll(subtransforms);
+// for a "primitive" transform, we want to keep only "leaf" 
sub-transform descendents, if any (in the common
 
 Review comment:
   @robertwb
   
   > _a runner_ might do substitution and implement a primitive in terms of 
other primitives, _but leave the original in the graph_...
   
   (mostly to check my own understanding): here it's the python SDK, not a 
runner, that:
   1. implements `Read` (normally a primitive) in terms of other transforms 
(including a `Reshuffle`, which in turn wraps a `ParDo` primitive),
   2. leaves the original (`Read`) in the graph
   
   The _runner_ substitution case (`ReferenceRunner`) does not leave the 
original in the graph, and the error we were getting there had to do with a 
PCollection being left with no "primitive" transform that produced it.
   
   @tweise good idea, I've updated the comment and force-pushed it here


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 146427)
Time Spent: 1h 50m  (was: 1h 40m)

> Portable Wordcount fails in GreedyPipelineFuser
> ---
>
> Key: BEAM-5441
> URL: https://issues.apache.org/jira/browse/BEAM-5441
> Project: Beam
>  Issue Type: New Feature
>  Components: examples-python, sdk-java-core
>Affects Versions: 2.8.0
>Reporter: Maximilian Michels
>Assignee: Ryan Williams
>Priority: Major
>  Labels: portability, portability-flink
> Fix For: 2.8.0
>
>  Time Spent: 1h 50m
>  Remaining Estimate: 0h
>
> The Python SDK wordcount with the PortableRunner throws the following 
> exception:
> {noformat}
> java.lang.IllegalArgumentException: A PCollectionNode should have exactly one 
> producing PTransformNode, PCollectionNode{id=ref_PCollection_PCollection_26, 
> PCollection=unique_name: 
> "60write/Write/WriteImpl/DoOnce/Read/Reshuffle/RemoveRandomKeys.None"
> coder_id: "ref_Coder_FastPrimitivesCoder_2"
> is_bounded: BOUNDED
> windowing_strategy_id: "ref_Windowing_Windowing_1"
> } has 
> [PTransformNode{id=ref_AppliedPTransform_write/Write/WriteImpl/DoOnce/Read/Reshuffle_32,
>  transform=spec {
>   urn: "beam:transform:generic_composite:v1"
>   payload: ""
> }
> subtransforms: 
> "ref_AppliedPTransform_write/Write/WriteImpl/DoOnce/Read/Reshuffle/AddRandomKeys_33"
> subtransforms: 
> "ref_AppliedPTransform_write/Write/WriteImpl/DoOnce/Read/Reshuffle/ReshufflePerKey_34"
> subtransforms: 
> "ref_AppliedPTransform_write/Write/WriteImpl/DoOnce/Read/Reshuffle/RemoveRandomKeys_41"
> inputs {
>   key: "0"
>   value: "ref_PCollection_PCollection_19"
> }
> outputs {
>   key: "None"
>   value: "ref_PCollection_PCollection_26"
> }
> unique_name: "write/Write/WriteImpl/DoOnce/Read/Reshuffle"
> }, 
> PTransformNode{id=ref_AppliedPTransform_write/Write/WriteImpl/DoOnce/Read/Reshuffle/RemoveRandomKeys_41,
>  transform=spec {
>   urn: "urn:beam:transform:pardo:v1"
>   payload: "\n\317\006\n\255\006\n 
> 

[jira] [Work logged] (BEAM-5441) Portable Wordcount fails in GreedyPipelineFuser

2018-09-21 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5441?focusedWorklogId=146423=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-146423
 ]

ASF GitHub Bot logged work on BEAM-5441:


Author: ASF GitHub Bot
Created on: 21/Sep/18 15:46
Start Date: 21/Sep/18 15:46
Worklog Time Spent: 10m 
  Work Description: tweise commented on a change in pull request #6461: 
[BEAM-5441] extend primitive transform check to find subtransform leaves
URL: https://github.com/apache/beam/pull/6461#discussion_r219543194
 
 

 ##
 File path: 
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/QueryablePipeline.java
 ##
 @@ -123,15 +125,24 @@ private QueryablePipeline(Collection 
transformIds, Components components
   @VisibleForTesting
   static Collection getPrimitiveTransformIds(RunnerApi.Components 
components) {
 Collection ids = new LinkedHashSet<>();
+
 for (Map.Entry transformEntry : 
components.getTransformsMap().entrySet()) {
   PTransform transform = transformEntry.getValue();
   boolean isPrimitive = isPrimitiveTransform(transform);
   if (isPrimitive) {
-List subtransforms = transform.getSubtransformsList();
-if (subtransforms.isEmpty()) {
-  ids.add(transformEntry.getKey());
-} else {
-  ids.addAll(subtransforms);
+// for a "primitive" transform, we want to keep only "leaf" 
sub-transform descendents, if any (in the common
 
 Review comment:
   @ryan-williams it may be good to refer to that JIRA in the comment for 
context.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 146423)
Time Spent: 1.5h  (was: 1h 20m)

> Portable Wordcount fails in GreedyPipelineFuser
> ---
>
> Key: BEAM-5441
> URL: https://issues.apache.org/jira/browse/BEAM-5441
> Project: Beam
>  Issue Type: New Feature
>  Components: examples-python, sdk-java-core
>Affects Versions: 2.8.0
>Reporter: Maximilian Michels
>Assignee: Ryan Williams
>Priority: Major
>  Labels: portability, portability-flink
> Fix For: 2.8.0
>
>  Time Spent: 1.5h
>  Remaining Estimate: 0h
>
> The Python SDK wordcount with the PortableRunner throws the following 
> exception:
> {noformat}
> java.lang.IllegalArgumentException: A PCollectionNode should have exactly one 
> producing PTransformNode, PCollectionNode{id=ref_PCollection_PCollection_26, 
> PCollection=unique_name: 
> "60write/Write/WriteImpl/DoOnce/Read/Reshuffle/RemoveRandomKeys.None"
> coder_id: "ref_Coder_FastPrimitivesCoder_2"
> is_bounded: BOUNDED
> windowing_strategy_id: "ref_Windowing_Windowing_1"
> } has 
> [PTransformNode{id=ref_AppliedPTransform_write/Write/WriteImpl/DoOnce/Read/Reshuffle_32,
>  transform=spec {
>   urn: "beam:transform:generic_composite:v1"
>   payload: ""
> }
> subtransforms: 
> "ref_AppliedPTransform_write/Write/WriteImpl/DoOnce/Read/Reshuffle/AddRandomKeys_33"
> subtransforms: 
> "ref_AppliedPTransform_write/Write/WriteImpl/DoOnce/Read/Reshuffle/ReshufflePerKey_34"
> subtransforms: 
> "ref_AppliedPTransform_write/Write/WriteImpl/DoOnce/Read/Reshuffle/RemoveRandomKeys_41"
> inputs {
>   key: "0"
>   value: "ref_PCollection_PCollection_19"
> }
> outputs {
>   key: "None"
>   value: "ref_PCollection_PCollection_26"
> }
> unique_name: "write/Write/WriteImpl/DoOnce/Read/Reshuffle"
> }, 
> PTransformNode{id=ref_AppliedPTransform_write/Write/WriteImpl/DoOnce/Read/Reshuffle/RemoveRandomKeys_41,
>  transform=spec {
>   urn: "urn:beam:transform:pardo:v1"
>   payload: "\n\317\006\n\255\006\n 
> beam:dofn:pickled_python_info:v1\032\210\006eNrFkttP1EAUxtsFZe1yUcEL4v3aRbdFUEFFQEHUEGpSnPhkxml32Gnstv3a6YZNbKIxJf7ZTvdB2ER9NZNMcs6cb3LO73zfaqbPEuYLTj3OupZMWZTtx2k3s/w45cYmC0PmhfxjypKEp1vxdmRAa36HXqBmkrEkjX2eZRjx20EYWrS6DeqnnElO9/PIl0GsFKPm0HsYszaV/YQbOEHqm3Gbf1ABTpYYc1E3d3R1arvTG2Tip6Z91bQfutbRtT2cckoYTaIfoFFinPRtkvE0s7vswN7iPbuaoCV5Ju0ej3p2GHh20pcijhatZTsLJG+pSb+wDs/sYzO3Fq0Va8Fq895CK+mrUot3OscL7CModgXFSvqYIPXVkHW9NlvD5G5jlGiYIrX9CKdLnGlKnHUx7VPq5UEog4hSo8MlkzI1MDNEIugmcSppN27noaJxjsz9Yxs4X+KCi4ukTpXcl5Ri9hCXXMyJSedPC/C5CnBZjJriN9W9z6SukLZ1bXYPV5wd/RBXFVKJayWu/w+kuQzCCukNMbm7XhNTTYXvpotbYkb8HUclwu0Sd1zcFQrCPRemguAUaJLGwFpUBJHMMD9sb/UwyKveFFEm4zQz3r2v3Pe2Shu4r7z9oECrgGWSRhAluRx8l8F2yHicy6PEgpMf4qGXSSy6WCrxyMXjEk8KLJtiXlQfrRR4WuCZKWxHDKqfe6o7lnayhPtUuWVVLOUSL1ysDXe9PpBvFHhZ4NWRfNMjI5VsS6zl3ie8LrDtOBJvrF+Bv0km\022\035ref_Environment_Environment_1"
> }
> inputs {
>   key: 

[jira] [Work logged] (BEAM-5441) Portable Wordcount fails in GreedyPipelineFuser

2018-09-21 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5441?focusedWorklogId=146418=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-146418
 ]

ASF GitHub Bot logged work on BEAM-5441:


Author: ASF GitHub Bot
Created on: 21/Sep/18 15:41
Start Date: 21/Sep/18 15:41
Worklog Time Spent: 10m 
  Work Description: robertwb commented on a change in pull request #6461: 
[BEAM-5441] extend primitive transform check to find subtransform leaves
URL: https://github.com/apache/beam/pull/6461#discussion_r219541510
 
 

 ##
 File path: 
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/QueryablePipeline.java
 ##
 @@ -123,15 +125,24 @@ private QueryablePipeline(Collection 
transformIds, Components components
   @VisibleForTesting
   static Collection getPrimitiveTransformIds(RunnerApi.Components 
components) {
 Collection ids = new LinkedHashSet<>();
+
 for (Map.Entry transformEntry : 
components.getTransformsMap().entrySet()) {
   PTransform transform = transformEntry.getValue();
   boolean isPrimitive = isPrimitiveTransform(transform);
   if (isPrimitive) {
-List subtransforms = transform.getSubtransformsList();
-if (subtransforms.isEmpty()) {
-  ids.add(transformEntry.getKey());
-} else {
-  ids.addAll(subtransforms);
+// for a "primitive" transform, we want to keep only "leaf" 
sub-transform descendents, if any (in the common
 
 Review comment:
   Thanks for the pointer. so a runner might do substitution and implement a 
primitive in terms of other primitives, but leave the original in the graph... 
   
   My take is that the graph should be correct (by some stricter definition) 
when it's submitted, but a runner can do things that make it "less" correct and 
we should perhaps not have as strict of checks after the fact. Treating 
transforms with primitive URNs but sub-transforms as composites makes sense 
here. 
   
   FWIW, it's worth getting this in to unblock things.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 146418)
Time Spent: 1h 20m  (was: 1h 10m)

> Portable Wordcount fails in GreedyPipelineFuser
> ---
>
> Key: BEAM-5441
> URL: https://issues.apache.org/jira/browse/BEAM-5441
> Project: Beam
>  Issue Type: New Feature
>  Components: examples-python, sdk-java-core
>Affects Versions: 2.8.0
>Reporter: Maximilian Michels
>Assignee: Ryan Williams
>Priority: Major
>  Labels: portability, portability-flink
> Fix For: 2.8.0
>
>  Time Spent: 1h 20m
>  Remaining Estimate: 0h
>
> The Python SDK wordcount with the PortableRunner throws the following 
> exception:
> {noformat}
> java.lang.IllegalArgumentException: A PCollectionNode should have exactly one 
> producing PTransformNode, PCollectionNode{id=ref_PCollection_PCollection_26, 
> PCollection=unique_name: 
> "60write/Write/WriteImpl/DoOnce/Read/Reshuffle/RemoveRandomKeys.None"
> coder_id: "ref_Coder_FastPrimitivesCoder_2"
> is_bounded: BOUNDED
> windowing_strategy_id: "ref_Windowing_Windowing_1"
> } has 
> [PTransformNode{id=ref_AppliedPTransform_write/Write/WriteImpl/DoOnce/Read/Reshuffle_32,
>  transform=spec {
>   urn: "beam:transform:generic_composite:v1"
>   payload: ""
> }
> subtransforms: 
> "ref_AppliedPTransform_write/Write/WriteImpl/DoOnce/Read/Reshuffle/AddRandomKeys_33"
> subtransforms: 
> "ref_AppliedPTransform_write/Write/WriteImpl/DoOnce/Read/Reshuffle/ReshufflePerKey_34"
> subtransforms: 
> "ref_AppliedPTransform_write/Write/WriteImpl/DoOnce/Read/Reshuffle/RemoveRandomKeys_41"
> inputs {
>   key: "0"
>   value: "ref_PCollection_PCollection_19"
> }
> outputs {
>   key: "None"
>   value: "ref_PCollection_PCollection_26"
> }
> unique_name: "write/Write/WriteImpl/DoOnce/Read/Reshuffle"
> }, 
> PTransformNode{id=ref_AppliedPTransform_write/Write/WriteImpl/DoOnce/Read/Reshuffle/RemoveRandomKeys_41,
>  transform=spec {
>   urn: "urn:beam:transform:pardo:v1"
>   payload: "\n\317\006\n\255\006\n 
> 

[jira] [Work logged] (BEAM-5441) Portable Wordcount fails in GreedyPipelineFuser

2018-09-21 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5441?focusedWorklogId=146414=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-146414
 ]

ASF GitHub Bot logged work on BEAM-5441:


Author: ASF GitHub Bot
Created on: 21/Sep/18 15:28
Start Date: 21/Sep/18 15:28
Worklog Time Spent: 10m 
  Work Description: ryan-williams commented on a change in pull request 
#6461: [BEAM-5441] extend primitive transform check to find subtransform leaves
URL: https://github.com/apache/beam/pull/6461#discussion_r219537254
 
 

 ##
 File path: 
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/QueryablePipeline.java
 ##
 @@ -123,15 +125,24 @@ private QueryablePipeline(Collection 
transformIds, Components components
   @VisibleForTesting
   static Collection getPrimitiveTransformIds(RunnerApi.Components 
components) {
 Collection ids = new LinkedHashSet<>();
+
 for (Map.Entry transformEntry : 
components.getTransformsMap().entrySet()) {
   PTransform transform = transformEntry.getValue();
   boolean isPrimitive = isPrimitiveTransform(transform);
   if (isPrimitive) {
-List subtransforms = transform.getSubtransformsList();
-if (subtransforms.isEmpty()) {
-  ids.add(transformEntry.getKey());
-} else {
-  ids.addAll(subtransforms);
+// for a "primitive" transform, we want to keep only "leaf" 
sub-transform descendents, if any (in the common
 
 Review comment:
   [This write-up on 
BEAM-5441](https://issues.apache.org/jira/browse/BEAM-5441?focusedCommentId=16622983=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-16622983)
 gives the best context:
   
   > DirectRunner [replaces a 
GBK](https://github.com/apache/beam/blob/master/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java#L264-L266)
 (normally a primitive transform) with [a wrapper for two runner-specific 
transforms ("group by key only" and "group also by 
window")](https://github.com/apache/beam/blob/master/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/DirectGroupByKey.java#L63-L70),
 and the expectation is that the wrapped transforms are considered 
"primitives", but the wrapper is not
   >
   > … in the wordcount case, the `Reshuffle` gets added as a primitive 
transform (being a child of the otherwise-primitive `Read`) and, separately, 
its child `RemoveRandomKeys` (URN `urn:beam:transform:pardo:v1`) gets marked 
primitive, so there are two "primitive" transforms that have the same output 
PCollection, failing [the 
assertion](https://github.com/apache/beam/blob/9d0b03ad736797c33fbd6a4eb13155fa91367779/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/QueryablePipeline.java#L176-L182).
   
   Basically I've been collecting cases where "primitive" transforms end up 
with descendents, and trying to do the right thing here as each comes to light!
   
   Definitely interested in guidance from folks like yourself that have a more 
complete "big picture" understanding of whether "primitives" should never have 
children (let alone deeper descendents; this was the original definition of 
"primitive" pre-#6328, but seemed to have false-positives and -negatives), or 
how to handle the cases where they do (if not how I've prototyped here).


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 146414)
Time Spent: 1h 10m  (was: 1h)

> Portable Wordcount fails in GreedyPipelineFuser
> ---
>
> Key: BEAM-5441
> URL: https://issues.apache.org/jira/browse/BEAM-5441
> Project: Beam
>  Issue Type: New Feature
>  Components: examples-python, sdk-java-core
>Affects Versions: 2.8.0
>Reporter: Maximilian Michels
>Assignee: Ryan Williams
>Priority: Major
>  Labels: portability, portability-flink
> Fix For: 2.8.0
>
>  Time Spent: 1h 10m
>  Remaining Estimate: 0h
>
> The Python SDK wordcount with the PortableRunner throws the following 
> exception:
> {noformat}
> java.lang.IllegalArgumentException: A PCollectionNode should have exactly one 
> producing PTransformNode, PCollectionNode{id=ref_PCollection_PCollection_26, 
> PCollection=unique_name: 
> "60write/Write/WriteImpl/DoOnce/Read/Reshuffle/RemoveRandomKeys.None"
> coder_id: "ref_Coder_FastPrimitivesCoder_2"
> is_bounded: BOUNDED
> windowing_strategy_id: 

[jira] [Work logged] (BEAM-5441) Portable Wordcount fails in GreedyPipelineFuser

2018-09-21 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5441?focusedWorklogId=146411=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-146411
 ]

ASF GitHub Bot logged work on BEAM-5441:


Author: ASF GitHub Bot
Created on: 21/Sep/18 15:22
Start Date: 21/Sep/18 15:22
Worklog Time Spent: 10m 
  Work Description: tweise commented on issue #6461: [BEAM-5441] extend 
primitive transform check to find subtransform leaves
URL: https://github.com/apache/beam/pull/6461#issuecomment-423571589
 
 
   @mxm @angoenka @ryan-williams I just tried this PR on top of master and 
verified that this fixes the issue with running wordcount. I see that a single 
container is started for the pipeline and that the pipeline succeeds, with 
local file system, as expected.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 146411)
Time Spent: 1h  (was: 50m)

> Portable Wordcount fails in GreedyPipelineFuser
> ---
>
> Key: BEAM-5441
> URL: https://issues.apache.org/jira/browse/BEAM-5441
> Project: Beam
>  Issue Type: New Feature
>  Components: examples-python, sdk-java-core
>Affects Versions: 2.8.0
>Reporter: Maximilian Michels
>Assignee: Ryan Williams
>Priority: Major
>  Labels: portability, portability-flink
> Fix For: 2.8.0
>
>  Time Spent: 1h
>  Remaining Estimate: 0h
>
> The Python SDK wordcount with the PortableRunner throws the following 
> exception:
> {noformat}
> java.lang.IllegalArgumentException: A PCollectionNode should have exactly one 
> producing PTransformNode, PCollectionNode{id=ref_PCollection_PCollection_26, 
> PCollection=unique_name: 
> "60write/Write/WriteImpl/DoOnce/Read/Reshuffle/RemoveRandomKeys.None"
> coder_id: "ref_Coder_FastPrimitivesCoder_2"
> is_bounded: BOUNDED
> windowing_strategy_id: "ref_Windowing_Windowing_1"
> } has 
> [PTransformNode{id=ref_AppliedPTransform_write/Write/WriteImpl/DoOnce/Read/Reshuffle_32,
>  transform=spec {
>   urn: "beam:transform:generic_composite:v1"
>   payload: ""
> }
> subtransforms: 
> "ref_AppliedPTransform_write/Write/WriteImpl/DoOnce/Read/Reshuffle/AddRandomKeys_33"
> subtransforms: 
> "ref_AppliedPTransform_write/Write/WriteImpl/DoOnce/Read/Reshuffle/ReshufflePerKey_34"
> subtransforms: 
> "ref_AppliedPTransform_write/Write/WriteImpl/DoOnce/Read/Reshuffle/RemoveRandomKeys_41"
> inputs {
>   key: "0"
>   value: "ref_PCollection_PCollection_19"
> }
> outputs {
>   key: "None"
>   value: "ref_PCollection_PCollection_26"
> }
> unique_name: "write/Write/WriteImpl/DoOnce/Read/Reshuffle"
> }, 
> PTransformNode{id=ref_AppliedPTransform_write/Write/WriteImpl/DoOnce/Read/Reshuffle/RemoveRandomKeys_41,
>  transform=spec {
>   urn: "urn:beam:transform:pardo:v1"
>   payload: "\n\317\006\n\255\006\n 
> beam:dofn:pickled_python_info:v1\032\210\006eNrFkttP1EAUxtsFZe1yUcEL4v3aRbdFUEFFQEHUEGpSnPhkxml32Gnstv3a6YZNbKIxJf7ZTvdB2ER9NZNMcs6cb3LO73zfaqbPEuYLTj3OupZMWZTtx2k3s/w45cYmC0PmhfxjypKEp1vxdmRAa36HXqBmkrEkjX2eZRjx20EYWrS6DeqnnElO9/PIl0GsFKPm0HsYszaV/YQbOEHqm3Gbf1ABTpYYc1E3d3R1arvTG2Tip6Z91bQfutbRtT2cckoYTaIfoFFinPRtkvE0s7vswN7iPbuaoCV5Ju0ej3p2GHh20pcijhatZTsLJG+pSb+wDs/sYzO3Fq0Va8Fq895CK+mrUot3OscL7CModgXFSvqYIPXVkHW9NlvD5G5jlGiYIrX9CKdLnGlKnHUx7VPq5UEog4hSo8MlkzI1MDNEIugmcSppN27noaJxjsz9Yxs4X+KCi4ukTpXcl5Ri9hCXXMyJSedPC/C5CnBZjJriN9W9z6SukLZ1bXYPV5wd/RBXFVKJayWu/w+kuQzCCukNMbm7XhNTTYXvpotbYkb8HUclwu0Sd1zcFQrCPRemguAUaJLGwFpUBJHMMD9sb/UwyKveFFEm4zQz3r2v3Pe2Shu4r7z9oECrgGWSRhAluRx8l8F2yHicy6PEgpMf4qGXSSy6WCrxyMXjEk8KLJtiXlQfrRR4WuCZKWxHDKqfe6o7lnayhPtUuWVVLOUSL1ysDXe9PpBvFHhZ4NWRfNMjI5VsS6zl3ie8LrDtOBJvrF+Bv0km\022\035ref_Environment_Environment_1"
> }
> inputs {
>   key: "0"
>   value: "ref_PCollection_PCollection_25"
> }
> outputs {
>   key: "None"
>   value: "ref_PCollection_PCollection_26"
> }
> unique_name: "write/Write/WriteImpl/DoOnce/Read/Reshuffle/RemoveRandomKeys"
> }]
> at 
> org.apache.beam.repackaged.beam_runners_core_construction_java.com.google.common.base.Preconditions.checkArgument(Preconditions.java:416)
> at 
> org.apache.beam.runners.core.construction.graph.QueryablePipeline.buildNetwork(QueryablePipeline.java:176)
> at 
> org.apache.beam.runners.core.construction.graph.QueryablePipeline.(QueryablePipeline.java:119)
> at 
> org.apache.beam.runners.core.construction.graph.QueryablePipeline.forPrimitivesIn(QueryablePipeline.java:82)
> at 
> 

[jira] [Work logged] (BEAM-5441) Portable Wordcount fails in GreedyPipelineFuser

2018-09-21 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5441?focusedWorklogId=146268=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-146268
 ]

ASF GitHub Bot logged work on BEAM-5441:


Author: ASF GitHub Bot
Created on: 21/Sep/18 07:46
Start Date: 21/Sep/18 07:46
Worklog Time Spent: 10m 
  Work Description: robertwb commented on a change in pull request #6461: 
[BEAM-5441] extend primitive transform check to find subtransform leaves
URL: https://github.com/apache/beam/pull/6461#discussion_r219409800
 
 

 ##
 File path: 
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/QueryablePipeline.java
 ##
 @@ -123,15 +125,24 @@ private QueryablePipeline(Collection 
transformIds, Components components
   @VisibleForTesting
   static Collection getPrimitiveTransformIds(RunnerApi.Components 
components) {
 Collection ids = new LinkedHashSet<>();
+
 for (Map.Entry transformEntry : 
components.getTransformsMap().entrySet()) {
   PTransform transform = transformEntry.getValue();
   boolean isPrimitive = isPrimitiveTransform(transform);
   if (isPrimitive) {
-List subtransforms = transform.getSubtransformsList();
-if (subtransforms.isEmpty()) {
-  ids.add(transformEntry.getKey());
-} else {
-  ids.addAll(subtransforms);
+// for a "primitive" transform, we want to keep only "leaf" 
sub-transform descendents, if any (in the common
 
 Review comment:
   Why would a primitive transform ever have leafs? 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 146268)
Time Spent: 50m  (was: 40m)

> Portable Wordcount fails in GreedyPipelineFuser
> ---
>
> Key: BEAM-5441
> URL: https://issues.apache.org/jira/browse/BEAM-5441
> Project: Beam
>  Issue Type: New Feature
>  Components: examples-python, sdk-java-core
>Affects Versions: 2.8.0
>Reporter: Maximilian Michels
>Assignee: Ryan Williams
>Priority: Major
>  Labels: portability, portability-flink
> Fix For: 2.8.0
>
>  Time Spent: 50m
>  Remaining Estimate: 0h
>
> The Python SDK wordcount with the PortableRunner throws the following 
> exception:
> {noformat}
> java.lang.IllegalArgumentException: A PCollectionNode should have exactly one 
> producing PTransformNode, PCollectionNode{id=ref_PCollection_PCollection_26, 
> PCollection=unique_name: 
> "60write/Write/WriteImpl/DoOnce/Read/Reshuffle/RemoveRandomKeys.None"
> coder_id: "ref_Coder_FastPrimitivesCoder_2"
> is_bounded: BOUNDED
> windowing_strategy_id: "ref_Windowing_Windowing_1"
> } has 
> [PTransformNode{id=ref_AppliedPTransform_write/Write/WriteImpl/DoOnce/Read/Reshuffle_32,
>  transform=spec {
>   urn: "beam:transform:generic_composite:v1"
>   payload: ""
> }
> subtransforms: 
> "ref_AppliedPTransform_write/Write/WriteImpl/DoOnce/Read/Reshuffle/AddRandomKeys_33"
> subtransforms: 
> "ref_AppliedPTransform_write/Write/WriteImpl/DoOnce/Read/Reshuffle/ReshufflePerKey_34"
> subtransforms: 
> "ref_AppliedPTransform_write/Write/WriteImpl/DoOnce/Read/Reshuffle/RemoveRandomKeys_41"
> inputs {
>   key: "0"
>   value: "ref_PCollection_PCollection_19"
> }
> outputs {
>   key: "None"
>   value: "ref_PCollection_PCollection_26"
> }
> unique_name: "write/Write/WriteImpl/DoOnce/Read/Reshuffle"
> }, 
> PTransformNode{id=ref_AppliedPTransform_write/Write/WriteImpl/DoOnce/Read/Reshuffle/RemoveRandomKeys_41,
>  transform=spec {
>   urn: "urn:beam:transform:pardo:v1"
>   payload: "\n\317\006\n\255\006\n 
> beam:dofn:pickled_python_info:v1\032\210\006eNrFkttP1EAUxtsFZe1yUcEL4v3aRbdFUEFFQEHUEGpSnPhkxml32Gnstv3a6YZNbKIxJf7ZTvdB2ER9NZNMcs6cb3LO73zfaqbPEuYLTj3OupZMWZTtx2k3s/w45cYmC0PmhfxjypKEp1vxdmRAa36HXqBmkrEkjX2eZRjx20EYWrS6DeqnnElO9/PIl0GsFKPm0HsYszaV/YQbOEHqm3Gbf1ABTpYYc1E3d3R1arvTG2Tip6Z91bQfutbRtT2cckoYTaIfoFFinPRtkvE0s7vswN7iPbuaoCV5Ju0ej3p2GHh20pcijhatZTsLJG+pSb+wDs/sYzO3Fq0Va8Fq895CK+mrUot3OscL7CModgXFSvqYIPXVkHW9NlvD5G5jlGiYIrX9CKdLnGlKnHUx7VPq5UEog4hSo8MlkzI1MDNEIugmcSppN27noaJxjsz9Yxs4X+KCi4ukTpXcl5Ri9hCXXMyJSedPC/C5CnBZjJriN9W9z6SukLZ1bXYPV5wd/RBXFVKJayWu/w+kuQzCCukNMbm7XhNTTYXvpotbYkb8HUclwu0Sd1zcFQrCPRemguAUaJLGwFpUBJHMMD9sb/UwyKveFFEm4zQz3r2v3Pe2Shu4r7z9oECrgGWSRhAluRx8l8F2yHicy6PEgpMf4qGXSSy6WCrxyMXjEk8KLJtiXlQfrRR4WuCZKWxHDKqfe6o7lnayhPtUuWVVLOUSL1ysDXe9PpBvFHhZ4NWRfNMjI5VsS6zl3ie8LrDtOBJvrF+Bv0km\022\035ref_Environment_Environment_1"
> }
> inputs {
>   key: "0"
>   value: 

[jira] [Work logged] (BEAM-5441) Portable Wordcount fails in GreedyPipelineFuser

2018-09-20 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5441?focusedWorklogId=146221=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-146221
 ]

ASF GitHub Bot logged work on BEAM-5441:


Author: ASF GitHub Bot
Created on: 21/Sep/18 05:00
Start Date: 21/Sep/18 05:00
Worklog Time Spent: 10m 
  Work Description: ryan-williams edited a comment on issue #6461: 
[BEAM-5441] extend primitive transform check to find subtransform leaves
URL: https://github.com/apache/beam/pull/6461#issuecomment-423406984
 
 
   when I run locally:
   
   ```
   python -m apache_beam.examples.wordcount --input=/etc/profile 
--output=/tmp/py-wordcount-direct --experiments=beam_fn_api 
--runner=PortableRunner --sdk_location=container --job_endpoint=localhost:8099
   ```
   I see failures like:
   
   ```
   java.lang.Exception: The user defined 'open()' method caused an exception: 
java.lang.IllegalStateException: The passed environment does not contain a 
DockerPayload.
   at 
org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:498)
   at 
org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:712)
   at java.lang.Thread.run(Thread.java:745)
   ```
   
   ([more 
output](https://gist.github.com/ryan-williams/653861032f4c6e12e6e1b4220f753348))
   
   have no leads on what could be causing that / haven't looked yet, but wanted 
to get this out for folks to look at
   
   **update**: this was basically due to not running from within `sdks/python` 
(or alternatively, not having installed my local python sdk so that it would be 
used by the `python` command above)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 146221)
Time Spent: 40m  (was: 0.5h)

> Portable Wordcount fails in GreedyPipelineFuser
> ---
>
> Key: BEAM-5441
> URL: https://issues.apache.org/jira/browse/BEAM-5441
> Project: Beam
>  Issue Type: New Feature
>  Components: examples-python, sdk-java-core
>Affects Versions: 2.8.0
>Reporter: Maximilian Michels
>Assignee: Ryan Williams
>Priority: Major
>  Labels: portability, portability-flink
> Fix For: 2.8.0
>
>  Time Spent: 40m
>  Remaining Estimate: 0h
>
> The Python SDK wordcount with the PortableRunner throws the following 
> exception:
> {noformat}
> java.lang.IllegalArgumentException: A PCollectionNode should have exactly one 
> producing PTransformNode, PCollectionNode{id=ref_PCollection_PCollection_26, 
> PCollection=unique_name: 
> "60write/Write/WriteImpl/DoOnce/Read/Reshuffle/RemoveRandomKeys.None"
> coder_id: "ref_Coder_FastPrimitivesCoder_2"
> is_bounded: BOUNDED
> windowing_strategy_id: "ref_Windowing_Windowing_1"
> } has 
> [PTransformNode{id=ref_AppliedPTransform_write/Write/WriteImpl/DoOnce/Read/Reshuffle_32,
>  transform=spec {
>   urn: "beam:transform:generic_composite:v1"
>   payload: ""
> }
> subtransforms: 
> "ref_AppliedPTransform_write/Write/WriteImpl/DoOnce/Read/Reshuffle/AddRandomKeys_33"
> subtransforms: 
> "ref_AppliedPTransform_write/Write/WriteImpl/DoOnce/Read/Reshuffle/ReshufflePerKey_34"
> subtransforms: 
> "ref_AppliedPTransform_write/Write/WriteImpl/DoOnce/Read/Reshuffle/RemoveRandomKeys_41"
> inputs {
>   key: "0"
>   value: "ref_PCollection_PCollection_19"
> }
> outputs {
>   key: "None"
>   value: "ref_PCollection_PCollection_26"
> }
> unique_name: "write/Write/WriteImpl/DoOnce/Read/Reshuffle"
> }, 
> PTransformNode{id=ref_AppliedPTransform_write/Write/WriteImpl/DoOnce/Read/Reshuffle/RemoveRandomKeys_41,
>  transform=spec {
>   urn: "urn:beam:transform:pardo:v1"
>   payload: "\n\317\006\n\255\006\n 
> 

[jira] [Work logged] (BEAM-5441) Portable Wordcount fails in GreedyPipelineFuser

2018-09-20 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5441?focusedWorklogId=146217=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-146217
 ]

ASF GitHub Bot logged work on BEAM-5441:


Author: ASF GitHub Bot
Created on: 21/Sep/18 04:39
Start Date: 21/Sep/18 04:39
Worklog Time Spent: 10m 
  Work Description: ryan-williams commented on issue #6461: [BEAM-5441] 
extend primitive transform check to find subtransform leaves
URL: https://github.com/apache/beam/pull/6461#issuecomment-423411079
 
 
   seems like `:beam-sdks-python:testPython3` is broken at `HEAD`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 146217)
Time Spent: 0.5h  (was: 20m)

> Portable Wordcount fails in GreedyPipelineFuser
> ---
>
> Key: BEAM-5441
> URL: https://issues.apache.org/jira/browse/BEAM-5441
> Project: Beam
>  Issue Type: New Feature
>  Components: examples-python, sdk-java-core
>Affects Versions: 2.8.0
>Reporter: Maximilian Michels
>Assignee: Ryan Williams
>Priority: Major
>  Labels: portability, portability-flink
> Fix For: 2.8.0
>
>  Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> The Python SDK wordcount with the PortableRunner throws the following 
> exception:
> {noformat}
> java.lang.IllegalArgumentException: A PCollectionNode should have exactly one 
> producing PTransformNode, PCollectionNode{id=ref_PCollection_PCollection_26, 
> PCollection=unique_name: 
> "60write/Write/WriteImpl/DoOnce/Read/Reshuffle/RemoveRandomKeys.None"
> coder_id: "ref_Coder_FastPrimitivesCoder_2"
> is_bounded: BOUNDED
> windowing_strategy_id: "ref_Windowing_Windowing_1"
> } has 
> [PTransformNode{id=ref_AppliedPTransform_write/Write/WriteImpl/DoOnce/Read/Reshuffle_32,
>  transform=spec {
>   urn: "beam:transform:generic_composite:v1"
>   payload: ""
> }
> subtransforms: 
> "ref_AppliedPTransform_write/Write/WriteImpl/DoOnce/Read/Reshuffle/AddRandomKeys_33"
> subtransforms: 
> "ref_AppliedPTransform_write/Write/WriteImpl/DoOnce/Read/Reshuffle/ReshufflePerKey_34"
> subtransforms: 
> "ref_AppliedPTransform_write/Write/WriteImpl/DoOnce/Read/Reshuffle/RemoveRandomKeys_41"
> inputs {
>   key: "0"
>   value: "ref_PCollection_PCollection_19"
> }
> outputs {
>   key: "None"
>   value: "ref_PCollection_PCollection_26"
> }
> unique_name: "write/Write/WriteImpl/DoOnce/Read/Reshuffle"
> }, 
> PTransformNode{id=ref_AppliedPTransform_write/Write/WriteImpl/DoOnce/Read/Reshuffle/RemoveRandomKeys_41,
>  transform=spec {
>   urn: "urn:beam:transform:pardo:v1"
>   payload: "\n\317\006\n\255\006\n 
> beam:dofn:pickled_python_info:v1\032\210\006eNrFkttP1EAUxtsFZe1yUcEL4v3aRbdFUEFFQEHUEGpSnPhkxml32Gnstv3a6YZNbKIxJf7ZTvdB2ER9NZNMcs6cb3LO73zfaqbPEuYLTj3OupZMWZTtx2k3s/w45cYmC0PmhfxjypKEp1vxdmRAa36HXqBmkrEkjX2eZRjx20EYWrS6DeqnnElO9/PIl0GsFKPm0HsYszaV/YQbOEHqm3Gbf1ABTpYYc1E3d3R1arvTG2Tip6Z91bQfutbRtT2cckoYTaIfoFFinPRtkvE0s7vswN7iPbuaoCV5Ju0ej3p2GHh20pcijhatZTsLJG+pSb+wDs/sYzO3Fq0Va8Fq895CK+mrUot3OscL7CModgXFSvqYIPXVkHW9NlvD5G5jlGiYIrX9CKdLnGlKnHUx7VPq5UEog4hSo8MlkzI1MDNEIugmcSppN27noaJxjsz9Yxs4X+KCi4ukTpXcl5Ri9hCXXMyJSedPC/C5CnBZjJriN9W9z6SukLZ1bXYPV5wd/RBXFVKJayWu/w+kuQzCCukNMbm7XhNTTYXvpotbYkb8HUclwu0Sd1zcFQrCPRemguAUaJLGwFpUBJHMMD9sb/UwyKveFFEm4zQz3r2v3Pe2Shu4r7z9oECrgGWSRhAluRx8l8F2yHicy6PEgpMf4qGXSSy6WCrxyMXjEk8KLJtiXlQfrRR4WuCZKWxHDKqfe6o7lnayhPtUuWVVLOUSL1ysDXe9PpBvFHhZ4NWRfNMjI5VsS6zl3ie8LrDtOBJvrF+Bv0km\022\035ref_Environment_Environment_1"
> }
> inputs {
>   key: "0"
>   value: "ref_PCollection_PCollection_25"
> }
> outputs {
>   key: "None"
>   value: "ref_PCollection_PCollection_26"
> }
> unique_name: "write/Write/WriteImpl/DoOnce/Read/Reshuffle/RemoveRandomKeys"
> }]
> at 
> org.apache.beam.repackaged.beam_runners_core_construction_java.com.google.common.base.Preconditions.checkArgument(Preconditions.java:416)
> at 
> org.apache.beam.runners.core.construction.graph.QueryablePipeline.buildNetwork(QueryablePipeline.java:176)
> at 
> org.apache.beam.runners.core.construction.graph.QueryablePipeline.(QueryablePipeline.java:119)
> at 
> org.apache.beam.runners.core.construction.graph.QueryablePipeline.forPrimitivesIn(QueryablePipeline.java:82)
> at 
> org.apache.beam.runners.core.construction.graph.GreedyPipelineFuser.(GreedyPipelineFuser.java:67)
> at 
> org.apache.beam.runners.core.construction.graph.GreedyPipelineFuser.fuse(GreedyPipelineFuser.java:89)
> at 
> 

[jira] [Work logged] (BEAM-5441) Portable Wordcount fails in GreedyPipelineFuser

2018-09-20 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5441?focusedWorklogId=146210=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-146210
 ]

ASF GitHub Bot logged work on BEAM-5441:


Author: ASF GitHub Bot
Created on: 21/Sep/18 04:05
Start Date: 21/Sep/18 04:05
Worklog Time Spent: 10m 
  Work Description: ryan-williams commented on issue #6461: [BEAM-5441] 
extend primitive transform check to find subtransform leaves
URL: https://github.com/apache/beam/pull/6461#issuecomment-423406984
 
 
   when I run locally:
   
   ```
   python -m apache_beam.examples.wordcount --input=/etc/profile 
--output=/tmp/py-wordcount-direct --experiments=beam_fn_api 
--runner=PortableRunner --sdk_location=container --job_endpoint=localhost:8099
   ```
   I see failures like:
   
   ```
   java.lang.Exception: The user defined 'open()' method caused an exception: 
java.lang.IllegalStateException: The passed environment does not contain a 
DockerPayload.
   at 
org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:498)
   at 
org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:712)
   at java.lang.Thread.run(Thread.java:745)
   ```
   
   ([more 
output](https://gist.github.com/ryan-williams/653861032f4c6e12e6e1b4220f753348))
   
   have no leads on what could be causing that / haven't looked yet, but wanted 
to get this out for folks to look at


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 146210)
Time Spent: 20m  (was: 10m)

> Portable Wordcount fails in GreedyPipelineFuser
> ---
>
> Key: BEAM-5441
> URL: https://issues.apache.org/jira/browse/BEAM-5441
> Project: Beam
>  Issue Type: New Feature
>  Components: examples-python, sdk-java-core
>Affects Versions: 2.8.0
>Reporter: Maximilian Michels
>Assignee: Ryan Williams
>Priority: Major
>  Labels: portability, portability-flink
> Fix For: 2.8.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> The Python SDK wordcount with the PortableRunner throws the following 
> exception:
> {noformat}
> java.lang.IllegalArgumentException: A PCollectionNode should have exactly one 
> producing PTransformNode, PCollectionNode{id=ref_PCollection_PCollection_26, 
> PCollection=unique_name: 
> "60write/Write/WriteImpl/DoOnce/Read/Reshuffle/RemoveRandomKeys.None"
> coder_id: "ref_Coder_FastPrimitivesCoder_2"
> is_bounded: BOUNDED
> windowing_strategy_id: "ref_Windowing_Windowing_1"
> } has 
> [PTransformNode{id=ref_AppliedPTransform_write/Write/WriteImpl/DoOnce/Read/Reshuffle_32,
>  transform=spec {
>   urn: "beam:transform:generic_composite:v1"
>   payload: ""
> }
> subtransforms: 
> "ref_AppliedPTransform_write/Write/WriteImpl/DoOnce/Read/Reshuffle/AddRandomKeys_33"
> subtransforms: 
> "ref_AppliedPTransform_write/Write/WriteImpl/DoOnce/Read/Reshuffle/ReshufflePerKey_34"
> subtransforms: 
> "ref_AppliedPTransform_write/Write/WriteImpl/DoOnce/Read/Reshuffle/RemoveRandomKeys_41"
> inputs {
>   key: "0"
>   value: "ref_PCollection_PCollection_19"
> }
> outputs {
>   key: "None"
>   value: "ref_PCollection_PCollection_26"
> }
> unique_name: "write/Write/WriteImpl/DoOnce/Read/Reshuffle"
> }, 
> PTransformNode{id=ref_AppliedPTransform_write/Write/WriteImpl/DoOnce/Read/Reshuffle/RemoveRandomKeys_41,
>  transform=spec {
>   urn: "urn:beam:transform:pardo:v1"
>   payload: "\n\317\006\n\255\006\n 
> beam:dofn:pickled_python_info:v1\032\210\006eNrFkttP1EAUxtsFZe1yUcEL4v3aRbdFUEFFQEHUEGpSnPhkxml32Gnstv3a6YZNbKIxJf7ZTvdB2ER9NZNMcs6cb3LO73zfaqbPEuYLTj3OupZMWZTtx2k3s/w45cYmC0PmhfxjypKEp1vxdmRAa36HXqBmkrEkjX2eZRjx20EYWrS6DeqnnElO9/PIl0GsFKPm0HsYszaV/YQbOEHqm3Gbf1ABTpYYc1E3d3R1arvTG2Tip6Z91bQfutbRtT2cckoYTaIfoFFinPRtkvE0s7vswN7iPbuaoCV5Ju0ej3p2GHh20pcijhatZTsLJG+pSb+wDs/sYzO3Fq0Va8Fq895CK+mrUot3OscL7CModgXFSvqYIPXVkHW9NlvD5G5jlGiYIrX9CKdLnGlKnHUx7VPq5UEog4hSo8MlkzI1MDNEIugmcSppN27noaJxjsz9Yxs4X+KCi4ukTpXcl5Ri9hCXXMyJSedPC/C5CnBZjJriN9W9z6SukLZ1bXYPV5wd/RBXFVKJayWu/w+kuQzCCukNMbm7XhNTTYXvpotbYkb8HUclwu0Sd1zcFQrCPRemguAUaJLGwFpUBJHMMD9sb/UwyKveFFEm4zQz3r2v3Pe2Shu4r7z9oECrgGWSRhAluRx8l8F2yHicy6PEgpMf4qGXSSy6WCrxyMXjEk8KLJtiXlQfrRR4WuCZKWxHDKqfe6o7lnayhPtUuWVVLOUSL1ysDXe9PpBvFHhZ4NWRfNMjI5VsS6zl3ie8LrDtOBJvrF+Bv0km\022\035ref_Environment_Environment_1"
> }
> inputs {
>   key: "0"
>   value: "ref_PCollection_PCollection_25"
> }
> outputs {
>   key: "None"
>   value: 

[jira] [Work logged] (BEAM-5441) Portable Wordcount fails in GreedyPipelineFuser

2018-09-20 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5441?focusedWorklogId=146208=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-146208
 ]

ASF GitHub Bot logged work on BEAM-5441:


Author: ASF GitHub Bot
Created on: 21/Sep/18 04:02
Start Date: 21/Sep/18 04:02
Worklog Time Spent: 10m 
  Work Description: ryan-williams opened a new pull request #6461: 
[BEAM-5441] extend primitive transform check to find subtransform leaves
URL: https://github.com/apache/beam/pull/6461
 
 
   WIP / attempt at fixing portable python wordcount, which was broken by #6328 
   
   R: @angoenka
   
   Post-Commit Tests Status (on master branch)
   

   
   Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
   --- | --- | --- | --- | --- | --- | --- | ---
   Go | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_GradleBuild/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_GradleBuild/lastCompletedBuild/)
 | --- | --- | --- | --- | --- | ---
   Java | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_GradleBuild/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_GradleBuild/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex_Gradle/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Gradle/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Gradle/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump_Gradle/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza_Gradle/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark_Gradle/lastCompletedBuild/)
   Python | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/)
 | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)
  [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/)
 | --- | --- | --- | ---
   
   
   
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 146208)
Time Spent: 10m
Remaining Estimate: 0h

> Portable Wordcount fails in GreedyPipelineFuser
> ---
>
> Key: BEAM-5441
> URL: https://issues.apache.org/jira/browse/BEAM-5441
> Project: Beam
>  Issue Type: New Feature
>  Components: examples-python, sdk-java-core
>Affects Versions: 2.8.0
>Reporter: Maximilian Michels
>Assignee: Ryan Williams
>Priority: Major
>  Labels: portability, portability-flink
> Fix For: 2.8.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> The Python SDK wordcount with the PortableRunner throws the following 
> exception:
> {noformat}
> java.lang.IllegalArgumentException: A PCollectionNode should have exactly one 
> producing PTransformNode, PCollectionNode{id=ref_PCollection_PCollection_26, 
> PCollection=unique_name: 
> "60write/Write/WriteImpl/DoOnce/Read/Reshuffle/RemoveRandomKeys.None"
> coder_id: "ref_Coder_FastPrimitivesCoder_2"
> is_bounded: BOUNDED
> windowing_strategy_id: "ref_Windowing_Windowing_1"