Hi Hans,

That stack trace was due to user error and I'm glad it was. ;)
There was one "id" field to many coming into the merge join…

Sorry for the noise!

cheers

Fabian

> Am 02.09.2022 um 09:31 schrieb Fabian Peters <[email protected]>:
> 
> Hi Hans,
> 
> Thanks! I probably read that at some point, but the "Notice" modal popping up 
> when closing the "Merge join" dialogue probably convinced me otherwise: "If 
> the incoming data is not sorted on the specified keys, the output results may 
> not be correct. We recommend sorting the incoming data within the pipeline."
> 
> I'm testing it in my pipeline now and am getting a stack trace (see below). 
> The "site_id" field is from an "Avro decode" transform and is a plain 
> Integer. Using the local runner and writing to Postgres this works fine.
> 
> cheers
> 
> Fabian
> 
> 2022/09/02 09:20:20 - General - ERROR: Error starting the Beam pipeline
> 2022/09/02 09:20:20 - General - ERROR: 
> org.apache.hop.core.exception.HopException: 
> 2022/09/02 09:20:20 - General - Error executing pipeline with runner Direct
> 2022/09/02 09:20:20 - General - java.lang.RuntimeException: Error converting 
> HopRow to BigQuery TableRow
> 2022/09/02 09:20:20 - General - 
> 2022/09/02 09:20:20 - General -       at 
> org.apache.hop.beam.engines.BeamPipelineEngine.executePipeline(BeamPipelineEngine.java:258)
> 2022/09/02 09:20:20 - General -       at 
> org.apache.hop.beam.engines.BeamPipelineEngine.lambda$startThreads$0(BeamPipelineEngine.java:305)
> 2022/09/02 09:20:20 - General -       at 
> java.base/java.lang.Thread.run(Thread.java:829)
> 2022/09/02 09:20:20 - General - Caused by: 
> org.apache.beam.sdk.Pipeline$PipelineExecutionException: 
> java.lang.RuntimeException: Error converting HopRow to BigQuery TableRow
> 2022/09/02 09:20:20 - General -       at 
> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:373)
> 2022/09/02 09:20:20 - General -       at 
> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:341)
> 2022/09/02 09:20:20 - General -       at 
> org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:218)
> 2022/09/02 09:20:20 - General -       at 
> org.apache.hop.beam.engines.BeamPipelineEngine.executePipeline(BeamPipelineEngine.java:246)
> 2022/09/02 09:20:20 - General -       ... 2 more
> 2022/09/02 09:20:20 - General - Caused by: java.lang.RuntimeException: Error 
> converting HopRow to BigQuery TableRow
> 2022/09/02 09:20:20 - General -       at 
> org.apache.hop.beam.core.fn.HopToBQTableRowFn.apply(HopToBQTableRowFn.java:126)
> 2022/09/02 09:20:20 - General -       at 
> org.apache.hop.beam.core.fn.HopToBQTableRowFn.apply(HopToBQTableRowFn.java:37)
> 2022/09/02 09:20:20 - General -       at 
> org.apache.beam.sdk.io.gcp.bigquery.TableRowWriter.write(TableRowWriter.java:41)
> 2022/09/02 09:20:20 - General -       at 
> org.apache.beam.sdk.io.gcp.bigquery.WriteBundlesToFiles.processElement(WriteBundlesToFiles.java:232)
> 2022/09/02 09:20:20 - General - Caused by: 
> org.apache.hop.core.exception.HopValueException: 
> 2022/09/02 09:20:20 - General - Unexpected conversion error while converting 
> value [site_id Integer] to an Integer
> 2022/09/02 09:20:20 - General - class java.lang.String cannot be cast to 
> class java.lang.Long (java.lang.String and java.lang.Long are in module 
> java.base of loader 'bootstrap')
> 2022/09/02 09:20:20 - General - 
> 2022/09/02 09:20:20 - General -       at 
> org.apache.hop.core.row.value.ValueMetaBase.getInteger(ValueMetaBase.java:2164)
> 2022/09/02 09:20:20 - General -       at 
> org.apache.hop.beam.core.fn.HopToBQTableRowFn.apply(HopToBQTableRowFn.java:96)
> 2022/09/02 09:20:20 - General -       at 
> org.apache.hop.beam.core.fn.HopToBQTableRowFn.apply(HopToBQTableRowFn.java:37)
> 2022/09/02 09:20:20 - General -       at 
> org.apache.beam.sdk.io.gcp.bigquery.TableRowWriter.write(TableRowWriter.java:41)
> 2022/09/02 09:20:20 - General -       at 
> org.apache.beam.sdk.io.gcp.bigquery.WriteBundlesToFiles.processElement(WriteBundlesToFiles.java:232)
> 2022/09/02 09:20:20 - General -       at 
> org.apache.beam.sdk.io.gcp.bigquery.WriteBundlesToFiles$DoFnInvoker.invokeProcessElement(Unknown
>  Source)
> 2022/09/02 09:20:20 - General -       at 
> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:228)
> 2022/09/02 09:20:20 - General -       at 
> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:184)
> 2022/09/02 09:20:20 - General -       at 
> org.apache.beam.repackaged.direct_java.runners.core.SimplePushbackSideInputDoFnRunner.processElementInReadyWindows(SimplePushbackSideInputDoFnRunner.java:88)
> 2022/09/02 09:20:20 - General -       at 
> org.apache.beam.runners.direct.ParDoEvaluator.processElement(ParDoEvaluator.java:244)
> 2022/09/02 09:20:20 - General -       at 
> org.apache.beam.runners.direct.DoFnLifecycleManagerRemovingTransformEvaluator.processElement(DoFnLifecycleManagerRemovingTransformEvaluator.java:54)
> 2022/09/02 09:20:20 - General -       at 
> org.apache.beam.runners.direct.DirectTransformExecutor.processElements(DirectTransformExecutor.java:165)
> 2022/09/02 09:20:20 - General -       at 
> org.apache.beam.runners.direct.DirectTransformExecutor.run(DirectTransformExecutor.java:129)
> 2022/09/02 09:20:20 - General -       at 
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
> 2022/09/02 09:20:20 - General -       at 
> java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
> 2022/09/02 09:20:20 - General -       at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
> 2022/09/02 09:20:20 - General -       at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
> 2022/09/02 09:20:20 - General -       at 
> java.base/java.lang.Thread.run(Thread.java:829)
> 2022/09/02 09:20:20 - General - Caused by: java.lang.ClassCastException: 
> class java.lang.String cannot be cast to class java.lang.Long 
> (java.lang.String and java.lang.Long are in module java.base of loader 
> 'bootstrap')
> 2022/09/02 09:20:20 - General -       at 
> org.apache.hop.core.row.value.ValueMetaBase.getInteger(ValueMetaBase.java:2077)
> 2022/09/02 09:20:20 - General -       ... 17 more
> 
>> Am 02.09.2022 um 08:58 schrieb Hans Van Akelyen <[email protected] 
>> <mailto:[email protected]>>:
>> 
>> Hi Fabian,
>> 
>> Merge join does not require your data to be sorted when executing on Beam 
>> https://hop.apache.org/manual/latest/pipeline/beam/getting-started-with-beam.html#_universal_transforms
>>  
>> <https://hop.apache.org/manual/latest/pipeline/beam/getting-started-with-beam.html#_universal_transforms>
>> 
>> Chases,
>> Hans
>> 
>> On Fri, 2 Sep 2022 at 08:34, Fabian Peters <[email protected] 
>> <mailto:[email protected]>> wrote:
>> Good morning Matt,
>> 
>> Thanks for your quick reply! Unfortunately the inputs are not sorted, so the 
>> Merge Join transform is not an option. I guess I'll have to use temporary 
>> BigQuery tables to handle this. Those pipelines are all bounded, so this is 
>> an option. Or is there an easy option to sort things when running on Beam?
>> 
>> I'll create a Jira ticket, no problem.
>> 
>> cheers
>> 
>> Fabian
>> 
>>> Am 01.09.2022 um 19:11 schrieb Matt Casters <[email protected] 
>>> <mailto:[email protected]>>:
>>> 
>>> Hi Fabian,
>>> 
>>> Joining rows is indeed the exception in Beam.  I would suggest you use the 
>>> Merge Join 
>>> <https://hop.apache.org/manual/latest/pipeline/transforms/mergejoin.html> 
>>> transforms.
>>> For unbounded pipelines (never ending) that transform will be handled 
>>> <https://github.com/apache/hop/blob/master/plugins/engines/beam/src/main/java/org/apache/hop/beam/pipeline/handler/BeamMergeJoinTransformHandler.java>
>>>  correctly. 
>>> If you don't mind, please create a JIRA case so we can create a similar 
>>> handler for the Cartesian product use-case.
>>> The code usually is non-trivial in the massive parallel world but quite 
>>> doable ;-)
>>> 
>>> All the best,
>>> Matt
>>> 
>>> 
>>> On Thu, Sep 1, 2022 at 6:37 PM Fabian Peters <[email protected] 
>>> <mailto:[email protected]>> wrote:
>>> Hi all,
>>> 
>>> I've hit the next problem, this time something I thought I had testet on 
>>> Beam before: A pipeline containing a "Join rows (cartesian product)" 
>>> transform with input from two sources, loops forever when run via 
>>> Beam-Direct or Dataflow. It works fine using the local runner.
>>> 
>>> While running it on Beam-Direct I've attached a debugger and can see that 
>>> it is stuck in the while loop at JoinRows.java:486 
>>> <https://github.com/apache/hop/blob/758c07c360c26c0447251f0a29df81557864ad11/plugins/transforms/joinrows/src/main/java/org/apache/hop/pipeline/transforms/joinrows/JoinRows.java#L487>.
>>>  I've tried using a GCS temp directory and swapped the "Main transform to 
>>> read from" but none of those helped.
>>> 
>>> Is this transform incompatible with Beam? If so, what could I use instead?
>>> 
>>> cheers
>>> 
>>> Fabian
>>> 
>>> <PastedGraphic-8.png>
>> 
>>> 
>>> 
>>> -- 
>>> Neo4j Chief Solutions Architect
>>> ✉   [email protected] <mailto:[email protected]>
>>> 
>>> 
>>> 
>> 
> 

Reply via email to