On Mon, Nov 25, 2019 at 1:59 AM Reza Rokni <[email protected]> wrote: > Reshuffle does a GBK if I recall correctly, so I guess that was not a > solution for your use-case :-( > > On Mon, 25 Nov 2019 at 17:33, Frantisek Csajka <[email protected]> wrote: > >> Hi, >> >> Sorry for the late reply, I was not near a computer during the weekend. >> Many thanks for all the advices and suggestions! >> >> [email protected], [email protected]: >> Thanks for detailed answers! The separation of pipelines using P/S works >> fine, although it would be nice if it could be done in one pipeline (less >> resources needed). Streaming engine worked well for as in a similar >> scenario but as the amount of data can be large, the streaming engine can >> be expensive for us :/ >> >> [email protected] >> Never worked with SDF, but it sound interesting, thanks! Is there any >> good docs with examples to start with? >> > The Beam Python SDK is the furthest along in this matter and supports bounded SplittableDoFns and unbounded is being worked on right now (maybe a month or so away). The Beam Java and Go SDKs have some code related to supporting SDFs but they are not mature and will change in the next few months quite a bit to follow what is being done in Beam Python SDK at which point they will become available for general use.
This https://s.apache.org/beam-breaking-fusion best describes the motivation and the problem space. > [email protected]: >> Yes we were considering breaking the (possible) fusion but have not tried >> exactly what you are suggesting, we were experimenting with Reshuffle >> transform. Will try your suggestion with explicit GBK. Thanks! >> Also, Reshuffle is deprecated, so, is GBK the preferred way to break the >> fusion or will there be some fusion breaking transform in the future? >> >> Thank you all again, it helped a lot, >> Frantisek >> >> On Sun, Nov 24, 2019 at 2:58 AM Reza Rokni <[email protected]> wrote: >> >>> Hi, >>> >>> One thing that you could explore, as the fanout is large: >>> >>> The Dataflow runner will optimise various transforms using Fusion where >>> possible (link >>> <https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#fusion-optimization>). >>> There are cases , like transforms with high fanout, where this may not be >>> desirable. You can manually break fusion by putting in a shuffle step ( >>> also on that doc link). This would come at the cost of an extra shuffle of >>> course, so you will need to experiment to see if this meets your needs. >>> >>> In your fanout transform , generate a KV where the Key is some Hash that >>> partitions the elements, use something like O(10k) for the key space. Then >>> do a GBK and then ungroup. >>> >>> Cheers >>> Reza >>> >>> >>> >>> On Sat, 23 Nov 2019, 12:25 Kenneth Knowles, <[email protected]> wrote: >>> >>>> +Lukasz Cwik <[email protected]> +Chamikara Jayalath >>>> <[email protected]> >>>> >>>> It sounds like your high-fanout transform that listens for new files on >>>> Pubsub would be a good fit for Splittable DoFn (SDF). It seems like a >>>> fairly common use case that could be a useful general contribution to Beam. >>>> >>>> Kenn >>>> >>>> On Fri, Nov 22, 2019 at 7:50 AM Jeff Klukas <[email protected]> >>>> wrote: >>>> >>>>> We also had throughput issues in writing to BQ in a streaming pipeline >>>>> and we mitigated by provisioning a large quantity of SSD storage to >>>>> improve >>>>> I/O throughput to disk for checkpoints. >>>>> >>>>> I also Erik's suggestion to look into Streaming Engine. We are >>>>> currently looking into migrating our streaming use cases to use streaming >>>>> engine after we had success with improved BQ write throughput on batch >>>>> workloads by using Shuffle service (the batch mode analogue to the >>>>> Streaming Engine). >>>>> >>>>> >>>>> >>>>> On Fri, Nov 22, 2019 at 9:01 AM Erik Lincoln < >>>>> [email protected]> wrote: >>>>> >>>>>> Hi Frantisek, >>>>>> >>>>>> >>>>>> >>>>>> Some advice from making a similar pipeline and struggling with >>>>>> throughput and latency: >>>>>> >>>>>> 1. Break up your pipeline into multiple pipelines. Dataflow only >>>>>> auto-scales based on input throughput. If you’re microbatching events >>>>>> in >>>>>> files, the job will only autoscale to meet the volume of files, not >>>>>> the >>>>>> volume of events added to the pipeline from the files. >>>>>> 1. Better flow is: >>>>>> >>>>>> i. >>>>>> Pipeline 1: Receive GCS notifications, read files, and then output >>>>>> file contents as Pubsub messages either per event or in microbatches from >>>>>> the file >>>>>> >>>>>> ii. >>>>>> Pipeline 2: Receive events from Pubsub, do your transforms, then >>>>>> write to BQ >>>>>> >>>>>> 1. Use the Streaming Engine service (if you can run in a region >>>>>> supporting it): >>>>>> >>>>>> https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#streaming-engine >>>>>> 2. BQ streaming can be slower than a load job if you have a very >>>>>> high volume (millions of events a minute). If your event volume is >>>>>> high, >>>>>> you might need to consider further microbatching loads into BQ from >>>>>> GCS. >>>>>> >>>>>> >>>>>> >>>>>> Hope this helps, >>>>>> >>>>>> Erik >>>>>> >>>>>> >>>>>> >>>>>> *From:* Frantisek Csajka <[email protected]> >>>>>> *Sent:* Friday, November 22, 2019 5:35 AM >>>>>> *To:* [email protected] >>>>>> *Subject:* Fanout and OOMs on Dataflow while streaming to BQ >>>>>> >>>>>> >>>>>> >>>>>> Hi Beamers, >>>>>> >>>>>> We are facing OutOfMemory errors with a streaming pipeline on >>>>>> Dataflow. We are unable to get rid of them, not even with bigger worker >>>>>> instances. Any advice will be appreciated. >>>>>> >>>>>> The scenario is the following. >>>>>> - Files are being written to a bucket in GCS. A notification is set >>>>>> up on the bucket, so for each file there is a Pub/Sub message >>>>>> - Notifications are consumed by our pipeline >>>>>> - Files from notifications are read by the pipeline >>>>>> - Each file contains several events (there is a quite big fanout) >>>>>> - Events are written to BigQuery (streaming) >>>>>> >>>>>> Everything works fine if there are only few notifications, but if the >>>>>> files are incoming at high rate or if there is a large backlog in the >>>>>> notification subscription, events get stuck in BQ write and later OOM is >>>>>> thrown. >>>>>> >>>>>> Having a larger worker does not work because if the backlog is large, >>>>>> larger instance(s) will throw OOM as well, just later... >>>>>> >>>>>> As far as we can see, there is a checkpointing/reshuffle in BQ write >>>>>> and thats where the elements got stuck. It looks like the pubsub is >>>>>> consuming too many elements and due to fanout it causes OOM when grouping >>>>>> in reshuffle. >>>>>> Is there any way to backpressure the pubsub read? Is it possible to >>>>>> have smaller window in Reshuffle? How does the Reshuffle actually work? >>>>>> Any advice? >>>>>> >>>>>> Thanks in advance, >>>>>> Frantisek >>>>>> CONFIDENTIALITY NOTICE: This e-mail and any files transmitted with it >>>>>> are intended solely for the use of the individual or entity to whom they >>>>>> are addressed and may contain confidential and privileged information >>>>>> protected by law. If you received this e-mail in error, any review, use, >>>>>> dissemination, distribution, or copying of the e-mail is strictly >>>>>> prohibited. Please notify the sender immediately by return e-mail and >>>>>> delete all copies from your system. >>>>>> >>>>> > > -- > > This email may be confidential and privileged. If you received this > communication by mistake, please don't forward it to anyone else, please > erase all copies and attachments, and please let me know that it has gone > to the wrong person. > > The above terms reflect a potential business arrangement, are provided > solely as a basis for further discussion, and are not intended to be and do > not constitute a legally binding obligation. No legally binding obligations > will be created, implied, or inferred until an agreement in final form is > executed in writing by all parties involved. >
