micheal-o commented on code in PR #49816:
URL: https://github.com/apache/spark/pull/49816#discussion_r1950201910
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala:
##########
@@ -2251,6 +2251,19 @@ object SQLConf {
.booleanConf
.createWithDefault(true)
+ val STATE_STORE_INSTANCE_METRICS_REPORT_LIMIT =
+
buildConf("spark.sql.streaming.stateStore.numStateStoreInstanceMetricsToReport")
+ .internal()
+ .doc(
+ "Number of state store instance metrics to include in state store
progress reporting. " +
+ "The default limit is 20% of partitions clamped between 1 and 10
(please refer to " +
+ "numStateStoreInstanceMetricsToReport for details)."
Review Comment:
This doc comment is user facing, you shouldn't refer user to a func in the
code.
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala:
##########
@@ -2251,6 +2251,19 @@ object SQLConf {
.booleanConf
.createWithDefault(true)
+ val STATE_STORE_INSTANCE_METRICS_REPORT_LIMIT =
+
buildConf("spark.sql.streaming.stateStore.numStateStoreInstanceMetricsToReport")
+ .internal()
+ .doc(
+ "Number of state store instance metrics to include in state store
progress reporting. " +
Review Comment:
nit: streaming progress report, not state store progress report
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala:
##########
@@ -504,10 +536,19 @@ trait StateStoreProvider {
* (specifically, same names) through `StateStore.metrics`.
*/
def supportedCustomMetrics: Seq[StateStoreCustomMetric] = Nil
+
+ /**
+ * Optional custom state store instance metrics that the implementation may
want to report.
+ * @note The StateStore objects created by this provider must report the
same custom metrics
+ * (specifically, same names) through `StateStore.metrics`.
+ */
+ def supportedInstanceMetrics: Seq[StateStoreInstanceMetric] = Seq.empty
}
object StateStoreProvider {
+ val INSTANCE_METRIC_SUFFIX = ".partition_"
Review Comment:
nit: this is only used in one place right now. No need to have separate
const for it. Just use it there.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala:
##########
@@ -447,6 +447,9 @@ class SymmetricHashJoinStateManager(
keyToNumValuesMetrics.memoryUsedBytes +
keyWithIndexToValueMetrics.memoryUsedBytes,
keyWithIndexToValueMetrics.customMetrics.map {
case (metric, value) => (metric.withNewDesc(desc =
newDesc(metric.desc)), value)
+ } ++ keyToNumValuesMetrics.customMetrics.collect {
+ case (metric: StateStoreInstanceMetric, value) =>
Review Comment:
nit: add comment e.g. `For instance metric, we want to collect metrics for
both state stores. Unlike other custom metrics where it is only collected for
keyWithIndexToValue`
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala:
##########
@@ -837,8 +842,16 @@ object RocksDBStateStoreProvider {
CUSTOM_METRIC_COMPACT_WRITTEN_BYTES, CUSTOM_METRIC_FLUSH_WRITTEN_BYTES,
CUSTOM_METRIC_PINNED_BLOCKS_MEM_USAGE,
CUSTOM_METRIC_NUM_EXTERNAL_COL_FAMILIES,
CUSTOM_METRIC_NUM_INTERNAL_COL_FAMILIES)
+
+ val CUSTOM_INSTANCE_METRIC_SNAPSHOT_LAST_UPLOADED = StateStoreInstanceMetric(
+ "rocksdbSnapshotLastUploaded",
+ "RocksDB: the last uploaded version of the snapshot for a specific state
store instance"
+ )
+
+ val ALL_INSTANCE_METRICS = Seq(CUSTOM_INSTANCE_METRIC_SNAPSHOT_LAST_UPLOADED)
}
+
Review Comment:
nit: why?
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala:
##########
@@ -125,6 +126,10 @@ trait StatefulOperator extends SparkPlan {
def validateAndMaybeEvolveStateSchema(
hadoopConf: Configuration, batchId: Long, stateSchemaVersion: Int):
List[StateSchemaValidationResult]
+
+ // Used to determine state store names used when allocation metric names,
+ // since join operations use multiple state stores with non-default names.
+ def isJoinOperator: Boolean = false
Review Comment:
ditto
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala:
##########
@@ -2251,6 +2251,19 @@ object SQLConf {
.booleanConf
.createWithDefault(true)
+ val STATE_STORE_INSTANCE_METRICS_REPORT_LIMIT =
+
buildConf("spark.sql.streaming.stateStore.numStateStoreInstanceMetricsToReport")
+ .internal()
+ .doc(
+ "Number of state store instance metrics to include in state store
progress reporting. " +
+ "The default limit is 20% of partitions clamped between 1 and 10
(please refer to " +
Review Comment:
see my other comment below about removing this 20%
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala:
##########
@@ -5733,6 +5746,12 @@ class SQLConf extends Serializable with Logging with
SqlApiConf {
def numStateStoreMaintenanceThreads: Int =
getConf(NUM_STATE_STORE_MAINTENANCE_THREADS)
+ def numStateStoreInstanceMetricsToReport: Int = {
+ getConf(STATE_STORE_INSTANCE_METRICS_REPORT_LIMIT).getOrElse(
+ Math.min(10, Math.max(1, defaultNumShufflePartitions / 5))
Review Comment:
Why are we doing this 20% check thing? I don't see a need for it. Why not
just
`min(10, defaultNumShufflePartitions)`?
Actually, why not just make 10 the default conf value?
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala:
##########
@@ -321,6 +321,38 @@ case class StateStoreCustomTimingMetric(name: String,
desc: String) extends Stat
SQLMetrics.createTimingMetric(sparkContext, desc)
}
+case class StateStoreInstanceMetric(
+ metricPrefix: String,
+ descPrefix: String,
+ partitionId: Option[Int] = None,
+ storeName: String = StateStoreId.DEFAULT_STORE_NAME)
+ extends StateStoreCustomMetric {
+ override def name: String = {
+ partitionId
+ .map { id =>
+
s"$metricPrefix${StateStoreProvider.INSTANCE_METRIC_SUFFIX}${id}_${storeName}"
+ }
+ .getOrElse(metricPrefix)
+ }
+
+ override def desc: String = {
+ partitionId
+ .map { id =>
+ s"$descPrefix (partitionId = $id, storeName = ${storeName})"
+ }
+ .getOrElse(descPrefix)
+ }
+
+ override def withNewDesc(desc: String): StateStoreInstanceMetric =
copy(descPrefix = desc)
+
+ override def createSQLMetric(sparkContext: SparkContext): SQLMetric =
+ SQLMetrics.createSizeMetric(sparkContext, desc)
+
+ def withNewPartition(partitionId: Int, storeName: String):
StateStoreInstanceMetric = {
Review Comment:
nit: rename to `withNewId` instead? Since we are not setting only partitionId
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala:
##########
@@ -812,4 +825,8 @@ case class StreamingSymmetricHashJoinExec(
Some((leftStateWatermark ++ rightStateWatermark ++
Some(inputWatermarkMs)).min)
}
+
+ // Used to determine state store names used when allocation metric names,
+ // since join operations use multiple state stores with non-default names.
+ override def isJoinOperator: Boolean = true
Review Comment:
why?
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala:
##########
@@ -398,6 +427,25 @@ trait StateStoreWriter
}.toMap
}
+ private def stateStoreInstanceMetrics: Map[String, SQLMetric] = {
+ val provider = StateStoreProvider.create(conf.stateStoreProviderClass)
+ val maxPartitions = conf.defaultNumShufflePartitions
+
+ (0 until maxPartitions).flatMap { partitionId =>
+ provider.supportedInstanceMetrics.flatMap { metric =>
+ val storeNames = if (isJoinOperator) {
+ SymmetricHashJoinStateManager.allStateStoreNames(LeftSide, RightSide)
Review Comment:
There is a better way to do this. Lets not expose Symmetric hash join
implementation details here. This is a higher level.
One option is to add a func to `StateStoreWriter` for implementers to
provide their state store name. e.g. `stateStoreNames` func returns
`DEFAULT_STORE_NAME` by default, but join can override this to provide its own
names. That is just one option.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala:
##########
@@ -373,8 +389,21 @@ trait StateStoreWriter
val storeMetrics = store.metrics
longMetric("numTotalStateRows") += storeMetrics.numKeys
longMetric("stateMemory") += storeMetrics.memoryUsedBytes
- storeMetrics.customMetrics.foreach { case (metric, value) =>
- longMetric(metric.name) += value
+ storeMetrics.customMetrics.foreach {
+ // Set the max for instance metrics
+ case (metric: StateStoreInstanceMetric, value) =>
Review Comment:
this code is a duplicate of the same code in
`StreamingSymmetricHashJoinExec`. Should we just have a common func for this
logic, instead of duplicating?
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala:
##########
@@ -527,8 +527,21 @@ case class StreamingSymmetricHashJoinExec(
(leftSideJoiner.numUpdatedStateRows +
rightSideJoiner.numUpdatedStateRows)
numTotalStateRows += combinedMetrics.numKeys
stateMemory += combinedMetrics.memoryUsedBytes
- combinedMetrics.customMetrics.foreach { case (metric, value) =>
- longMetric(metric.name) += value
+ combinedMetrics.customMetrics.foreach {
+ // Set the max for instance metrics
+ case (metric: StateStoreInstanceMetric, value) =>
+ // Check for cases where value < 0 and .value converts metric to 0.
+ // Metrics like last uploaded snapshot version can have an init
value of -1,
+ // which need special handling to avoid setting the metric to 0
using `.value`.
+ longMetric(metric.name).set(
+ if (longMetric(metric.name).isZero) {
+ value
+ } else {
+ Math.max(value, longMetric(metric.name).value)
Review Comment:
nit: lets add comment here for why we are using max
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala:
##########
@@ -321,6 +321,38 @@ case class StateStoreCustomTimingMetric(name: String,
desc: String) extends Stat
SQLMetrics.createTimingMetric(sparkContext, desc)
}
+case class StateStoreInstanceMetric(
+ metricPrefix: String,
+ descPrefix: String,
+ partitionId: Option[Int] = None,
Review Comment:
why is `partitionId` optional?
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala:
##########
@@ -812,4 +825,8 @@ case class StreamingSymmetricHashJoinExec(
Some((leftStateWatermark ++ rightStateWatermark ++
Some(inputWatermarkMs)).min)
}
+
+ // Used to determine state store names used when allocation metric names,
+ // since join operations use multiple state stores with non-default names.
+ override def isJoinOperator: Boolean = true
Review Comment:
I explained why we shouldn't do this in my comment below.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]