Hi,

I think this thread is another manifestation of a problem discussed recently in [1]. Long story short - users in certain situations might legitimately need finer control over how is their pipeline translated into runner's operators. The case of caching is another one, where looking at the pipeline itself doesn't contain enough information to perform correct optimization - consider for example this DAG of operations:

 PCollectionA -> PCollectionB -> PCollectionC

                                              \-> PCollectionD

That is PCollectionB is consumed by two operators - PCollectionC and PCollectionD. Logical conclusion would be that we need to cache PCollectionB, but what if the transform that produced PCollectionB is of type "cheap explosion" - where PCollectionB is significantly bigger than PCollectionA and at the same time can be produced very cheaply from elements of PCollectionA? Than it would make sense to cache PCollectionA instead. But you cannot know that just from the DAG. There would be many more examples of this. Maybe we could think about how to support this, which might help widen the user base.

Jan

[1] https://www.mail-archive.com/[email protected]/msg03809.html

On 5/15/19 10:39 AM, Robert Bradshaw wrote:
Just to clarify, do you need direct control over what to cache, or
would it be OK to let Spark decide the minimal set of RDDs to cache as
long as we didn't cache all intermediates?

From: Augusto Ribeiro <[email protected]>
Date: Wed, May 15, 2019 at 8:37 AM
To: <[email protected]>

Hi Kyle,

Thanks for the help. It seems like I have no other choice than using Spark 
directly, since my job causes immense memory pressure if I can't decide what to 
cache.

Best regards,
Augusto

On 14 May 2019, at 18:40, Kyle Weaver <[email protected]> wrote:

Minor correction: Slack channel is actually #beam-spark

Kyle Weaver | Software Engineer | github.com/ibzib | [email protected] | 
+16502035555


From: Kyle Weaver <[email protected]>
Date: Tue, May 14, 2019 at 9:38 AM
To: <[email protected]>

Hi Augusto,

Right now the default behavior is to cache all intermediate RDDs that are 
consumed more than once by the pipeline. This can be disabled with 
`options.setCacheDisabled(true)` [1], but there is currently no way for the 
user to specify to the runner that it should cache certain RDDs, but not others.

There has recently been some discussion on the Slack (#spark-beam) about 
implementing such a feature, but no concrete plans as of yet.

[1] 
https://github.com/apache/beam/blob/81faf35c8a42493317eba9fa1e7b06fb42d54662/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java#L150

Thanks

Kyle Weaver | Software Engineer | github.com/ibzib | [email protected] | 
+16502035555


From: [email protected] <[email protected]>
Date: Tue, May 14, 2019 at 5:01 AM
To: <[email protected]>

Hi,

I guess the title says it all, right now it seems like BEAM caches all the 
intermediate RDD results for my pipeline when using the Spark runner, this 
leads to a very inefficient usage of memory. Any way to control this?

Best regards,
Augusto

Reply via email to