Thank you for the feedback @[email protected] going to give that a try. Thanks and Regards Arvind Clement
From: Luke Cwik <[email protected]> Sent: 25 March 2022 23:15 To: user <[email protected]> Subject: EXTERNAL: Re: Flink Portable Runner Error: Cannot union inputs of different types https://issues.apache.org/jira/browse/BEAM-6523<https://urldefense.com/v3/__https:/issues.apache.org/jira/browse/BEAM-6523__;!!LSAcJDlP!myKpP6QFwuscLzAoVYNA42FAr8cKOCXfm1DxHCy4v7AafP2LLfFi81_-AYrtAgNsfA$> would be the fix that you're looking for. An alternative work around would be to ensure that the PCollections that you're flattening together have the same output type hints. This way they should get the same encodings and this will side step the transcoding issue. On Mon, Mar 7, 2022 at 12:07 PM Arvind CLEMENT <[email protected]<mailto:[email protected]>> wrote: Hi All, We are running apache beam python batch pipeline and have code running on Dataflow and Flink. When we run the code in flink we are getting the below error. We are using apache beam 2.34.0, flink 1.12.x and python 3.6.8 anything help on the error will be awesome. P.S after some digging we came across this issue https://issues.apache.org/jira/browse/BEAM-6523<https://urldefense.com/v3/__https:/issues.apache.org/jira/browse/BEAM-6523__;!!LSAcJDlP!k8a_Fn7qFQEhW_rVl3gQ3Kze4tfgO6O9zZ-fSg3gRi3kPC4fKJ1cFsZEekqj-EBJEQ$> it does look like this would be our fix any idea if this has been implemented already ? The error --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- [2022-02-23 21:24:06,844] [root] [DEBUG]: org.apache.flink.api.common.InvalidProgramException: Cannot union inputs of different types. Input1=CoderTypeInformation{coder=WindowedValue$FullWindowedValueCoder(KvCoder(StringUtf8Coder engthPrefixCoder(ByteArrayCoder)),GlobalWindow$Coder)}, input2=CoderTypeInformation{coder=WindowedValue$FullWindowedValueCoder(LengthPrefixCoder(ByteArrayCoder),GlobalWindow$Coder)} at org.apache.flink.api.java.operators.UnionOperator.<init>(UnionOperator.java<https://urldefense.com/v3/__http:/UnionOperator.java__;!!LSAcJDlP!gSfJjv08tltwjnjU3PQOdN_ROfWoxUYG5fb980QEZo2AE_VyE8XBYvxsscidiD-Y_g$>:48) at org.apache.flink.api.java.DataSet.union(DataSet.java<https://urldefense.com/v3/__http:/DataSet.java__;!!LSAcJDlP!gSfJjv08tltwjnjU3PQOdN_ROfWoxUYG5fb980QEZo2AE_VyE8XBYvxsscgvTo-I6g$>:1242) at org.apache.beam.runners.flink.FlinkBatchPortablePipelineTranslator.translateFlatten(FlinkBatchPortablePipelineTranslator.java<https://urldefense.com/v3/__http:/FlinkBatchPortablePipelineTranslator.java__;!!LSAcJDlP!gSfJjv08tltwjnjU3PQOdN_ROfWoxUYG5fb980QEZo2AE_VyE8XBYvxssci34tdrWg$>:440) at org.apache.beam.runners.flink.FlinkBatchPortablePipelineTranslator.translate(FlinkBatchPortablePipelineTranslator.java<https://urldefense.com/v3/__http:/FlinkBatchPortablePipelineTranslator.java__;!!LSAcJDlP!gSfJjv08tltwjnjU3PQOdN_ROfWoxUYG5fb980QEZo2AE_VyE8XBYvxssci34tdrWg$>:272) at org.apache.beam.runners.flink.FlinkBatchPortablePipelineTranslator.translate(FlinkBatchPortablePipelineTranslator.java<https://urldefense.com/v3/__http:/FlinkBatchPortablePipelineTranslator.java__;!!LSAcJDlP!gSfJjv08tltwjnjU3PQOdN_ROfWoxUYG5fb980QEZo2AE_VyE8XBYvxssci34tdrWg$>:118) at org.apache.beam.runners.flink.FlinkPipelineRunner.runPipelineWithTranslator(FlinkPipelineRunner.java<https://urldefense.com/v3/__http:/FlinkPipelineRunner.java__;!!LSAcJDlP!gSfJjv08tltwjnjU3PQOdN_ROfWoxUYG5fb980QEZo2AE_VyE8XBYvxsscjThJprxg$>:115) at org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java<https://urldefense.com/v3/__http:/FlinkPipelineRunner.java__;!!LSAcJDlP!gSfJjv08tltwjnjU3PQOdN_ROfWoxUYG5fb980QEZo2AE_VyE8XBYvxsscjThJprxg$>:85) at org.apache.beam.runners.jobsubmission.JobInvocation.runPipeline(JobInvocation.java<https://urldefense.com/v3/__http:/JobInvocation.java__;!!LSAcJDlP!gSfJjv08tltwjnjU3PQOdN_ROfWoxUYG5fb980QEZo2AE_VyE8XBYvxsschC5mkdaQ$>:86) at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java<https://urldefense.com/v3/__http:/TrustedListenableFutureTask.java__;!!LSAcJDlP!gSfJjv08tltwjnjU3PQOdN_ROfWoxUYG5fb980QEZo2AE_VyE8XBYvxssciUVxhmtg$>:125) at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java<https://urldefense.com/v3/__http:/InterruptibleTask.java__;!!LSAcJDlP!gSfJjv08tltwjnjU3PQOdN_ROfWoxUYG5fb980QEZo2AE_VyE8XBYvxsscgfMnxEyQ$>:57) at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java<https://urldefense.com/v3/__http:/TrustedListenableFutureTask.java__;!!LSAcJDlP!gSfJjv08tltwjnjU3PQOdN_ROfWoxUYG5fb980QEZo2AE_VyE8XBYvxssciUVxhmtg$>:78) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java<https://urldefense.com/v3/__http:/ThreadPoolExecutor.java__;!!LSAcJDlP!gSfJjv08tltwjnjU3PQOdN_ROfWoxUYG5fb980QEZo2AE_VyE8XBYvxssci1tp0icw$>:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java<https://urldefense.com/v3/__http:/ThreadPoolExecutor.java__;!!LSAcJDlP!gSfJjv08tltwjnjU3PQOdN_ROfWoxUYG5fb980QEZo2AE_VyE8XBYvxssci1tp0icw$>:628) at java.base/java.lang.Thread.run(Thread.java<https://urldefense.com/v3/__http:/Thread.java__;!!LSAcJDlP!gSfJjv08tltwjnjU3PQOdN_ROfWoxUYG5fb980QEZo2AE_VyE8XBYvxsschWTnxtcw$>:829) [2022-02-23 21:24:06,845] [root] [ERROR]: org.apache.flink.api.common.InvalidProgramException: Cannot union inputs of different types. Input1=CoderTypeInformation{coder=WindowedValue$FullWindowedValueCoder(KvCoder(StringUtf8Coder engthPrefixCoder(ByteArrayCoder)),GlobalWindow$Coder)}, input2=CoderTypeInformation{coder=WindowedValue$FullWindowedValueCoder(LengthPrefixCoder(ByteArrayCoder),GlobalWindow$Coder)} [2022-02-23 21:24:06,996] [apache_beam.runners.portability.portable_runner] [INFO]: Job state changed to FAILED Traceback (most recent call last): File "BeamInputPreparation_debug.py", line 37, in <module> main() File "BeamInputPreparation_debug.py", line 33, in main pipeline.run(sys.argv) File "/ravencr/NOLA2_PYTHON_PIPELINE_2.3.b1361.3/nola2x/apache_beam/pipeline/beam_input_prep_pipeline_debug.py", line 544, in run | "PersistSettings/Save" >> beam.ParDo(WriteByKeyDoFn(OUTPUT), backup=True, path_sep=PATH_SEP) File "/ravencr/NOLA2_PYTHON_PIPELINE_2.3.b1361.3/venv/lib/python3.6/site-packages/apache_beam/runners/portability/portable_runner.py", line 600, in wait_until_finish raise self._runtime_exception RuntimeError: Pipeline nola2_input_prep_f5c0c22e-c8b6-43d1-a889-fde8ad8decbf failed in state FAILED: org.apache.flink.api.common.InvalidProgramException: Cannot union inputs of different types. Input1=CoderTypeInformation{coder=W dowedValue$FullWindowedValueCoder(KvCoder(StringUtf8Coder,LengthPrefixCoder(ByteArrayCoder)),GlobalWindow$Coder)}, input2=CoderTypeInformation{coder=WindowedValue$FullWindowedValueCoder(LengthPrefixCoder(ByteArrayCoder),GlobalWin w$Coder)} [2022-02-23 21:24:07,003] [root] [DEBUG]: Sending SIGINT to job_server (venv) root@ravencr-jobmanager-677cbd59-h6rt7:/ravencr/NOLA2_PYTHON_PIPELINE_2.3.b1361.3#<mailto:root@ravencr-jobmanager-677cbd59-h6rt7:/ravencr/NOLA2_PYTHON_PIPELINE_2.3.b1361.3> command terminated with exit code 137 ------------------------------------------------------------------------------------------------------------------------------------------ PUBLIC ----------------------------------------- SAVE PAPER - THINK BEFORE YOU PRINT! This E-mail is confidential. It may also be legally privileged. If you are not the addressee you may not copy, forward, disclose or use any part of it. If you have received this message in error, please delete it and all copies from your system and notify the sender immediately by return E-mail. Internet communications cannot be guaranteed to be timely secure, error or virus-free. The sender does not accept liability for any errors or omissions. ****************************************************************** This message originated from the Internet. Its originator may or may not be who they claim to be and the information contained in the message and any attachments may or may not be accurate. ****************************************************************** PUBLIC ----------------------------------------- SAVE PAPER - THINK BEFORE YOU PRINT! This E-mail is confidential. It may also be legally privileged. If you are not the addressee you may not copy, forward, disclose or use any part of it. If you have received this message in error, please delete it and all copies from your system and notify the sender immediately by return E-mail. Internet communications cannot be guaranteed to be timely secure, error or virus-free. The sender does not accept liability for any errors or omissions.
