anishshri-db commented on code in PR #50123:
URL: https://github.com/apache/spark/pull/50123#discussion_r2022119019
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala:
##########
@@ -2291,6 +2291,70 @@ object SQLConf {
.booleanConf
.createWithDefault(true)
+ val STATE_STORE_COORDINATOR_MULTIPLIER_FOR_MIN_VERSION_DIFF_TO_LOG =
+
buildConf("spark.sql.streaming.stateStore.multiplierForMinVersionDiffToLog")
+ .internal()
+ .doc(
+ "Determines the version threshold for logging warnings when a state
store falls behind. " +
+ "The coordinator logs a warning when the store's uploaded snapshot
version trails the " +
+ "query's latest version by the configured number of deltas needed to
create a snapshot, " +
+ "times this multiplier."
+ )
+ .version("4.1.0")
+ .longConf
+ .checkValue(k => k >= 1L, "Must be greater than or equal to 1")
+ .createWithDefault(5L)
+
+ val STATE_STORE_COORDINATOR_MULTIPLIER_FOR_MIN_TIME_DIFF_TO_LOG =
+ buildConf("spark.sql.streaming.stateStore.multiplierForMinTimeDiffToLog")
+ .internal()
+ .doc(
+ "Determines the time threshold for logging warnings when a state store
falls behind. " +
+ "The coordinator logs a warning when the store's uploaded snapshot
timestamp trails the " +
+ "current time by the configured maintenance interval, times this
multiplier."
+ )
+ .version("4.1.0")
+ .longConf
+ .checkValue(k => k >= 1L, "Must be greater than or equal to 1")
+ .createWithDefault(10L)
+
+ val STATE_STORE_COORDINATOR_REPORT_SNAPSHOT_UPLOAD_LAG =
+
buildConf("spark.sql.streaming.stateStore.coordinatorReportSnapshotUploadLag")
+ .internal()
+ .doc(
+ "When enabled, the state store coordinator will report state stores
whose snapshot " +
+ "have not been uploaded for some time. See the conf
snapshotLagReportInterval for " +
+ "the minimum time between reports, and the conf
multiplierForMinVersionDiffToLog " +
+ "and multiplierForMinTimeDiffToLog for the logging thresholds."
+ )
+ .version("4.1.0")
+ .booleanConf
+ .createWithDefault(true)
+
+ val STATE_STORE_COORDINATOR_SNAPSHOT_LAG_REPORT_INTERVAL =
+ buildConf("spark.sql.streaming.stateStore.snapshotLagReportInterval")
+ .internal()
+ .doc(
+ "The minimum amount of time between the state store coordinator's
reports on " +
+ "state store instances trailing behind in snapshot uploads."
+ )
+ .version("4.1.0")
+ .timeConf(TimeUnit.MILLISECONDS)
+ .createWithDefault(TimeUnit.MINUTES.toMillis(5))
+
+ val STATE_STORE_COORDINATOR_MAX_LAGGING_STORES_TO_REPORT =
+ buildConf("spark.sql.streaming.stateStore.maxLaggingStoresToReport")
+ .internal()
+ .doc(
+ "Maximum number of state stores the coordinator will report as
trailing in " +
+ "snapshot uploads. Stores are selected based on the most lagging
behind in " +
+ "snapshot version."
+ )
+ .version("4.1.0")
+ .intConf
+ .checkValue(k => k >= 0, "Must be greater than or equal to 0")
+ .createWithDefault(10)
Review Comment:
5 is probably enough ? or maybe some % of the total num of partitions ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]