siying commented on code in PR #47895:
URL: https://github.com/apache/spark/pull/47895#discussion_r1793840792
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala:
##########
@@ -126,12 +133,15 @@ class IncrementalExecution(
/** Get the state info of the next stateful operator */
private def nextStatefulOperationStateInfo(): StatefulOperatorStateInfo = {
- StatefulOperatorStateInfo(
+ val operatorId = statefulOperatorId.getAndIncrement()
+ val ret = StatefulOperatorStateInfo(
checkpointLocation,
runId,
- statefulOperatorId.getAndIncrement(),
+ operatorId,
currentBatchId,
- numStateStores)
+ numStateStores,
+ currentStateStoreCkptId.get(operatorId))
Review Comment:
The assertion will be more straight-forward after we add the support to
persistent the ID to commit logs. For now, it is also empty when the query is
just started. I can leave a comment here, saying we should add an assertion
once only batch 0 can be empty.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala:
##########
@@ -243,6 +243,7 @@ trait FlatMapGroupsWithStateExecBase
stateManager.stateSchema,
NoPrefixKeyStateEncoderSpec(groupingAttributes.toStructType),
stateInfo.get.storeVersion,
+ stateInfo.get.getStateStoreCkptId(partitionId).map(_(0)),
Review Comment:
I am a newbie in Scala. I checked but there is no `get()` in Scala array.
three is `.apply()` but it is even more confusing to me. I'll replace those
`(0)` with `.head`. for the J&J case, I think (0), (1), etc looks OK.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala:
##########
@@ -255,15 +259,15 @@ private[sql] class HDFSBackedStateStoreProvider extends
StateStoreProvider with
}
/** Get the state store for making updates to create a new `version` of the
store. */
- override def getStore(version: Long): StateStore = {
+ override def getStore(version: Long, uniqueId: Option[String] = None):
StateStore = {
Review Comment:
Right.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StreamStreamJoinStatePartitionReader.scala:
##########
@@ -105,7 +105,7 @@ class StreamStreamJoinStatePartitionReader(
val stateInfo = StatefulOperatorStateInfo(
partition.sourceOptions.stateCheckpointLocation.toString,
partition.queryId, partition.sourceOptions.operatorId,
- partition.sourceOptions.batchId + 1, -1)
+ partition.sourceOptions.batchId + 1, -1, None)
Review Comment:
The code needs to change after we persistent the ID to commit logs. The ID
needs to be get from the commit logs and pass it to here. For now, we can say
state store reader isn't supported in this new mode (it's likely working
accidentally, but it's not worth even testing it). There is already a TODO
comment above.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]