Hi,
I think this thread is another manifestation of a problem discussed
recently in [1]. Long story short - users in certain situations might
legitimately need finer control over how is their pipeline translated
into runner's operators. The case of caching is another one, where
looking at the pipeline itself doesn't contain enough information to
perform correct optimization - consider for example this DAG of operations:
PCollectionA -> PCollectionB -> PCollectionC
\-> PCollectionD
That is PCollectionB is consumed by two operators - PCollectionC and
PCollectionD. Logical conclusion would be that we need to cache
PCollectionB, but what if the transform that produced PCollectionB is of
type "cheap explosion" - where PCollectionB is significantly bigger than
PCollectionA and at the same time can be produced very cheaply from
elements of PCollectionA? Than it would make sense to cache PCollectionA
instead. But you cannot know that just from the DAG. There would be many
more examples of this. Maybe we could think about how to support this,
which might help widen the user base.
Jan
[1] https://www.mail-archive.com/[email protected]/msg03809.html
On 5/15/19 10:39 AM, Robert Bradshaw wrote:
Just to clarify, do you need direct control over what to cache, or
would it be OK to let Spark decide the minimal set of RDDs to cache as
long as we didn't cache all intermediates?
From: Augusto Ribeiro <[email protected]>
Date: Wed, May 15, 2019 at 8:37 AM
To: <[email protected]>
Hi Kyle,
Thanks for the help. It seems like I have no other choice than using Spark
directly, since my job causes immense memory pressure if I can't decide what to
cache.
Best regards,
Augusto
On 14 May 2019, at 18:40, Kyle Weaver <[email protected]> wrote:
Minor correction: Slack channel is actually #beam-spark
Kyle Weaver | Software Engineer | github.com/ibzib | [email protected] |
+16502035555
From: Kyle Weaver <[email protected]>
Date: Tue, May 14, 2019 at 9:38 AM
To: <[email protected]>
Hi Augusto,
Right now the default behavior is to cache all intermediate RDDs that are
consumed more than once by the pipeline. This can be disabled with
`options.setCacheDisabled(true)` [1], but there is currently no way for the
user to specify to the runner that it should cache certain RDDs, but not others.
There has recently been some discussion on the Slack (#spark-beam) about
implementing such a feature, but no concrete plans as of yet.
[1]
https://github.com/apache/beam/blob/81faf35c8a42493317eba9fa1e7b06fb42d54662/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java#L150
Thanks
Kyle Weaver | Software Engineer | github.com/ibzib | [email protected] |
+16502035555
From: [email protected] <[email protected]>
Date: Tue, May 14, 2019 at 5:01 AM
To: <[email protected]>
Hi,
I guess the title says it all, right now it seems like BEAM caches all the
intermediate RDD results for my pipeline when using the Spark runner, this
leads to a very inefficient usage of memory. Any way to control this?
Best regards,
Augusto