Hi Hans,

Thanks! I probably read that at some point, but the "Notice" modal popping up 
when closing the "Merge join" dialogue probably convinced me otherwise: "If the 
incoming data is not sorted on the specified keys, the output results may not 
be correct. We recommend sorting the incoming data within the pipeline."

I'm testing it in my pipeline now and am getting a stack trace (see below). The 
"site_id" field is from an "Avro decode" transform and is a plain Integer. 
Using the local runner and writing to Postgres this works fine.

cheers

Fabian

2022/09/02 09:20:20 - General - ERROR: Error starting the Beam pipeline
2022/09/02 09:20:20 - General - ERROR: 
org.apache.hop.core.exception.HopException: 
2022/09/02 09:20:20 - General - Error executing pipeline with runner Direct
2022/09/02 09:20:20 - General - java.lang.RuntimeException: Error converting 
HopRow to BigQuery TableRow
2022/09/02 09:20:20 - General - 
2022/09/02 09:20:20 - General -         at 
org.apache.hop.beam.engines.BeamPipelineEngine.executePipeline(BeamPipelineEngine.java:258)
2022/09/02 09:20:20 - General -         at 
org.apache.hop.beam.engines.BeamPipelineEngine.lambda$startThreads$0(BeamPipelineEngine.java:305)
2022/09/02 09:20:20 - General -         at 
java.base/java.lang.Thread.run(Thread.java:829)
2022/09/02 09:20:20 - General - Caused by: 
org.apache.beam.sdk.Pipeline$PipelineExecutionException: 
java.lang.RuntimeException: Error converting HopRow to BigQuery TableRow
2022/09/02 09:20:20 - General -         at 
org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:373)
2022/09/02 09:20:20 - General -         at 
org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:341)
2022/09/02 09:20:20 - General -         at 
org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:218)
2022/09/02 09:20:20 - General -         at 
org.apache.hop.beam.engines.BeamPipelineEngine.executePipeline(BeamPipelineEngine.java:246)
2022/09/02 09:20:20 - General -         ... 2 more
2022/09/02 09:20:20 - General - Caused by: java.lang.RuntimeException: Error 
converting HopRow to BigQuery TableRow
2022/09/02 09:20:20 - General -         at 
org.apache.hop.beam.core.fn.HopToBQTableRowFn.apply(HopToBQTableRowFn.java:126)
2022/09/02 09:20:20 - General -         at 
org.apache.hop.beam.core.fn.HopToBQTableRowFn.apply(HopToBQTableRowFn.java:37)
2022/09/02 09:20:20 - General -         at 
org.apache.beam.sdk.io.gcp.bigquery.TableRowWriter.write(TableRowWriter.java:41)
2022/09/02 09:20:20 - General -         at 
org.apache.beam.sdk.io.gcp.bigquery.WriteBundlesToFiles.processElement(WriteBundlesToFiles.java:232)
2022/09/02 09:20:20 - General - Caused by: 
org.apache.hop.core.exception.HopValueException: 
2022/09/02 09:20:20 - General - Unexpected conversion error while converting 
value [site_id Integer] to an Integer
2022/09/02 09:20:20 - General - class java.lang.String cannot be cast to class 
java.lang.Long (java.lang.String and java.lang.Long are in module java.base of 
loader 'bootstrap')
2022/09/02 09:20:20 - General - 
2022/09/02 09:20:20 - General -         at 
org.apache.hop.core.row.value.ValueMetaBase.getInteger(ValueMetaBase.java:2164)
2022/09/02 09:20:20 - General -         at 
org.apache.hop.beam.core.fn.HopToBQTableRowFn.apply(HopToBQTableRowFn.java:96)
2022/09/02 09:20:20 - General -         at 
org.apache.hop.beam.core.fn.HopToBQTableRowFn.apply(HopToBQTableRowFn.java:37)
2022/09/02 09:20:20 - General -         at 
org.apache.beam.sdk.io.gcp.bigquery.TableRowWriter.write(TableRowWriter.java:41)
2022/09/02 09:20:20 - General -         at 
org.apache.beam.sdk.io.gcp.bigquery.WriteBundlesToFiles.processElement(WriteBundlesToFiles.java:232)
2022/09/02 09:20:20 - General -         at 
org.apache.beam.sdk.io.gcp.bigquery.WriteBundlesToFiles$DoFnInvoker.invokeProcessElement(Unknown
 Source)
