Thanks Luke! No more questions so far :)

Regards,
Frantisek

On Tue, Nov 26, 2019 at 12:31 AM Luke Cwik <[email protected]> wrote:

>
>
> On Mon, Nov 25, 2019 at 1:59 AM Reza Rokni <[email protected]> wrote:
>
>> Reshuffle does a GBK if I recall correctly, so I guess that was not a
>> solution for your use-case :-(
>>
>> On Mon, 25 Nov 2019 at 17:33, Frantisek Csajka <[email protected]>
>> wrote:
>>
>>> Hi,
>>>
>>> Sorry for the late reply, I was not near a computer during the weekend.
>>> Many thanks for all the advices and suggestions!
>>>
>>> [email protected], [email protected]:
>>> Thanks for detailed answers! The separation of pipelines using P/S works
>>> fine, although it would be nice if it could be done in one pipeline (less
>>> resources needed). Streaming engine worked well for as in a similar
>>> scenario but as the amount of data can be large, the streaming engine can
>>> be expensive for us :/
>>>
>>> [email protected]
>>> Never worked with SDF, but it sound interesting, thanks! Is there any
>>> good docs with examples to start with?
>>>
>>
> The Beam Python SDK is the furthest along in this matter and supports
> bounded SplittableDoFns and unbounded is being worked on right now (maybe a
> month or so away). The Beam Java and Go SDKs have some code related to
> supporting SDFs but they are not mature and will change in the next few
> months quite a bit to follow what is being done in Beam Python SDK at which
> point they will become available for general use.
>
> This https://s.apache.org/beam-breaking-fusion best describes the
> motivation and the problem space.
>
>
>
>> [email protected]:
>>> Yes we were considering breaking the (possible) fusion but have not
>>> tried exactly what you are suggesting, we were experimenting with Reshuffle
>>> transform. Will try your suggestion with explicit GBK. Thanks!
>>> Also, Reshuffle is deprecated, so, is GBK the preferred way to break the
>>> fusion or will there be some fusion breaking transform in the future?
>>>
>>> Thank you all again, it helped a lot,
>>> Frantisek
>>>
>>> On Sun, Nov 24, 2019 at 2:58 AM Reza Rokni <[email protected]> wrote:
>>>
>>>> Hi,
>>>>
>>>> One thing that you could explore, as the fanout is large:
>>>>
>>>> The Dataflow runner will optimise various transforms using Fusion where
>>>> possible (link
>>>> <https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#fusion-optimization>).
>>>> There are cases , like transforms with high fanout, where this may not be
>>>> desirable. You can manually break fusion by putting in a shuffle step (
>>>> also on that doc link). This would come at the cost of an extra shuffle of
>>>> course, so you will need to experiment to see if this meets your needs.
>>>>
>>>> In your fanout transform , generate a KV where the Key is some Hash
>>>> that partitions the elements,  use something like O(10k) for the key space.
>>>> Then do a GBK and then ungroup.
>>>>
>>>> Cheers
>>>> Reza
>>>>
>>>>
>>>>
>>>> On Sat, 23 Nov 2019, 12:25 Kenneth Knowles, <[email protected]> wrote:
>>>>
>>>>> +Lukasz Cwik <[email protected]> +Chamikara Jayalath
>>>>> <[email protected]>
>>>>>
>>>>> It sounds like your high-fanout transform that listens for new files
>>>>> on Pubsub would be a good fit for Splittable DoFn (SDF). It seems like a
>>>>> fairly common use case that could be a useful general contribution to 
>>>>> Beam.
>>>>>
>>>>> Kenn
>>>>>
>>>>> On Fri, Nov 22, 2019 at 7:50 AM Jeff Klukas <[email protected]>
>>>>> wrote:
>>>>>
>>>>>> We also had throughput issues in writing to BQ in a streaming
>>>>>> pipeline and we mitigated by provisioning a large quantity of SSD storage
>>>>>> to improve I/O throughput to disk for checkpoints.
>>>>>>
>>>>>> I also Erik's suggestion to look into Streaming Engine. We are
>>>>>> currently looking into migrating our streaming use cases to use streaming
>>>>>> engine after we had success with improved BQ write throughput on batch
>>>>>> workloads by using Shuffle service (the batch mode analogue to the
>>>>>> Streaming Engine).
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Fri, Nov 22, 2019 at 9:01 AM Erik Lincoln <
>>>>>> [email protected]> wrote:
>>>>>>
>>>>>>> Hi Frantisek,
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> Some advice from making a similar pipeline and struggling with
>>>>>>> throughput and latency:
>>>>>>>
>>>>>>>    1. Break up your pipeline into multiple pipelines. Dataflow only
>>>>>>>    auto-scales based on input throughput. If you’re microbatching 
>>>>>>> events in
>>>>>>>    files, the job will only autoscale to meet the volume of files, not 
>>>>>>> the
>>>>>>>    volume of events added to the pipeline from the files.
>>>>>>>       1. Better flow is:
>>>>>>>
>>>>>>>                                                                i.
>>>>>>> Pipeline 1: Receive GCS notifications, read files, and then output
>>>>>>> file contents as Pubsub messages either per event or in microbatches 
>>>>>>> from
>>>>>>> the file
>>>>>>>
>>>>>>>                                                              ii.
>>>>>>> Pipeline 2: Receive events from Pubsub, do your transforms, then
>>>>>>> write to BQ
>>>>>>>
>>>>>>>    1. Use the Streaming Engine service (if you can run in a region
>>>>>>>    supporting it):
>>>>>>>    
>>>>>>> https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#streaming-engine
>>>>>>>    2. BQ streaming can be slower than a load job if you have a very
>>>>>>>    high volume (millions of events a minute). If your event volume is 
>>>>>>> high,
>>>>>>>    you might need to consider further microbatching loads into BQ from 
>>>>>>> GCS.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> Hope this helps,
>>>>>>>
>>>>>>> Erik
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> *From:* Frantisek Csajka <[email protected]>
>>>>>>> *Sent:* Friday, November 22, 2019 5:35 AM
>>>>>>> *To:* [email protected]
>>>>>>> *Subject:* Fanout and OOMs on Dataflow while streaming to BQ
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> Hi Beamers,
>>>>>>>
>>>>>>> We are facing OutOfMemory errors with a streaming pipeline on
>>>>>>> Dataflow. We are unable to get rid of them, not even with bigger worker
>>>>>>> instances. Any advice will be appreciated.
>>>>>>>
>>>>>>> The scenario is the following.
>>>>>>> - Files are being written to a bucket in GCS. A notification is set
>>>>>>> up on the bucket, so for each file there is a Pub/Sub message
>>>>>>> - Notifications are consumed by our pipeline
>>>>>>> - Files from notifications are read by the pipeline
>>>>>>> - Each file contains several events (there is a quite big fanout)
>>>>>>> - Events are written to BigQuery (streaming)
>>>>>>>
>>>>>>> Everything works fine if there are only few notifications, but if
>>>>>>> the files are incoming at high rate or if there is a large backlog in 
>>>>>>> the
>>>>>>> notification subscription, events get stuck in BQ write and later OOM is
>>>>>>> thrown.
>>>>>>>
>>>>>>> Having a larger worker does not work because if the backlog is
>>>>>>> large, larger instance(s) will throw OOM as well, just later...
>>>>>>>
>>>>>>> As far as we can see, there is a checkpointing/reshuffle in BQ write
>>>>>>> and thats where the elements got stuck. It looks like the pubsub is
>>>>>>> consuming too many elements and due to fanout it causes OOM when 
>>>>>>> grouping
>>>>>>> in reshuffle.
>>>>>>> Is there any way to backpressure the pubsub read? Is it possible to
>>>>>>> have smaller window in Reshuffle? How does the Reshuffle actually work?
>>>>>>> Any advice?
>>>>>>>
>>>>>>> Thanks in advance,
>>>>>>> Frantisek
>>>>>>> CONFIDENTIALITY NOTICE: This e-mail and any files transmitted with
>>>>>>> it are intended solely for the use of the individual or entity to whom 
>>>>>>> they
>>>>>>> are addressed and may contain confidential and privileged information
>>>>>>> protected by law. If you received this e-mail in error, any review, use,
>>>>>>> dissemination, distribution, or copying of the e-mail is strictly
>>>>>>> prohibited. Please notify the sender immediately by return e-mail and
>>>>>>> delete all copies from your system.
>>>>>>>
>>>>>>
>>
>> --
>>
>> This email may be confidential and privileged. If you received this
>> communication by mistake, please don't forward it to anyone else, please
>> erase all copies and attachments, and please let me know that it has gone
>> to the wrong person.
>>
>> The above terms reflect a potential business arrangement, are provided
>> solely as a basis for further discussion, and are not intended to be and do
>> not constitute a legally binding obligation. No legally binding obligations
>> will be created, implied, or inferred until an agreement in final form is
>> executed in writing by all parties involved.
>>
>

Reply via email to