zifeif2 commented on code in PR #52773:
URL: https://github.com/apache/spark/pull/52773#discussion_r2491882076
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala:
##########
@@ -339,6 +362,12 @@ private class StateStoreCoordinator(
val laggingStores =
findLaggingStores(queryRunId, latestVersion, currentTimestamp,
isTerminatingTrigger)
if (laggingStores.nonEmpty) {
+ if (shouldAutoSnapshotForLaggingStores) {
Review Comment:
Good catch! I fixed this bug in the next version.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala:
##########
@@ -574,6 +574,19 @@ case class RangeKeyScanStateEncoderSpec(
*/
trait StateStoreProvider {
+ // Track whether this state store instance is lagging behind in snapshot
uploads.
Review Comment:
Thanks for the feedback! My next version stores this variable in each
StateStore instance
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala:
##########
@@ -116,6 +116,17 @@ private case class GetLaggingStoresForTesting(
private object StopCoordinator
extends StateStoreCoordinatorMessage
+/**
+ * Status information about state stores on this executor.
+ * @param shouldForceSnapshotUpload Whether the current provider should force
a snapshot
+ * upload on next commit
+ * @param providerIdsToUnload The list of provider IDs that should be unloaded
from this executor.
+ */
+case class ReportActiveInstanceResponse(
Review Comment:
Could you specify where it looks wrong? I thought parameters on the second
line would always indent by 4 whitespaces. Thanks!
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinatorSuite.scala:
##########
@@ -208,7 +208,7 @@ class StateStoreCoordinatorSuite extends SparkFunSuite with
SharedSparkContext {
if (badPartitions.contains(partitionId)) {
assert(latestSnapshotVersion.getOrElse(0) == 0)
} else {
- assert(latestSnapshotVersion.get >= 0)
+ assert(latestSnapshotVersion.get > 0)
Review Comment:
Actually, looks like HDFS snapshot version start from 0 so I should change
it back to >= 0 in case there is only one snapshot taken in HDFS state store
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]