Hi Matt,

Thanks for your quick reply!

The "Write to log" transform actually works fine for me with the local, 
Beam-Direct and Dataflow runners.

I removed it from my test case and replaced the last one with a Beam Output 
transform. The issue stays the same, see stack trace below…

cheers

Fabian


2022/09/02 15:05:06 - Hop - Pipeline opened.
2022/09/02 15:05:06 - Hop - Launching pipeline [join-spike]...
2022/09/02 15:05:06 - Hop - Started the pipeline execution.
2022/09/02 15:05:14 - General - Created Apache Beam pipeline with name 
'join-spike'
2022/09/02 15:05:14 - General - Handled transform (INPUT) : Agents
2022/09/02 15:05:14 - General - Handled transform (INPUT) : Sites
2022/09/02 15:05:14 - General - Handled Merge Join (TRANSFORM) : Merge join
2022/09/02 15:05:14 - General - Handled generic transform (TRANSFORM) : Select 
values, gets data from 1 previous transform(s), targets=0, infos=0
2022/09/02 15:05:14 - General - Handled transform (OUTPUT) : Write valid 
agents, gets data from Select values
2022/09/02 15:05:14 - join-spike - Executing this pipeline using the Beam 
Pipeline Engine with run configuration 'Beam-Direct'
2022/09/02 15:05:14 - join-spike - ERROR: Error starting the Beam pipeline
2022/09/02 15:05:14 - join-spike - ERROR: 
org.apache.hop.core.exception.HopException: 
2022/09/02 15:05:14 - join-spike - Error executing pipeline with runner Direct
2022/09/02 15:05:14 - join-spike - java.lang.RuntimeException: Error converting 
Hop data to string lines
2022/09/02 15:05:14 - join-spike - 
2022/09/02 15:05:14 - join-spike -      at 
org.apache.hop.beam.engines.BeamPipelineEngine.executePipeline(BeamPipelineEngine.java:258)
2022/09/02 15:05:14 - join-spike -      at 
org.apache.hop.beam.engines.BeamPipelineEngine.lambda$startThreads$0(BeamPipelineEngine.java:305)
2022/09/02 15:05:14 - join-spike -      at 
java.base/java.lang.Thread.run(Thread.java:829)
2022/09/02 15:05:14 - join-spike - Caused by: 
org.apache.beam.sdk.Pipeline$PipelineExecutionException: 
java.lang.RuntimeException: Error converting Hop data to string lines
2022/09/02 15:05:14 - join-spike -      at 
org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:373)
2022/09/02 15:05:14 - join-spike -      at 
org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:341)
2022/09/02 15:05:14 - join-spike -      at 
org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:218)
2022/09/02 15:05:14 - join-spike -      at 
org.apache.hop.beam.engines.BeamPipelineEngine.executePipeline(BeamPipelineEngine.java:246)
2022/09/02 15:05:14 - join-spike -      ... 2 more
2022/09/02 15:05:14 - join-spike - Caused by: java.lang.RuntimeException: Error 
converting Hop data to string lines
2022/09/02 15:05:14 - join-spike -      at 
org.apache.hop.beam.core.fn.HopToStringFn.processElement(HopToStringFn.java:148)
2022/09/02 15:05:14 - join-spike - Caused by: 
org.apache.hop.core.exception.HopException: 
2022/09/02 15:05:14 - join-spike - Error getting String from field site_id 
Integer on index 1 in input: [id Integer], [site_id Integer], [site_name 
String], [telephone String], [agent_name String], native value found: Mbabane
2022/09/02 15:05:14 - join-spike - 
2022/09/02 15:05:14 - join-spike - site_id Integer : There was a data type 
error: the data type of java.lang.String object [Mbabane] does not correspond 
to value meta [Integer]
2022/09/02 15:05:14 - join-spike - 
2022/09/02 15:05:14 - join-spike - 
2022/09/02 15:05:14 - join-spike -      at 
org.apache.hop.beam.core.fn.HopToStringFn.processElement(HopToStringFn.java:135)
2022/09/02 15:05:14 - join-spike -      at 
org.apache.hop.beam.core.fn.HopToStringFn$DoFnInvoker.invokeProcessElement(Unknown
 Source)
