Github user tdas commented on a diff in the pull request:
https://github.com/apache/spark/pull/4964#discussion_r26530402
--- Diff:
streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
---
@@ -254,39 +271,97 @@ class JobGenerator(jobScheduler: JobScheduler)
extends Logging {
// If checkpointing is enabled, then checkpoint,
// else mark batch to be fully processed
- if (shouldCheckpoint) {
+ if (isCheckpointRequired(time)) {
eventActor ! DoCheckpoint(time)
} else {
// If checkpointing is not enabled, then delete metadata information
about
// received blocks (block data not saved in any case). Otherwise,
wait for
// checkpointing of this batch to complete.
- val maxRememberDuration = graph.getMaxInputStreamRememberDuration()
- jobScheduler.receiverTracker.cleanupOldBlocksAndBatches(time -
maxRememberDuration)
- markBatchFullyProcessed(time)
+ // This is synchronized so that multiple batches completing update
this only one by one
+ updateLastProcessedBatch(time)
+ cleanupOldBlocksAndBatches()
}
}
/** Clear DStream checkpoint data for the given `time`. */
private def clearCheckpointData(time: Time) {
ssc.graph.clearCheckpointData(time)
+ updateLastProcessedBatch(time)
+ cleanupOldBlocksAndBatches()
+ }
+ // Needs to be synchronized because the lastProcessedBatch can be
updated from another thread
+ private def cleanupOldBlocksAndBatches(): Unit = synchronized {
// All the checkpoint information about which batches have been
processed, etc have
// been saved to checkpoints, so its safe to delete block metadata and
data WAL files
- val maxRememberDuration = graph.getMaxInputStreamRememberDuration()
- jobScheduler.receiverTracker.cleanupOldBlocksAndBatches(time -
maxRememberDuration)
- markBatchFullyProcessed(time)
+ lastProcessedBatch.foreach { lastProcessedTime =>
+ val maxRememberDuration = graph.getMaxInputStreamRememberDuration()
+ jobScheduler.receiverTracker
+ .cleanupOldBlocksAndBatches(lastProcessedTime -
maxRememberDuration)
+ }
}
/** Perform checkpoint for the give `time`. */
private def doCheckpoint(time: Time) {
- if (shouldCheckpoint && (time -
graph.zeroTime).isMultipleOf(ssc.checkpointDuration)) {
+ if (isCheckpointRequired(time)) {
logInfo("Checkpointing graph for time " + time)
ssc.graph.updateCheckpointData(time)
checkpointWriter.write(new Checkpoint(ssc, time))
}
}
- private def markBatchFullyProcessed(time: Time) {
- lastProcessedBatch = time
+ private def isCheckpointRequired(time: Time): Boolean = {
+ shouldCheckpoint && (time -
graph.zeroTime).isMultipleOf(ssc.checkpointDuration)
+ }
+
+ private def updateLastProcessedBatch(time: Time): Unit = {
+ /*
+ * When is a batch fully processed?
+ * - For batches which are not to be checkpointed, they are fully
processed as soon as they
+ * are done.
+ * - For batches that are to be checkpointed, there is a checkpoint
at the start and at the
+ * end of the batch. It is fully done only when both checkpoints are
written out. So for
+ * each created batch (that is to be checkpointed), we must check if
the batch was
+ * checkpointed twice, if it was then it is fully processed.
+ *
+ * Since batches are processed asynchronously, it is possible for a
batch with time t to be
+ * processed after one with time t', even if t < t'. So when a batch
is fully processed, we
+ * must update the lastProcessedBatch with the time of the oldest
batch that has been fully
+ * processed. So for each batch that was to be checkpointed, ensure
that it has been
+ * checkpointed twice. The newest batch which has been checkpointed
twice is the
+ * lastProcessedBatch.
+ */
+ // This basically goes through each of the batches and checks if it
has been checkpointed
+ // twice. This is required since it is possible a newer batch was
checkpointed twice before
+ // an older one. Ex: Two batches - times 1 and 2, 1 takes longer to
process than 2, so 2 gets
+ // checkpointed twice before 1. Now, 2 can't be updated as the
lastProcessed since 1 is not
+ // done. So when 1 is done, we update the lastProcessedBatch to 2
(since both 1 and 2 are done).
+ if (!isCheckpointRequired(time)) {
+ // If checkpoint is not enabled, the lastProcessedBatch is
+ // the one which is the oldest.
+ if (!shouldCheckpoint && batchesGenerated.first() == time) {
+ lastProcessedBatch = Some(time)
+ }
+ batchesGenerated -= time
+ } else {
+ val iter = batchesGenerated.iterator()
+ var continueIterating = true
+ while (iter.hasNext && continueIterating) {
+ val t = iter.next()
+ // Have both checkpoints bee written out for this batch? If yes,
update lastProcessed, else
+ // stop and keep all WAL files for this batch.
+ if (completedBothCheckpoints.containsKey(t) &&
completedBothCheckpoints(t)) {
+ completedBothCheckpoints -= t
+ lastProcessedBatch = Some(t)
+ iter.remove()
+ } else {
+ continueIterating = false
+ }
+ }
+ }
+ }
+
+ private class TimeComparator extends Comparator[Time] {
--- End diff --
Better to put this in the same file as Time.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]