ericm-db commented on code in PR #49816:
URL: https://github.com/apache/spark/pull/49816#discussion_r1945082257
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala:
##########
@@ -321,6 +321,37 @@ case class StateStoreCustomTimingMetric(name: String,
desc: String) extends Stat
SQLMetrics.createTimingMetric(sparkContext, desc)
}
+case class StateStoreCustomPartitionMetric(
+ namePrefix: String,
Review Comment:
nit: metricPrefix
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreIntegrationSuite.scala:
##########
@@ -270,4 +283,139 @@ class RocksDBStateStoreIntegrationSuite extends StreamTest
assert(changelogVersionsPresent(dirForPartition0) == List(3L, 4L))
assert(snapshotVersionsPresent(dirForPartition0).contains(5L))
}
+
+ private val snapshotLagMetricPrefix =
+ "rocksdbSnapshotLastUploaded" + StateStoreProvider.PARTITION_METRIC_SUFFIX
+ private def snapshotLagMetricName(partitionId: Long): String =
Review Comment:
nit: use brackets
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreIntegrationSuite.scala:
##########
@@ -270,4 +283,139 @@ class RocksDBStateStoreIntegrationSuite extends StreamTest
assert(changelogVersionsPresent(dirForPartition0) == List(3L, 4L))
assert(snapshotVersionsPresent(dirForPartition0).contains(5L))
}
+
+ private val snapshotLagMetricPrefix =
Review Comment:
nit: define at the top of the class, with the name
`SNAPSHOT_LAG_METRIC_PREFIX`
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala:
##########
@@ -504,10 +535,19 @@ trait StateStoreProvider {
* (specifically, same names) through `StateStore.metrics`.
*/
def supportedCustomMetrics: Seq[StateStoreCustomMetric] = Nil
+
+ /**
+ * Optional custom partition-specific metrics that the implementation may
want to report.
+ * @note The StateStore objects created by this provider must report the
same custom metrics
+ * (specifically, same names) through `StateStore.metrics`.
+ */
+ def supportedCustomPartitionMetrics: Seq[StateStoreCustomPartitionMetric] =
Nil
Review Comment:
Let's set this to Seq.empty to begin
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreIntegrationSuite.scala:
##########
@@ -270,4 +283,139 @@ class RocksDBStateStoreIntegrationSuite extends StreamTest
assert(changelogVersionsPresent(dirForPartition0) == List(3L, 4L))
assert(snapshotVersionsPresent(dirForPartition0).contains(5L))
}
+
+ private val snapshotLagMetricPrefix =
+ "rocksdbSnapshotLastUploaded" + StateStoreProvider.PARTITION_METRIC_SUFFIX
+ private def snapshotLagMetricName(partitionId: Long): String =
+ snapshotLagMetricPrefix + partitionId
+
+ testWithChangelogCheckpointingEnabled(
+ "SPARK-51097: Verify snapshot lag metric is updated correctly with
RocksDBStateStoreProvider"
+ ) {
+ withSQLConf(
+ SQLConf.STATE_STORE_PROVIDER_CLASS.key ->
classOf[RocksDBStateStoreProvider].getName,
+ SQLConf.STREAMING_MAINTENANCE_INTERVAL.key -> "500",
+ SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key -> "1",
+ SQLConf.STATE_STORE_PARTITION_METRICS_REPORT_LIMIT.key -> "5"
+ ) {
+ withTempDir { checkpointDir =>
+ val inputData = MemoryStream[String]
+ val result = inputData.toDS().dropDuplicates()
+
+ testStream(result, outputMode = OutputMode.Update)(
+ StartStream(checkpointLocation = checkpointDir.getCanonicalPath),
+ AddData(inputData, "a"),
+ ProcessAllAvailable(),
+ Execute { _ =>
+ Thread.sleep(500)
+ },
+ AddData(inputData, "b"),
+ ProcessAllAvailable(),
+ Execute { _ =>
+ Thread.sleep(500)
+ },
+ AddData(inputData, "c"),
+ ProcessAllAvailable(),
+ Execute { _ =>
+ Thread.sleep(500)
+ },
+ AddData(inputData, "d"),
+ ProcessAllAvailable(),
+ CheckNewAnswer("a", "b", "c", "d"),
+ Execute { q =>
+ // Make sure only smallest K active metrics are published
+ eventually(timeout(Span(10, Seconds))) {
+ val partitionMetrics = q.lastProgress
+ .stateOperators(0)
+ .customMetrics
+ .asScala
+ .view
+ .filterKeys(_.startsWith(snapshotLagMetricPrefix))
+ // Determined by STATE_STORE_PARTITION_METRICS_REPORT_LIMIT for
this scenario
+ assert(
+ partitionMetrics.size == q.sparkSession.conf
+ .get(SQLConf.STATE_STORE_PARTITION_METRICS_REPORT_LIMIT)
+ )
+ assert(partitionMetrics.forall(_._2 >= 0))
+ // Should all be the same in this situation
+ assert(partitionMetrics.values.toSet.size == 1)
+ }
+ },
+ StopStream
+ )
+ }
+ }
+ }
+
+ testWithChangelogCheckpointingEnabled(
+ "SPARK-51097: Verify snapshot lag metric is updated correctly with " +
+ "SkipMaintenanceOnCertainPartitionsProvider"
+ ) {
+ withSQLConf(
+ SQLConf.STATE_STORE_PROVIDER_CLASS.key ->
+ classOf[SkipMaintenanceOnCertainPartitionsProvider].getName,
+ SQLConf.STREAMING_MAINTENANCE_INTERVAL.key -> "500",
+ SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key -> "1",
+ SQLConf.STATE_STORE_PARTITION_METRICS_REPORT_LIMIT.key -> "2"
+ ) {
+ withTempDir { checkpointDir =>
+ val inputData = MemoryStream[String]
+ val result = inputData.toDS().dropDuplicates()
+
+ testStream(result, outputMode = OutputMode.Update)(
+ StartStream(checkpointLocation = checkpointDir.getCanonicalPath),
+ AddData(inputData, "a"),
+ ProcessAllAvailable(),
+ Execute { _ =>
+ Thread.sleep(500)
+ },
+ AddData(inputData, "b"),
+ ProcessAllAvailable(),
+ Execute { _ =>
+ Thread.sleep(500)
+ },
+ AddData(inputData, "c"),
+ ProcessAllAvailable(),
+ Execute { _ =>
+ Thread.sleep(500)
+ },
+ AddData(inputData, "d"),
+ ProcessAllAvailable(),
+ CheckNewAnswer("a", "b", "c", "d"),
+ Execute { q =>
+ // Partitions getting skipped (id 0 and 1) do not have an uploaded
version, leaving
+ // the metric empty.
+ eventually(timeout(Span(10, Seconds))) {
Review Comment:
Hm, I think I'm missing something from our offline conversation. We still
want to emit metrics for these partitions, right?
Say there is one partition that hasn't uploaded anything, and every other
partition is updated with the most recent version. If we don't emit the lagging
partition, and look at this metric, we would think that maintenance is fine and
that there is no lag anywhere, which is not true.
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreIntegrationSuite.scala:
##########
@@ -270,4 +283,139 @@ class RocksDBStateStoreIntegrationSuite extends StreamTest
assert(changelogVersionsPresent(dirForPartition0) == List(3L, 4L))
assert(snapshotVersionsPresent(dirForPartition0).contains(5L))
}
+
+ private val snapshotLagMetricPrefix =
+ "rocksdbSnapshotLastUploaded" + StateStoreProvider.PARTITION_METRIC_SUFFIX
+ private def snapshotLagMetricName(partitionId: Long): String =
+ snapshotLagMetricPrefix + partitionId
+
+ testWithChangelogCheckpointingEnabled(
+ "SPARK-51097: Verify snapshot lag metric is updated correctly with
RocksDBStateStoreProvider"
+ ) {
+ withSQLConf(
+ SQLConf.STATE_STORE_PROVIDER_CLASS.key ->
classOf[RocksDBStateStoreProvider].getName,
+ SQLConf.STREAMING_MAINTENANCE_INTERVAL.key -> "500",
+ SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key -> "1",
+ SQLConf.STATE_STORE_PARTITION_METRICS_REPORT_LIMIT.key -> "5"
+ ) {
+ withTempDir { checkpointDir =>
+ val inputData = MemoryStream[String]
+ val result = inputData.toDS().dropDuplicates()
+
+ testStream(result, outputMode = OutputMode.Update)(
+ StartStream(checkpointLocation = checkpointDir.getCanonicalPath),
+ AddData(inputData, "a"),
+ ProcessAllAvailable(),
+ Execute { _ =>
+ Thread.sleep(500)
+ },
+ AddData(inputData, "b"),
+ ProcessAllAvailable(),
+ Execute { _ =>
+ Thread.sleep(500)
+ },
+ AddData(inputData, "c"),
+ ProcessAllAvailable(),
+ Execute { _ =>
+ Thread.sleep(500)
+ },
+ AddData(inputData, "d"),
+ ProcessAllAvailable(),
+ CheckNewAnswer("a", "b", "c", "d"),
+ Execute { q =>
+ // Make sure only smallest K active metrics are published
+ eventually(timeout(Span(10, Seconds))) {
+ val partitionMetrics = q.lastProgress
+ .stateOperators(0)
+ .customMetrics
+ .asScala
+ .view
+ .filterKeys(_.startsWith(snapshotLagMetricPrefix))
+ // Determined by STATE_STORE_PARTITION_METRICS_REPORT_LIMIT for
this scenario
+ assert(
+ partitionMetrics.size == q.sparkSession.conf
+ .get(SQLConf.STATE_STORE_PARTITION_METRICS_REPORT_LIMIT)
+ )
+ assert(partitionMetrics.forall(_._2 >= 0))
Review Comment:
How much variance is here? By the end of the test do we know what we expect
the metric to be? It seems that we know they should all be the same, but we
don't know what that value is?
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreIntegrationSuite.scala:
##########
@@ -270,4 +283,139 @@ class RocksDBStateStoreIntegrationSuite extends StreamTest
assert(changelogVersionsPresent(dirForPartition0) == List(3L, 4L))
assert(snapshotVersionsPresent(dirForPartition0).contains(5L))
}
+
+ private val snapshotLagMetricPrefix =
+ "rocksdbSnapshotLastUploaded" + StateStoreProvider.PARTITION_METRIC_SUFFIX
+ private def snapshotLagMetricName(partitionId: Long): String =
+ snapshotLagMetricPrefix + partitionId
+
+ testWithChangelogCheckpointingEnabled(
+ "SPARK-51097: Verify snapshot lag metric is updated correctly with
RocksDBStateStoreProvider"
+ ) {
+ withSQLConf(
+ SQLConf.STATE_STORE_PROVIDER_CLASS.key ->
classOf[RocksDBStateStoreProvider].getName,
+ SQLConf.STREAMING_MAINTENANCE_INTERVAL.key -> "500",
+ SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key -> "1",
+ SQLConf.STATE_STORE_PARTITION_METRICS_REPORT_LIMIT.key -> "5"
+ ) {
+ withTempDir { checkpointDir =>
+ val inputData = MemoryStream[String]
+ val result = inputData.toDS().dropDuplicates()
+
+ testStream(result, outputMode = OutputMode.Update)(
+ StartStream(checkpointLocation = checkpointDir.getCanonicalPath),
+ AddData(inputData, "a"),
+ ProcessAllAvailable(),
+ Execute { _ =>
+ Thread.sleep(500)
Review Comment:
I'm not a huge fan of this many Thread.sleep, is there a way to get rid of
some of these or reduce the duration?
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala:
##########
@@ -398,6 +411,18 @@ trait StateStoreWriter
}.toMap
}
+ private def stateStoreCustomPartitionMetrics: Map[String, SQLMetric] = {
Review Comment:
Can you leave some comments on what we are doing here and why? ie why we
have to populate the map in this way?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]