anishshri-db commented on code in PR #50123:
URL: https://github.com/apache/spark/pull/50123#discussion_r2022136053


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala:
##########
@@ -129,10 +198,66 @@ class StateStoreCoordinatorRef private(rpcEndpointRef: 
RpcEndpointRef) {
  * Class for coordinating instances of [[StateStore]]s loaded in executors 
across the cluster,
  * and get their locations for job scheduling.
  */
-private class StateStoreCoordinator(override val rpcEnv: RpcEnv)
-    extends ThreadSafeRpcEndpoint with Logging {
+private class StateStoreCoordinator(
+    override val rpcEnv: RpcEnv,
+    val sqlConf: SQLConf)
+  extends ThreadSafeRpcEndpoint with Logging {
   private val instances = new mutable.HashMap[StateStoreProviderId, 
ExecutorCacheTaskLocation]
 
+  // Stores the latest snapshot upload event for a specific state store
+  private val stateStoreLatestUploadedSnapshot =
+    new mutable.HashMap[StateStoreProviderId, SnapshotUploadEvent]
+
+  // Default snapshot upload event to use when a provider has never uploaded a 
snapshot
+  private val defaultSnapshotUploadEvent = SnapshotUploadEvent(0, 0)
+
+  // Stores the last timestamp in milliseconds for each queryRunId indicating 
when the
+  // coordinator did a report on instances lagging behind on snapshot uploads.
+  // The initial timestamp is defaulted to 0 milliseconds.
+  private val lastFullSnapshotLagReportTimeMs = new mutable.HashMap[UUID, Long]
+
+  // Stores the time and version registered at the query run's start.
+  // Queries that started recently should not have their state stores reported 
as lagging
+  // since we may not have all the information yet.
+  private val queryRunStartingPoint = new mutable.HashMap[UUID, QueryStartInfo]
+
+  def shouldCoordinatorReportSnapshotLag(
+      runId: UUID,
+      referenceVersion: Long,
+      referenceTimestamp: Long): Boolean = {
+    // Definitely do not report if it is disabled or the corresponding run id 
did not start yet.
+    if (!sqlConf.stateStoreCoordinatorReportSnapshotUploadLag ||
+        !queryRunStartingPoint.contains(runId)) {
+      false
+    } else {
+      // Determine alert thresholds from configurations for both time and 
version differences.
+      val snapshotVersionDeltaMultiplier =
+        sqlConf.stateStoreCoordinatorMultiplierForMinVersionDiffToLog
+      val maintenanceIntervalMultiplier = 
sqlConf.stateStoreCoordinatorMultiplierForMinTimeDiffToLog
+      val minDeltasForSnapshot = sqlConf.stateStoreMinDeltasForSnapshot
+      val maintenanceInterval = sqlConf.streamingMaintenanceInterval
+
+      // Use the configured multipliers to determine the proper alert 
thresholds
+      val minVersionDeltaForLogging = snapshotVersionDeltaMultiplier * 
minDeltasForSnapshot
+      val minTimeDeltaForLogging = maintenanceIntervalMultiplier * 
maintenanceInterval
+
+      // Do not report any instance as lagging if this query run started 
recently, since the
+      // coordinator may be missing some information from the state stores.
+      // A run is considered recent if the time between now and the start of 
the run does not pass
+      // the time requirement for lagging instances.
+      // Similarly, the run is also considered too recent if not enough 
versions have passed
+      // since the start of the run.
+      val queryStartInfo = queryRunStartingPoint(runId)
+
+      referenceTimestamp - queryStartInfo.timestamp > minTimeDeltaForLogging &&
+      referenceVersion - queryStartInfo.version > minVersionDeltaForLogging
+    }
+  }
+
+  def coordinatorLagReportInterval: Long = {
+    sqlConf.stateStoreCoordinatorSnapshotLagReportInterval

Review Comment:
   Can just move to single line without braces ? also - do we need it to be 
public ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to