On Thu, Jan 2, 2020 at 10:10 AM Konstantinos P. <[email protected]> wrote:
> Hi! > > I have setup a beam pipeline to read from a postGreSQL server and write to > BigQuery table and it takes for ever even for size of 20k records. While > investingating the process, I found that the pipeline created one file per > record and then a new job is requested for each file(!) which makes the > pipeline unscalable. > Is this a batch or streaming job ? Also which runner ? > > Searching on the Internet I found a very similar question on SO but > without any answer: > https://stackoverflow.com/questions/49276833/slow-bigquery-load-job-when-data-comes-from-apache-beam-jdbcio > The other user has made an experiment by replacing JdbcIO with TextIO and > (s)he couldn't replicate the behavior so it seems that the problem here is > the combination of JdbcIO + BigQueryIO and maybe even postgres has > something to do also. > TextIO can shard better (initial splitting while reading as well as dynamic work rebalancing if you are using a runner that supports that). JDBCIO has a known limitation that it has to execute the initial read (query) from a single worker. For some runners, you can break fusion between the read transform and whatever that comes after that by adding a Reshuffle.viaRandomKey() transform [1]. That might help your overall pipeline to execute with a better performance. [1] https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java#L64 Thanks, Cham > > I then tried to manually debug the execution using direct runner and local > debugger and I found out that the problem is with bundling as it creates > bundles of size 1. I suppose this has to do with "Dependent-parallelism > between transforms" of the execution model > https://beam.apache.org/documentation/runtime/model/#dependent-parallellism > . > > I have tried many things to override this but no luck including: > * Adding an intermediate mapping step > * Removing dynamic destination table > * Adding a shuffle operation using the Reparallelize<> class from GCP > example template Jdbc -> Bigquery > https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/5f245fb1e2223ea42d5093863816a41391a548dd/src/main/java/com/google/cloud/teleport/io/DynamicJdbcIO.java#L405 > > Is this supposed to happen? It looks like a bug to me that the default > behavior makes bundles of size 1. Also I feel that BigQueryIO should try to > batch outputs in bigger jobs to increase efficiency. > > Is there a way to override bundling somehow? > Any other suggestion to override this? > > > Cheers! > Konstantinos >
