Hi, One thing that you could explore, as the fanout is large:
The Dataflow runner will optimise various transforms using Fusion where possible (link <https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#fusion-optimization>). There are cases , like transforms with high fanout, where this may not be desirable. You can manually break fusion by putting in a shuffle step ( also on that doc link). This would come at the cost of an extra shuffle of course, so you will need to experiment to see if this meets your needs. In your fanout transform , generate a KV where the Key is some Hash that partitions the elements, use something like O(10k) for the key space. Then do a GBK and then ungroup. Cheers Reza On Sat, 23 Nov 2019, 12:25 Kenneth Knowles, <[email protected]> wrote: > +Lukasz Cwik <[email protected]> +Chamikara Jayalath <[email protected]> > > > It sounds like your high-fanout transform that listens for new files on > Pubsub would be a good fit for Splittable DoFn (SDF). It seems like a > fairly common use case that could be a useful general contribution to Beam. > > Kenn > > On Fri, Nov 22, 2019 at 7:50 AM Jeff Klukas <[email protected]> wrote: > >> We also had throughput issues in writing to BQ in a streaming pipeline >> and we mitigated by provisioning a large quantity of SSD storage to improve >> I/O throughput to disk for checkpoints. >> >> I also Erik's suggestion to look into Streaming Engine. We are currently >> looking into migrating our streaming use cases to use streaming engine >> after we had success with improved BQ write throughput on batch workloads >> by using Shuffle service (the batch mode analogue to the Streaming Engine). >> >> >> >> On Fri, Nov 22, 2019 at 9:01 AM Erik Lincoln <[email protected]> >> wrote: >> >>> Hi Frantisek, >>> >>> >>> >>> Some advice from making a similar pipeline and struggling with >>> throughput and latency: >>> >>> 1. Break up your pipeline into multiple pipelines. Dataflow only >>> auto-scales based on input throughput. If you’re microbatching events in >>> files, the job will only autoscale to meet the volume of files, not the >>> volume of events added to the pipeline from the files. >>> 1. Better flow is: >>> >>> i. >>> Pipeline >>> 1: Receive GCS notifications, read files, and then output file contents as >>> Pubsub messages either per event or in microbatches from the file >>> >>> ii. >>> Pipeline >>> 2: Receive events from Pubsub, do your transforms, then write to BQ >>> >>> 1. Use the Streaming Engine service (if you can run in a region >>> supporting it): >>> >>> https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#streaming-engine >>> 2. BQ streaming can be slower than a load job if you have a very >>> high volume (millions of events a minute). If your event volume is high, >>> you might need to consider further microbatching loads into BQ from GCS. >>> >>> >>> >>> Hope this helps, >>> >>> Erik >>> >>> >>> >>> *From:* Frantisek Csajka <[email protected]> >>> *Sent:* Friday, November 22, 2019 5:35 AM >>> *To:* [email protected] >>> *Subject:* Fanout and OOMs on Dataflow while streaming to BQ >>> >>> >>> >>> Hi Beamers, >>> >>> We are facing OutOfMemory errors with a streaming pipeline on Dataflow. >>> We are unable to get rid of them, not even with bigger worker instances. >>> Any advice will be appreciated. >>> >>> The scenario is the following. >>> - Files are being written to a bucket in GCS. A notification is set up >>> on the bucket, so for each file there is a Pub/Sub message >>> - Notifications are consumed by our pipeline >>> - Files from notifications are read by the pipeline >>> - Each file contains several events (there is a quite big fanout) >>> - Events are written to BigQuery (streaming) >>> >>> Everything works fine if there are only few notifications, but if the >>> files are incoming at high rate or if there is a large backlog in the >>> notification subscription, events get stuck in BQ write and later OOM is >>> thrown. >>> >>> Having a larger worker does not work because if the backlog is large, >>> larger instance(s) will throw OOM as well, just later... >>> >>> As far as we can see, there is a checkpointing/reshuffle in BQ write and >>> thats where the elements got stuck. It looks like the pubsub is consuming >>> too many elements and due to fanout it causes OOM when grouping in >>> reshuffle. >>> Is there any way to backpressure the pubsub read? Is it possible to have >>> smaller window in Reshuffle? How does the Reshuffle actually work? >>> Any advice? >>> >>> Thanks in advance, >>> Frantisek >>> CONFIDENTIALITY NOTICE: This e-mail and any files transmitted with it >>> are intended solely for the use of the individual or entity to whom they >>> are addressed and may contain confidential and privileged information >>> protected by law. If you received this e-mail in error, any review, use, >>> dissemination, distribution, or copying of the e-mail is strictly >>> prohibited. Please notify the sender immediately by return e-mail and >>> delete all copies from your system. >>> >>
