Thank you for the feedback @[email protected] going to give that a try.

Thanks and Regards
Arvind Clement

From: Luke Cwik <[email protected]>
Sent: 25 March 2022 23:15
To: user <[email protected]>
Subject: EXTERNAL: Re: Flink Portable Runner Error: Cannot union inputs of 
different types

https://issues.apache.org/jira/browse/BEAM-6523<https://urldefense.com/v3/__https:/issues.apache.org/jira/browse/BEAM-6523__;!!LSAcJDlP!myKpP6QFwuscLzAoVYNA42FAr8cKOCXfm1DxHCy4v7AafP2LLfFi81_-AYrtAgNsfA$>
 would be the fix that you're looking for.

An alternative work around would be to ensure that the PCollections that you're 
flattening together have the same output type hints. This way they should get 
the same encodings and this will side step the transcoding issue.

On Mon, Mar 7, 2022 at 12:07 PM Arvind CLEMENT 
<[email protected]<mailto:[email protected]>> wrote:
Hi All,

We are running apache beam python batch pipeline and have code running on 
Dataflow and Flink. When we run the code in flink we are getting the below 
error. We are using apache beam 2.34.0, flink 1.12.x and python 3.6.8 anything 
help on the error will be awesome.

P.S after some digging we came across this issue 
https://issues.apache.org/jira/browse/BEAM-6523<https://urldefense.com/v3/__https:/issues.apache.org/jira/browse/BEAM-6523__;!!LSAcJDlP!k8a_Fn7qFQEhW_rVl3gQ3Kze4tfgO6O9zZ-fSg3gRi3kPC4fKJ1cFsZEekqj-EBJEQ$>
 it does look like this would be our fix any idea if this has been implemented 
already ?

The error
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
[2022-02-23 21:24:06,844] [root] [DEBUG]: 
org.apache.flink.api.common.InvalidProgramException: Cannot union inputs of 
different types. 
Input1=CoderTypeInformation{coder=WindowedValue$FullWindowedValueCoder(KvCoder(StringUtf8Coder
engthPrefixCoder(ByteArrayCoder)),GlobalWindow$Coder)}, 
input2=CoderTypeInformation{coder=WindowedValue$FullWindowedValueCoder(LengthPrefixCoder(ByteArrayCoder),GlobalWindow$Coder)}
        at 
