HeartSaVioR commented on code in PR #49816:
URL: https://github.com/apache/spark/pull/49816#discussion_r1961616963
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala:
##########
@@ -281,10 +285,20 @@ object StateStoreMetrics {
customMetric -> sumOfMetrics
}.toMap
+ val distinctInstanceMetrics =
allMetrics.flatMap(_.instanceMetrics.keys).distinct
+ val instanceMetrics = allMetrics.flatMap(_.instanceMetrics)
+ val combinedInstanceMetrics = distinctInstanceMetrics.map { instanceMetric
=>
+ val sameMetrics = instanceMetrics.filter(_._1 == instanceMetric)
+ // Use the instance metric's custom ordering to select the combined
metric value
+ val selectedMetrics = sameMetrics.map(_._2).min(instanceMetric.ordering)
+ instanceMetric -> selectedMetrics
Review Comment:
Just to make very clear, will the name be different for every state store
instance despite the same metric here? I don't see us doing top N, hence I
assume you're just doing some accumulation if there are "somehow" multiple
values for the same metric, which does not happen in normal.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala:
##########
@@ -250,11 +250,15 @@ class WrappedReadStateStore(store: StateStore) extends
ReadStateStore {
* @param customMetrics Custom implementation-specific metrics
* The metrics reported through this must have the same
`name` as those
* reported by `StateStoreProvider.customMetrics`.
+ * @param instanceMetrics Custom implementation-specific metrics that are
specific to state stores
+ * The metrics reported through this must have the same
`name` as those
+ * reported by `StateStoreProvider.customMetrics`.
Review Comment:
nit: `supportedInstanceMetrics`? I guess you've just copied from the above.
That said, the above should also be supportedCustomMetrics, though it's beyond
the scope of PR (so up to you).
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala:
##########
@@ -321,6 +335,73 @@ case class StateStoreCustomTimingMetric(name: String,
desc: String) extends Stat
SQLMetrics.createTimingMetric(sparkContext, desc)
}
+trait StateStoreInstanceMetric {
+ def metricPrefix: String
+ def descPrefix: String
+ def descNotes: String
+ def partitionId: Option[Int]
+ def storeName: String
+ def stateStoreProvider: String
Review Comment:
nit: maybe it's not needed as we haven't had any desire to add this for
StateStoreCustomMetric?
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala:
##########
@@ -321,6 +335,73 @@ case class StateStoreCustomTimingMetric(name: String,
desc: String) extends Stat
SQLMetrics.createTimingMetric(sparkContext, desc)
}
+trait StateStoreInstanceMetric {
+ def metricPrefix: String
+ def descPrefix: String
+ def descNotes: String
+ def partitionId: Option[Int]
+ def storeName: String
+ def stateStoreProvider: String
+ def initValue: Long
+
+ def createSQLMetric(sparkContext: SparkContext): SQLMetric
+
+ /** This determines the selection of instance metrics during progress
reporting */
+ def ordering: Ordering[Long]
+
+ /** Should this instance metric be reported if it is unchanged from its
initial value */
+ def ignoreIfUnchanged: Boolean
+
+ def name: String = {
+ assert(partitionId.isDefined, "Partition ID must be defined for instance
metric name")
+
s"$stateStoreProvider.$metricPrefix.partition_${partitionId.get}_$storeName"
+ }
+
+ def desc: String = {
+ assert(partitionId.isDefined, "Partition ID must be defined for instance
metric description")
+ s"$stateStoreProvider: $descPrefix " +
+ s"(partitionId = ${partitionId.get}, storeName = $storeName, notes =
$descNotes)"
+ }
+
+ def withNewDescNotes(descNotes: String): StateStoreInstanceMetric
Review Comment:
Do we have notes for description as well? I haven't seen this yet from
existing metrics.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala:
##########
@@ -441,13 +441,20 @@ class SymmetricHashJoinStateManager(
val keyToNumValuesMetrics = keyToNumValues.metrics
val keyWithIndexToValueMetrics = keyWithIndexToValue.metrics
def newDesc(desc: String): String =
s"${joinSide.toString.toUpperCase(Locale.ROOT)}: $desc"
+ def newDescNotes: String = joinSide.toString.toUpperCase(Locale.ROOT)
StateStoreMetrics(
keyWithIndexToValueMetrics.numKeys, // represent each buffered row
only once
keyToNumValuesMetrics.memoryUsedBytes +
keyWithIndexToValueMetrics.memoryUsedBytes,
keyWithIndexToValueMetrics.customMetrics.map {
case (metric, value) => (metric.withNewDesc(desc =
newDesc(metric.desc)), value)
- }
+ },
+ // We want to collect instance metrics from both state stores
+ (keyWithIndexToValueMetrics.instanceMetrics ++
keyToNumValuesMetrics.instanceMetrics)
+ .collect {
+ case (metric, value) =>
+ (metric.withNewDescNotes(descNotes = newDescNotes), value)
Review Comment:
stateStoreName contains the side information, so if the metric name contains
state store name, we don't need the note.
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreIntegrationSuite.scala:
##########
@@ -270,4 +286,237 @@ class RocksDBStateStoreIntegrationSuite extends StreamTest
assert(changelogVersionsPresent(dirForPartition0) == List(3L, 4L))
assert(snapshotVersionsPresent(dirForPartition0).contains(5L))
}
+
+ private def snapshotLagMetricName(
+ partitionId: Long,
+ storeName: String = StateStoreId.DEFAULT_STORE_NAME): String = {
+ s"$SNAPSHOT_LAG_METRIC_PREFIX${partitionId}_$storeName"
+ }
+
+ testWithChangelogCheckpointingEnabled(
+ "SPARK-51097: Verify snapshot lag metrics are updated correctly with
RocksDBStateStoreProvider"
+ ) {
+ withSQLConf(
+ SQLConf.STATE_STORE_PROVIDER_CLASS.key ->
classOf[RocksDBStateStoreProvider].getName,
+ SQLConf.STREAMING_MAINTENANCE_INTERVAL.key -> "100",
+ SQLConf.STREAMING_NO_DATA_PROGRESS_EVENT_INTERVAL.key -> "10",
+ SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key -> "1",
+ SQLConf.STATE_STORE_INSTANCE_METRICS_REPORT_LIMIT.key -> "5"
Review Comment:
nit: let's set this smaller - for unit tests, the default value of shuffle
partition is 5
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala:
##########
@@ -391,13 +434,60 @@ trait StateStoreWriter
}
}
+ protected def setStoreCustomMetrics(customMetrics:
Map[StateStoreCustomMetric, Long]): Unit = {
+ customMetrics.foreach {
+ case (metric, value) =>
+ longMetric(metric.name) += value
+ }
+ }
+
+ protected def setStoreInstanceMetrics(
+ instanceMetrics: Map[StateStoreInstanceMetric, Long]): Unit = {
+ instanceMetrics.foreach {
+ case (metric, value) =>
+ // Set the max for instance metrics
+ // Check for cases where value < 0 and .value converts metric to 0
+ // Some metrics like last uploaded snapshot version can have an
initial value of -1,
+ // which need special handling to avoid setting the metric to 0 using
`.value`.
+ longMetric(metric.name).set(
+ if (longMetric(metric.name).isZero) {
+ value
+ } else {
+ // Use max to grab the most updated value across all state store
instances,
Review Comment:
I have been confused from this; we set instance metric to have storeName and
partition ID in the metric. Once we do that, is there the case where we have
multiple values for the metric? If this is ever happening, could you please
help describing the case, and whether max would normally fit on aggregation?
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala:
##########
@@ -321,6 +335,73 @@ case class StateStoreCustomTimingMetric(name: String,
desc: String) extends Stat
SQLMetrics.createTimingMetric(sparkContext, desc)
}
+trait StateStoreInstanceMetric {
+ def metricPrefix: String
+ def descPrefix: String
+ def descNotes: String
+ def partitionId: Option[Int]
+ def storeName: String
+ def stateStoreProvider: String
+ def initValue: Long
+
+ def createSQLMetric(sparkContext: SparkContext): SQLMetric
+
+ /** This determines the selection of instance metrics during progress
reporting */
Review Comment:
Probably good to mention about the direction, e.g. if you want to do TOP N
then ordering should be desc/asc (not yet to read the part so whatever fits).
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala:
##########
@@ -281,10 +285,20 @@ object StateStoreMetrics {
customMetric -> sumOfMetrics
}.toMap
+ val distinctInstanceMetrics =
allMetrics.flatMap(_.instanceMetrics.keys).distinct
Review Comment:
nit: probably `distinctInstanceMetricKeys`? Just to be clearer to
distinguish with the below.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala:
##########
@@ -320,11 +327,48 @@ trait StateStoreWriter
* the driver after this SparkPlan has been executed and metrics have been
updated.
*/
def getProgress(): StateOperatorProgress = {
+ // Still publish instance metrics that are marked with ignoreIfUnchanged,
+ // as those unchanged metric values may contain important information
+ val instanceMetricsToReport = stateStoreInstanceMetrics
+ .filter {
+ case (name, _) =>
+ val metric = longMetric(name)
+ val metricConfig = instanceMetricConfiguration(name)
+ // Keep instance metrics that are updated or aren't marked to be
ignored,
+ // as their initial value could still be important.
+ !metric.isZero || !metricConfig.ignoreIfUnchanged
+ }
+ .groupBy {
+ // Group all instance metrics underneath their common metric prefix
+ // to ignore partition and store names.
+ case (name, _) =>
+ (
+ instanceMetricConfiguration(name).stateStoreProvider,
+ instanceMetricConfiguration(name).metricPrefix
+ )
+ }
+ .flatMap {
+ case (_, metrics) =>
+ // Select at most N metrics based on the metric's defined ordering
+ // to report to the driver. For example, ascending order would be
taking the N smallest.
Review Comment:
nit: Let's give this information to the interface. Let's assume developers
of new metric would only be able to see the interface to begin with.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala:
##########
@@ -391,13 +434,60 @@ trait StateStoreWriter
}
}
+ protected def setStoreCustomMetrics(customMetrics:
Map[StateStoreCustomMetric, Long]): Unit = {
+ customMetrics.foreach {
+ case (metric, value) =>
+ longMetric(metric.name) += value
+ }
+ }
+
+ protected def setStoreInstanceMetrics(
+ instanceMetrics: Map[StateStoreInstanceMetric, Long]): Unit = {
+ instanceMetrics.foreach {
+ case (metric, value) =>
+ // Set the max for instance metrics
+ // Check for cases where value < 0 and .value converts metric to 0
+ // Some metrics like last uploaded snapshot version can have an
initial value of -1,
+ // which need special handling to avoid setting the metric to 0 using
`.value`.
+ longMetric(metric.name).set(
+ if (longMetric(metric.name).isZero) {
+ value
+ } else {
+ // Use max to grab the most updated value across all state store
instances,
+ // which for snapshot versions is the largest version number.
+ Math.max(value, longMetric(metric.name).value)
+ }
+ )
+ }
+ }
+
private def stateStoreCustomMetrics: Map[String, SQLMetric] = {
val provider = StateStoreProvider.create(conf.stateStoreProviderClass)
provider.supportedCustomMetrics.map {
metric => (metric.name, metric.createSQLMetric(sparkContext))
}.toMap
}
+ private def stateStoreInstanceMetrics: Map[String, SQLMetric] = {
+ stateStoreInstanceMetricObjects.map {
+ case (name, metric) => (name, metric.createSQLMetric(sparkContext))
+ }
+ }
+
+ private def stateStoreInstanceMetricObjects: Map[String,
StateStoreInstanceMetric] = {
+ val provider = StateStoreProvider.create(conf.stateStoreProviderClass)
+ val maxPartitions = conf.defaultNumShufflePartitions
Review Comment:
There could be an extreme case where `conf.defaultNumShufflePartitions` and
`getStateInfo.numPartitions` are different. The latter is the source of truth
for stateful operators.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala:
##########
@@ -391,13 +434,60 @@ trait StateStoreWriter
}
}
+ protected def setStoreCustomMetrics(customMetrics:
Map[StateStoreCustomMetric, Long]): Unit = {
+ customMetrics.foreach {
+ case (metric, value) =>
+ longMetric(metric.name) += value
+ }
+ }
+
+ protected def setStoreInstanceMetrics(
+ instanceMetrics: Map[StateStoreInstanceMetric, Long]): Unit = {
+ instanceMetrics.foreach {
+ case (metric, value) =>
+ // Set the max for instance metrics
+ // Check for cases where value < 0 and .value converts metric to 0
+ // Some metrics like last uploaded snapshot version can have an
initial value of -1,
+ // which need special handling to avoid setting the metric to 0 using
`.value`.
+ longMetric(metric.name).set(
+ if (longMetric(metric.name).isZero) {
+ value
+ } else {
+ // Use max to grab the most updated value across all state store
instances,
+ // which for snapshot versions is the largest version number.
+ Math.max(value, longMetric(metric.name).value)
+ }
+ )
+ }
+ }
+
private def stateStoreCustomMetrics: Map[String, SQLMetric] = {
val provider = StateStoreProvider.create(conf.stateStoreProviderClass)
provider.supportedCustomMetrics.map {
metric => (metric.name, metric.createSQLMetric(sparkContext))
}.toMap
}
+ private def stateStoreInstanceMetrics: Map[String, SQLMetric] = {
+ stateStoreInstanceMetricObjects.map {
+ case (name, metric) => (name, metric.createSQLMetric(sparkContext))
+ }
+ }
+
+ private def stateStoreInstanceMetricObjects: Map[String,
StateStoreInstanceMetric] = {
Review Comment:
I see this method to be called in multiple places; would the map need to be
used separately?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]