Hello Augusto, This is a long standing problem that is really hard to fix properly, unless the runner has better understanding of what is actually happening in the pipeline itself (that's what Jan already pointed out). I still think the best approach, that would require least "knobs" for user to tune, would be sampling of previous pipeline runs and reusing the gathered information to decide what should be cached and how.
I think we can probably eliminate some of the major cases that could cause trouble, eg. when DAG split is right after shuffle, it's usually not worth caching as Spark can reuse shuffle files, unless computation is really expensive (random access db joins) ... D. On Thu, May 16, 2019 at 9:38 AM Augusto Ribeiro <[email protected]> wrote: > Hi Robert, > > If Spark could decide then it would be ok, but probably not a trivial > problem like Jan pointed out. > > In my case, the decision had to be made on the basis of the machines in > the cluster, in principle if you had enough machines (or larger) to hold > everything in memory it wouldn't be a problem but in reality we all have > constrains on how many and what kind of machines to use. > > Best regards, > Augusto > > > On 15 May 2019, at 10:39, Robert Bradshaw <[email protected]> wrote: > > > > Just to clarify, do you need direct control over what to cache, or > > would it be OK to let Spark decide the minimal set of RDDs to cache as > > long as we didn't cache all intermediates? > > > > From: Augusto Ribeiro <[email protected]> > > Date: Wed, May 15, 2019 at 8:37 AM > > To: <[email protected]> > > > >> Hi Kyle, > >> > >> Thanks for the help. It seems like I have no other choice than using > Spark directly, since my job causes immense memory pressure if I can't > decide what to cache. > >> > >> Best regards, > >> Augusto > >> > >> On 14 May 2019, at 18:40, Kyle Weaver <[email protected]> wrote: > >> > >> Minor correction: Slack channel is actually #beam-spark > >> > >> Kyle Weaver | Software Engineer | github.com/ibzib | > [email protected] | +16502035555 > >> > >> > >> From: Kyle Weaver <[email protected]> > >> Date: Tue, May 14, 2019 at 9:38 AM > >> To: <[email protected]> > >> > >>> Hi Augusto, > >>> > >>> Right now the default behavior is to cache all intermediate RDDs that > are consumed more than once by the pipeline. This can be disabled with > `options.setCacheDisabled(true)` [1], but there is currently no way for the > user to specify to the runner that it should cache certain RDDs, but not > others. > >>> > >>> There has recently been some discussion on the Slack (#spark-beam) > about implementing such a feature, but no concrete plans as of yet. > >>> > >>> [1] > https://github.com/apache/beam/blob/81faf35c8a42493317eba9fa1e7b06fb42d54662/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java#L150 > >>> > >>> Thanks > >>> > >>> Kyle Weaver | Software Engineer | github.com/ibzib | > [email protected] | +16502035555 > >>> > >>> > >>> From: [email protected] <[email protected]> > >>> Date: Tue, May 14, 2019 at 5:01 AM > >>> To: <[email protected]> > >>> > >>>> Hi, > >>>> > >>>> I guess the title says it all, right now it seems like BEAM caches > all the intermediate RDD results for my pipeline when using the Spark > runner, this leads to a very inefficient usage of memory. Any way to > control this? > >>>> > >>>> Best regards, > >>>> Augusto > >> > >> > >
