Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/4964#discussion_r26530402
  
    --- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
 ---
    @@ -254,39 +271,97 @@ class JobGenerator(jobScheduler: JobScheduler) 
extends Logging {
     
         // If checkpointing is enabled, then checkpoint,
         // else mark batch to be fully processed
    -    if (shouldCheckpoint) {
    +    if (isCheckpointRequired(time)) {
           eventActor ! DoCheckpoint(time)
         } else {
           // If checkpointing is not enabled, then delete metadata information 
about
           // received blocks (block data not saved in any case). Otherwise, 
wait for
           // checkpointing of this batch to complete.
    -      val maxRememberDuration = graph.getMaxInputStreamRememberDuration()
    -      jobScheduler.receiverTracker.cleanupOldBlocksAndBatches(time - 
maxRememberDuration)
    -      markBatchFullyProcessed(time)
    +      // This is synchronized so that multiple batches completing update 
this only one by one
    +      updateLastProcessedBatch(time)
    +      cleanupOldBlocksAndBatches()
         }
       }
     
       /** Clear DStream checkpoint data for the given `time`. */
       private def clearCheckpointData(time: Time) {
         ssc.graph.clearCheckpointData(time)
    +    updateLastProcessedBatch(time)
    +    cleanupOldBlocksAndBatches()
    +  }
     
    +  // Needs to be synchronized because the lastProcessedBatch can be 
updated from another thread
    +  private def cleanupOldBlocksAndBatches(): Unit = synchronized {
         // All the checkpoint information about which batches have been 
processed, etc have
         // been saved to checkpoints, so its safe to delete block metadata and 
data WAL files
    -    val maxRememberDuration = graph.getMaxInputStreamRememberDuration()
    -    jobScheduler.receiverTracker.cleanupOldBlocksAndBatches(time - 
maxRememberDuration)
    -    markBatchFullyProcessed(time)
    +    lastProcessedBatch.foreach { lastProcessedTime =>
    +      val maxRememberDuration = graph.getMaxInputStreamRememberDuration()
    +      jobScheduler.receiverTracker
    +        .cleanupOldBlocksAndBatches(lastProcessedTime - 
maxRememberDuration)
    +    }
       }
     
       /** Perform checkpoint for the give `time`. */
       private def doCheckpoint(time: Time) {
    -    if (shouldCheckpoint && (time - 
graph.zeroTime).isMultipleOf(ssc.checkpointDuration)) {
    +    if (isCheckpointRequired(time)) {
           logInfo("Checkpointing graph for time " + time)
           ssc.graph.updateCheckpointData(time)
           checkpointWriter.write(new Checkpoint(ssc, time))
         }
       }
     
    -  private def markBatchFullyProcessed(time: Time) {
    -    lastProcessedBatch = time
    +  private def isCheckpointRequired(time: Time): Boolean = {
    +    shouldCheckpoint && (time - 
graph.zeroTime).isMultipleOf(ssc.checkpointDuration)
    +  }
    +
    +  private def updateLastProcessedBatch(time: Time): Unit = {
    +    /*
    +     * When is a batch fully processed?
    +     *  - For batches which are not to be checkpointed, they are fully 
processed as soon as they
    +     *  are done.
    +     *  - For batches that are to be checkpointed, there is a checkpoint 
at the start and at the
    +     *  end of the batch. It is fully done only when both checkpoints are 
written out. So for
    +     *  each created batch (that is to be checkpointed), we must check if 
the batch was
    +     *  checkpointed twice, if it was then it is fully processed.
    +     *
    +     *  Since batches are processed asynchronously, it is possible for a 
batch with time t to be
    +     *  processed after one with time t', even if t < t'. So when a batch 
is fully processed, we
    +     *  must update the lastProcessedBatch with the time of the oldest 
batch that has been fully
    +     *  processed. So for each batch that was to be checkpointed, ensure 
that it has been
    +     *  checkpointed twice. The newest batch which has been checkpointed 
twice is the
    +     *  lastProcessedBatch.
    +     */
    +    // This basically goes through each of the batches and checks if it 
has been checkpointed
    +    // twice. This is required since it is possible a newer batch was 
checkpointed twice before
    +    // an older one. Ex: Two batches - times 1 and 2, 1 takes longer to 
process than 2, so 2 gets
    +    // checkpointed twice before 1. Now, 2 can't be updated as the 
lastProcessed since 1 is not
    +    // done. So when 1 is done, we update the lastProcessedBatch to 2 
(since both 1 and 2 are done).
    +    if (!isCheckpointRequired(time)) {
    +      // If checkpoint is not enabled, the lastProcessedBatch is
    +      // the one which is the oldest.
    +      if (!shouldCheckpoint && batchesGenerated.first() == time) {
    +        lastProcessedBatch = Some(time)
    +      }
    +      batchesGenerated -= time
    +    } else {
    +      val iter = batchesGenerated.iterator()
    +      var continueIterating = true
    +      while (iter.hasNext && continueIterating) {
    +        val t = iter.next()
    +        // Have both checkpoints bee written out for this batch? If yes, 
update lastProcessed, else
    +        // stop and keep all WAL files for this batch.
    +        if (completedBothCheckpoints.containsKey(t) && 
completedBothCheckpoints(t)) {
    +          completedBothCheckpoints -= t
    +          lastProcessedBatch = Some(t)
    +          iter.remove()
    +        } else {
    +          continueIterating = false
    +        }
    +      }
    +    }
    +  }
    +
    +  private class TimeComparator extends Comparator[Time] {
    --- End diff --
    
    Better to put this in the same file as Time. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to