Thanks Luke! No more questions so far :) Regards, Frantisek
On Tue, Nov 26, 2019 at 12:31 AM Luke Cwik <[email protected]> wrote: > > > On Mon, Nov 25, 2019 at 1:59 AM Reza Rokni <[email protected]> wrote: > >> Reshuffle does a GBK if I recall correctly, so I guess that was not a >> solution for your use-case :-( >> >> On Mon, 25 Nov 2019 at 17:33, Frantisek Csajka <[email protected]> >> wrote: >> >>> Hi, >>> >>> Sorry for the late reply, I was not near a computer during the weekend. >>> Many thanks for all the advices and suggestions! >>> >>> [email protected], [email protected]: >>> Thanks for detailed answers! The separation of pipelines using P/S works >>> fine, although it would be nice if it could be done in one pipeline (less >>> resources needed). Streaming engine worked well for as in a similar >>> scenario but as the amount of data can be large, the streaming engine can >>> be expensive for us :/ >>> >>> [email protected] >>> Never worked with SDF, but it sound interesting, thanks! Is there any >>> good docs with examples to start with? >>> >> > The Beam Python SDK is the furthest along in this matter and supports > bounded SplittableDoFns and unbounded is being worked on right now (maybe a > month or so away). The Beam Java and Go SDKs have some code related to > supporting SDFs but they are not mature and will change in the next few > months quite a bit to follow what is being done in Beam Python SDK at which > point they will become available for general use. > > This https://s.apache.org/beam-breaking-fusion best describes the > motivation and the problem space. > > > >> [email protected]: >>> Yes we were considering breaking the (possible) fusion but have not >>> tried exactly what you are suggesting, we were experimenting with Reshuffle >>> transform. Will try your suggestion with explicit GBK. Thanks! >>> Also, Reshuffle is deprecated, so, is GBK the preferred way to break the >>> fusion or will there be some fusion breaking transform in the future? >>> >>> Thank you all again, it helped a lot, >>> Frantisek >>> >>> On Sun, Nov 24, 2019 at 2:58 AM Reza Rokni <[email protected]> wrote: >>> >>>> Hi, >>>> >>>> One thing that you could explore, as the fanout is large: >>>> >>>> The Dataflow runner will optimise various transforms using Fusion where >>>> possible (link >>>> <https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#fusion-optimization>). >>>> There are cases , like transforms with high fanout, where this may not be >>>> desirable. You can manually break fusion by putting in a shuffle step ( >>>> also on that doc link). This would come at the cost of an extra shuffle of >>>> course, so you will need to experiment to see if this meets your needs. >>>> >>>> In your fanout transform , generate a KV where the Key is some Hash >>>> that partitions the elements, use something like O(10k) for the key space. >>>> Then do a GBK and then ungroup. >>>> >>>> Cheers >>>> Reza >>>> >>>> >>>> >>>> On Sat, 23 Nov 2019, 12:25 Kenneth Knowles, <[email protected]> wrote: >>>> >>>>> +Lukasz Cwik <[email protected]> +Chamikara Jayalath >>>>> <[email protected]> >>>>> >>>>> It sounds like your high-fanout transform that listens for new files >>>>> on Pubsub would be a good fit for Splittable DoFn (SDF). It seems like a >>>>> fairly common use case that could be a useful general contribution to >>>>> Beam. >>>>> >>>>> Kenn >>>>> >>>>> On Fri, Nov 22, 2019 at 7:50 AM Jeff Klukas <[email protected]> >>>>> wrote: >>>>> >>>>>> We also had throughput issues in writing to BQ in a streaming >>>>>> pipeline and we mitigated by provisioning a large quantity of SSD storage >>>>>> to improve I/O throughput to disk for checkpoints. >>>>>> >>>>>> I also Erik's suggestion to look into Streaming Engine. We are >>>>>> currently looking into migrating our streaming use cases to use streaming >>>>>> engine after we had success with improved BQ write throughput on batch >>>>>> workloads by using Shuffle service (the batch mode analogue to the >>>>>> Streaming Engine). >>>>>> >>>>>> >>>>>> >>>>>> On Fri, Nov 22, 2019 at 9:01 AM Erik Lincoln < >>>>>> [email protected]> wrote: >>>>>> >>>>>>> Hi Frantisek, >>>>>>> >>>>>>> >>>>>>> >>>>>>> Some advice from making a similar pipeline and struggling with >>>>>>> throughput and latency: >>>>>>> >>>>>>> 1. Break up your pipeline into multiple pipelines. Dataflow only >>>>>>> auto-scales based on input throughput. If you’re microbatching >>>>>>> events in >>>>>>> files, the job will only autoscale to meet the volume of files, not >>>>>>> the >>>>>>> volume of events added to the pipeline from the files. >>>>>>> 1. Better flow is: >>>>>>> >>>>>>> i. >>>>>>> Pipeline 1: Receive GCS notifications, read files, and then output >>>>>>> file contents as Pubsub messages either per event or in microbatches >>>>>>> from >>>>>>> the file >>>>>>> >>>>>>> ii. >>>>>>> Pipeline 2: Receive events from Pubsub, do your transforms, then >>>>>>> write to BQ >>>>>>> >>>>>>> 1. Use the Streaming Engine service (if you can run in a region >>>>>>> supporting it): >>>>>>> >>>>>>> https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#streaming-engine >>>>>>> 2. BQ streaming can be slower than a load job if you have a very >>>>>>> high volume (millions of events a minute). If your event volume is >>>>>>> high, >>>>>>> you might need to consider further microbatching loads into BQ from >>>>>>> GCS. >>>>>>> >>>>>>> >>>>>>> >>>>>>> Hope this helps, >>>>>>> >>>>>>> Erik >>>>>>> >>>>>>> >>>>>>> >>>>>>> *From:* Frantisek Csajka <[email protected]> >>>>>>> *Sent:* Friday, November 22, 2019 5:35 AM >>>>>>> *To:* [email protected] >>>>>>> *Subject:* Fanout and OOMs on Dataflow while streaming to BQ >>>>>>> >>>>>>> >>>>>>> >>>>>>> Hi Beamers, >>>>>>> >>>>>>> We are facing OutOfMemory errors with a streaming pipeline on >>>>>>> Dataflow. We are unable to get rid of them, not even with bigger worker >>>>>>> instances. Any advice will be appreciated. >>>>>>> >>>>>>> The scenario is the following. >>>>>>> - Files are being written to a bucket in GCS. A notification is set >>>>>>> up on the bucket, so for each file there is a Pub/Sub message >>>>>>> - Notifications are consumed by our pipeline >>>>>>> - Files from notifications are read by the pipeline >>>>>>> - Each file contains several events (there is a quite big fanout) >>>>>>> - Events are written to BigQuery (streaming) >>>>>>> >>>>>>> Everything works fine if there are only few notifications, but if >>>>>>> the files are incoming at high rate or if there is a large backlog in >>>>>>> the >>>>>>> notification subscription, events get stuck in BQ write and later OOM is >>>>>>> thrown. >>>>>>> >>>>>>> Having a larger worker does not work because if the backlog is >>>>>>> large, larger instance(s) will throw OOM as well, just later... >>>>>>> >>>>>>> As far as we can see, there is a checkpointing/reshuffle in BQ write >>>>>>> and thats where the elements got stuck. It looks like the pubsub is >>>>>>> consuming too many elements and due to fanout it causes OOM when >>>>>>> grouping >>>>>>> in reshuffle. >>>>>>> Is there any way to backpressure the pubsub read? Is it possible to >>>>>>> have smaller window in Reshuffle? How does the Reshuffle actually work? >>>>>>> Any advice? >>>>>>> >>>>>>> Thanks in advance, >>>>>>> Frantisek >>>>>>> CONFIDENTIALITY NOTICE: This e-mail and any files transmitted with >>>>>>> it are intended solely for the use of the individual or entity to whom >>>>>>> they >>>>>>> are addressed and may contain confidential and privileged information >>>>>>> protected by law. If you received this e-mail in error, any review, use, >>>>>>> dissemination, distribution, or copying of the e-mail is strictly >>>>>>> prohibited. Please notify the sender immediately by return e-mail and >>>>>>> delete all copies from your system. >>>>>>> >>>>>> >> >> -- >> >> This email may be confidential and privileged. If you received this >> communication by mistake, please don't forward it to anyone else, please >> erase all copies and attachments, and please let me know that it has gone >> to the wrong person. >> >> The above terms reflect a potential business arrangement, are provided >> solely as a basis for further discussion, and are not intended to be and do >> not constitute a legally binding obligation. No legally binding obligations >> will be created, implied, or inferred until an agreement in final form is >> executed in writing by all parties involved. >> >
