Maximilian Michels created BEAM-5441:
----------------------------------------

             Summary: Portable Wordcount fails in GreedyPipelineFuser
                 Key: BEAM-5441
                 URL: https://issues.apache.org/jira/browse/BEAM-5441
             Project: Beam
          Issue Type: New Feature
          Components: examples-python, sdk-java-core
    Affects Versions: 2.8.0
            Reporter: Maximilian Michels
            Assignee: Ahmet Altay
             Fix For: 2.8.0


The Python SDK wordcount with the PortableRunner throws the following exception:

{noformat}
java.lang.IllegalArgumentException: A PCollectionNode should have exactly one 
producing PTransformNode, PCollectionNode{id=ref_PCollection_PCollection_26, 
PCollection=unique_name: 
"60write/Write/WriteImpl/DoOnce/Read/Reshuffle/RemoveRandomKeys.None"
coder_id: "ref_Coder_FastPrimitivesCoder_2"
is_bounded: BOUNDED
windowing_strategy_id: "ref_Windowing_Windowing_1"
} has 
[PTransformNode{id=ref_AppliedPTransform_write/Write/WriteImpl/DoOnce/Read/Reshuffle_32,
 transform=spec {
  urn: "beam:transform:generic_composite:v1"
  payload: "<Reshuffle(PTransform) label=[Reshuffle]>"
}
subtransforms: 
"ref_AppliedPTransform_write/Write/WriteImpl/DoOnce/Read/Reshuffle/AddRandomKeys_33"
subtransforms: 
"ref_AppliedPTransform_write/Write/WriteImpl/DoOnce/Read/Reshuffle/ReshufflePerKey_34"
subtransforms: 
"ref_AppliedPTransform_write/Write/WriteImpl/DoOnce/Read/Reshuffle/RemoveRandomKeys_41"
inputs {
  key: "0"
  value: "ref_PCollection_PCollection_19"
}
outputs {
  key: "None"
  value: "ref_PCollection_PCollection_26"
}
unique_name: "write/Write/WriteImpl/DoOnce/Read/Reshuffle"
}, 
PTransformNode{id=ref_AppliedPTransform_write/Write/WriteImpl/DoOnce/Read/Reshuffle/RemoveRandomKeys_41,
 transform=spec {
  urn: "urn:beam:transform:pardo:v1"
  payload: "\n\317\006\n\255\006\n 
beam:dofn:pickled_python_info:v1\032\210\006eNrFkttP1EAUxtsFZe1yUcEL4v3aRbdFUEFFQEHUEGpSnPhkxml32Gnstv3a6YZNbKIxJf7ZTvdB2ER9NZNMcs6cb3LO73zfaqbPEuYLTj3OupZMWZTtx2k3s/w45cYmC0PmhfxjypKEp1vxdmRAa36HXqBmkrEkjX2eZRjx20EYWrS6DeqnnElO9/PIl0GsFKPm0HsYszaV/YQbOEHqm3Gbf1ABTpYYc1E3d3R1arvTG2Tip6Z91bQfutbRtT2cckoYTaIfoFFinPRtkvE0s7vswN7iPbuaoCV5Ju0ej3p2GHh20pcijhatZTsLJG+pSb+wDs/sYzO3Fq0Va8Fq895CK+mrUot3OscL7CModgXFSvqYIPXVkHW9NlvD5G5jlGiYIrX9CKdLnGlKnHUx7VPq5UEog4hSo8MlkzI1MDNEIugmcSppN27noaJxjsz9Yxs4X+KCi4ukTpXcl5Ri9hCXXMyJSedPC/C5CnBZjJriN9W9z6SukLZ1bXYPV5wd/RBXFVKJayWu/w+kuQzCCukNMbm7XhNTTYXvpotbYkb8HUclwu0Sd1zcFQrCPRemguAUaJLGwFpUBJHMMD9sb/UwyKveFFEm4zQz3r2v3Pe2Shu4r7z9oECrgGWSRhAluRx8l8F2yHicy6PEgpMf4qGXSSy6WCrxyMXjEk8KLJtiXlQfrRR4WuCZKWxHDKqfe6o7lnayhPtUuWVVLOUSL1ysDXe9PpBvFHhZ4NWRfNMjI5VsS6zl3ie8LrDtOBJvrF+Bv0km\022\035ref_Environment_Environment_1"
}
inputs {
  key: "0"
  value: "ref_PCollection_PCollection_25"
}
outputs {
  key: "None"
  value: "ref_PCollection_PCollection_26"
}
unique_name: "write/Write/WriteImpl/DoOnce/Read/Reshuffle/RemoveRandomKeys"
}]
        at 
org.apache.beam.repackaged.beam_runners_core_construction_java.com.google.common.base.Preconditions.checkArgument(Preconditions.java:416)
        at 
org.apache.beam.runners.core.construction.graph.QueryablePipeline.buildNetwork(QueryablePipeline.java:176)
        at 
org.apache.beam.runners.core.construction.graph.QueryablePipeline.<init>(QueryablePipeline.java:119)
        at 
org.apache.beam.runners.core.construction.graph.QueryablePipeline.forPrimitivesIn(QueryablePipeline.java:82)
        at 
org.apache.beam.runners.core.construction.graph.GreedyPipelineFuser.<init>(GreedyPipelineFuser.java:67)
        at 
org.apache.beam.runners.core.construction.graph.GreedyPipelineFuser.fuse(GreedyPipelineFuser.java:89)
        at 
org.apache.beam.runners.flink.FlinkJobInvocation.runPipeline(FlinkJobInvocation.java:96)
        at 
org.apache.beam.repackaged.beam_runners_flink_2.11.com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:111)
        at 
org.apache.beam.repackaged.beam_runners_flink_2.11.com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:58)
        at 
org.apache.beam.repackaged.beam_runners_flink_2.11.com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:75)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
{noformat}

Looks like it was caused by https://github.com/apache/beam/pull/6328



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to