In my view spark is behaving as expected. 

TL:DR 
Every time a dataframe is reused or branched or forked the sequence operations 
evaluated run again. Use Cache or persist to avoid this behavior and un-persist 
when no longer required, spark does not un-persist automatically. 


Couple of things

* RDD is the base data structure construct in Spark All higher level APIs 
finally convert to RDD API, so the behavior will not change between RDD or 
DataFrame API. DataFrame API just makes it easier with added optimization for 
SQL kind of constructs. while RDD API more raw control to programmer

* Having one action doesn't mean it has to read the file or calculate data 
frames only once. Action and Data Frames are not related in that way. Action is 
something that requires results to be sent to driver program and hence triggers 
jobs based on sequence of operations on RDD lineage. 

* The important thing to remember is that every time a dataframe is reused or 
branched or forked (and hence underlying RDD) sequence of operations are 
evaluated again. 

Here df1 is base data from file and df2 is aggregated data from df1 both are 
different RDDs. yes df2 has parent lineage to df1 but the groupBy operation on 
df2 makes new RDD a different branch. 

The way to optimize is this to explicitly cache at some point. You will also 
have to consider the available memory in cluster. 

* This behavior is same if you call the same action again, spark will not cache 
RDD unless explicitly asked for. For instance run df3.count() again and you 
will see the file will read twice again, though  df3.count() was run before. 
(you might in some cases see better performance for the second run of 
df3.count, this might be due to operating system caching the file and so the 
file scan is faster then first time, but this is not result of Spark cache)

* In some cases one example is shuffle operations Spark will do implicit 
persist for fault tolerance documented here. 
https://spark.apache.org/docs/3.4.0/rdd-programming-guide.html#rdd-persistence

"Spark also automatically persists some intermediate data in shuffle operations 
(e.g. reduceByKey), even without users calling persist. This is done to avoid 
recomputing the entire input if a node fails during the shuffle. We still 
recommend users call persist on the resulting RDD if they plan to reuse it."


Hope that helps. 


thanks
Vijay
---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to