neilramaswamy commented on code in PR #47895:
URL: https://github.com/apache/spark/pull/47895#discussion_r1755323322


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StreamStreamJoinStatePartitionReader.scala:
##########
@@ -105,7 +105,7 @@ class StreamStreamJoinStatePartitionReader(
       val stateInfo = StatefulOperatorStateInfo(
         partition.sourceOptions.stateCheckpointLocation.toString,
         partition.queryId, partition.sourceOptions.operatorId,
-        partition.sourceOptions.batchId + 1, -1)
+        partition.sourceOptions.batchId + 1, -1, None)

Review Comment:
   Why is this `None`? I would image that users of the state data source reader 
now have to specify the `id` that they would like to read, given that state 
stores are now not uniquely identified by operator/partition/name, but by 
id/operator/partition/name?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala:
##########
@@ -130,6 +131,11 @@ class MicroBatchExecution(
 
   protected var watermarkTracker: WatermarkTracker = _
 
+  // Store checkpointIDs for state store checkpoints to be committed or have 
been committed to
+  // the commit log.
+  // operatorID -> (partitionID -> uniqueID)
+  private val currentCheckpointUniqueId = MutableMap[Long, Array[String]]()

Review Comment:
   `operatorID -> (partitionID -> uniqueID)`, is this supposed to mean a map of 
maps? If so, then why is the type of `currentCheckpointUniqueId` just a single 
map?
   
   I also don't fully understand why we would need a unique map for every 
operator X partition. Why is it not sufficient to have the following protocol, 
where we have _one_ unique ID for every batch:
   
   For the first batch, an ID is created and sent to all executors. When all 
tasks finish, that ID is persisted into the commit log. It is also kept in 
memory for the subsequent batch.
   
   For any other batch, if there does not exist an ID in memory from the 
previous batch, then it must be read from the commit log and brought into 
memory. (This is the restart case.)
   
   Then, using the ID in memory from the previous batch (call that `prevId`), 
this is sent to all executors in the physical plan, as well as a new ID for the 
current batch (call this `currId`). Before any processing start, executors  
must load and use the state for `prevId` to process the current batch. Then, 
they can start processing, and they upload their state as `<state file 
name>_currId.<changelog|snapshot>`.
   
   What's wrong with that?
   



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala:
##########
@@ -57,7 +59,9 @@ class IncrementalExecution(
     val prevOffsetSeqMetadata: Option[OffsetSeqMetadata],
     val offsetSeqMetadata: OffsetSeqMetadata,
     val watermarkPropagator: WatermarkPropagator,
-    val isFirstBatch: Boolean)
+    val isFirstBatch: Boolean,
+    val currentCheckpointUniqueId:
+      MutableMap[Long, Array[String]] = MutableMap[Long, Array[String]]())

Review Comment:
   I'm also confused by this. When I sketched an implementation of your 
proposal in my head, my assumption would be that `IncrementalExecution` would 
get just an ID, perhaps a _single_ Long, that would correspond to the ID that 
it would bake into the physical plan sent to executors. So why is a map needed?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to