zifeif2 commented on code in PR #52773:
URL: https://github.com/apache/spark/pull/52773#discussion_r2491882076


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala:
##########
@@ -339,6 +362,12 @@ private class StateStoreCoordinator(
         val laggingStores =
           findLaggingStores(queryRunId, latestVersion, currentTimestamp, 
isTerminatingTrigger)
         if (laggingStores.nonEmpty) {
+          if (shouldAutoSnapshotForLaggingStores) {

Review Comment:
   Good catch! I fixed this bug in the next version.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala:
##########
@@ -574,6 +574,19 @@ case class RangeKeyScanStateEncoderSpec(
  */
 trait StateStoreProvider {
 
+  // Track whether this state store instance is lagging behind in snapshot 
uploads.

Review Comment:
   Thanks for the feedback! My next version stores this variable in each 
StateStore instance



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala:
##########
@@ -116,6 +116,17 @@ private case class GetLaggingStoresForTesting(
 private object StopCoordinator
   extends StateStoreCoordinatorMessage
 
+/**
+ * Status information about state stores on this executor.
+ * @param shouldForceSnapshotUpload Whether the current provider should force 
a snapshot
+ * upload on next commit
+ * @param providerIdsToUnload The list of provider IDs that should be unloaded 
from this executor.
+ */
+case class ReportActiveInstanceResponse(

Review Comment:
   Could you specify where it looks wrong? I thought parameters on the second 
line would always indent by 4 whitespaces. Thanks!



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinatorSuite.scala:
##########
@@ -208,7 +208,7 @@ class StateStoreCoordinatorSuite extends SparkFunSuite with 
SharedSparkContext {
           if (badPartitions.contains(partitionId)) {
             assert(latestSnapshotVersion.getOrElse(0) == 0)
           } else {
-            assert(latestSnapshotVersion.get >= 0)
+            assert(latestSnapshotVersion.get > 0)

Review Comment:
   Actually, looks like HDFS snapshot version start from 0 so I should change 
it back to >= 0 in case there is only one snapshot taken in HDFS state store



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to