zecookiez commented on code in PR #50123:
URL: https://github.com/apache/spark/pull/50123#discussion_r2004261541
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala:
##########
@@ -168,9 +264,153 @@ private class StateStoreCoordinator(override val rpcEnv:
RpcEnv)
storeIdsToRemove.mkString(", "))
context.reply(true)
+ case ReportSnapshotUploaded(providerId, version, timestamp) =>
+ // Ignore this upload event if the registered latest version for the
provider is more recent,
+ // since it's possible that an older version gets uploaded after a new
executor uploads for
+ // the same provider but with a newer snapshot.
+ logDebug(s"Snapshot version $version was uploaded for provider
$providerId")
+ if (!stateStoreLatestUploadedSnapshot.get(providerId).exists(_.version
>= version)) {
+ stateStoreLatestUploadedSnapshot.put(providerId,
SnapshotUploadEvent(version, timestamp))
+ }
+ context.reply(true)
+
+ case ConstructLaggingInstanceReport(queryRunId, latestVersion, timestamp)
=>
+ // Only log lagging instances if the snapshot report upload is enabled,
+ // otherwise all instances will be considered lagging.
+ if (isSnapshotUploadReportEnabled) {
+ val laggingStores = findLaggingStores(queryRunId, latestVersion,
timestamp)
Review Comment:
Resolved offline - the timestamp passed in is the reference point we compare
with other upload event's timestamp, so this information is already done per
partition
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]