Re: Fanout and OOMs on Dataflow while streaming to BQ

2019-11-22 Thread Kenneth Knowles
+Lukasz Cwik  +Chamikara Jayalath 

It sounds like your high-fanout transform that listens for new files on
Pubsub would be a good fit for Splittable DoFn (SDF). It seems like a
fairly common use case that could be a useful general contribution to Beam.

Kenn

On Fri, Nov 22, 2019 at 7:50 AM Jeff Klukas  wrote:

> We also had throughput issues in writing to BQ in a streaming pipeline and
> we mitigated by provisioning a large quantity of SSD storage to improve I/O
> throughput to disk for checkpoints.
>
> I also Erik's suggestion to look into Streaming Engine. We are currently
> looking into migrating our streaming use cases to use streaming engine
> after we had success with improved BQ write throughput on batch workloads
> by using Shuffle service (the batch mode analogue to the Streaming Engine).
>
>
>
> On Fri, Nov 22, 2019 at 9:01 AM Erik Lincoln 
> wrote:
>
>> Hi Frantisek,
>>
>>
>>
>> Some advice from making a similar pipeline and struggling with throughput
>> and latency:
>>
>>1. Break up your pipeline into multiple pipelines. Dataflow only
>>auto-scales based on input throughput. If you’re microbatching events in
>>files, the job will only autoscale to meet the volume of files, not the
>>volume of events added to the pipeline from the files.
>>   1. Better flow is:
>>
>>i.  
>> Pipeline
>> 1: Receive GCS notifications, read files, and then output file contents as
>> Pubsub messages either per event or in microbatches from the file
>>
>>  ii.  
>> Pipeline
>> 2: Receive events from Pubsub, do your transforms, then write to BQ
>>
>>1. Use the Streaming Engine service (if you can run in a region
>>supporting it):
>>
>> https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#streaming-engine
>>2. BQ streaming can be slower than a load job if you have a very high
>>volume (millions of events a minute). If your event volume is high, you
>>might need to consider further microbatching loads into BQ from GCS.
>>
>>
>>
>> Hope this helps,
>>
>> Erik
>>
>>
>>
>> *From:* Frantisek Csajka 
>> *Sent:* Friday, November 22, 2019 5:35 AM
>> *To:* user@beam.apache.org
>> *Subject:* Fanout and OOMs on Dataflow while streaming to BQ
>>
>>
>>
>> Hi Beamers,
>>
>> We are facing OutOfMemory errors with a streaming pipeline on Dataflow.
>> We are unable to get rid of them, not even with bigger worker instances.
>> Any advice will be appreciated.
>>
>> The scenario is the following.
>> - Files are being written to a bucket in GCS. A notification is set up on
>> the bucket, so for each file there is a Pub/Sub message
>> - Notifications are consumed by our pipeline
>> - Files from notifications are read by the pipeline
>> - Each file contains several events (there is a quite big fanout)
>> - Events are written to BigQuery (streaming)
>>
>> Everything works fine if there are only few notifications, but if the
>> files are incoming at high rate or if there is a large backlog in the
>> notification subscription, events get stuck in BQ write and later OOM is
>> thrown.
>>
>> Having a larger worker does not work because if the backlog is large,
>> larger instance(s) will throw OOM as well, just later...
>>
>> As far as we can see, there is a checkpointing/reshuffle in BQ write and
>> thats where the elements got stuck. It looks like the pubsub is consuming
>> too many elements and due to fanout it causes OOM when grouping in
>> reshuffle.
>> Is there any way to backpressure the pubsub read? Is it possible to have
>> smaller window in Reshuffle? How does the Reshuffle actually work?
>> Any advice?
>>
>> Thanks in advance,
>> Frantisek
>> CONFIDENTIALITY NOTICE: This e-mail and any files transmitted with it are
>> intended solely for the use of the individual or entity to whom they are
>> addressed and may contain confidential and privileged information protected
>> by law. If you received this e-mail in error, any review, use,
>> dissemination, distribution, or copying of the e-mail is strictly
>> prohibited. Please notify the sender immediately by return e-mail and
>> delete all copies from your system.
>>
>


Re: Pipeline AttributeError on Python3

