[ 
https://issues.apache.org/jira/browse/SPARK-28781?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sean R. Owen resolved SPARK-28781.
----------------------------------
    Resolution: Not A Problem

I think the point of this class is to manage RDDs that depend on each other, to 
break lineage, etc. They all need to be persisted, so they are not recomputed 
when child RDDs are materialized. My only question here is why it needs to hold 
3 rather than 2, but, that's a different issue. 

> Unneccesary persist in PeriodicCheckpointer.update()
> ----------------------------------------------------
>
>                 Key: SPARK-28781
>                 URL: https://issues.apache.org/jira/browse/SPARK-28781
>             Project: Spark
>          Issue Type: Improvement
>          Components: Spark Core
>    Affects Versions: 3.0.0
>            Reporter: Dong Wang
>            Priority: Major
>
> Once the fuction _update()_ is called, the RDD _newData_ is persisted at line 
> 82. However, only when meeting the checking point condition (at line 94), the 
> persisted rdd _newData_ would be used for the second time in the API 
> _checkpoint()_ (do checkpoint at line 97). In other conditions, _newData_ 
> will only be used once and it is unnecessary to persist the rdd in that case. 
> Although the persistedQueue will be checked to avoid too many unnecessary 
> cached data, it would be better to avoid every unnecessary persist operation.
> {code:scala}
> def update(newData: T): Unit = {
>     persist(newData)
>     persistedQueue.enqueue(newData)
>     // We try to maintain 2 Datasets in persistedQueue to support the 
> semantics of this class:
>     // Users should call [[update()]] when a new Dataset has been created,
>     // before the Dataset has been materialized.
>     while (persistedQueue.size > 3) {
>       val dataToUnpersist = persistedQueue.dequeue()
>       unpersist(dataToUnpersist)
>     }
>     updateCount += 1
>     // Handle checkpointing (after persisting)
>     if (checkpointInterval != -1 && (updateCount % checkpointInterval) == 0
>       && sc.getCheckpointDir.nonEmpty) {
>       // Add new checkpoint before removing old checkpoints.
>       checkpoint(newData)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to