On Thu, Jan 2, 2020 at 11:52 AM Chamikara Jayalath <[email protected]>
wrote:

>
>
> On Thu, Jan 2, 2020 at 10:10 AM Konstantinos P. <[email protected]>
> wrote:
>
>> Hi!
>>
>> I have setup a beam pipeline to read from a postGreSQL server and write
>> to BigQuery table and it takes for ever even for size of 20k records. While
>> investingating the process, I found that the pipeline created one file per
>> record and then a new job is requested for each file(!) which makes the
>> pipeline unscalable.
>>
>
> Is this a batch or streaming job ? Also which runner ?
>
>
>>
>> Searching on the Internet I found a very similar question on SO but
>> without any answer:
>> https://stackoverflow.com/questions/49276833/slow-bigquery-load-job-when-data-comes-from-apache-beam-jdbcio
>> The other user has made an experiment by replacing JdbcIO with TextIO and
>> (s)he couldn't replicate the behavior so it seems that the problem here is
>> the combination of JdbcIO + BigQueryIO and maybe even postgres has
>> something to do also.
>>
>
> TextIO can shard better (initial splitting while reading as well as
> dynamic work rebalancing if you are using a runner that supports that).
> JDBCIO has a known limitation that it has to execute the initial read
> (query) from a single worker.
>
I think the issue Konstantinos is experiencing is too many shards, rather
than too few shards.
I suspect that this is because his runner of choice creates 1 shard for
each key of the internal Reshuffle.viaRandomKey() in JdbcIO. I believe this
is the case for quite a few runners unfortunately.
Perhaps this can be turned off by withOutputParallelization(false)? Then
there'll be no parallelism, but based on the data amount it seems like
there's nothing to parallelize anyway.


>
> For some runners, you can break fusion between the read transform and
> whatever that comes after that by adding a Reshuffle.viaRandomKey()
> transform [1]. That might help your overall pipeline to execute with a
> better performance.
>
> [1]
> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java#L64
>
> Thanks,
> Cham
>
>
>>
>> I then tried to manually debug the execution using direct runner and
>> local debugger and I found out that the problem is with bundling as it
>> creates bundles of size 1. I suppose this has to do with
>> "Dependent-parallelism between transforms" of the execution model
>> https://beam.apache.org/documentation/runtime/model/#dependent-parallellism
>> .
>>
>> I have tried many things to override this but no luck including:
>>  * Adding an intermediate mapping step
>>  * Removing dynamic destination table
>>  * Adding a shuffle operation using the Reparallelize<> class from GCP
>> example template Jdbc -> Bigquery
>> https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/5f245fb1e2223ea42d5093863816a41391a548dd/src/main/java/com/google/cloud/teleport/io/DynamicJdbcIO.java#L405
>>
>> Is this supposed to happen? It looks like a bug to me that the default
>> behavior makes bundles of size 1. Also I feel that BigQueryIO should try to
>> batch outputs in bigger jobs to increase efficiency.
>>
>> Is there a way to override bundling somehow?
>> Any other suggestion to override this?
>>
>>
>> Cheers!
>> Konstantinos
>>
>

Reply via email to