micheal-o commented on code in PR #49816:
URL: https://github.com/apache/spark/pull/49816#discussion_r1946026553
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -146,6 +146,7 @@ class RocksDB(
@volatile private var changelogWriter: Option[StateStoreChangelogWriter] =
None
private val enableChangelogCheckpointing: Boolean =
conf.enableChangelogCheckpointing
@volatile protected var loadedVersion: Long = -1L // -1 = nothing valid is
loaded
+ @volatile protected var lastUploadedVersion = -1L // -1 = nothing has been
uploaded
Review Comment:
Lets also add comment to note that it is updated by whoever uploaded
checkpoint, which could be task or maintenance or both.
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala:
##########
@@ -2251,6 +2251,22 @@ object SQLConf {
.booleanConf
.createWithDefault(true)
+ val STATE_STORE_PARTITION_METRICS_REPORT_LIMIT =
+ buildConf("spark.sql.streaming.stateStore.numPartitionMetricsToReport")
+ .internal()
+ .doc(
+ "Maximum number of partition-level metrics to include in state store
progress " +
+ "reporting. The default limit is 20% of the number of cores (with a
minimum of 1 " +
Review Comment:
nit: This entire message can be simplified
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala:
##########
@@ -321,6 +321,37 @@ case class StateStoreCustomTimingMetric(name: String,
desc: String) extends Stat
SQLMetrics.createTimingMetric(sparkContext, desc)
}
+case class StateStoreCustomPartitionMetric(
+ metricPrefix: String,
+ descPrefix: String,
+ partitionId: Option[Int] = None)
+ extends StateStoreCustomMetric {
+ override def name: String = {
+ partitionId
+ .map { id =>
+ s"$metricPrefix${StateStoreProvider.PARTITION_METRIC_SUFFIX}$id"
+ }
+ .getOrElse(metricPrefix)
+ }
+
+ override def desc: String = {
+ partitionId
+ .map { id =>
+ s"$descPrefix (partitionId = $id)"
+ }
+ .getOrElse(descPrefix)
+ }
+
+ override def withNewDesc(desc: String): StateStoreCustomPartitionMetric =
copy(descPrefix = desc)
+
+ override def createSQLMetric(sparkContext: SparkContext): SQLMetric =
+ SQLMetrics.createPartitionMetric(sparkContext, desc)
+
+ def withPartition(partitionId: Int): StateStoreCustomPartitionMetric = {
Review Comment:
nit: follow the same naming convention: `withNewPartition`
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala:
##########
@@ -198,6 +198,12 @@ object SQLMetrics {
acc
}
+ def createPartitionMetric(sc: SparkContext, name: String, initValue: Long =
-1L): SQLMetric = {
Review Comment:
func name is `createPartitionMetric` but it is using `SIZE_METRIC`. This has
the exact code as `createSizeMetric`. So why have a func with different name
but same code?
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala:
##########
@@ -2251,6 +2251,22 @@ object SQLConf {
.booleanConf
.createWithDefault(true)
+ val STATE_STORE_PARTITION_METRICS_REPORT_LIMIT =
+ buildConf("spark.sql.streaming.stateStore.numPartitionMetricsToReport")
+ .internal()
+ .doc(
+ "Maximum number of partition-level metrics to include in state store
progress " +
+ "reporting. The default limit is 20% of the number of cores (with a
minimum of 1 " +
+ "partition) and with a cap of 10. This limits the metrics to the N
partitions with " +
+ "the smallest values to prevent the progress report from becoming
too large."
+ )
+ .version("4.0.0")
+ .intConf
+ .checkValue(k => k >= 0, "Must be greater than or equal to 0")
+ .createWithDefault(
+ Math.min(10, Math.min(1,
SHUFFLE_PARTITIONS.defaultValue.getOrElse(200) / 5))
Review Comment:
Not sure we need to getOrElse with 200 here. Shuffle partition is INT here
and always set > 0. If someone updates the default of shuffle partition, they
will likely not update this.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala:
##########
@@ -328,7 +328,9 @@ private[sql] class RocksDBStateStoreProvider
CUSTOM_METRIC_FLUSH_WRITTEN_BYTES ->
nativeOpsMetrics("totalBytesWrittenByFlush"),
CUSTOM_METRIC_PINNED_BLOCKS_MEM_USAGE ->
rocksDBMetrics.pinnedBlocksMemUsage,
CUSTOM_METRIC_NUM_EXTERNAL_COL_FAMILIES -> internalColFamilyCnt(),
- CUSTOM_METRIC_NUM_INTERNAL_COL_FAMILIES -> externalColFamilyCnt()
+ CUSTOM_METRIC_NUM_INTERNAL_COL_FAMILIES -> externalColFamilyCnt(),
+ CUSTOM_METRIC_SNAPSHOT_LAST_UPLOADED.withPartition(id.partitionId) ->
Review Comment:
Stream join operator has 4 state stores per partition. How are we handling
that?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]