On Thu, Jan 2, 2020 at 11:52 AM Chamikara Jayalath <[email protected]> wrote:
> > > On Thu, Jan 2, 2020 at 10:10 AM Konstantinos P. <[email protected]> > wrote: > >> Hi! >> >> I have setup a beam pipeline to read from a postGreSQL server and write >> to BigQuery table and it takes for ever even for size of 20k records. While >> investingating the process, I found that the pipeline created one file per >> record and then a new job is requested for each file(!) which makes the >> pipeline unscalable. >> > > Is this a batch or streaming job ? Also which runner ? > > >> >> Searching on the Internet I found a very similar question on SO but >> without any answer: >> https://stackoverflow.com/questions/49276833/slow-bigquery-load-job-when-data-comes-from-apache-beam-jdbcio >> The other user has made an experiment by replacing JdbcIO with TextIO and >> (s)he couldn't replicate the behavior so it seems that the problem here is >> the combination of JdbcIO + BigQueryIO and maybe even postgres has >> something to do also. >> > > TextIO can shard better (initial splitting while reading as well as > dynamic work rebalancing if you are using a runner that supports that). > JDBCIO has a known limitation that it has to execute the initial read > (query) from a single worker. > I think the issue Konstantinos is experiencing is too many shards, rather than too few shards. I suspect that this is because his runner of choice creates 1 shard for each key of the internal Reshuffle.viaRandomKey() in JdbcIO. I believe this is the case for quite a few runners unfortunately. Perhaps this can be turned off by withOutputParallelization(false)? Then there'll be no parallelism, but based on the data amount it seems like there's nothing to parallelize anyway. > > For some runners, you can break fusion between the read transform and > whatever that comes after that by adding a Reshuffle.viaRandomKey() > transform [1]. That might help your overall pipeline to execute with a > better performance. > > [1] > https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java#L64 > > Thanks, > Cham > > >> >> I then tried to manually debug the execution using direct runner and >> local debugger and I found out that the problem is with bundling as it >> creates bundles of size 1. I suppose this has to do with >> "Dependent-parallelism between transforms" of the execution model >> https://beam.apache.org/documentation/runtime/model/#dependent-parallellism >> . >> >> I have tried many things to override this but no luck including: >> * Adding an intermediate mapping step >> * Removing dynamic destination table >> * Adding a shuffle operation using the Reparallelize<> class from GCP >> example template Jdbc -> Bigquery >> https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/5f245fb1e2223ea42d5093863816a41391a548dd/src/main/java/com/google/cloud/teleport/io/DynamicJdbcIO.java#L405 >> >> Is this supposed to happen? It looks like a bug to me that the default >> behavior makes bundles of size 1. Also I feel that BigQueryIO should try to >> batch outputs in bigger jobs to increase efficiency. >> >> Is there a way to override bundling somehow? >> Any other suggestion to override this? >> >> >> Cheers! >> Konstantinos >> >
