Hi Matt, Sure, will do. Get well soon!
Fabian > Am 02.09.2022 um 15:21 schrieb Matt Casters <[email protected]>: > > Could you create a JIRA case for this? Bug reports are really more at home > there. > I do know that the Beam Join operator behaves a bit different so there's > probably an edge case triggered somewhere. > In principle this is a 5 minute thing to look at and fix but I'm struggling > with a bit of covid and I just can't look at it right now. > > All the best, > Matt > > On Fri, Sep 2, 2022 at 3:14 PM Fabian Peters <[email protected] > <mailto:[email protected]>> wrote: > Hi Matt, > > Thanks for your quick reply! > > The "Write to log" transform actually works fine for me with the local, > Beam-Direct and Dataflow runners. > > I removed it from my test case and replaced the last one with a Beam Output > transform. The issue stays the same, see stack trace below… > > cheers > > Fabian > > > 2022/09/02 15:05:06 - Hop - Pipeline opened. > 2022/09/02 15:05:06 - Hop - Launching pipeline [join-spike]... > 2022/09/02 15:05:06 - Hop - Started the pipeline execution. > 2022/09/02 15:05:14 - General - Created Apache Beam pipeline with name > 'join-spike' > 2022/09/02 15:05:14 - General - Handled transform (INPUT) : Agents > 2022/09/02 15:05:14 - General - Handled transform (INPUT) : Sites > 2022/09/02 15:05:14 - General - Handled Merge Join (TRANSFORM) : Merge join > 2022/09/02 15:05:14 - General - Handled generic transform (TRANSFORM) : > Select values, gets data from 1 previous transform(s), targets=0, infos=0 > 2022/09/02 15:05:14 - General - Handled transform (OUTPUT) : Write valid > agents, gets data from Select values > 2022/09/02 15:05:14 - join-spike - Executing this pipeline using the Beam > Pipeline Engine with run configuration 'Beam-Direct' > 2022/09/02 15:05:14 - join-spike - ERROR: Error starting the Beam pipeline > 2022/09/02 15:05:14 - join-spike - ERROR: > org.apache.hop.core.exception.HopException: > 2022/09/02 15:05:14 - join-spike - Error executing pipeline with runner Direct > 2022/09/02 15:05:14 - join-spike - java.lang.RuntimeException: Error > converting Hop data to string lines > 2022/09/02 15:05:14 - join-spike - > 2022/09/02 15:05:14 - join-spike - at > org.apache.hop.beam.engines.BeamPipelineEngine.executePipeline(BeamPipelineEngine.java:258) > 2022/09/02 15:05:14 - join-spike - at > org.apache.hop.beam.engines.BeamPipelineEngine.lambda$startThreads$0(BeamPipelineEngine.java:305) > 2022/09/02 15:05:14 - join-spike - at > java.base/java.lang.Thread.run(Thread.java:829) > 2022/09/02 15:05:14 - join-spike - Caused by: > org.apache.beam.sdk.Pipeline$PipelineExecutionException: > java.lang.RuntimeException: Error converting Hop data to string lines > 2022/09/02 15:05:14 - join-spike - at > org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:373) > 2022/09/02 15:05:14 - join-spike - at > org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:341) > 2022/09/02 15:05:14 - join-spike - at > org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:218) > 2022/09/02 15:05:14 - join-spike - at > org.apache.hop.beam.engines.BeamPipelineEngine.executePipeline(BeamPipelineEngine.java:246) > 2022/09/02 15:05:14 - join-spike - ... 2 more > 2022/09/02 15:05:14 - join-spike - Caused by: java.lang.RuntimeException: > Error converting Hop data to string lines > 2022/09/02 15:05:14 - join-spike - at > org.apache.hop.beam.core.fn.HopToStringFn.processElement(HopToStringFn.java:148) > 2022/09/02 15:05:14 - join-spike - Caused by: > org.apache.hop.core.exception.HopException: > 2022/09/02 15:05:14 - join-spike - Error getting String from field site_id > Integer on index 1 in input: [id Integer], [site_id Integer], [site_name > String], [telephone String], [agent_name String], native value found: Mbabane > 2022/09/02 15:05:14 - join-spike - > 2022/09/02 15:05:14 - join-spike - site_id Integer : There was a data type > error: the data type of java.lang.String object [Mbabane] does not correspond > to value meta [Integer] > 2022/09/02 15:05:14 - join-spike - > 2022/09/02 15:05:14 - join-spike - > 2022/09/02 15:05:14 - join-spike - at > org.apache.hop.beam.core.fn.HopToStringFn.processElement(HopToStringFn.java:135) > 2022/09/02 15:05:14 - join-spike - at > org.apache.hop.beam.core.fn.HopToStringFn$DoFnInvoker.invokeProcessElement(Unknown > Source) > 2022/09/02 15:05:14 - join-spike - at > org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:228) > 2022/09/02 15:05:14 - join-spike - at > org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:187) > 2022/09/02 15:05:14 - join-spike - at > org.apache.beam.repackaged.direct_java.runners.core.SimplePushbackSideInputDoFnRunner.processElementInReadyWindows(SimplePushbackSideInputDoFnRunner.java:79) > 2022/09/02 15:05:14 - join-spike - at > org.apache.beam.runners.direct.ParDoEvaluator.processElement(ParDoEvaluator.java:244) > 2022/09/02 15:05:14 - join-spike - at > org.apache.beam.runners.direct.DoFnLifecycleManagerRemovingTransformEvaluator.processElement(DoFnLifecycleManagerRemovingTransformEvaluator.java:54) > 2022/09/02 15:05:14 - join-spike - at > org.apache.beam.runners.direct.DirectTransformExecutor.processElements(DirectTransformExecutor.java:165) > 2022/09/02 15:05:14 - join-spike - at > org.apache.beam.runners.direct.DirectTransformExecutor.run(DirectTransformExecutor.java:129) > 2022/09/02 15:05:14 - join-spike - at > java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) > 2022/09/02 15:05:14 - join-spike - at > java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) > 2022/09/02 15:05:14 - join-spike - at > java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) > 2022/09/02 15:05:14 - join-spike - at > java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) > 2022/09/02 15:05:14 - join-spike - at > java.base/java.lang.Thread.run(Thread.java:829) > 2022/09/02 15:05:14 - join-spike - Caused by: > org.apache.hop.core.exception.HopValueException: > 2022/09/02 15:05:14 - join-spike - site_id Integer : There was a data type > error: the data type of java.lang.String object [Mbabane] does not correspond > to value meta [Integer] > 2022/09/02 15:05:14 - join-spike - > 2022/09/02 15:05:14 - join-spike - at > org.apache.hop.core.row.value.ValueMetaBase.getString(ValueMetaBase.java:1944) > 2022/09/02 15:05:14 - join-spike - at > org.apache.hop.core.row.RowMeta.getString(RowMeta.java:301) > 2022/09/02 15:05:14 - join-spike - at > org.apache.hop.beam.core.fn.HopToStringFn.processElement(HopToStringFn.java:111) > 2022/09/02 15:05:14 - join-spike - ... 13 more > > >> Am 02.09.2022 um 13:19 schrieb Matt Casters <[email protected] >> <mailto:[email protected]>>: >> >> Hi Fabian, >> >> If you remove the "Write to log" transforms the pipeline will work. We >> typically don't use that transform as the log ends up on a Spark/Flink node >> somewhere where you can't see the information anyway. It's the main reason >> why I'm working on HOP-4024. >> >> Also, just as a reference, there is an example in samples project under >> beam/pipelines called complex.hpl which contains a merge join. >> >> Best of luck! >> Matt >> >> >> On Fri, Sep 2, 2022 at 12:56 PM Fabian Peters <[email protected] >> <mailto:[email protected]>> wrote: >> Hi once more, >> >> I feel a little bit like I've started my slow descent into madness. What I >> supposed to be a configuration error now looks like it's not. I've put >> together a minimal test case project >> <https://github.com/fbarthez/BeamJoinSpike>, showing the same behaviour I'm >> seeing in my project. I'd be very grateful if someone could take a look and >> tell me whether I'm taking a wrong turn somewhere or whether there really is >> a bug. >> >> What I'm trying to achieve: There are records of one entity (agent) that >> come with a site_id field. Some site_id values do not correspond to a valid >> site. The agent records are to be joined with valid records of the second >> entity (site), so that only agent records with an existing site_id get to >> proceed. >> >> The setup works fine with the local runner (even though the inputs are not >> sorted). With the Beam-Direct runner, the fields are getting mixed up, in >> this case the values from site_name end up in the site_id column. >> >> cheers >> >> Fabian >> >> 2022/09/02 12:35:33 - Hop - Pipeline opened. >> 2022/09/02 12:35:33 - Hop - Launching pipeline [join-spike]... >> 2022/09/02 12:35:33 - Hop - Started the pipeline execution. >> 2022/09/02 12:35:33 - General - Created Apache Beam pipeline with name >> 'join-spike' >> 2022/09/02 12:35:33 - General - Handled transform (INPUT) : Sites >> 2022/09/02 12:35:33 - General - Handled transform (INPUT) : Agents >> 2022/09/02 12:35:33 - General - Handled generic transform (TRANSFORM) : Log >> valid sites, gets data from 1 previous transform(s), targets=0, infos=0 >> 2022/09/02 12:35:33 - General - Handled generic transform (TRANSFORM) : Log >> all agents, gets data from 1 previous transform(s), targets=0, infos=0 >> 2022/09/02 12:35:33 - General - Handled Merge Join (TRANSFORM) : Merge join >> 2022/09/02 12:35:33 - General - Handled generic transform (TRANSFORM) : Log >> valid agents, gets data from 1 previous transform(s), targets=0, infos=0 >> 2022/09/02 12:35:33 - join-spike - Executing this pipeline using the Beam >> Pipeline Engine with run configuration 'Beam-Direct' >> 2022/09/02 12:35:33 - join-spike - ERROR: Error starting the Beam pipeline >> 2022/09/02 12:35:33 - join-spike - ERROR: >> org.apache.hop.core.exception.HopException: >> 2022/09/02 12:35:33 - join-spike - Error executing pipeline with runner >> Direct >> 2022/09/02 12:35:33 - join-spike - java.lang.RuntimeException: Error >> executing TransformFn >> 2022/09/02 12:35:33 - join-spike - >> 2022/09/02 12:35:33 - join-spike - at >> org.apache.hop.beam.engines.BeamPipelineEngine.executePipeline(BeamPipelineEngine.java:258) >> 2022/09/02 12:35:33 - join-spike - at >> org.apache.hop.beam.engines.BeamPipelineEngine.lambda$startThreads$0(BeamPipelineEngine.java:305) >> 2022/09/02 12:35:33 - join-spike - at >> java.base/java.lang.Thread.run(Thread.java:829) >> 2022/09/02 12:35:33 - join-spike - Caused by: >> org.apache.beam.sdk.Pipeline$PipelineExecutionException: >> java.lang.RuntimeException: Error executing TransformFn >> 2022/09/02 12:35:33 - join-spike - at >> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:373) >> 2022/09/02 12:35:33 - join-spike - at >> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:341) >> 2022/09/02 12:35:33 - join-spike - at >> org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:218) >> 2022/09/02 12:35:33 - join-spike - at >> org.apache.hop.beam.engines.BeamPipelineEngine.executePipeline(BeamPipelineEngine.java:246) >> 2022/09/02 12:35:33 - join-spike - ... 2 more >> 2022/09/02 12:35:33 - join-spike - Caused by: java.lang.RuntimeException: >> Error executing TransformFn >> 2022/09/02 12:35:33 - join-spike - at >> org.apache.hop.beam.core.transform.TransformTransform$TransformFn.processElement(TransformTransform.java:571) >> 2022/09/02 12:35:33 - join-spike - Caused by: >> org.apache.hop.core.exception.HopException: >> 2022/09/02 12:35:33 - join-spike - Error performing an iteration in a single >> threaded pipeline >> 2022/09/02 12:35:33 - join-spike - >> 2022/09/02 12:35:33 - join-spike - site_id Integer : There was a data type >> error: the data type of java.lang.String object [San Jose] does not >> correspond to value meta [Integer] >> 2022/09/02 12:35:33 - join-spike - >> 2022/09/02 12:35:33 - join-spike - >> 2022/09/02 12:35:33 - join-spike - at >> org.apache.hop.pipeline.SingleThreadedPipelineExecutor.oneIteration(SingleThreadedPipelineExecutor.java:390) >> 2022/09/02 12:35:33 - join-spike - at >> org.apache.hop.beam.core.transform.TransformTransform$TransformFn.emptyRowBuffer(TransformTransform.java:614) >> 2022/09/02 12:35:33 - join-spike - at >> org.apache.hop.beam.core.transform.TransformTransform$TransformFn.processElement(TransformTransform.java:567) >> 2022/09/02 12:35:33 - join-spike - at >> org.apache.hop.beam.core.transform.TransformTransform$TransformFn$DoFnInvoker.invokeProcessElement(Unknown >> Source) >> 2022/09/02 12:35:33 - join-spike - at >> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:228) >> 2022/09/02 12:35:33 - join-spike - at >> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:184) >> 2022/09/02 12:35:33 - join-spike - at >> org.apache.beam.repackaged.direct_java.runners.core.SimplePushbackSideInputDoFnRunner.processElementInReadyWindows(SimplePushbackSideInputDoFnRunner.java:79) >> 2022/09/02 12:35:33 - join-spike - at >> org.apache.beam.runners.direct.ParDoEvaluator.processElement(ParDoEvaluator.java:244) >> 2022/09/02 12:35:33 - join-spike - at >> org.apache.beam.runners.direct.DoFnLifecycleManagerRemovingTransformEvaluator.processElement(DoFnLifecycleManagerRemovingTransformEvaluator.java:54) >> 2022/09/02 12:35:33 - join-spike - at >> org.apache.beam.runners.direct.DirectTransformExecutor.processElements(DirectTransformExecutor.java:165) >> 2022/09/02 12:35:33 - join-spike - at >> org.apache.beam.runners.direct.DirectTransformExecutor.run(DirectTransformExecutor.java:129) >> 2022/09/02 12:35:33 - join-spike - at >> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) >> 2022/09/02 12:35:33 - join-spike - at >> java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) >> 2022/09/02 12:35:33 - join-spike - at >> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) >> 2022/09/02 12:35:33 - join-spike - at >> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) >> 2022/09/02 12:35:33 - join-spike - at >> java.base/java.lang.Thread.run(Thread.java:829) >> 2022/09/02 12:35:33 - join-spike - Caused by: >> org.apache.hop.core.exception.HopValueException: >> 2022/09/02 12:35:33 - join-spike - site_id Integer : There was a data type >> error: the data type of java.lang.String object [San Jose] does not >> correspond to value meta [Integer] >> 2022/09/02 12:35:33 - join-spike - >> 2022/09/02 12:35:33 - join-spike - at >> org.apache.hop.core.row.value.ValueMetaBase.getString(ValueMetaBase.java:1944) >> 2022/09/02 12:35:33 - join-spike - at >> org.apache.hop.core.row.RowMeta.getString(RowMeta.java:301) >> 2022/09/02 12:35:33 - join-spike - at >> org.apache.hop.pipeline.transforms.writetolog.WriteToLog.processRow(WriteToLog.java:107) >> 2022/09/02 12:35:33 - join-spike - at >> org.apache.hop.pipeline.SingleThreadedPipelineExecutor.oneIteration(SingleThreadedPipelineExecutor.java:365) >> 2022/09/02 12:35:33 - join-spike - ... 15 more >> >> >> >>> Am 02.09.2022 um 09:48 schrieb Fabian Peters <[email protected] >>> <mailto:[email protected]>>: >>> >>> Hi Hans, >>> >>> That stack trace was due to user error and I'm glad it was. ;) >>> There was one "id" field to many coming into the merge join… >>> >>> Sorry for the noise! >>> >>> cheers >>> >>> Fabian >>> >>>> Am 02.09.2022 um 09:31 schrieb Fabian Peters <[email protected] >>>> <mailto:[email protected]>>: >>>> >>>> Hi Hans, >>>> >>>> Thanks! I probably read that at some point, but the "Notice" modal popping >>>> up when closing the "Merge join" dialogue probably convinced me otherwise: >>>> "If the incoming data is not sorted on the specified keys, the output >>>> results may not be correct. We recommend sorting the incoming data within >>>> the pipeline." >>>> >>>> I'm testing it in my pipeline now and am getting a stack trace (see >>>> below). The "site_id" field is from an "Avro decode" transform and is a >>>> plain Integer. Using the local runner and writing to Postgres this works >>>> fine. >>>> >>>> cheers >>>> >>>> Fabian >>>> >>>> 2022/09/02 09:20:20 - General - ERROR: Error starting the Beam pipeline >>>> 2022/09/02 09:20:20 - General - ERROR: >>>> org.apache.hop.core.exception.HopException: >>>> 2022/09/02 09:20:20 - General - Error executing pipeline with runner Direct >>>> 2022/09/02 09:20:20 - General - java.lang.RuntimeException: Error >>>> converting HopRow to BigQuery TableRow >>>> 2022/09/02 09:20:20 - General - >>>> 2022/09/02 09:20:20 - General - at >>>> org.apache.hop.beam.engines.BeamPipelineEngine.executePipeline(BeamPipelineEngine.java:258) >>>> 2022/09/02 09:20:20 - General - at >>>> org.apache.hop.beam.engines.BeamPipelineEngine.lambda$startThreads$0(BeamPipelineEngine.java:305) >>>> 2022/09/02 09:20:20 - General - at >>>> java.base/java.lang.Thread.run(Thread.java:829) >>>> 2022/09/02 09:20:20 - General - Caused by: >>>> org.apache.beam.sdk.Pipeline$PipelineExecutionException: >>>> java.lang.RuntimeException: Error converting HopRow to BigQuery TableRow >>>> 2022/09/02 09:20:20 - General - at >>>> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:373) >>>> 2022/09/02 09:20:20 - General - at >>>> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:341) >>>> 2022/09/02 09:20:20 - General - at >>>> org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:218) >>>> 2022/09/02 09:20:20 - General - at >>>> org.apache.hop.beam.engines.BeamPipelineEngine.executePipeline(BeamPipelineEngine.java:246) >>>> 2022/09/02 09:20:20 - General - ... 2 more >>>> 2022/09/02 09:20:20 - General - Caused by: java.lang.RuntimeException: >>>> Error converting HopRow to BigQuery TableRow >>>> 2022/09/02 09:20:20 - General - at >>>> org.apache.hop.beam.core.fn.HopToBQTableRowFn.apply(HopToBQTableRowFn.java:126) >>>> 2022/09/02 09:20:20 - General - at >>>> org.apache.hop.beam.core.fn.HopToBQTableRowFn.apply(HopToBQTableRowFn.java:37) >>>> 2022/09/02 09:20:20 - General - at >>>> org.apache.beam.sdk.io.gcp.bigquery.TableRowWriter.write(TableRowWriter.java:41) >>>> 2022/09/02 09:20:20 - General - at >>>> org.apache.beam.sdk.io.gcp.bigquery.WriteBundlesToFiles.processElement(WriteBundlesToFiles.java:232) >>>> 2022/09/02 09:20:20 - General - Caused by: >>>> org.apache.hop.core.exception.HopValueException: >>>> 2022/09/02 09:20:20 - General - Unexpected conversion error while >>>> converting value [site_id Integer] to an Integer >>>> 2022/09/02 09:20:20 - General - class java.lang.String cannot be cast to >>>> class java.lang.Long (java.lang.String and java.lang.Long are in module >>>> java.base of loader 'bootstrap') >>>> 2022/09/02 09:20:20 - General - >>>> 2022/09/02 09:20:20 - General - at >>>> org.apache.hop.core.row.value.ValueMetaBase.getInteger(ValueMetaBase.java:2164) >>>> 2022/09/02 09:20:20 - General - at >>>> org.apache.hop.beam.core.fn.HopToBQTableRowFn.apply(HopToBQTableRowFn.java:96) >>>> 2022/09/02 09:20:20 - General - at >>>> org.apache.hop.beam.core.fn.HopToBQTableRowFn.apply(HopToBQTableRowFn.java:37) >>>> 2022/09/02 09:20:20 - General - at >>>> org.apache.beam.sdk.io.gcp.bigquery.TableRowWriter.write(TableRowWriter.java:41) >>>> 2022/09/02 09:20:20 - General - at >>>> org.apache.beam.sdk.io.gcp.bigquery.WriteBundlesToFiles.processElement(WriteBundlesToFiles.java:232) >>>> 2022/09/02 09:20:20 - General - at >>>> org.apache.beam.sdk.io.gcp.bigquery.WriteBundlesToFiles$DoFnInvoker.invokeProcessElement(Unknown >>>> Source) >>>> 2022/09/02 09:20:20 - General - at >>>> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:228) >>>> 2022/09/02 09:20:20 - General - at >>>> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:184) >>>> 2022/09/02 09:20:20 - General - at >>>> org.apache.beam.repackaged.direct_java.runners.core.SimplePushbackSideInputDoFnRunner.processElementInReadyWindows(SimplePushbackSideInputDoFnRunner.java:88) >>>> 2022/09/02 09:20:20 - General - at >>>> org.apache.beam.runners.direct.ParDoEvaluator.processElement(ParDoEvaluator.java:244) >>>> 2022/09/02 09:20:20 - General - at >>>> org.apache.beam.runners.direct.DoFnLifecycleManagerRemovingTransformEvaluator.processElement(DoFnLifecycleManagerRemovingTransformEvaluator.java:54) >>>> 2022/09/02 09:20:20 - General - at >>>> org.apache.beam.runners.direct.DirectTransformExecutor.processElements(DirectTransformExecutor.java:165) >>>> 2022/09/02 09:20:20 - General - at >>>> org.apache.beam.runners.direct.DirectTransformExecutor.run(DirectTransformExecutor.java:129) >>>> 2022/09/02 09:20:20 - General - at >>>> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) >>>> 2022/09/02 09:20:20 - General - at >>>> java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) >>>> 2022/09/02 09:20:20 - General - at >>>> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) >>>> 2022/09/02 09:20:20 - General - at >>>> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) >>>> 2022/09/02 09:20:20 - General - at >>>> java.base/java.lang.Thread.run(Thread.java:829) >>>> 2022/09/02 09:20:20 - General - Caused by: java.lang.ClassCastException: >>>> class java.lang.String cannot be cast to class java.lang.Long >>>> (java.lang.String and java.lang.Long are in module java.base of loader >>>> 'bootstrap') >>>> 2022/09/02 09:20:20 - General - at >>>> org.apache.hop.core.row.value.ValueMetaBase.getInteger(ValueMetaBase.java:2077) >>>> 2022/09/02 09:20:20 - General - ... 17 more >>>> >>>>> Am 02.09.2022 um 08:58 schrieb Hans Van Akelyen >>>>> <[email protected] <mailto:[email protected]>>: >>>>> >>>>> Hi Fabian, >>>>> >>>>> Merge join does not require your data to be sorted when executing on Beam >>>>> https://hop.apache.org/manual/latest/pipeline/beam/getting-started-with-beam.html#_universal_transforms >>>>> >>>>> <https://hop.apache.org/manual/latest/pipeline/beam/getting-started-with-beam.html#_universal_transforms> >>>>> >>>>> Chases, >>>>> Hans >>>>> >>>>> On Fri, 2 Sep 2022 at 08:34, Fabian Peters <[email protected] >>>>> <mailto:[email protected]>> wrote: >>>>> Good morning Matt, >>>>> >>>>> Thanks for your quick reply! Unfortunately the inputs are not sorted, so >>>>> the Merge Join transform is not an option. I guess I'll have to use >>>>> temporary BigQuery tables to handle this. Those pipelines are all >>>>> bounded, so this is an option. Or is there an easy option to sort things >>>>> when running on Beam? >>>>> >>>>> I'll create a Jira ticket, no problem. >>>>> >>>>> cheers >>>>> >>>>> Fabian >>>>> >>>>>> Am 01.09.2022 um 19:11 schrieb Matt Casters <[email protected] >>>>>> <mailto:[email protected]>>: >>>>>> >>>>>> Hi Fabian, >>>>>> >>>>>> Joining rows is indeed the exception in Beam. I would suggest you use >>>>>> the Merge Join >>>>>> <https://hop.apache.org/manual/latest/pipeline/transforms/mergejoin.html> >>>>>> transforms. >>>>>> For unbounded pipelines (never ending) that transform will be handled >>>>>> <https://github.com/apache/hop/blob/master/plugins/engines/beam/src/main/java/org/apache/hop/beam/pipeline/handler/BeamMergeJoinTransformHandler.java> >>>>>> correctly. >>>>>> If you don't mind, please create a JIRA case so we can create a similar >>>>>> handler for the Cartesian product use-case. >>>>>> The code usually is non-trivial in the massive parallel world but quite >>>>>> doable ;-) >>>>>> >>>>>> All the best, >>>>>> Matt >>>>>> >>>>>> >>>>>> On Thu, Sep 1, 2022 at 6:37 PM Fabian Peters <[email protected] >>>>>> <mailto:[email protected]>> wrote: >>>>>> Hi all, >>>>>> >>>>>> I've hit the next problem, this time something I thought I had testet on >>>>>> Beam before: A pipeline containing a "Join rows (cartesian product)" >>>>>> transform with input from two sources, loops forever when run via >>>>>> Beam-Direct or Dataflow. It works fine using the local runner. >>>>>> >>>>>> While running it on Beam-Direct I've attached a debugger and can see >>>>>> that it is stuck in the while loop at JoinRows.java:486 >>>>>> <https://github.com/apache/hop/blob/758c07c360c26c0447251f0a29df81557864ad11/plugins/transforms/joinrows/src/main/java/org/apache/hop/pipeline/transforms/joinrows/JoinRows.java#L487>. >>>>>> I've tried using a GCS temp directory and swapped the "Main transform >>>>>> to read from" but none of those helped. >>>>>> >>>>>> Is this transform incompatible with Beam? If so, what could I use >>>>>> instead? >>>>>> >>>>>> cheers >>>>>> >>>>>> Fabian >>>>>> >>>>>> <PastedGraphic-8.png> >>>>> >>>>>> >>>>>> >>>>>> -- >>>>>> Neo4j Chief Solutions Architect >>>>>> ✉ [email protected] <mailto:[email protected]> >>>>>> >>>>>> >>>>>> >>>>> >>>> >>> >> >> >> >> -- >> Neo4j Chief Solutions Architect >> ✉ [email protected] <mailto:[email protected]> >> >> >> > > > > -- > Neo4j Chief Solutions Architect > ✉ [email protected] <mailto:[email protected]> > > >
