Github user zsxwing commented on a diff in the pull request:
https://github.com/apache/spark/pull/9707#discussion_r45124403
--- Diff:
streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala ---
@@ -187,16 +187,27 @@ class CheckpointWriter(
private var stopped = false
private var fs_ : FileSystem = _
+ @volatile private var latestCheckpointTime: Time = null
+
class CheckpointWriteHandler(
checkpointTime: Time,
bytes: Array[Byte],
clearCheckpointDataLater: Boolean) extends Runnable {
def run() {
+ if (latestCheckpointTime == null || latestCheckpointTime <
checkpointTime) {
+ latestCheckpointTime = checkpointTime
+ }
var attempts = 0
val startTime = System.currentTimeMillis()
val tempFile = new Path(checkpointDir, "temp")
- val checkpointFile = Checkpoint.checkpointFile(checkpointDir,
checkpointTime)
- val backupFile = Checkpoint.checkpointBackupFile(checkpointDir,
checkpointTime)
+ // We will do checkpoint when generating a batch and completing a
batch. When the processing
+ // time of a batch is greater than the batch interval, checkpointing
for completing an old
+ // batch may run after checkpointing of a new batch. If this
happens, checkpoint of an old
+ // batch actually has the latest information, so we want to recovery
from it. Therefore, we
+ // also use the latest checkpoint time as the file name, so that we
can recovery from the
+ // latest checkpoint file.
+ val checkpointFile = Checkpoint.checkpointFile(checkpointDir,
latestCheckpointTime)
--- End diff --
Updated
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]