2022/09/02 09:20:20 - General -         at 
org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:228)
2022/09/02 09:20:20 - General -         at 
org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:184)
2022/09/02 09:20:20 - General -         at 
org.apache.beam.repackaged.direct_java.runners.core.SimplePushbackSideInputDoFnRunner.processElementInReadyWindows(SimplePushbackSideInputDoFnRunner.java:88)
2022/09/02 09:20:20 - General -         at 
org.apache.beam.runners.direct.ParDoEvaluator.processElement(ParDoEvaluator.java:244)
2022/09/02 09:20:20 - General -         at 
org.apache.beam.runners.direct.DoFnLifecycleManagerRemovingTransformEvaluator.processElement(DoFnLifecycleManagerRemovingTransformEvaluator.java:54)
2022/09/02 09:20:20 - General -         at 
org.apache.beam.runners.direct.DirectTransformExecutor.processElements(DirectTransformExecutor.java:165)
2022/09/02 09:20:20 - General -         at 
org.apache.beam.runners.direct.DirectTransformExecutor.run(DirectTransformExecutor.java:129)
2022/09/02 09:20:20 - General -         at 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
2022/09/02 09:20:20 - General -         at 
java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
2022/09/02 09:20:20 - General -         at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
2022/09/02 09:20:20 - General -         at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
2022/09/02 09:20:20 - General -         at 
java.base/java.lang.Thread.run(Thread.java:829)
2022/09/02 09:20:20 - General - Caused by: java.lang.ClassCastException: class 
java.lang.String cannot be cast to class java.lang.Long (java.lang.String and 
java.lang.Long are in module java.base of loader 'bootstrap')
2022/09/02 09:20:20 - General -         at 
org.apache.hop.core.row.value.ValueMetaBase.getInteger(ValueMetaBase.java:2077)
2022/09/02 09:20:20 - General -         ... 17 more

> Am 02.09.2022 um 08:58 schrieb Hans Van Akelyen <[email protected]>:
> 
> Hi Fabian,
> 
> Merge join does not require your data to be sorted when executing on Beam 
> https://hop.apache.org/manual/latest/pipeline/beam/getting-started-with-beam.html#_universal_transforms
>  
> <https://hop.apache.org/manual/latest/pipeline/beam/getting-started-with-beam.html#_universal_transforms>
> 
> Chases,
> Hans
> 
> On Fri, 2 Sep 2022 at 08:34, Fabian Peters <[email protected] 
> <mailto:[email protected]>> wrote:
> Good morning Matt,
> 
> Thanks for your quick reply! Unfortunately the inputs are not sorted, so the 
> Merge Join transform is not an option. I guess I'll have to use temporary 
> BigQuery tables to handle this. Those pipelines are all bounded, so this is 
> an option. Or is there an easy option to sort things when running on Beam?
> 
> I'll create a Jira ticket, no problem.
> 
> cheers
> 
> Fabian
> 
>> Am 01.09.2022 um 19:11 schrieb Matt Casters <[email protected] 
>> <mailto:[email protected]>>:
>> 
>> Hi Fabian,
>> 
>> Joining rows is indeed the exception in Beam.  I would suggest you use the 
>> Merge Join 
>> <https://hop.apache.org/manual/latest/pipeline/transforms/mergejoin.html> 
>> transforms.
>> For unbounded pipelines (never ending) that transform will be handled 
>> <https://github.com/apache/hop/blob/master/plugins/engines/beam/src/main/java/org/apache/hop/beam/pipeline/handler/BeamMergeJoinTransformHandler.java>
>>  correctly. 
>> If you don't mind, please create a JIRA case so we can create a similar 
>> handler for the Cartesian product use-case.
>> The code usually is non-trivial in the massive parallel world but quite 
>> doable ;-)
>> 
>> All the best,
>> Matt
>> 
>> 
>> On Thu, Sep 1, 2022 at 6:37 PM Fabian Peters <[email protected] 
>> <mailto:[email protected]>> wrote:
>> Hi all,
>> 
>> I've hit the next problem, this time something I thought I had testet on 
>> Beam before: A pipeline containing a "Join rows (cartesian product)" 
>> transform with input from two sources, loops forever when run via 
>> Beam-Direct or Dataflow. It works fine using the local runner.
>> 
>> While running it on Beam-Direct I've attached a debugger and can see that it 
>> is stuck in the while loop at JoinRows.java:486 
>> <https://github.com/apache/hop/blob/758c07c360c26c0447251f0a29df81557864ad11/plugins/transforms/joinrows/src/main/java/org/apache/hop/pipeline/transforms/joinrows/JoinRows.java#L487>.
>>  I've tried using a GCS temp directory and swapped the "Main transform to 
>> read from" but none of those helped.
>> 
>> Is this transform incompatible with Beam? If so, what could I use instead?
>> 
>> cheers
>> 
>> Fabian
>> 
>> <PastedGraphic-8.png>
> 
>> 
>> 
>> -- 
>> Neo4j Chief Solutions Architect
>> ✉   [email protected] <mailto:[email protected]>
>> 
>> 
>> 
> 

Reply via email to