Hi All,
To come back to this thread this has been picked up by the following ticket
[1] and has been solved in what will be the 2.1 release.

Kind regards,
Hans

[1] https://issues.apache.org/jira/browse/HOP-3983


On Mon, 5 Sept 2022 at 12:04, <[email protected]> wrote:

> I think Merge Join is not working with Beam. I spent many hours trying to
> make it work without success.
> For this reason I decided to go back to Kettle - will be back when it's
> solved.
>
> Regards,
>
> Mike
>
>
> *Sent:* Friday, September 02, 2022 at 3:14 PM
> *From:* "Fabian Peters" <[email protected]>
> *To:* [email protected], "Matt Casters" <[email protected]>
> *Subject:* Re: Merge join issue on Beam [was: Join rows transform loops
> forever on Beam]
> Hi Matt,
>
> Thanks for your quick reply!
>
> The "Write to log" transform actually works fine for me with the local,
> Beam-Direct and Dataflow runners.
>
> I removed it from my test case and replaced the last one with a Beam
> Output transform. The issue stays the same, see stack trace below…
>
> cheers
>
> Fabian
>
>
> 2022/09/02 15:05:06 - Hop - Pipeline opened.
> 2022/09/02 15:05:06 - Hop - Launching pipeline [join-spike]...
> 2022/09/02 15:05:06 - Hop - Started the pipeline execution.
> 2022/09/02 15:05:14 - General - Created Apache Beam pipeline with name
> 'join-spike'
> 2022/09/02 15:05:14 - General - Handled transform (INPUT) : Agents
> 2022/09/02 15:05:14 - General - Handled transform (INPUT) : Sites
> 2022/09/02 15:05:14 - General - Handled Merge Join (TRANSFORM) : Merge join
> 2022/09/02 15:05:14 - General - Handled generic transform (TRANSFORM) :
> Select values, gets data from 1 previous transform(s), targets=0, infos=0
> 2022/09/02 15:05:14 - General - Handled transform (OUTPUT) : Write valid
> agents, gets data from Select values
> 2022/09/02 15:05:14 - join-spike - Executing this pipeline using the Beam
> Pipeline Engine with run configuration 'Beam-Direct'
> 2022/09/02 15:05:14 - join-spike - ERROR: Error starting the Beam pipeline
> 2022/09/02 15:05:14 - join-spike - ERROR:
> org.apache.hop.core.exception.HopException:
> 2022/09/02 15:05:14 - join-spike - Error executing pipeline with runner
> Direct
> 2022/09/02 15:05:14 - join-spike - java.lang.RuntimeException: Error
> converting Hop data to string lines
> 2022/09/02 15:05:14 - join-spike -
> 2022/09/02 15:05:14 - join-spike - at
> org.apache.hop.beam.engines.BeamPipelineEngine.executePipeline(BeamPipelineEngine.java:258)
> 2022/09/02 15:05:14 - join-spike - at
> org.apache.hop.beam.engines.BeamPipelineEngine.lambda$startThreads$0(BeamPipelineEngine.java:305)
> 2022/09/02 15:05:14 - join-spike - at
> java.base/java.lang.Thread.run(Thread.java:829)
> 2022/09/02 15:05:14 - join-spike - Caused by:
> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
> java.lang.RuntimeException: Error converting Hop data to string lines
> 2022/09/02 15:05:14 - join-spike - at
> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:373)
> 2022/09/02 15:05:14 - join-spike - at
> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:341)
> 2022/09/02 15:05:14 - join-spike - at
> org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:218)
> 2022/09/02 15:05:14 - join-spike - at
> org.apache.hop.beam.engines.BeamPipelineEngine.executePipeline(BeamPipelineEngine.java:246)
> 2022/09/02 15:05:14 - join-spike - ... 2 more
> 2022/09/02 15:05:14 - join-spike - Caused by: java.lang.RuntimeException:
> Error converting Hop data to string lines
> 2022/09/02 15:05:14 - join-spike - at
> org.apache.hop.beam.core.fn.HopToStringFn.processElement(HopToStringFn.java:148)
> 2022/09/02 15:05:14 - join-spike - Caused by:
> org.apache.hop.core.exception.HopException:
> 2022/09/02 15:05:14 - join-spike - Error getting String from field site_id
> Integer on index 1 in input: [id Integer], [site_id Integer], [site_name
> String], [telephone String], [agent_name String], native value found:
> Mbabane
> 2022/09/02 15:05:14 - join-spike -
> 2022/09/02 15:05:14 - join-spike - site_id Integer : There was a data type
> error: the data type of java.lang.String object [Mbabane] does not
> correspond to value meta [Integer]
> 2022/09/02 15:05:14 - join-spike -
> 2022/09/02 15:05:14 - join-spike -
> 2022/09/02 15:05:14 - join-spike - at
> org.apache.hop.beam.core.fn.HopToStringFn.processElement(HopToStringFn.java:135)
> 2022/09/02 15:05:14 - join-spike - at
> org.apache.hop.beam.core.fn.HopToStringFn$DoFnInvoker.invokeProcessElement(Unknown
> Source)
> 2022/09/02 15:05:14 - join-spike - at
> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:228)
> 2022/09/02 15:05:14 - join-spike - at
> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:187)
> 2022/09/02 15:05:14 - join-spike - at
> org.apache.beam.repackaged.direct_java.runners.core.SimplePushbackSideInputDoFnRunner.processElementInReadyWindows(SimplePushbackSideInputDoFnRunner.java:79)
> 2022/09/02 15:05:14 - join-spike - at
> org.apache.beam.runners.direct.ParDoEvaluator.processElement(ParDoEvaluator.java:244)
> 2022/09/02 15:05:14 - join-spike - at
> org.apache.beam.runners.direct.DoFnLifecycleManagerRemovingTransformEvaluator.processElement(DoFnLifecycleManagerRemovingTransformEvaluator.java:54)
> 2022/09/02 15:05:14 - join-spike - at
> org.apache.beam.runners.direct.DirectTransformExecutor.processElements(DirectTransformExecutor.java:165)
> 2022/09/02 15:05:14 - join-spike - at
> org.apache.beam.runners.direct.DirectTransformExecutor.run(DirectTransformExecutor.java:129)
> 2022/09/02 15:05:14 - join-spike - at
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
> 2022/09/02 15:05:14 - join-spike - at
> java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
> 2022/09/02 15:05:14 - join-spike - at
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
> 2022/09/02 15:05:14 - join-spike - at
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
> 2022/09/02 15:05:14 - join-spike - at
> java.base/java.lang.Thread.run(Thread.java:829)
> 2022/09/02 15:05:14 - join-spike - Caused by:
> org.apache.hop.core.exception.HopValueException:
> 2022/09/02 15:05:14 - join-spike - site_id Integer : There was a data type
> error: the data type of java.lang.String object [Mbabane] does not
> correspond to value meta [Integer]
> 2022/09/02 15:05:14 - join-spike -
> 2022/09/02 15:05:14 - join-spike - at
> org.apache.hop.core.row.value.ValueMetaBase.getString(ValueMetaBase.java:1944)
> 2022/09/02 15:05:14 - join-spike - at
> org.apache.hop.core.row.RowMeta.getString(RowMeta.java:301)
> 2022/09/02 15:05:14 - join-spike - at
> org.apache.hop.beam.core.fn.HopToStringFn.processElement(HopToStringFn.java:111)
> 2022/09/02 15:05:14 - join-spike - ... 13 more
>
>
>
> Am 02.09.2022 um 13:19 schrieb Matt Casters <[email protected]>:
>
> Hi Fabian,
>
> If you remove the "Write to log" transforms the pipeline will work.  We
> typically don't use that transform as the log ends up on a Spark/Flink node
> somewhere where you can't see the information anyway.  It's the main reason
> why I'm working on HOP-4024.
>
> Also, just as a reference, there is an example in samples project under
> beam/pipelines called complex.hpl which contains a merge join.
>
> Best of luck!
> Matt
>
>
> On Fri, Sep 2, 2022 at 12:56 PM Fabian Peters <[email protected]> wrote:
>
>> Hi once more,
>>
>> I feel a little bit like I've started my slow descent into madness. What
>> I supposed to be a configuration error now looks like it's not. I've put
>> together a minimal test case project
>> <https://github.com/fbarthez/BeamJoinSpike>, showing the same behaviour
>> I'm seeing in my project. I'd be very grateful if someone could take a look
>> and tell me whether I'm taking a wrong turn somewhere or whether there
>> really is a bug.
>>
>> What I'm trying to achieve: There are records of one entity (agent) that
>> come with a site_id field. Some site_id values do not correspond to a valid
>> site. The agent records are to be joined with valid records of the second
>> entity (site), so that only agent records with an existing site_id get to
>> proceed.
>>
>> The setup works fine with the local runner (even though the inputs are
>> not sorted). With the Beam-Direct runner, the fields are getting mixed up,
>> in this case the values from site_name end up in the site_id column.
>>
>> cheers
>>
>> Fabian
>>
>> 2022/09/02 12:35:33 - Hop - Pipeline opened.
>> 2022/09/02 12:35:33 - Hop - Launching pipeline [join-spike]...
>> 2022/09/02 12:35:33 - Hop - Started the pipeline execution.
>> 2022/09/02 12:35:33 - General - Created Apache Beam pipeline with name
>> 'join-spike'
>> 2022/09/02 12:35:33 - General - Handled transform (INPUT) : Sites
>> 2022/09/02 12:35:33 - General - Handled transform (INPUT) : Agents
>> 2022/09/02 12:35:33 - General - Handled generic transform (TRANSFORM) :
>> Log valid sites, gets data from 1 previous transform(s), targets=0, infos=0
>> 2022/09/02 12:35:33 - General - Handled generic transform (TRANSFORM) :
>> Log all agents, gets data from 1 previous transform(s), targets=0, infos=0
>> 2022/09/02 12:35:33 - General - Handled Merge Join (TRANSFORM) : Merge
>> join
>> 2022/09/02 12:35:33 - General - Handled generic transform (TRANSFORM) :
>> Log valid agents, gets data from 1 previous transform(s), targets=0, infos=0
>> 2022/09/02 12:35:33 - join-spike - Executing this pipeline using the Beam
>> Pipeline Engine with run configuration 'Beam-Direct'
>> 2022/09/02 12:35:33 - join-spike - ERROR: Error starting the Beam pipeline
>> 2022/09/02 12:35:33 - join-spike - ERROR:
>> org.apache.hop.core.exception.HopException:
>> 2022/09/02 12:35:33 - join-spike - Error executing pipeline with runner
>> Direct
>> 2022/09/02 12:35:33 - join-spike - java.lang.RuntimeException: Error
>> executing TransformFn
>> 2022/09/02 12:35:33 - join-spike -
>> 2022/09/02 12:35:33 - join-spike -  at
>> org.apache.hop.beam.engines.BeamPipelineEngine.executePipeline(BeamPipelineEngine.java:258)
>> 2022/09/02 12:35:33 - join-spike -  at
>> org.apache.hop.beam.engines.BeamPipelineEngine.lambda$startThreads$0(BeamPipelineEngine.java:305)
>> 2022/09/02 12:35:33 - join-spike -  at
>> java.base/java.lang.Thread.run(Thread.java:829)
>> 2022/09/02 12:35:33 - join-spike - Caused by:
>> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
>> java.lang.RuntimeException: Error executing TransformFn
>> 2022/09/02 12:35:33 - join-spike -
>> at 
>> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:373)
>> 2022/09/02 12:35:33 - join-spike -
>> at 
>> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:341)
>> 2022/09/02 12:35:33 - join-spike -  at
>> org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:218)
>> 2022/09/02 12:35:33 - join-spike -  at
>> org.apache.hop.beam.engines.BeamPipelineEngine.executePipeline(BeamPipelineEngine.java:246)
>> 2022/09/02 12:35:33 - join-spike -  ... 2 more
>> 2022/09/02 12:35:33 - join-spike - Caused by: java.lang.RuntimeException:
>> Error executing TransformFn
>> 2022/09/02 12:35:33 - join-spike -
>> at 
>> org.apache.hop.beam.core.transform.TransformTransform$TransformFn.processElement(TransformTransform.java:571)
>> 2022/09/02 12:35:33 - join-spike - Caused by:
>> org.apache.hop.core.exception.HopException:
>> 2022/09/02 12:35:33 - join-spike - Error performing an iteration in a
>> single threaded pipeline
>> 2022/09/02 12:35:33 - join-spike -
>> 2022/09/02 12:35:33 - join-spike - site_id Integer : There was a data
>> type error: the data type of java.lang.String object [San Jose] does
>> not correspond to value meta [Integer]
>> 2022/09/02 12:35:33 - join-spike -
>> 2022/09/02 12:35:33 - join-spike -
>> 2022/09/02 12:35:33 - join-spike -
>> at 
>> org.apache.hop.pipeline.SingleThreadedPipelineExecutor.oneIteration(SingleThreadedPipelineExecutor.java:390)
>> 2022/09/02 12:35:33 - join-spike -
>> at 
>> org.apache.hop.beam.core.transform.TransformTransform$TransformFn.emptyRowBuffer(TransformTransform.java:614)
>> 2022/09/02 12:35:33 - join-spike -
>> at 
>> org.apache.hop.beam.core.transform.TransformTransform$TransformFn.processElement(TransformTransform.java:567)
>> 2022/09/02 12:35:33 - join-spike -  at 
>> org.apache.hop.beam.core.transform.TransformTransform$TransformFn$DoFnInvoker.invokeProcessElement(Unknown
>> Source)
>> 2022/09/02 12:35:33 - join-spike -
>> at 
>> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:228)
>> 2022/09/02 12:35:33 - join-spike -
>> at 
>> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:184)
>> 2022/09/02 12:35:33 - join-spike -
>> at 
>> org.apache.beam.repackaged.direct_java.runners.core.SimplePushbackSideInputDoFnRunner.processElementInReadyWindows(SimplePushbackSideInputDoFnRunner.java:79)
>> 2022/09/02 12:35:33 - join-spike -  at
>> org.apache.beam.runners.direct.ParDoEvaluator.processElement(ParDoEvaluator.java:244)
>> 2022/09/02 12:35:33 - join-spike -
>> at 
>> org.apache.beam.runners.direct.DoFnLifecycleManagerRemovingTransformEvaluator.processElement(DoFnLifecycleManagerRemovingTransformEvaluator.java:54)
>> 2022/09/02 12:35:33 - join-spike -
>> at 
>> org.apache.beam.runners.direct.DirectTransformExecutor.processElements(DirectTransformExecutor.java:165)
>> 2022/09/02 12:35:33 - join-spike -  at
>> org.apache.beam.runners.direct.DirectTransformExecutor.run(DirectTransformExecutor.java:129)
>> 2022/09/02 12:35:33 - join-spike -  at
>> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
>> 2022/09/02 12:35:33 - join-spike -  at
>> java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
>> 2022/09/02 12:35:33 - join-spike -  at
>> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>> 2022/09/02 12:35:33 - join-spike -  at
>> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>> 2022/09/02 12:35:33 - join-spike -  at
>> java.base/java.lang.Thread.run(Thread.java:829)
>> 2022/09/02 12:35:33 - join-spike - Caused by:
>> org.apache.hop.core.exception.HopValueException:
>> 2022/09/02 12:35:33 - join-spike - site_id Integer : There was a data
>> type error: the data type of java.lang.String object [San Jose] does
>> not correspond to value meta [Integer]
>> 2022/09/02 12:35:33 - join-spike -
>> 2022/09/02 12:35:33 - join-spike -  at
>> org.apache.hop.core.row.value.ValueMetaBase.getString(ValueMetaBase.java:1944)
>> 2022/09/02 12:35:33 - join-spike -  at
>> org.apache.hop.core.row.RowMeta.getString(RowMeta.java:301)
>> 2022/09/02 12:35:33 - join-spike -  at
>> org.apache.hop.pipeline.transforms.writetolog.WriteToLog.processRow(WriteToLog.java:107)
>> 2022/09/02 12:35:33 - join-spike -
>> at 
>> org.apache.hop.pipeline.SingleThreadedPipelineExecutor.oneIteration(SingleThreadedPipelineExecutor.java:365)
>> 2022/09/02 12:35:33 - join-spike -  ... 15 more
>>
>>
>>
>>
>> Am 02.09.2022 um 09:48 schrieb Fabian Peters <[email protected]>:
>>
>> Hi Hans,
>>
>> That stack trace was due to user error and I'm glad it was. ;)
>> There was one "id" field to many coming into the merge join…
>>
>> Sorry for the noise!
>>
>> cheers
>>
>> Fabian
>>
>>
>> Am 02.09.2022 um 09:31 schrieb Fabian Peters <[email protected]>:
>>
>> Hi Hans,
>>
>> Thanks! I probably read that at some point, but the "Notice" modal
>> popping up when closing the "Merge join" dialogue probably convinced me
>> otherwise: "If the incoming data is not sorted on the specified keys, the
>> output results may not be correct. We recommend sorting the incoming data
>> within the pipeline."
>>
>> I'm testing it in my pipeline now and am getting a stack trace (see
>> below). The "site_id" field is from an "Avro decode" transform and is a
>> plain Integer. Using the local runner and writing to Postgres this works
>> fine.
>>
>> cheers
>>
>> Fabian
>>
>> 2022/09/02 09:20:20 - General - ERROR: Error starting the Beam pipeline
>> 2022/09/02 09:20:20 - General - ERROR:
>> org.apache.hop.core.exception.HopException:
>> 2022/09/02 09:20:20 - General - Error executing pipeline with runner
>> Direct
>> 2022/09/02 09:20:20 - General - java.lang.RuntimeException: Error
>> converting HopRow to BigQuery TableRow
>> 2022/09/02 09:20:20 - General -
>> 2022/09/02 09:20:20 - General -  at
>> org.apache.hop.beam.engines.BeamPipelineEngine.executePipeline(BeamPipelineEngine.java:258)
>> 2022/09/02 09:20:20 - General -  at
>> org.apache.hop.beam.engines.BeamPipelineEngine.lambda$startThreads$0(BeamPipelineEngine.java:305)
>> 2022/09/02 09:20:20 - General -  at
>> java.base/java.lang.Thread.run(Thread.java:829)
>> 2022/09/02 09:20:20 - General - Caused by:
>> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
>> java.lang.RuntimeException: Error converting HopRow to BigQuery TableRow
>> 2022/09/02 09:20:20 - General -  at
>> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:373)
>> 2022/09/02 09:20:20 - General -  at
>> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:341)
>> 2022/09/02 09:20:20 - General -  at
>> org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:218)
>> 2022/09/02 09:20:20 - General -  at
>> org.apache.hop.beam.engines.BeamPipelineEngine.executePipeline(BeamPipelineEngine.java:246)
>> 2022/09/02 09:20:20 - General -  ... 2 more
>> 2022/09/02 09:20:20 - General - Caused by: java.lang.RuntimeException:
>> Error converting HopRow to BigQuery TableRow
>> 2022/09/02 09:20:20 - General -  at
>> org.apache.hop.beam.core.fn.HopToBQTableRowFn.apply(HopToBQTableRowFn.java:126)
>> 2022/09/02 09:20:20 - General -  at
>> org.apache.hop.beam.core.fn.HopToBQTableRowFn.apply(HopToBQTableRowFn.java:37)
>> 2022/09/02 09:20:20 - General -  at
>> org.apache.beam.sdk.io.gcp.bigquery.TableRowWriter.write(TableRowWriter.java:41)
>> 2022/09/02 09:20:20 - General -  at
>> org.apache.beam.sdk.io.gcp.bigquery.WriteBundlesToFiles.processElement(WriteBundlesToFiles.java:232)
>> 2022/09/02 09:20:20 - General - Caused by:
>> org.apache.hop.core.exception.HopValueException:
>> 2022/09/02 09:20:20 - General - Unexpected conversion error while
>> converting value [site_id Integer] to an Integer
>> 2022/09/02 09:20:20 - General - class java.lang.String cannot be cast to
>> class java.lang.Long (java.lang.String and java.lang.Long are in
>> module java.base of loader 'bootstrap')
>> 2022/09/02 09:20:20 - General -
>> 2022/09/02 09:20:20 - General -  at
>> org.apache.hop.core.row.value.ValueMetaBase.getInteger(ValueMetaBase.java:2164)
>> 2022/09/02 09:20:20 - General -  at
>> org.apache.hop.beam.core.fn.HopToBQTableRowFn.apply(HopToBQTableRowFn.java:96)
>> 2022/09/02 09:20:20 - General -  at
>> org.apache.hop.beam.core.fn.HopToBQTableRowFn.apply(HopToBQTableRowFn.java:37)
>> 2022/09/02 09:20:20 - General -  at
>> org.apache.beam.sdk.io.gcp.bigquery.TableRowWriter.write(TableRowWriter.java:41)
>> 2022/09/02 09:20:20 - General -  at
>> org.apache.beam.sdk.io.gcp.bigquery.WriteBundlesToFiles.processElement(WriteBundlesToFiles.java:232)
>> 2022/09/02 09:20:20 - General -  at
>> org.apache.beam.sdk.io.gcp.bigquery.WriteBundlesToFiles$DoFnInvoker.invokeProcessElement(Unknown
>> Source)
>> 2022/09/02 09:20:20 - General -
>> at 
>> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:228)
>> 2022/09/02 09:20:20 - General -
>> at 
>> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:184)
>> 2022/09/02 09:20:20 - General -
>> at 
>> org.apache.beam.repackaged.direct_java.runners.core.SimplePushbackSideInputDoFnRunner.processElementInReadyWindows(SimplePushbackSideInputDoFnRunner.java:88)
>> 2022/09/02 09:20:20 - General -  at
>> org.apache.beam.runners.direct.ParDoEvaluator.processElement(ParDoEvaluator.java:244)
>> 2022/09/02 09:20:20 - General -
>> at 
>> org.apache.beam.runners.direct.DoFnLifecycleManagerRemovingTransformEvaluator.processElement(DoFnLifecycleManagerRemovingTransformEvaluator.java:54)
>> 2022/09/02 09:20:20 - General -  at
>> org.apache.beam.runners.direct.DirectTransformExecutor.processElements(DirectTransformExecutor.java:165)
>> 2022/09/02 09:20:20 - General -  at
>> org.apache.beam.runners.direct.DirectTransformExecutor.run(DirectTransformExecutor.java:129)
>> 2022/09/02 09:20:20 - General -  at
>> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
>> 2022/09/02 09:20:20 - General -  at
>> java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
>> 2022/09/02 09:20:20 - General -  at
>> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>> 2022/09/02 09:20:20 - General -  at
>> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>> 2022/09/02 09:20:20 - General -  at
>> java.base/java.lang.Thread.run(Thread.java:829)
>> 2022/09/02 09:20:20 - General - Caused by: java.lang.ClassCastException:
>> class java.lang.String cannot be cast to class
>> java.lang.Long (java.lang.String and java.lang.Long are in module java.base
>> of loader 'bootstrap')
>> 2022/09/02 09:20:20 - General -  at
>> org.apache.hop.core.row.value.ValueMetaBase.getInteger(ValueMetaBase.java:2077)
>> 2022/09/02 09:20:20 - General -  ... 17 more
>>
>>
>> Am 02.09.2022 um 08:58 schrieb Hans Van Akelyen <
>> [email protected]>:
>>
>> Hi Fabian,
>>
>> Merge join does not require your data to be sorted when executing on
>> Beam
>>
>> https://hop.apache.org/manual/latest/pipeline/beam/getting-started-with-beam.html#_universal_transforms
>>
>> Chases,
>> Hans
>>
>> On Fri, 2 Sep 2022 at 08:34, Fabian Peters <[email protected]> wrote:
>>
>>> Good morning Matt,
>>>
>>> Thanks for your quick reply! Unfortunately the inputs are not sorted, so
>>> the Merge Join transform is not an option. I guess I'll have to use
>>> temporary BigQuery tables to handle this. Those pipelines are all bounded,
>>> so this is an option. Or is there an easy option to sort things when
>>> running on Beam?
>>>
>>> I'll create a Jira ticket, no problem.
>>>
>>> cheers
>>>
>>> Fabian
>>>
>>>
>>> Am 01.09.2022 um 19:11 schrieb Matt Casters <[email protected]>:
>>>
>>> Hi Fabian,
>>>
>>> Joining rows is indeed the exception in Beam.  I would suggest you use
>>> the Merge Join
>>> <https://hop.apache.org/manual/latest/pipeline/transforms/mergejoin.html>
>>> transforms.
>>> For unbounded pipelines (never ending) that transform will be handled
>>> <https://github.com/apache/hop/blob/master/plugins/engines/beam/src/main/java/org/apache/hop/beam/pipeline/handler/BeamMergeJoinTransformHandler.java>
>>> correctly.
>>> If you don't mind, please create a JIRA case so we can create a similar
>>> handler for the Cartesian product use-case.
>>> The code usually is non-trivial in the massive parallel world but quite
>>> doable ;-)
>>>
>>> All the best,
>>> Matt
>>>
>>>
>>> On Thu, Sep 1, 2022 at 6:37 PM Fabian Peters <[email protected]> wrote:
>>>
>>>> Hi all,
>>>>
>>>> I've hit the next problem, this time something I thought I had testet
>>>> on Beam before: A pipeline containing a "Join rows (cartesian product)"
>>>> transform with input from two sources, loops forever when run via
>>>> Beam-Direct or Dataflow. It works fine using the local runner.
>>>>
>>>> While running it on Beam-Direct I've attached a debugger and can see
>>>> that it is stuck in the while loop at JoinRows.java:486
>>>> <https://github.com/apache/hop/blob/758c07c360c26c0447251f0a29df81557864ad11/plugins/transforms/joinrows/src/main/java/org/apache/hop/pipeline/transforms/joinrows/JoinRows.java#L487>.
>>>> I've tried using a GCS temp directory and swapped the "Main transform to
>>>> read from" but none of those helped.
>>>>
>>>> Is this transform incompatible with Beam? If so, what could I use
>>>> instead?
>>>>
>>>> cheers
>>>>
>>>> Fabian
>>>>
>>>> <PastedGraphic-8.png>
>>>>
>>>
>>>
>>> --
>>> Neo4j Chief Solutions Architect
>>> *✉   *[email protected]
>>>
>>>
>>>
>>>
>>>
>
> --
> Neo4j Chief Solutions Architect
> *✉   *[email protected]
>
>
>
>
>
>
>

Reply via email to