liviazhu-db commented on code in PR #49816:
URL: https://github.com/apache/spark/pull/49816#discussion_r1945446788
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala:
##########
@@ -2251,6 +2251,22 @@ object SQLConf {
.booleanConf
.createWithDefault(true)
+ val STATE_STORE_PARTITION_METRICS_REPORT_LIMIT =
+ buildConf("spark.sql.streaming.stateStore.numPartitionMetricsToReport")
+ .internal()
+ .doc(
+ "Maximum number of partition-level metrics to include in state store
progress " +
+ "reporting. The default limit is 20% of the number of cores (with a
minimum of 1 " +
+ "partition) and with a cap of 10. This limits the metrics to the N
partitions with " +
+ "the smallest values to prevent the progress report from becoming
too large."
+ )
+ .version("4.0.0")
+ .intConf
+ .checkValue(k => k >= 0, "Must be greater than or equal to 0")
+ .createWithDefault(
+ Math.min(10, Math.min(1,
SHUFFLE_PARTITIONS.defaultValue.getOrElse(200) / 5))
Review Comment:
By your comment it should be `Math.min(10, Math.max(1,
SHUFFLE_PARTITIONS...)` right? As it is written right now I think you're always
going to get a limit of 1.
Also, I think this isn't actually going to capture the real number of
shuffle partitions, since you're not actually getting the SQLConf here. You're
going to get 200 every time. I'm not sure if you can get the real number of
shuffle partitions here, so maybe just create it with a default of 10 unless
@ericm-db knows how to actually get the SQLConf here.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -146,6 +146,7 @@ class RocksDB(
@volatile private var changelogWriter: Option[StateStoreChangelogWriter] =
None
private val enableChangelogCheckpointing: Boolean =
conf.enableChangelogCheckpointing
@volatile protected var loadedVersion: Long = -1L // -1 = nothing valid is
loaded
+ @volatile protected var lastUploadedVersion = -1L // -1 = nothing has been
uploaded
Review Comment:
I think that lastUploadedVersion is not thread-safe here. `@volatile` is
only thread-safe when you have one writer thread, but both the maintenance and
task thread can write this value. Can you make it an atomic instead?
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreIntegrationSuite.scala:
##########
@@ -270,4 +286,116 @@ class RocksDBStateStoreIntegrationSuite extends StreamTest
assert(changelogVersionsPresent(dirForPartition0) == List(3L, 4L))
assert(snapshotVersionsPresent(dirForPartition0).contains(5L))
}
+
+ private def snapshotLagMetricName(partitionId: Long): String =
+ (SNAPSHOT_LAG_METRIC_PREFIX + partitionId)
+
+ testWithChangelogCheckpointingEnabled(
+ "SPARK-51097: Verify snapshot lag metric is updated correctly with
RocksDBStateStoreProvider"
+ ) {
+ withSQLConf(
+ SQLConf.STATE_STORE_PROVIDER_CLASS.key ->
classOf[RocksDBStateStoreProvider].getName,
+ SQLConf.STREAMING_MAINTENANCE_INTERVAL.key -> "100",
+ SQLConf.STREAMING_NO_DATA_PROGRESS_EVENT_INTERVAL.key -> "10",
+ SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key -> "1",
+ SQLConf.STATE_STORE_PARTITION_METRICS_REPORT_LIMIT.key -> "5"
+ ) {
+ withTempDir { checkpointDir =>
+ val inputData = MemoryStream[String]
+ val result = inputData.toDS().dropDuplicates()
+
+ testStream(result, outputMode = OutputMode.Update)(
+ StartStream(checkpointLocation = checkpointDir.getCanonicalPath),
+ AddData(inputData, "a"),
+ ProcessAllAvailable(),
+ AddData(inputData, "b"),
+ ProcessAllAvailable(),
+ CheckNewAnswer("a", "b"),
+ Execute { q =>
+ // Make sure only smallest K active metrics are published
+ eventually(timeout(Span(10, Seconds))) {
Review Comment:
nit: I think you can use `10.seconds`, just some syntactic sugar that looks
pretty :)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]