neilramaswamy commented on code in PR #47895:
URL: https://github.com/apache/spark/pull/47895#discussion_r1762072512


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala:
##########
@@ -900,12 +907,54 @@ class MicroBatchExecution(
    */
   protected def markMicroBatchExecutionStart(execCtx: 
MicroBatchExecutionContext): Unit = {}
 
+  private def updateCheckpointIdForOperator(
+      execCtx: MicroBatchExecutionContext,
+      opId: Long,
+      checkpointInfo: Array[StateStoreCheckpointInfo]): Unit = {
+    // TODO validate baseCheckpointId
+    checkpointInfo.map(_.batchVersion).foreach { v =>
+      assert(
+        execCtx.batchId == -1 || v == execCtx.batchId + 1,
+        s"version $v doesn't match current Batch ID ${execCtx.batchId}")
+    }
+    currentCheckpointUniqueId.put(opId, checkpointInfo.map { c =>
+      assert(c.checkpointId.isDefined)
+      c.checkpointId.get
+    })
+  }
+
+  private def updateCheckpointId(

Review Comment:
   Let me make sure I understand the flow here:
   
   1. Micro-batch ends, we call `updateCheckpointId`
   2. This goes through every stateful operator and calls 
`updateCheckpointIdForOperator`
   3. For each operator, we call into its `getCheckpointInfo` method
       1. That method will access the `checkpointInfoAccumulator`
       2. The `checkpointInfoAccumulator` is appended to using the unique ID 
from the state store after processing all data on the task
    4. In the future, we'll write this to the commit log.
   
   Is this right?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala:
##########
@@ -233,6 +238,12 @@ case class StateStoreMetrics(
     memoryUsedBytes: Long,
     customMetrics: Map[StateStoreCustomMetric, Long])
 
+case class StateStoreCheckpointInfo(
+    partitionId: Int,
+    batchVersion: Long,
+    checkpointId: Option[String],
+    baseCheckpointId: Option[String])

Review Comment:
   We call this `checkpointId` in some places and `baseCheckpointId` in others? 
Can you clarify which is which, and what specifically it should be here?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -158,6 +159,20 @@ class RocksDB(
   @volatile private var changelogWriter: Option[StateStoreChangelogWriter] = 
None
   private val enableChangelogCheckpointing: Boolean = 
conf.enableChangelogCheckpointing
   @volatile private var loadedVersion = -1L   // -1 = nothing valid is loaded
+
+  // variables to manage checkpoint ID. Once a checkpoingting finishes, it 
nees to return
+  // the `lastCommittedCheckpointId` as the committed checkpointID, as well as
+  // `LastCommitBasedCheckpointId` as the checkpontID of the previous version 
that is based on.
+  // `loadedCheckpointId` is the checkpointID for the current live DB. After 
the batch finishes
+  // and checkpoint finishes, it will turn into `LastCommitBasedCheckpointId`.
+  // `sessionCheckpointId` store an ID to be used for future checkpoints. It 
is kept being used
+  // until we have to use a new one. We don't need to reuse any uniqueID, but 
reusing when possible
+  // can help debug problems.
+  @volatile private var LastCommitBasedCheckpointId: Option[String] = None
+  @volatile private var lastCommittedCheckpointId: Option[String] = None
+  @volatile private var loadedCheckpointId: Option[String] = None
+  @volatile private var sessionCheckpointId: Option[String] = None

Review Comment:
   Can you comment specifically why these are marked as `volatile`? From what I 
can tell, these are only read/written to by the query execution thread.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -803,6 +843,14 @@ class RocksDB(
   /** Get the write buffer manager and cache */
   def getWriteBufferManagerAndCache(): (WriteBufferManager, Cache) = 
(writeBufferManager, lruCache)
 
+  def getLatestCheckpointInfo(partitionId: Int): StateStoreCheckpointInfo = {

Review Comment:
   Will this ever be called if `lastCommittedCheckpointId` is `None` or 
`LastCommitBasedCheckpointId` is `None`?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -158,6 +159,20 @@ class RocksDB(
   @volatile private var changelogWriter: Option[StateStoreChangelogWriter] = 
None
   private val enableChangelogCheckpointing: Boolean = 
conf.enableChangelogCheckpointing
   @volatile private var loadedVersion = -1L   // -1 = nothing valid is loaded
+
+  // variables to manage checkpoint ID. Once a checkpoingting finishes, it 
nees to return
+  // the `lastCommittedCheckpointId` as the committed checkpointID, as well as
+  // `LastCommitBasedCheckpointId` as the checkpontID of the previous version 
that is based on.
+  // `loadedCheckpointId` is the checkpointID for the current live DB. After 
the batch finishes
+  // and checkpoint finishes, it will turn into `LastCommitBasedCheckpointId`.
+  // `sessionCheckpointId` store an ID to be used for future checkpoints. It 
is kept being used
+  // until we have to use a new one. We don't need to reuse any uniqueID, but 
reusing when possible
+  // can help debug problems.
+  @volatile private var LastCommitBasedCheckpointId: Option[String] = None
+  @volatile private var lastCommittedCheckpointId: Option[String] = None
+  @volatile private var loadedCheckpointId: Option[String] = None
+  @volatile private var sessionCheckpointId: Option[String] = None

Review Comment:
   We never read `sessionCheckpointId` and the comment doesn't really help me. 
What is it being used for? 
   
   Is there a reason `LastCommitBasedCheckpointId` is capitalized? And 
`LastCommitBasedCheckpointId` isn't even used in this PR since there is another 
TODO that says `// TODO validate baseCheckpointId`? Is that right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to