Hi! Eugene you were right. What was happening was that I was oversharding for two different reasons. a) I had a bug on dynamic destination table function of bigquery and it was creating one unique table per row. b) Even though I had fixed this temporarly at a static table, the direct runner + Reshuffle.viaRandomKey() was forcing bundles of size 1. On DataflowRunner the Reshuffle.viaRandomKey() didn't had the same behaviour and it seemed that without it, it was not working so well.
Also thanks for pointing out withOutputParallelization, I discovered that the internal implementation of output parallelization was through exactly the same class (Reparallelize) that was used from Dataflow template. Cheers! On Thu, Jan 2, 2020 at 10:37 PM Eugene Kirpichov <[email protected]> wrote: > > > On Thu, Jan 2, 2020 at 11:52 AM Chamikara Jayalath <[email protected]> > wrote: > >> >> >> On Thu, Jan 2, 2020 at 10:10 AM Konstantinos P. <[email protected]> >> wrote: >> >>> Hi! >>> >>> I have setup a beam pipeline to read from a postGreSQL server and write >>> to BigQuery table and it takes for ever even for size of 20k records. While >>> investingating the process, I found that the pipeline created one file per >>> record and then a new job is requested for each file(!) which makes the >>> pipeline unscalable. >>> >> >> Is this a batch or streaming job ? Also which runner ? >> >> >>> >>> Searching on the Internet I found a very similar question on SO but >>> without any answer: >>> https://stackoverflow.com/questions/49276833/slow-bigquery-load-job-when-data-comes-from-apache-beam-jdbcio >>> The other user has made an experiment by replacing JdbcIO with TextIO and >>> (s)he couldn't replicate the behavior so it seems that the problem here is >>> the combination of JdbcIO + BigQueryIO and maybe even postgres has >>> something to do also. >>> >> >> TextIO can shard better (initial splitting while reading as well as >> dynamic work rebalancing if you are using a runner that supports that). >> JDBCIO has a known limitation that it has to execute the initial read >> (query) from a single worker. >> > I think the issue Konstantinos is experiencing is too many shards, rather > than too few shards. > I suspect that this is because his runner of choice creates 1 shard for > each key of the internal Reshuffle.viaRandomKey() in JdbcIO. I believe this > is the case for quite a few runners unfortunately. > Perhaps this can be turned off by withOutputParallelization(false)? Then > there'll be no parallelism, but based on the data amount it seems like > there's nothing to parallelize anyway. > > >> >> For some runners, you can break fusion between the read transform and >> whatever that comes after that by adding a Reshuffle.viaRandomKey() >> transform [1]. That might help your overall pipeline to execute with a >> better performance. >> >> [1] >> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java#L64 >> >> Thanks, >> Cham >> >> >>> >>> I then tried to manually debug the execution using direct runner and >>> local debugger and I found out that the problem is with bundling as it >>> creates bundles of size 1. I suppose this has to do with >>> "Dependent-parallelism between transforms" of the execution model >>> https://beam.apache.org/documentation/runtime/model/#dependent-parallellism >>> . >>> >>> I have tried many things to override this but no luck including: >>> * Adding an intermediate mapping step >>> * Removing dynamic destination table >>> * Adding a shuffle operation using the Reparallelize<> class from GCP >>> example template Jdbc -> Bigquery >>> https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/5f245fb1e2223ea42d5093863816a41391a548dd/src/main/java/com/google/cloud/teleport/io/DynamicJdbcIO.java#L405 >>> >>> Is this supposed to happen? It looks like a bug to me that the default >>> behavior makes bundles of size 1. Also I feel that BigQueryIO should try to >>> batch outputs in bigger jobs to increase efficiency. >>> >>> Is there a way to override bundling somehow? >>> Any other suggestion to override this? >>> >>> >>> Cheers! >>> Konstantinos >>> >>