2022/09/02 15:05:14 - join-spike -      at 
org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:228)
2022/09/02 15:05:14 - join-spike -      at 
org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:187)
2022/09/02 15:05:14 - join-spike -      at 
org.apache.beam.repackaged.direct_java.runners.core.SimplePushbackSideInputDoFnRunner.processElementInReadyWindows(SimplePushbackSideInputDoFnRunner.java:79)
2022/09/02 15:05:14 - join-spike -      at 
org.apache.beam.runners.direct.ParDoEvaluator.processElement(ParDoEvaluator.java:244)
2022/09/02 15:05:14 - join-spike -      at 
org.apache.beam.runners.direct.DoFnLifecycleManagerRemovingTransformEvaluator.processElement(DoFnLifecycleManagerRemovingTransformEvaluator.java:54)
2022/09/02 15:05:14 - join-spike -      at 
org.apache.beam.runners.direct.DirectTransformExecutor.processElements(DirectTransformExecutor.java:165)
2022/09/02 15:05:14 - join-spike -      at 
org.apache.beam.runners.direct.DirectTransformExecutor.run(DirectTransformExecutor.java:129)
2022/09/02 15:05:14 - join-spike -      at 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
2022/09/02 15:05:14 - join-spike -      at 
java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
2022/09/02 15:05:14 - join-spike -      at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
2022/09/02 15:05:14 - join-spike -      at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
2022/09/02 15:05:14 - join-spike -      at 
java.base/java.lang.Thread.run(Thread.java:829)
2022/09/02 15:05:14 - join-spike - Caused by: 
org.apache.hop.core.exception.HopValueException: 
2022/09/02 15:05:14 - join-spike - site_id Integer : There was a data type 
error: the data type of java.lang.String object [Mbabane] does not correspond 
to value meta [Integer]
2022/09/02 15:05:14 - join-spike - 
2022/09/02 15:05:14 - join-spike -      at 
org.apache.hop.core.row.value.ValueMetaBase.getString(ValueMetaBase.java:1944)
2022/09/02 15:05:14 - join-spike -      at 
org.apache.hop.core.row.RowMeta.getString(RowMeta.java:301)
2022/09/02 15:05:14 - join-spike -      at 
org.apache.hop.beam.core.fn.HopToStringFn.processElement(HopToStringFn.java:111)
2022/09/02 15:05:14 - join-spike -      ... 13 more