2019-11-22 Thread Thomas Weise
We have not seen the issue with Python 3.6 on 2.16+ after applying this
patch. 🎉

Thanks!

On Thu, Nov 21, 2019 at 4:41 PM Thomas Weise  wrote:

> We are currently verifying the patch. Will report back tomorrow.
>
> On Thu, Nov 21, 2019 at 8:40 AM Valentyn Tymofieiev 
> wrote:
>
>> That would be helpful, thanks a lot! It should be a straightforward patch.
>> Also, thanks Guenther, for sharing your investigation on
>> https://bugs.python.org/issue34572, it was very helpful.
>>
>> On Thu, Nov 21, 2019 at 8:25 AM Thomas Weise  wrote:
>>
>>> Valentyn, thanks a lot for following up on this.
>>>
>>> If the change can be cherry picked in isolation, we should be able to
>>> verify this soon (with 2.16).
>>>
>>>
>>> On Thu, Nov 21, 2019 at 8:12 AM Valentyn Tymofieiev 
>>> wrote:
>>>
 To close the loop here: To my knowledge this issue affects all Python 3
 users of Portable Flink/Spark runners, and Dataflow Python Streaming users,
 including users on Python 3.7.3 and newer versions.

 The issue is addressed on Beam master, and we have a cherry-pick out
 for Beam 2.17.0.

 Workaround options for users on 2.16.0 and earlier SDKs:

 - Patch the SDK you are using with
 https://github.com/apache/beam/pull/10167.
 - Temporarily switch to Python 2 until 2.17.0. We have not seen the
 issue on Python 2, so it may be rare on non-existent on Python 2.
 - Pass --experiments worker_threads=1 . This option may work only for
 some, but not all pipelines.

 See BEAM-8651  for
 details on the issue.

 On Wed, Nov 13, 2019 at 11:55 AM Valentyn Tymofieiev <
 valen...@google.com> wrote:

