Hi once more, I feel a little bit like I've started my slow descent into madness. What I supposed to be a configuration error now looks like it's not. I've put together a minimal test case project <https://github.com/fbarthez/BeamJoinSpike>, showing the same behaviour I'm seeing in my project. I'd be very grateful if someone could take a look and tell me whether I'm taking a wrong turn somewhere or whether there really is a bug.
What I'm trying to achieve: There are records of one entity (agent) that come with a site_id field. Some site_id values do not correspond to a valid site. The agent records are to be joined with valid records of the second entity (site), so that only agent records with an existing site_id get to proceed. The setup works fine with the local runner (even though the inputs are not sorted). With the Beam-Direct runner, the fields are getting mixed up, in this case the values from site_name end up in the site_id column. cheers Fabian 2022/09/02 12:35:33 - Hop - Pipeline opened. 2022/09/02 12:35:33 - Hop - Launching pipeline [join-spike]... 2022/09/02 12:35:33 - Hop - Started the pipeline execution. 2022/09/02 12:35:33 - General - Created Apache Beam pipeline with name 'join-spike' 2022/09/02 12:35:33 - General - Handled transform (INPUT) : Sites 2022/09/02 12:35:33 - General - Handled transform (INPUT) : Agents 2022/09/02 12:35:33 - General - Handled generic transform (TRANSFORM) : Log valid sites, gets data from 1 previous transform(s), targets=0, infos=0 2022/09/02 12:35:33 - General - Handled generic transform (TRANSFORM) : Log all agents, gets data from 1 previous transform(s), targets=0, infos=0 2022/09/02 12:35:33 - General - Handled Merge Join (TRANSFORM) : Merge join 2022/09/02 12:35:33 - General - Handled generic transform (TRANSFORM) : Log valid agents, gets data from 1 previous transform(s), targets=0, infos=0 2022/09/02 12:35:33 - join-spike - Executing this pipeline using the Beam Pipeline Engine with run configuration 'Beam-Direct' 2022/09/02 12:35:33 - join-spike - ERROR: Error starting the Beam pipeline 2022/09/02 12:35:33 - join-spike - ERROR: org.apache.hop.core.exception.HopException: 2022/09/02 12:35:33 - join-spike - Error executing pipeline with runner Direct 2022/09/02 12:35:33 - join-spike - java.lang.RuntimeException: Error executing TransformFn 2022/09/02 12:35:33 - join-spike - 2022/09/02 12:35:33 - join-spike - at org.apache.hop.beam.engines.BeamPipelineEngine.executePipeline(BeamPipelineEngine.java:258) 2022/09/02 12:35:33 - join-spike - at org.apache.hop.beam.engines.BeamPipelineEngine.lambda$startThreads$0(BeamPipelineEngine.java:305) 2022/09/02 12:35:33 - join-spike - at java.base/java.lang.Thread.run(Thread.java:829) 2022/09/02 12:35:33 - join-spike - Caused by: org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.RuntimeException: Error executing TransformFn 2022/09/02 12:35:33 - join-spike - at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:373) 2022/09/02 12:35:33 - join-spike - at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:341) 2022/09/02 12:35:33 - join-spike - at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:218) 2022/09/02 12:35:33 - join-spike - at org.apache.hop.beam.engines.BeamPipelineEngine.executePipeline(BeamPipelineEngine.java:246) 2022/09/02 12:35:33 - join-spike - ... 2 more 2022/09/02 12:35:33 - join-spike - Caused by: java.lang.RuntimeException: Error executing TransformFn 2022/09/02 12:35:33 - join-spike - at org.apache.hop.beam.core.transform.TransformTransform$TransformFn.processElement(TransformTransform.java:571) 2022/09/02 12:35:33 - join-spike - Caused by: org.apache.hop.core.exception.HopException: 2022/09/02 12:35:33 - join-spike - Error performing an iteration in a single threaded pipeline 2022/09/02 12:35:33 - join-spike - 2022/09/02 12:35:33 - join-spike - site_id Integer : There was a data type error: the data type of java.lang.String object [San Jose] does not correspond to value meta [Integer] 2022/09/02 12:35:33 - join-spike - 2022/09/02 12:35:33 - join-spike - 2022/09/02 12:35:33 - join-spike - at org.apache.hop.pipeline.SingleThreadedPipelineExecutor.oneIteration(SingleThreadedPipelineExecutor.java:390) 2022/09/02 12:35:33 - join-spike - at org.apache.hop.beam.core.transform.TransformTransform$TransformFn.emptyRowBuffer(TransformTransform.java:614) 2022/09/02 12:35:33 - join-spike - at org.apache.hop.beam.core.transform.TransformTransform$TransformFn.processElement(TransformTransform.java:567) 2022/09/02 12:35:33 - join-spike - at org.apache.hop.beam.core.transform.TransformTransform$TransformFn$DoFnInvoker.invokeProcessElement(Unknown Source) 2022/09/02 12:35:33 - join-spike - at org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:228) 2022/09/02 12:35:33 - join-spike - at org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:184) 2022/09/02 12:35:33 - join-spike - at org.apache.beam.repackaged.direct_java.runners.core.SimplePushbackSideInputDoFnRunner.processElementInReadyWindows(SimplePushbackSideInputDoFnRunner.java:79) 2022/09/02 12:35:33 - join-spike - at org.apache.beam.runners.direct.ParDoEvaluator.processElement(ParDoEvaluator.java:244) 2022/09/02 12:35:33 - join-spike - at org.apache.beam.runners.direct.DoFnLifecycleManagerRemovingTransformEvaluator.processElement(DoFnLifecycleManagerRemovingTransformEvaluator.java:54) 2022/09/02 12:35:33 - join-spike - at org.apache.beam.runners.direct.DirectTransformExecutor.processElements(DirectTransformExecutor.java:165) 2022/09/02 12:35:33 - join-spike - at org.apache.beam.runners.direct.DirectTransformExecutor.run(DirectTransformExecutor.java:129) 2022/09/02 12:35:33 - join-spike - at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) 2022/09/02 12:35:33 - join-spike - at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) 2022/09/02 12:35:33 - join-spike - at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) 2022/09/02 12:35:33 - join-spike - at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) 2022/09/02 12:35:33 - join-spike - at java.base/java.lang.Thread.run(Thread.java:829) 2022/09/02 12:35:33 - join-spike - Caused by: org.apache.hop.core.exception.HopValueException: 2022/09/02 12:35:33 - join-spike - site_id Integer : There was a data type error: the data type of java.lang.String object [San Jose] does not correspond to value meta [Integer] 2022/09/02 12:35:33 - join-spike - 2022/09/02 12:35:33 - join-spike - at org.apache.hop.core.row.value.ValueMetaBase.getString(ValueMetaBase.java:1944) 2022/09/02 12:35:33 - join-spike - at org.apache.hop.core.row.RowMeta.getString(RowMeta.java:301) 2022/09/02 12:35:33 - join-spike - at org.apache.hop.pipeline.transforms.writetolog.WriteToLog.processRow(WriteToLog.java:107) 2022/09/02 12:35:33 - join-spike - at org.apache.hop.pipeline.SingleThreadedPipelineExecutor.oneIteration(SingleThreadedPipelineExecutor.java:365) 2022/09/02 12:35:33 - join-spike - ... 15 more > Am 02.09.2022 um 09:48 schrieb Fabian Peters <[email protected]>: > > Hi Hans, > > That stack trace was due to user error and I'm glad it was. ;) > There was one "id" field to many coming into the merge join… > > Sorry for the noise! > > cheers > > Fabian > >> Am 02.09.2022 um 09:31 schrieb Fabian Peters <[email protected] >> <mailto:[email protected]>>: >> >> Hi Hans, >> >> Thanks! I probably read that at some point, but the "Notice" modal popping >> up when closing the "Merge join" dialogue probably convinced me otherwise: >> "If the incoming data is not sorted on the specified keys, the output >> results may not be correct. We recommend sorting the incoming data within >> the pipeline." >> >> I'm testing it in my pipeline now and am getting a stack trace (see below). >> The "site_id" field is from an "Avro decode" transform and is a plain >> Integer. Using the local runner and writing to Postgres this works fine. >> >> cheers >> >> Fabian >> >> 2022/09/02 09:20:20 - General - ERROR: Error starting the Beam pipeline >> 2022/09/02 09:20:20 - General - ERROR: >> org.apache.hop.core.exception.HopException: >> 2022/09/02 09:20:20 - General - Error executing pipeline with runner Direct >> 2022/09/02 09:20:20 - General - java.lang.RuntimeException: Error converting >> HopRow to BigQuery TableRow >> 2022/09/02 09:20:20 - General - >> 2022/09/02 09:20:20 - General - at >> org.apache.hop.beam.engines.BeamPipelineEngine.executePipeline(BeamPipelineEngine.java:258) >> 2022/09/02 09:20:20 - General - at >> org.apache.hop.beam.engines.BeamPipelineEngine.lambda$startThreads$0(BeamPipelineEngine.java:305) >> 2022/09/02 09:20:20 - General - at >> java.base/java.lang.Thread.run(Thread.java:829) >> 2022/09/02 09:20:20 - General - Caused by: >> org.apache.beam.sdk.Pipeline$PipelineExecutionException: >> java.lang.RuntimeException: Error converting HopRow to BigQuery TableRow >> 2022/09/02 09:20:20 - General - at >> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:373) >> 2022/09/02 09:20:20 - General - at >> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:341) >> 2022/09/02 09:20:20 - General - at >> org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:218) >> 2022/09/02 09:20:20 - General - at >> org.apache.hop.beam.engines.BeamPipelineEngine.executePipeline(BeamPipelineEngine.java:246) >> 2022/09/02 09:20:20 - General - ... 2 more >> 2022/09/02 09:20:20 - General - Caused by: java.lang.RuntimeException: Error >> converting HopRow to BigQuery TableRow >> 2022/09/02 09:20:20 - General - at >> org.apache.hop.beam.core.fn.HopToBQTableRowFn.apply(HopToBQTableRowFn.java:126) >> 2022/09/02 09:20:20 - General - at >> org.apache.hop.beam.core.fn.HopToBQTableRowFn.apply(HopToBQTableRowFn.java:37) >> 2022/09/02 09:20:20 - General - at >> org.apache.beam.sdk.io.gcp.bigquery.TableRowWriter.write(TableRowWriter.java:41) >> 2022/09/02 09:20:20 - General - at >> org.apache.beam.sdk.io.gcp.bigquery.WriteBundlesToFiles.processElement(WriteBundlesToFiles.java:232) >> 2022/09/02 09:20:20 - General - Caused by: >> org.apache.hop.core.exception.HopValueException: >> 2022/09/02 09:20:20 - General - Unexpected conversion error while converting >> value [site_id Integer] to an Integer >> 2022/09/02 09:20:20 - General - class java.lang.String cannot be cast to >> class java.lang.Long (java.lang.String and java.lang.Long are in module >> java.base of loader 'bootstrap') >> 2022/09/02 09:20:20 - General - >> 2022/09/02 09:20:20 - General - at >> org.apache.hop.core.row.value.ValueMetaBase.getInteger(ValueMetaBase.java:2164) >> 2022/09/02 09:20:20 - General - at >> org.apache.hop.beam.core.fn.HopToBQTableRowFn.apply(HopToBQTableRowFn.java:96) >> 2022/09/02 09:20:20 - General - at >> org.apache.hop.beam.core.fn.HopToBQTableRowFn.apply(HopToBQTableRowFn.java:37) >> 2022/09/02 09:20:20 - General - at >> org.apache.beam.sdk.io.gcp.bigquery.TableRowWriter.write(TableRowWriter.java:41) >> 2022/09/02 09:20:20 - General - at >> org.apache.beam.sdk.io.gcp.bigquery.WriteBundlesToFiles.processElement(WriteBundlesToFiles.java:232) >> 2022/09/02 09:20:20 - General - at >> org.apache.beam.sdk.io.gcp.bigquery.WriteBundlesToFiles$DoFnInvoker.invokeProcessElement(Unknown >> Source) >> 2022/09/02 09:20:20 - General - at >> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:228) >> 2022/09/02 09:20:20 - General - at >> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:184) >> 2022/09/02 09:20:20 - General - at >> org.apache.beam.repackaged.direct_java.runners.core.SimplePushbackSideInputDoFnRunner.processElementInReadyWindows(SimplePushbackSideInputDoFnRunner.java:88) >> 2022/09/02 09:20:20 - General - at >> org.apache.beam.runners.direct.ParDoEvaluator.processElement(ParDoEvaluator.java:244) >> 2022/09/02 09:20:20 - General - at >> org.apache.beam.runners.direct.DoFnLifecycleManagerRemovingTransformEvaluator.processElement(DoFnLifecycleManagerRemovingTransformEvaluator.java:54) >> 2022/09/02 09:20:20 - General - at >> org.apache.beam.runners.direct.DirectTransformExecutor.processElements(DirectTransformExecutor.java:165) >> 2022/09/02 09:20:20 - General - at >> org.apache.beam.runners.direct.DirectTransformExecutor.run(DirectTransformExecutor.java:129) >> 2022/09/02 09:20:20 - General - at >> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) >> 2022/09/02 09:20:20 - General - at >> java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) >> 2022/09/02 09:20:20 - General - at >> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) >> 2022/09/02 09:20:20 - General - at >> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) >> 2022/09/02 09:20:20 - General - at >> java.base/java.lang.Thread.run(Thread.java:829) >> 2022/09/02 09:20:20 - General - Caused by: java.lang.ClassCastException: >> class java.lang.String cannot be cast to class java.lang.Long >> (java.lang.String and java.lang.Long are in module java.base of loader >> 'bootstrap') >> 2022/09/02 09:20:20 - General - at >> org.apache.hop.core.row.value.ValueMetaBase.getInteger(ValueMetaBase.java:2077) >> 2022/09/02 09:20:20 - General - ... 17 more >> >>> Am 02.09.2022 um 08:58 schrieb Hans Van Akelyen <[email protected] >>> <mailto:[email protected]>>: >>> >>> Hi Fabian, >>> >>> Merge join does not require your data to be sorted when executing on Beam >>> https://hop.apache.org/manual/latest/pipeline/beam/getting-started-with-beam.html#_universal_transforms >>> >>> <https://hop.apache.org/manual/latest/pipeline/beam/getting-started-with-beam.html#_universal_transforms> >>> >>> Chases, >>> Hans >>> >>> On Fri, 2 Sep 2022 at 08:34, Fabian Peters <[email protected] >>> <mailto:[email protected]>> wrote: >>> Good morning Matt, >>> >>> Thanks for your quick reply! Unfortunately the inputs are not sorted, so >>> the Merge Join transform is not an option. I guess I'll have to use >>> temporary BigQuery tables to handle this. Those pipelines are all bounded, >>> so this is an option. Or is there an easy option to sort things when >>> running on Beam? >>> >>> I'll create a Jira ticket, no problem. >>> >>> cheers >>> >>> Fabian >>> >>>> Am 01.09.2022 um 19:11 schrieb Matt Casters <[email protected] >>>> <mailto:[email protected]>>: >>>> >>>> Hi Fabian, >>>> >>>> Joining rows is indeed the exception in Beam. I would suggest you use the >>>> Merge Join >>>> <https://hop.apache.org/manual/latest/pipeline/transforms/mergejoin.html> >>>> transforms. >>>> For unbounded pipelines (never ending) that transform will be handled >>>> <https://github.com/apache/hop/blob/master/plugins/engines/beam/src/main/java/org/apache/hop/beam/pipeline/handler/BeamMergeJoinTransformHandler.java> >>>> correctly. >>>> If you don't mind, please create a JIRA case so we can create a similar >>>> handler for the Cartesian product use-case. >>>> The code usually is non-trivial in the massive parallel world but quite >>>> doable ;-) >>>> >>>> All the best, >>>> Matt >>>> >>>> >>>> On Thu, Sep 1, 2022 at 6:37 PM Fabian Peters <[email protected] >>>> <mailto:[email protected]>> wrote: >>>> Hi all, >>>> >>>> I've hit the next problem, this time something I thought I had testet on >>>> Beam before: A pipeline containing a "Join rows (cartesian product)" >>>> transform with input from two sources, loops forever when run via >>>> Beam-Direct or Dataflow. It works fine using the local runner. >>>> >>>> While running it on Beam-Direct I've attached a debugger and can see that >>>> it is stuck in the while loop at JoinRows.java:486 >>>> <https://github.com/apache/hop/blob/758c07c360c26c0447251f0a29df81557864ad11/plugins/transforms/joinrows/src/main/java/org/apache/hop/pipeline/transforms/joinrows/JoinRows.java#L487>. >>>> I've tried using a GCS temp directory and swapped the "Main transform to >>>> read from" but none of those helped. >>>> >>>> Is this transform incompatible with Beam? If so, what could I use instead? >>>> >>>> cheers >>>> >>>> Fabian >>>> >>>> <PastedGraphic-8.png> >>> >>>> >>>> >>>> -- >>>> Neo4j Chief Solutions Architect >>>> ✉ [email protected] <mailto:[email protected]> >>>> >>>> >>>> >>> >> >
