siying commented on code in PR #47895:
URL: https://github.com/apache/spark/pull/47895#discussion_r1793840792


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala:
##########
@@ -126,12 +133,15 @@ class IncrementalExecution(
 
   /** Get the state info of the next stateful operator */
   private def nextStatefulOperationStateInfo(): StatefulOperatorStateInfo = {
-    StatefulOperatorStateInfo(
+    val operatorId = statefulOperatorId.getAndIncrement()
+    val ret = StatefulOperatorStateInfo(
       checkpointLocation,
       runId,
-      statefulOperatorId.getAndIncrement(),
+      operatorId,
       currentBatchId,
-      numStateStores)
+      numStateStores,
+      currentStateStoreCkptId.get(operatorId))

Review Comment:
   The assertion will be more straight-forward after we add the support to 
persistent the ID to commit logs. For now, it is also empty when the query is 
just started. I can leave a comment here, saying we should add an assertion 
once only batch 0 can be empty.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala:
##########
@@ -243,6 +243,7 @@ trait FlatMapGroupsWithStateExecBase
             stateManager.stateSchema,
             NoPrefixKeyStateEncoderSpec(groupingAttributes.toStructType),
             stateInfo.get.storeVersion,
+            stateInfo.get.getStateStoreCkptId(partitionId).map(_(0)),

Review Comment:
   I am a newbie in Scala. I checked but there is no `get()` in Scala array. 
three is `.apply()` but it is even more confusing to me. I'll replace those 
`(0)` with `.head`.  for the J&J case, I think (0), (1), etc looks OK.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala:
##########
@@ -255,15 +259,15 @@ private[sql] class HDFSBackedStateStoreProvider extends 
StateStoreProvider with
   }
 
   /** Get the state store for making updates to create a new `version` of the 
store. */
-  override def getStore(version: Long): StateStore = {
+  override def getStore(version: Long, uniqueId: Option[String] = None): 
StateStore = {

Review Comment:
   Right.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StreamStreamJoinStatePartitionReader.scala:
##########
@@ -105,7 +105,7 @@ class StreamStreamJoinStatePartitionReader(
       val stateInfo = StatefulOperatorStateInfo(
         partition.sourceOptions.stateCheckpointLocation.toString,
         partition.queryId, partition.sourceOptions.operatorId,
-        partition.sourceOptions.batchId + 1, -1)
+        partition.sourceOptions.batchId + 1, -1, None)

Review Comment:
   The code needs to change after we persistent the ID to commit logs. The ID 
needs to be get from the commit logs and pass it to here. For now, we can say 
state store reader isn't supported in this new mode (it's likely working 
accidentally, but it's not worth even testing it). There is already a TODO 
comment above.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to