> I also opened https://issues.apache.org/jira/browse/BEAM-8651 to
> track this issue and any recommendation for the users that will come out 
> of
> it.
>
> On Thu, Nov 7, 2019 at 6:25 PM Valentyn Tymofieiev <
> valen...@google.com> wrote:
>
>>  I think we have heard of this issue from the same source:
>>
>> This looks exactly like a race condition that we've encountered on
>>> Python 3.7.1: There's a bug in some older 3.7.x releases that breaks the
>>> thread-safety of the unpickler, as concurrent unpickle threads can 
>>> access a
>>> module before it has been fully imported. See
>>> https://bugs.python.org/issue34572 for more information.
>>>
>>> The traceback shows a Python 3.6 venv so this could be a different
>>> issue (the unpickle bug was introduced in version 3.7). If it's the same
>>> bug then upgrading to Python 3.7.3 or higher should fix that issue. One
>>> potential workaround is to ensure that all of the modules get imported
>>> during the initialization of the sdk_worker, as this bug only affects
>>> imports done by the unpickler.
>>
>>
>> The symptoms do sound similar, so I would try to reproduce your issue
>> on 3.7.3 and see if it is gone, or try to reproduce
>> https://bugs.python.org/issue34572 in the version of interpreter you
>> use. If this doesn't help, you can try to reproduce the race using your
>> input.
>>
>> To get the output of serialized do fn, you could do the following:
>> 1. Patch https://github.com/apache/beam/pull/10036.
>> 2. Set logging level to DEBUG, see:
>> https://github.com/apache/beam/blob/90d587843172143c15ed392513e396b74569a98c/sdks/python/apache_beam/examples/wordcount.py#L137
>> .
>> 3. Check for log output for payload of your transform, it may look
>> like:
>>
>> transforms {
>>   key:
>> "ref_AppliedPTransform_write/Write/WriteImpl/PreFinalize_42"
>>   value {
>> spec {
>>   urn: "beam:transform:pardo:v1"
>>   payload: "\n\347\006\n\275\006\n
>> beam:dofn:pickled_python_info:v1\032\230\006eNptU1tPFTEQPgqIFFT
>> 
>>
>> Then you can extract the output of pickled fn:
>>
>> from apache_beam.utils import proto_utils
>> from apache_beam.portability.api import beam_runner_api_pb2
>> from apache_beam.internal import pickler
>>
>> payload = b'\n\347\006\n\275\006\n
>> beam:dofn:pickled_python_info:v1\032\230\006eNptU1tPFTEQPgqIFFT...'
>> pardo_payload = proto_utils.parse_Bytes(x,
>> beam_runner_api_pb2.ParDoPayload)
>> pickled_fn = pardo_payload.do_fn.spec.payload
>>
>> pickler.loads(pickle_fn) # Presumably the race happens here when
>> unpickling one of your transforms
>> (pricingrealtime.aggregation.aggregation_transform).
>>
>>
>> On Wed, Nov 6, 2019 at 10:54 PM Rakesh Kumar 
>> wrote:
>>
>>> Thanks Valentyn,
>>>
>>> Aggregation_transform.py doesn't have any transformation method
>>> which extends beam.DoFn. We are using plain python method which we 
>>> passed
>>> in beam.Map().  I am not sure how to get the dump of serialized_fn. Can 
>>>

Re: Fanout and OOMs on Dataflow while streaming to BQ

2019-11-22 Thread Jeff Klukas
We also had throughput issues in writing to BQ in a streaming pipeline and
we mitigated by provisioning a large quantity of SSD storage to improve I/O
throughput to disk for checkpoints.

I also Erik's suggestion to look into Streaming Engine. We are currently
looking into migrating our streaming use cases to use streaming engine
after we had success with improved BQ write throughput on batch workloads
by using Shuffle service (the batch mode analogue to the Streaming Engine).



On Fri, Nov 22, 2019 at 9:01 AM Erik Lincoln 
wrote:

> Hi Frantisek,
>
>
>
> Some advice from making a similar pipeline and struggling with throughput
> and latency:
>
>1. Break up your pipeline into multiple pipelines. Dataflow only
>auto-scales based on input throughput. If you’re microbatching events in
>files, the job will only autoscale to meet the volume of files, not the
>volume of events added to the pipeline from the files.
>   1. Better flow is:
>
>i.  
> Pipeline
> 1: Receive GCS notifications, read files, and then output file contents as
> Pubsub messages either per event or in microbatches from the file
>
>  ii.  Pipeline
> 2: Receive events from Pubsub, do your transforms, then write to BQ
>
>1. Use the Streaming Engine service (if you can run in a region
>supporting it):
>
> https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#streaming-engine
>2. BQ streaming can be slower than a load job if you have a very high
>volume (millions of events a minute). If your event volume is high, you
>might need to consider further microbatching loads into BQ from GCS.
>
>
>
> Hope this helps,
>
> Erik
>
>
>
> *From:* Frantisek Csajka 
> *Sent:* Friday, November 22, 2019 5:35 AM
> *To:* user@beam.apache.org
> *Subject:* Fanout and OOMs on Dataflow while streaming to BQ
>
>
>
> Hi Beamers,
>
> We are facing OutOfMemory errors with a streaming pipeline on Dataflow. We
> are unable to get rid of them, not even with bigger worker instances. Any
> advice will be appreciated.
>
> The scenario is the following.
> - Files are being written to a bucket in GCS. A notification is set up on
> the bucket, so for each file there is a Pub/Sub message
> - Notifications are consumed by our pipeline
> - Files from notifications are read by the pipeline
> - Each file contains several events (there is a quite big fanout)
> - Events are written to BigQuery (streaming)
>
> Everything works fine if there are only few notifications, but if the
> files are incoming at high rate or if there is a large backlog in the
> notification subscription, events get stuck in BQ write and later OOM is
> thrown.
>
> Having a larger worker does not work because if the backlog is large,
> larger instance(s) will throw OOM as well, just later...
>
> As far as we can see, there is a checkpointing/reshuffle in BQ write and
> thats where the elements got stuck. It looks like the pubsub is consuming
> too many elements and due to fanout it causes OOM when grouping in
> reshuffle.
> Is there any way to backpressure the pubsub read? Is it possible to have
> smaller window in Reshuffle? How does the Reshuffle actually work?
> Any advice?
>
> Thanks in advance,
> Frantisek
> CONFIDENTIALITY NOTICE: This e-mail and any files transmitted with it are
> intended solely for the use of the individual or entity to whom they are
> addressed and may contain confidential and privileged information protected
> by law. If you received this e-mail in error, any review, use,
> dissemination, distribution, or copying of the e-mail is strictly
> prohibited. Please notify the sender immediately by return e-mail and
> delete all copies from your system.
>


RE: Fanout and OOMs on Dataflow while streaming to BQ

2019-11-22 Thread Erik Lincoln
Hi Frantisek,

Some advice from making a similar pipeline and struggling with throughput and 
latency:

  1.  Break up your pipeline into multiple pipelines. Dataflow only auto-scales 
based on input throughput. If you’re microbatching events in files, the job 
will only autoscale to meet the volume of files, not the volume of events added 
to the pipeline from the files.
 *   Better flow is:

   i.  Pipeline 
1: Receive GCS notifications, read files, and then output file contents as 
Pubsub messages either per event or in microbatches from the file

 ii.  Pipeline 
2: Receive events from Pubsub, do your transforms, then write to BQ

  1.  Use the Streaming Engine service (if you can run in a region supporting 
it): 
https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#streaming-engine
  2.  BQ streaming can be slower than a load job if you have a very high volume 
(millions of events a minute). If your event volume is high, you might need to 
consider further microbatching loads into BQ from GCS.

Hope this helps,
Erik

From: Frantisek Csajka 
Sent: Friday, November 22, 2019 5:35 AM
To: user@beam.apache.org
Subject: Fanout and OOMs on Dataflow while streaming to BQ

Hi Beamers,

We are facing OutOfMemory errors with a streaming pipeline on Dataflow. We are 
unable to get rid of them, not even with bigger worker instances. Any advice 
will be appreciated.

The scenario is the following.
- Files are being written to a bucket in GCS. A notification is set up on the 
bucket, so for each file there is a Pub/Sub message
- Notifications are consumed by our pipeline
- Files from notifications are read by the pipeline
- Each file contains several events (there is a quite big fanout)
- Events are written to BigQuery (streaming)

Everything works fine if there are only few notifications, but if the files are 
incoming at high rate or if there is a large backlog in the notification 
subscription, events get stuck in BQ write and later OOM is thrown.

Having a larger worker does not work because if the backlog is large, larger 
instance(s) will throw OOM as well, just later...

As far as we can see, there is a checkpointing/reshuffle in BQ write and thats 
where the elements got stuck. It looks like the pubsub is consuming too many 
elements and due to fanout it causes OOM when grouping in reshuffle.
Is there any way to backpressure the pubsub read? Is it possible to have 
smaller window in Reshuffle? How does the Reshuffle actually work?
Any advice?

Thanks in advance,
Frantisek
CONFIDENTIALITY NOTICE: This e-mail and any files transmitted with it are 
intended solely for the use of the individual or entity to whom they are 
addressed and may contain confidential and privileged information protected by 
law. If you received this e-mail in error, any review, use, dissemination, 
distribution, or copying of the e-mail is strictly prohibited. Please notify 
the sender immediately by return e-mail and delete all copies from your system.


Fanout and OOMs on Dataflow while streaming to BQ

2019-11-22 Thread Frantisek Csajka
Hi Beamers,

We are facing OutOfMemory errors with a streaming pipeline on Dataflow. We
are unable to get rid of them, not even with bigger worker instances. Any
advice will be appreciated.

The scenario is the following.
- Files are being written to a bucket in GCS. A notification is set up on
the bucket, so for each file there is a Pub/Sub message
- Notifications are consumed by our pipeline
- Files from notifications are read by the pipeline
- Each file contains several events (there is a quite big fanout)
- Events are written to BigQuery (streaming)

Everything works fine if there are only few notifications, but if the files
are incoming at high rate or if there is a large backlog in the
notification subscription, events get stuck in BQ write and later OOM is
thrown.

Having a larger worker does not work because if the backlog is large,
larger instance(s) will throw OOM as well, just later...

As far as we can see, there is a checkpointing/reshuffle in BQ write and
thats where the elements got stuck. It looks like the pubsub is consuming
too many elements and due to fanout it causes OOM when grouping in
reshuffle.
Is there any way to backpressure the pubsub read? Is it possible to have
smaller window in Reshuffle? How does the Reshuffle actually work?
Any advice?

Thanks in advance,
Frantisek