> Am 02.09.2022 um 13:19 schrieb Matt Casters <[email protected]>:
> 
> Hi Fabian,
> 
> If you remove the "Write to log" transforms the pipeline will work.  We 
> typically don't use that transform as the log ends up on a Spark/Flink node 
> somewhere where you can't see the information anyway.  It's the main reason 
> why I'm working on HOP-4024.
> 
> Also, just as a reference, there is an example in samples project under 
> beam/pipelines called complex.hpl which contains a merge join.
> 
> Best of luck!
> Matt
> 
> 
> On Fri, Sep 2, 2022 at 12:56 PM Fabian Peters <[email protected] 
> <mailto:[email protected]>> wrote:
> Hi once more,
> 
> I feel a little bit like I've started my slow descent into madness. What I 
> supposed to be a configuration error now looks like it's not. I've put 
> together a minimal test case project 
> <https://github.com/fbarthez/BeamJoinSpike>, showing the same behaviour I'm 
> seeing in my project. I'd be very grateful if someone could take a look and 
> tell me whether I'm taking a wrong turn somewhere or whether there really is 
> a bug.
> 
> What I'm trying to achieve: There are records of one entity (agent) that come 
> with a site_id field. Some site_id values do not correspond to a valid site. 
> The agent records are to be joined with valid records of the second entity 
> (site), so that only agent records with an existing site_id get to proceed.
> 
> The setup works fine with the local runner (even though the inputs are not 
> sorted). With the Beam-Direct runner, the fields are getting mixed up, in 
> this case the values from site_name end up in the site_id column.
> 
> cheers
> 
> Fabian
> 
> 2022/09/02 12:35:33 - Hop - Pipeline opened.
> 2022/09/02 12:35:33 - Hop - Launching pipeline [join-spike]...
> 2022/09/02 12:35:33 - Hop - Started the pipeline execution.
> 2022/09/02 12:35:33 - General - Created Apache Beam pipeline with name 
> 'join-spike'
> 2022/09/02 12:35:33 - General - Handled transform (INPUT) : Sites
> 2022/09/02 12:35:33 - General - Handled transform (INPUT) : Agents
> 2022/09/02 12:35:33 - General - Handled generic transform (TRANSFORM) : Log 
> valid sites, gets data from 1 previous transform(s), targets=0, infos=0
> 2022/09/02 12:35:33 - General - Handled generic transform (TRANSFORM) : Log 
> all agents, gets data from 1 previous transform(s), targets=0, infos=0
> 2022/09/02 12:35:33 - General - Handled Merge Join (TRANSFORM) : Merge join
> 2022/09/02 12:35:33 - General - Handled generic transform (TRANSFORM) : Log 
> valid agents, gets data from 1 previous transform(s), targets=0, infos=0
> 2022/09/02 12:35:33 - join-spike - Executing this pipeline using the Beam 
> Pipeline Engine with run configuration 'Beam-Direct'
> 2022/09/02 12:35:33 - join-spike - ERROR: Error starting the Beam pipeline
> 2022/09/02 12:35:33 - join-spike - ERROR: 
> org.apache.hop.core.exception.HopException: 
> 2022/09/02 12:35:33 - join-spike - Error executing pipeline with runner Direct
> 2022/09/02 12:35:33 - join-spike - java.lang.RuntimeException: Error 
> executing TransformFn
> 2022/09/02 12:35:33 - join-spike - 
> 2022/09/02 12:35:33 - join-spike -    at 
> org.apache.hop.beam.engines.BeamPipelineEngine.executePipeline(BeamPipelineEngine.java:258)
> 2022/09/02 12:35:33 - join-spike -    at 
> org.apache.hop.beam.engines.BeamPipelineEngine.lambda$startThreads$0(BeamPipelineEngine.java:305)
> 2022/09/02 12:35:33 - join-spike -    at 
> java.base/java.lang.Thread.run(Thread.java:829)
> 2022/09/02 12:35:33 - join-spike - Caused by: 
> org.apache.beam.sdk.Pipeline$PipelineExecutionException: 
> java.lang.RuntimeException: Error executing TransformFn
> 2022/09/02 12:35:33 - join-spike -    at 
> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:373)
> 2022/09/02 12:35:33 - join-spike -    at 
> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:341)
> 2022/09/02 12:35:33 - join-spike -    at 
> org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:218)
> 2022/09/02 12:35:33 - join-spike -    at 
> org.apache.hop.beam.engines.BeamPipelineEngine.executePipeline(BeamPipelineEngine.java:246)
> 2022/09/02 12:35:33 - join-spike -    ... 2 more
> 2022/09/02 12:35:33 - join-spike - Caused by: java.lang.RuntimeException: 
> Error executing TransformFn
> 2022/09/02 12:35:33 - join-spike -    at 
> org.apache.hop.beam.core.transform.TransformTransform$TransformFn.processElement(TransformTransform.java:571)
> 2022/09/02 12:35:33 - join-spike - Caused by: 
> org.apache.hop.core.exception.HopException: 
> 2022/09/02 12:35:33 - join-spike - Error performing an iteration in a single 
> threaded pipeline
> 2022/09/02 12:35:33 - join-spike - 
> 2022/09/02 12:35:33 - join-spike - site_id Integer : There was a data type 
> error: the data type of java.lang.String object [San Jose] does not 
> correspond to value meta [Integer]
> 2022/09/02 12:35:33 - join-spike - 
> 2022/09/02 12:35:33 - join-spike - 
> 2022/09/02 12:35:33 - join-spike -    at 
> org.apache.hop.pipeline.SingleThreadedPipelineExecutor.oneIteration(SingleThreadedPipelineExecutor.java:390)
> 2022/09/02 12:35:33 - join-spike -    at 
> org.apache.hop.beam.core.transform.TransformTransform$TransformFn.emptyRowBuffer(TransformTransform.java:614)
> 2022/09/02 12:35:33 - join-spike -    at 
> org.apache.hop.beam.core.transform.TransformTransform$TransformFn.processElement(TransformTransform.java:567)
> 2022/09/02 12:35:33 - join-spike -    at 
> org.apache.hop.beam.core.transform.TransformTransform$TransformFn$DoFnInvoker.invokeProcessElement(Unknown
>  Source)
> 2022/09/02 12:35:33 - join-spike -    at 
> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:228)
> 2022/09/02 12:35:33 - join-spike -    at 
> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:184)
> 2022/09/02 12:35:33 - join-spike -    at 
> org.apache.beam.repackaged.direct_java.runners.core.SimplePushbackSideInputDoFnRunner.processElementInReadyWindows(SimplePushbackSideInputDoFnRunner.java:79)
> 2022/09/02 12:35:33 - join-spike -    at 
> org.apache.beam.runners.direct.ParDoEvaluator.processElement(ParDoEvaluator.java:244)
> 2022/09/02 12:35:33 - join-spike -    at 
> org.apache.beam.runners.direct.DoFnLifecycleManagerRemovingTransformEvaluator.processElement(DoFnLifecycleManagerRemovingTransformEvaluator.java:54)
> 2022/09/02 12:35:33 - join-spike -    at 
> org.apache.beam.runners.direct.DirectTransformExecutor.processElements(DirectTransformExecutor.java:165)
> 2022/09/02 12:35:33 - join-spike -    at 
> org.apache.beam.runners.direct.DirectTransformExecutor.run(DirectTransformExecutor.java:129)
> 2022/09/02 12:35:33 - join-spike -    at 
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
> 2022/09/02 12:35:33 - join-spike -    at 
> java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
> 2022/09/02 12:35:33 - join-spike -    at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
> 2022/09/02 12:35:33 - join-spike -    at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
> 2022/09/02 12:35:33 - join-spike -    at 
> java.base/java.lang.Thread.run(Thread.java:829)
> 2022/09/02 12:35:33 - join-spike - Caused by: 
> org.apache.hop.core.exception.HopValueException: 
> 2022/09/02 12:35:33 - join-spike - site_id Integer : There was a data type 
> error: the data type of java.lang.String object [San Jose] does not 
> correspond to value meta [Integer]
> 2022/09/02 12:35:33 - join-spike - 
> 2022/09/02 12:35:33 - join-spike -    at 
> org.apache.hop.core.row.value.ValueMetaBase.getString(ValueMetaBase.java:1944)
> 2022/09/02 12:35:33 - join-spike -    at 
> org.apache.hop.core.row.RowMeta.getString(RowMeta.java:301)
> 2022/09/02 12:35:33 - join-spike -    at 
> org.apache.hop.pipeline.transforms.writetolog.WriteToLog.processRow(WriteToLog.java:107)
> 2022/09/02 12:35:33 - join-spike -    at 
> org.apache.hop.pipeline.SingleThreadedPipelineExecutor.oneIteration(SingleThreadedPipelineExecutor.java:365)
> 2022/09/02 12:35:33 - join-spike -    ... 15 more
> 
> 
> 
>> Am 02.09.2022 um 09:48 schrieb Fabian Peters <[email protected] 
>> <mailto:[email protected]>>:
>> 
>> Hi Hans,
>> 
>> That stack trace was due to user error and I'm glad it was. ;)
>> There was one "id" field to many coming into the merge join…
>> 
>> Sorry for the noise!
>> 
>> cheers
>> 
>> Fabian
>> 
>>> Am 02.09.2022 um 09:31 schrieb Fabian Peters <[email protected] 
>>> <mailto:[email protected]>>:
>>> 
>>> Hi Hans,
>>> 
>>> Thanks! I probably read that at some point, but the "Notice" modal popping 
>>> up when closing the "Merge join" dialogue probably convinced me otherwise: 
>>> "If the incoming data is not sorted on the specified keys, the output 
>>> results may not be correct. We recommend sorting the incoming data within 
>>> the pipeline."
>>> 
>>> I'm testing it in my pipeline now and am getting a stack trace (see below). 
>>> The "site_id" field is from an "Avro decode" transform and is a plain 
>>> Integer. Using the local runner and writing to Postgres this works fine.
>>> 
>>> cheers
>>> 
>>> Fabian
>>> 
>>> 2022/09/02 09:20:20 - General - ERROR: Error starting the Beam pipeline
>>> 2022/09/02 09:20:20 - General - ERROR: 
>>> org.apache.hop.core.exception.HopException: 
>>> 2022/09/02 09:20:20 - General - Error executing pipeline with runner Direct
>>> 2022/09/02 09:20:20 - General - java.lang.RuntimeException: Error 
>>> converting HopRow to BigQuery TableRow
>>> 2022/09/02 09:20:20 - General - 
>>> 2022/09/02 09:20:20 - General -     at 
>>> org.apache.hop.beam.engines.BeamPipelineEngine.executePipeline(BeamPipelineEngine.java:258)
>>> 2022/09/02 09:20:20 - General -     at 
>>> org.apache.hop.beam.engines.BeamPipelineEngine.lambda$startThreads$0(BeamPipelineEngine.java:305)
>>> 2022/09/02 09:20:20 - General -     at 
>>> java.base/java.lang.Thread.run(Thread.java:829)
>>> 2022/09/02 09:20:20 - General - Caused by: 
>>> org.apache.beam.sdk.Pipeline$PipelineExecutionException: 
>>> java.lang.RuntimeException: Error converting HopRow to BigQuery TableRow
>>> 2022/09/02 09:20:20 - General -     at 
>>> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:373)
>>> 2022/09/02 09:20:20 - General -     at 
>>> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:341)
>>> 2022/09/02 09:20:20 - General -     at 
>>> org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:218)
>>> 2022/09/02 09:20:20 - General -     at 
>>> org.apache.hop.beam.engines.BeamPipelineEngine.executePipeline(BeamPipelineEngine.java:246)
>>> 2022/09/02 09:20:20 - General -     ... 2 more
>>> 2022/09/02 09:20:20 - General - Caused by: java.lang.RuntimeException: 
>>> Error converting HopRow to BigQuery TableRow
>>> 2022/09/02 09:20:20 - General -     at 
>>> org.apache.hop.beam.core.fn.HopToBQTableRowFn.apply(HopToBQTableRowFn.java:126)
>>> 2022/09/02 09:20:20 - General -     at 
>>> org.apache.hop.beam.core.fn.HopToBQTableRowFn.apply(HopToBQTableRowFn.java:37)
>>> 2022/09/02 09:20:20 - General -     at 
>>> org.apache.beam.sdk.io.gcp.bigquery.TableRowWriter.write(TableRowWriter.java:41)
>>> 2022/09/02 09:20:20 - General -     at 
>>> org.apache.beam.sdk.io.gcp.bigquery.WriteBundlesToFiles.processElement(WriteBundlesToFiles.java:232)
>>> 2022/09/02 09:20:20 - General - Caused by: 
>>> org.apache.hop.core.exception.HopValueException: 
>>> 2022/09/02 09:20:20 - General - Unexpected conversion error while 
>>> converting value [site_id Integer] to an Integer
>>> 2022/09/02 09:20:20 - General - class java.lang.String cannot be cast to 
>>> class java.lang.Long (java.lang.String and java.lang.Long are in module 
>>> java.base of loader 'bootstrap')
>>> 2022/09/02 09:20:20 - General - 
>>> 2022/09/02 09:20:20 - General -     at 
>>> org.apache.hop.core.row.value.ValueMetaBase.getInteger(ValueMetaBase.java:2164)
>>> 2022/09/02 09:20:20 - General -     at 
>>> org.apache.hop.beam.core.fn.HopToBQTableRowFn.apply(HopToBQTableRowFn.java:96)
>>> 2022/09/02 09:20:20 - General -     at 
>>> org.apache.hop.beam.core.fn.HopToBQTableRowFn.apply(HopToBQTableRowFn.java:37)
>>> 2022/09/02 09:20:20 - General -     at 
>>> org.apache.beam.sdk.io.gcp.bigquery.TableRowWriter.write(TableRowWriter.java:41)
>>> 2022/09/02 09:20:20 - General -     at 
>>> org.apache.beam.sdk.io.gcp.bigquery.WriteBundlesToFiles.processElement(WriteBundlesToFiles.java:232)
>>> 2022/09/02 09:20:20 - General -     at 
>>> org.apache.beam.sdk.io.gcp.bigquery.WriteBundlesToFiles$DoFnInvoker.invokeProcessElement(Unknown
>>>  Source)
>>> 2022/09/02 09:20:20 - General -     at 
>>> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:228)
>>> 2022/09/02 09:20:20 - General -     at 
>>> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:184)
>>> 2022/09/02 09:20:20 - General -     at 
>>> org.apache.beam.repackaged.direct_java.runners.core.SimplePushbackSideInputDoFnRunner.processElementInReadyWindows(SimplePushbackSideInputDoFnRunner.java:88)
>>> 2022/09/02 09:20:20 - General -     at 
>>> org.apache.beam.runners.direct.ParDoEvaluator.processElement(ParDoEvaluator.java:244)
>>> 2022/09/02 09:20:20 - General -     at 
>>> org.apache.beam.runners.direct.DoFnLifecycleManagerRemovingTransformEvaluator.processElement(DoFnLifecycleManagerRemovingTransformEvaluator.java:54)
>>> 2022/09/02 09:20:20 - General -     at 
>>> org.apache.beam.runners.direct.DirectTransformExecutor.processElements(DirectTransformExecutor.java:165)
>>> 2022/09/02 09:20:20 - General -     at 
>>> org.apache.beam.runners.direct.DirectTransformExecutor.run(DirectTransformExecutor.java:129)
>>> 2022/09/02 09:20:20 - General -     at 
>>> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
>>> 2022/09/02 09:20:20 - General -     at 
>>> java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
>>> 2022/09/02 09:20:20 - General -     at 
>>> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>>> 2022/09/02 09:20:20 - General -     at 
>>> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>>> 2022/09/02 09:20:20 - General -     at 
>>> java.base/java.lang.Thread.run(Thread.java:829)
>>> 2022/09/02 09:20:20 - General - Caused by: java.lang.ClassCastException: 
>>> class java.lang.String cannot be cast to class java.lang.Long 
>>> (java.lang.String and java.lang.Long are in module java.base of loader 
>>> 'bootstrap')
>>> 2022/09/02 09:20:20 - General -     at 
>>> org.apache.hop.core.row.value.ValueMetaBase.getInteger(ValueMetaBase.java:2077)
>>> 2022/09/02 09:20:20 - General -     ... 17 more
>>> 
>>>> Am 02.09.2022 um 08:58 schrieb Hans Van Akelyen 
>>>> <[email protected] <mailto:[email protected]>>:
>>>> 
>>>> Hi Fabian,
>>>> 
>>>> Merge join does not require your data to be sorted when executing on Beam 
>>>> https://hop.apache.org/manual/latest/pipeline/beam/getting-started-with-beam.html#_universal_transforms
>>>>  
>>>> <https://hop.apache.org/manual/latest/pipeline/beam/getting-started-with-beam.html#_universal_transforms>
>>>> 
>>>> Chases,
>>>> Hans
>>>> 
>>>> On Fri, 2 Sep 2022 at 08:34, Fabian Peters <[email protected] 
>>>> <mailto:[email protected]>> wrote:
>>>> Good morning Matt,
>>>> 
>>>> Thanks for your quick reply! Unfortunately the inputs are not sorted, so 
>>>> the Merge Join transform is not an option. I guess I'll have to use 
>>>> temporary BigQuery tables to handle this. Those pipelines are all bounded, 
>>>> so this is an option. Or is there an easy option to sort things when 
>>>> running on Beam?
>>>> 
>>>> I'll create a Jira ticket, no problem.
>>>> 
>>>> cheers
>>>> 
>>>> Fabian
>>>> 
>>>>> Am 01.09.2022 um 19:11 schrieb Matt Casters <[email protected] 
>>>>> <mailto:[email protected]>>:
>>>>> 
>>>>> Hi Fabian,
>>>>> 
>>>>> Joining rows is indeed the exception in Beam.  I would suggest you use 
>>>>> the Merge Join 
>>>>> <https://hop.apache.org/manual/latest/pipeline/transforms/mergejoin.html> 
>>>>> transforms.
>>>>> For unbounded pipelines (never ending) that transform will be handled 
>>>>> <https://github.com/apache/hop/blob/master/plugins/engines/beam/src/main/java/org/apache/hop/beam/pipeline/handler/BeamMergeJoinTransformHandler.java>
>>>>>  correctly. 
>>>>> If you don't mind, please create a JIRA case so we can create a similar 
>>>>> handler for the Cartesian product use-case.
>>>>> The code usually is non-trivial in the massive parallel world but quite 
>>>>> doable ;-)
>>>>> 
>>>>> All the best,
>>>>> Matt
>>>>> 
>>>>> 
>>>>> On Thu, Sep 1, 2022 at 6:37 PM Fabian Peters <[email protected] 
>>>>> <mailto:[email protected]>> wrote:
>>>>> Hi all,
>>>>> 
>>>>> I've hit the next problem, this time something I thought I had testet on 
>>>>> Beam before: A pipeline containing a "Join rows (cartesian product)" 
>>>>> transform with input from two sources, loops forever when run via 
>>>>> Beam-Direct or Dataflow. It works fine using the local runner.
>>>>> 
>>>>> While running it on Beam-Direct I've attached a debugger and can see that 
>>>>> it is stuck in the while loop at JoinRows.java:486 
>>>>> <https://github.com/apache/hop/blob/758c07c360c26c0447251f0a29df81557864ad11/plugins/transforms/joinrows/src/main/java/org/apache/hop/pipeline/transforms/joinrows/JoinRows.java#L487>.
>>>>>  I've tried using a GCS temp directory and swapped the "Main transform to 
>>>>> read from" but none of those helped.
>>>>> 
>>>>> Is this transform incompatible with Beam? If so, what could I use instead?
>>>>> 
>>>>> cheers
>>>>> 
>>>>> Fabian
>>>>> 
>>>>> <PastedGraphic-8.png>
>>>> 
>>>>> 
>>>>> 
>>>>> -- 
>>>>> Neo4j Chief Solutions Architect
>>>>> ✉   [email protected] <mailto:[email protected]>
>>>>> 
>>>>> 
>>>>> 
>>>> 
>>> 
>> 
> 
> 
> 
> -- 
> Neo4j Chief Solutions Architect
> ✉   [email protected] <mailto:[email protected]>
> 
> 
> 

Reply via email to