zecookiez commented on code in PR #50123:
URL: https://github.com/apache/spark/pull/50123#discussion_r2004384529
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala:
##########
@@ -168,9 +220,99 @@ private class StateStoreCoordinator(override val rpcEnv:
RpcEnv)
storeIdsToRemove.mkString(", "))
context.reply(true)
+ case SnapshotUploaded(providerId, version, timestamp) =>
+ stateStoreSnapshotVersions.put(providerId, SnapshotUploadEvent(version,
timestamp))
+ logDebug(s"Snapshot uploaded at ${providerId} with version ${version}")
+ // Report all stores that are behind in snapshot uploads
+ val (laggingStores, latestSnapshot) = findLaggingStores()
+ if (laggingStores.nonEmpty) {
+ logWarning(
+ log"StateStoreCoordinator Snapshot Lag - Number of state stores
falling behind: " +
+ log"${MDC(LogKeys.NUM_LAGGING_STORES, laggingStores.size)} " +
+ log"(Latest snapshot: ${MDC(LogKeys.SNAPSHOT_EVENT,
latestSnapshot)})"
+ )
+ laggingStores.foreach { storeProviderId =>
+ val logMessage = stateStoreSnapshotVersions.get(storeProviderId)
match {
+ case Some(snapshotEvent) =>
+ val versionDelta = latestSnapshot.version - snapshotEvent.version
+ val timeDelta = latestSnapshot.timestamp -
snapshotEvent.timestamp
+
+ log"StateStoreCoordinator Snapshot Lag - State store falling
behind " +
+ log"${MDC(LogKeys.STATE_STORE_PROVIDER_ID, providerId)} " +
+ log"(version delta: ${MDC(LogKeys.SNAPSHOT_EVENT_VERSION_DELTA,
versionDelta)}, " +
+ log"time delta: ${MDC(LogKeys.SNAPSHOT_EVENT_TIME_DELTA,
timeDelta)}ms)"
+ case None =>
+ log"StateStoreCoordinator Snapshot Lag - State store falling
behind " +
Review Comment:
Keeping this up-to-date with the newest version of this PR, if changelog
checkpointing is enabled it will not report these instances as lagging as the
upload process is tied to commit().
Since the alert threshold is tied to version + time, which are multiples of
the maintenance and snapshotting intervals, this will not mark them as late
unless if the user configures these multipliers to something low like 1
(default is 5 and 10 respectively)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]