Github user harishreedharan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9707#discussion_r45099938
  
    --- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala ---
    @@ -187,16 +187,27 @@ class CheckpointWriter(
       private var stopped = false
       private var fs_ : FileSystem = _
     
    +  @volatile private var latestCheckpointTime: Time = null
    +
       class CheckpointWriteHandler(
           checkpointTime: Time,
           bytes: Array[Byte],
           clearCheckpointDataLater: Boolean) extends Runnable {
         def run() {
    +      if (latestCheckpointTime == null || latestCheckpointTime < 
checkpointTime) {
    +        latestCheckpointTime = checkpointTime
    +      }
           var attempts = 0
           val startTime = System.currentTimeMillis()
           val tempFile = new Path(checkpointDir, "temp")
    -      val checkpointFile = Checkpoint.checkpointFile(checkpointDir, 
checkpointTime)
    -      val backupFile = Checkpoint.checkpointBackupFile(checkpointDir, 
checkpointTime)
    +      // We will do checkpoint when generating a batch and completing a 
batch. When the processing
    +      // time of a batch is greater than the batch interval, checkpointing 
for completing an old
    +      // batch may run after checkpointing of a new batch. If this 
happens, checkpoint of an old
    +      // batch actually has the latest information, so we want to recovery 
from it. Therefore, we
    +      // also use the latest checkpoint time as the file name, so that we 
can recovery from the
    +      // latest checkpoint file.
    +      val checkpointFile = Checkpoint.checkpointFile(checkpointDir, 
latestCheckpointTime)
    --- End diff --
    
    I don't think you get what I am saying. I am saying that two threads could 
run at the same time writing out data to the exact same files. 
    
    If I am not mistaken, there is a bug here that could lead to 2 checkpoints 
running at the same time, writing to the same files.
    -- Checkpoint 1: Completion of Batch Time t
    -- Checkpoint 2: Start of Batch Time t+1
    
    Checkpoint 2 starts -> `latestCheckpoint = t + 1`
    Checkpoint 1 starts -> since `latestCheckpoint != null` and 
`latestCheckpoint > checkpointTime`, we would not reset `latestCheckpoint`, so 
both checkpoints would use the same file name to write their checkpoints out.
    
    Because of this, depending on which thread reaches the tempFile creation 
first, that would win - which is non-deterministic. The other thread would end 
up hitting an exception.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to