Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9072#discussion_r42183114
  
    --- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala ---
    @@ -1520,19 +1524,40 @@ abstract class RDD[T: ClassTag](
           persist(LocalRDDCheckpointData.transformStorageLevel(storageLevel), 
allowOverride = true)
         }
     
    -    checkpointData match {
    -      case Some(reliable: ReliableRDDCheckpointData[_]) => logWarning(
    -        "RDD was already marked for reliable checkpointing: overriding 
with local checkpoint.")
    -      case _ =>
    +    // If this RDD is already checkpointed and materialized, its lineage 
is already truncated.
    +    // We must not override our `checkpointData` in this case because it 
is needed to recover
    +    // the checkpointed data. If it is overridden, next time materializing 
on this RDD will
    +    // cause error.
    +    if (isCheckpointedAndMaterialized) {
    +      logWarning("Not marking RDD for local checkpoint because it was 
already " +
    +        "checkpointed and materialized")
    +    } else {
    +      // Lineage is not truncated yet, so just override any existing 
checkpoint data with ours
    +      checkpointData match {
    +        case Some(_: ReliableRDDCheckpointData[_]) => logWarning(
    +          "RDD was already marked for reliable checkpointing: overriding 
with local checkpoint.")
    +        case _ =>
    +      }
    +      checkpointData = Some(new LocalRDDCheckpointData(this))
         }
    -    checkpointData = Some(new LocalRDDCheckpointData(this))
         this
       }
     
       /**
        * Return whether this RDD is marked for checkpointing, either reliably 
or locally.
        */
    -  def isCheckpointed: Boolean = checkpointData.exists(_.isCheckpointed)
    +  def isCheckpointed: Boolean = {
    +    checkpointData match {
    +      case Some(_: RDDCheckpointData[_]) => true
    +      case _ => false
    +    }
    +  }
    --- End diff --
    
    we can't change the semantics of this public method. We need to keep this 
as it was before. Internally in Spark we will use the 
`isCheckpointAndMaterialized`, which will just call `isCheckpointed`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to