org.apache.flink.api.java.operators.UnionOperator.<init>(UnionOperator.java<https://urldefense.com/v3/__http:/UnionOperator.java__;!!LSAcJDlP!gSfJjv08tltwjnjU3PQOdN_ROfWoxUYG5fb980QEZo2AE_VyE8XBYvxsscidiD-Y_g$>:48)
        at 
org.apache.flink.api.java.DataSet.union(DataSet.java<https://urldefense.com/v3/__http:/DataSet.java__;!!LSAcJDlP!gSfJjv08tltwjnjU3PQOdN_ROfWoxUYG5fb980QEZo2AE_VyE8XBYvxsscgvTo-I6g$>:1242)
        at 
org.apache.beam.runners.flink.FlinkBatchPortablePipelineTranslator.translateFlatten(FlinkBatchPortablePipelineTranslator.java<https://urldefense.com/v3/__http:/FlinkBatchPortablePipelineTranslator.java__;!!LSAcJDlP!gSfJjv08tltwjnjU3PQOdN_ROfWoxUYG5fb980QEZo2AE_VyE8XBYvxssci34tdrWg$>:440)
        at 
org.apache.beam.runners.flink.FlinkBatchPortablePipelineTranslator.translate(FlinkBatchPortablePipelineTranslator.java<https://urldefense.com/v3/__http:/FlinkBatchPortablePipelineTranslator.java__;!!LSAcJDlP!gSfJjv08tltwjnjU3PQOdN_ROfWoxUYG5fb980QEZo2AE_VyE8XBYvxssci34tdrWg$>:272)
        at 
org.apache.beam.runners.flink.FlinkBatchPortablePipelineTranslator.translate(FlinkBatchPortablePipelineTranslator.java<https://urldefense.com/v3/__http:/FlinkBatchPortablePipelineTranslator.java__;!!LSAcJDlP!gSfJjv08tltwjnjU3PQOdN_ROfWoxUYG5fb980QEZo2AE_VyE8XBYvxssci34tdrWg$>:118)
        at 
org.apache.beam.runners.flink.FlinkPipelineRunner.runPipelineWithTranslator(FlinkPipelineRunner.java<https://urldefense.com/v3/__http:/FlinkPipelineRunner.java__;!!LSAcJDlP!gSfJjv08tltwjnjU3PQOdN_ROfWoxUYG5fb980QEZo2AE_VyE8XBYvxsscjThJprxg$>:115)
        at 
org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java<https://urldefense.com/v3/__http:/FlinkPipelineRunner.java__;!!LSAcJDlP!gSfJjv08tltwjnjU3PQOdN_ROfWoxUYG5fb980QEZo2AE_VyE8XBYvxsscjThJprxg$>:85)
        at 
org.apache.beam.runners.jobsubmission.JobInvocation.runPipeline(JobInvocation.java<https://urldefense.com/v3/__http:/JobInvocation.java__;!!LSAcJDlP!gSfJjv08tltwjnjU3PQOdN_ROfWoxUYG5fb980QEZo2AE_VyE8XBYvxsschC5mkdaQ$>:86)
        at 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java<https://urldefense.com/v3/__http:/TrustedListenableFutureTask.java__;!!LSAcJDlP!gSfJjv08tltwjnjU3PQOdN_ROfWoxUYG5fb980QEZo2AE_VyE8XBYvxssciUVxhmtg$>:125)
        at 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java<https://urldefense.com/v3/__http:/InterruptibleTask.java__;!!LSAcJDlP!gSfJjv08tltwjnjU3PQOdN_ROfWoxUYG5fb980QEZo2AE_VyE8XBYvxsscgfMnxEyQ$>:57)
        at 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java<https://urldefense.com/v3/__http:/TrustedListenableFutureTask.java__;!!LSAcJDlP!gSfJjv08tltwjnjU3PQOdN_ROfWoxUYG5fb980QEZo2AE_VyE8XBYvxssciUVxhmtg$>:78)
        at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java<https://urldefense.com/v3/__http:/ThreadPoolExecutor.java__;!!LSAcJDlP!gSfJjv08tltwjnjU3PQOdN_ROfWoxUYG5fb980QEZo2AE_VyE8XBYvxssci1tp0icw$>:1128)
        at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java<https://urldefense.com/v3/__http:/ThreadPoolExecutor.java__;!!LSAcJDlP!gSfJjv08tltwjnjU3PQOdN_ROfWoxUYG5fb980QEZo2AE_VyE8XBYvxssci1tp0icw$>:628)
        at 
java.base/java.lang.Thread.run(Thread.java<https://urldefense.com/v3/__http:/Thread.java__;!!LSAcJDlP!gSfJjv08tltwjnjU3PQOdN_ROfWoxUYG5fb980QEZo2AE_VyE8XBYvxsschWTnxtcw$>:829)

[2022-02-23 21:24:06,845] [root] [ERROR]: 
org.apache.flink.api.common.InvalidProgramException: Cannot union inputs of 
different types. 
Input1=CoderTypeInformation{coder=WindowedValue$FullWindowedValueCoder(KvCoder(StringUtf8Coder
engthPrefixCoder(ByteArrayCoder)),GlobalWindow$Coder)}, 
input2=CoderTypeInformation{coder=WindowedValue$FullWindowedValueCoder(LengthPrefixCoder(ByteArrayCoder),GlobalWindow$Coder)}
[2022-02-23 21:24:06,996] [apache_beam.runners.portability.portable_runner] 
[INFO]: Job state changed to FAILED
Traceback (most recent call last):
  File "BeamInputPreparation_debug.py", line 37, in <module>
    main()
  File "BeamInputPreparation_debug.py", line 33, in main
    pipeline.run(sys.argv)
  File 
"/ravencr/NOLA2_PYTHON_PIPELINE_2.3.b1361.3/nola2x/apache_beam/pipeline/beam_input_prep_pipeline_debug.py",
 line 544, in run
    | "PersistSettings/Save" >> beam.ParDo(WriteByKeyDoFn(OUTPUT), backup=True, 
path_sep=PATH_SEP)
  File 
"/ravencr/NOLA2_PYTHON_PIPELINE_2.3.b1361.3/venv/lib/python3.6/site-packages/apache_beam/runners/portability/portable_runner.py",
 line 600, in wait_until_finish
    raise self._runtime_exception
RuntimeError: Pipeline nola2_input_prep_f5c0c22e-c8b6-43d1-a889-fde8ad8decbf 
failed in state FAILED: org.apache.flink.api.common.InvalidProgramException: 
Cannot union inputs of different types. Input1=CoderTypeInformation{coder=W
dowedValue$FullWindowedValueCoder(KvCoder(StringUtf8Coder,LengthPrefixCoder(ByteArrayCoder)),GlobalWindow$Coder)},
 
input2=CoderTypeInformation{coder=WindowedValue$FullWindowedValueCoder(LengthPrefixCoder(ByteArrayCoder),GlobalWin
w$Coder)}
[2022-02-23 21:24:07,003] [root] [DEBUG]: Sending SIGINT to job_server
(venv) 
root@ravencr-jobmanager-677cbd59-h6rt7:/ravencr/NOLA2_PYTHON_PIPELINE_2.3.b1361.3#<mailto:root@ravencr-jobmanager-677cbd59-h6rt7:/ravencr/NOLA2_PYTHON_PIPELINE_2.3.b1361.3>
 command terminated with exit code 137


------------------------------------------------------------------------------------------------------------------------------------------



PUBLIC
-----------------------------------------
SAVE PAPER - THINK BEFORE YOU PRINT!

This E-mail is confidential.

It may also be legally privileged. If you are not the addressee you may not 
copy,
forward, disclose or use any part of it. If you have received this message in 
error,
please delete it and all copies from your system and notify the sender 
immediately by
return E-mail.

Internet communications cannot be guaranteed to be timely secure, error or 
virus-free.
The sender does not accept liability for any errors or omissions.
******************************************************************
This message originated from the Internet. Its originator may or
may not be who they claim to be and the information contained in
the message and any attachments may or may not be accurate.
******************************************************************


PUBLIC

-----------------------------------------
SAVE PAPER - THINK BEFORE YOU PRINT!

This E-mail is confidential. 

It may also be legally privileged. If you are not the addressee you may not 
copy,
forward, disclose or use any part of it. If you have received this message in 
error,
please delete it and all copies from your system and notify the sender 
immediately by
return E-mail.

Internet communications cannot be guaranteed to be timely secure, error or 
virus-free.
The sender does not accept liability for any errors or omissions.

Reply via email to