neilramaswamy commented on code in PR #47895:
URL: https://github.com/apache/spark/pull/47895#discussion_r1764013176
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala:
##########
@@ -182,6 +191,27 @@ trait StateStoreWriter extends StatefulOperator with
PythonSQLMetrics { self: Sp
}
}
+ val checkpointInfoAccumulator:
CollectionAccumulator[StateStoreCheckpointInfo] = {
+
SparkContext.getActive.map(_.collectionAccumulator[StateStoreCheckpointInfo]).get
+ }
+
+ def getCheckpointInfo(): Array[StateStoreCheckpointInfo] = {
+ assert(conf.stateStoreCheckpointFormatVersion >= 2)
+ val ret = checkpointInfoAccumulator
+ .value
+ .asScala
+ .toSeq
+ .groupBy(_.partitionId)
+ .map {
+ case (key, values) => key -> values.head
+ }
Review Comment:
And as discussed earlier today offline, this has the issue of not working if
the same partition has multiple state stores, e.g. in a stream-stream join.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala:
##########
@@ -182,6 +191,27 @@ trait StateStoreWriter extends StatefulOperator with
PythonSQLMetrics { self: Sp
}
}
+ val checkpointInfoAccumulator:
CollectionAccumulator[StateStoreCheckpointInfo] = {
+
SparkContext.getActive.map(_.collectionAccumulator[StateStoreCheckpointInfo]).get
+ }
+
+ def getCheckpointInfo(): Array[StateStoreCheckpointInfo] = {
+ assert(conf.stateStoreCheckpointFormatVersion >= 2)
+ val ret = checkpointInfoAccumulator
+ .value
+ .asScala
+ .toSeq
+ .groupBy(_.partitionId)
+ .map {
+ case (key, values) => key -> values.head
+ }
Review Comment:
And as discussed earlier today offline, this has the issue of not working if
the same partition has multiple state stores, e.g. in a stream-stream join,
which is actually a very serious issue.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]