[ 
https://issues.apache.org/jira/browse/BEAM-10016?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17117378#comment-17117378
 ] 

Luke Cwik edited comment on BEAM-10016 at 5/27/20, 4:19 AM:
------------------------------------------------------------

The issue is that the GreedyPipelineFuser (and related classes) doesn't take 
into account the change in the encoding from the flattens input to the flattens 
output in certain scenarios where the flatten isn't being merged with an 
existing stage.

Normally one could copy the coder from the flatten's output PCollection to all 
the input PCollections to fix this but this doesn't hold when dealing with 
cross language pipelines because we could have
{code:java}
ParDo(Java) -> PC(big endian int coder)           -> Flatten(Python) -> 
PC(varint coder)
ParDo(Go) -> PCollection(little endian int coder) /{code}
The Python SDK in this case would know big endian int coder, little endian int 
coder and varint coder but Java/Go would only know the big endian int coder and 
little endian int coder respectively.

The solution in the above example is to make the Python SDK perform the 
transcoding by having it execute the flatten. Only flattens where the 
input/output coder matches can be done by a runner since no transcoding is 
necessary.

An alternative would be to require flattens have the same input and output 
coders but this has its own problems since it would be a backwards incompatible 
change or to insert identity ParDo's within SDKs to make sure that input/output 
coders match whenever there is a Flatten.


was (Author: lcwik):
The issue is that the GreedyPipelineFuser (and related classes) doesn't take 
into account the change in the encoding from the flattens input to the flattens 
output in certain scenarios where the flatten isn't being merged with an 
existing stage.

Normally one could copy the coder from the flatten's output PCollection to all 
the input PCollections to fix this but this doesn't hold when dealing with 
cross language pipelines because we could have
{code:java}
ParDo(Java) -> PC(big endian int coder)           -> Flatten(Python) -> 
PC(varint coder)
ParDo(Go) -> PCollection(little endian int coder) /{code}
The Python SDK in this case would know big endian int coder, little endian int 
coder and varint coder but Java/Go would only know the big endian int coder and 
little endian int coder respectively.

The solution in the above example is to make the Python SDK perform the 
transcoding by having it execute the flatten. Only flattens where the 
input/output coder matches can be done by a runner since no transcoding is 
necessary.

> Flink postcommits failing testFlattenWithDifferentInputAndOutputCoders2
> -----------------------------------------------------------------------
>
>                 Key: BEAM-10016
>                 URL: https://issues.apache.org/jira/browse/BEAM-10016
>             Project: Beam
>          Issue Type: Bug
>          Components: test-failures
>            Reporter: Kyle Weaver
>            Assignee: Maximilian Michels
>            Priority: P2
>
> Both beam_PostCommit_Java_PVR_Flink_Batch and 
> beam_PostCommit_Java_PVR_Flink_Streaming are failing newly added test 
> org.apache.beam.sdk.transforms.FlattenTest.testFlattenWithDifferentInputAndOutputCoders2.
> SEVERE: Error in task code:  CHAIN MapPartition (MapPartition at [6]{Values, 
> FlatMapElements, PAssert$0}) -> FlatMap (FlatMap at ExtractOutput[0]) -> Map 
> (Key Extractor) -> GroupCombine (GroupCombine at GroupCombine: 
> PAssert$0/GroupGlobally/GatherAllOutputs/GroupByKey) -> Map (Key Extractor) 
> (2/2) java.lang.ClassCastException: org.apache.beam.sdk.values.KV cannot be 
> cast to [B
>       at 
> org.apache.beam.sdk.coders.ByteArrayCoder.encode(ByteArrayCoder.java:41)
>       at 
> org.apache.beam.sdk.coders.LengthPrefixCoder.encode(LengthPrefixCoder.java:56)
>       at org.apache.beam.sdk.coders.Coder.encode(Coder.java:136)
>       at 
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:590)
>       at 
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:581)
>       at 
> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:541)
>       at 
> org.apache.beam.sdk.fn.data.BeamFnDataSizeBasedBufferingOutboundObserver.accept(BeamFnDataSizeBasedBufferingOutboundObserver.java:109)
>       at 
> org.apache.beam.runners.fnexecution.control.SdkHarnessClient$CountingFnDataReceiver.accept(SdkHarnessClient.java:667)
>       at 
> org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageFunction.processElements(FlinkExecutableStageFunction.java:271)
>       at 
> org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageFunction.mapPartition(FlinkExecutableStageFunction.java:203)
>       at 
> org.apache.flink.runtime.operators.MapPartitionDriver.run(MapPartitionDriver.java:103)
>       at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:504)
>       at 
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:369)
>       at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>       at java.lang.Thread.run(Thread.java:748)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to