This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 25c624f269b [SPARK-45084][SS] StateOperatorProgress to use accurate effective shuffle partition number 25c624f269b is described below commit 25c624f269bfec027ad889c1764d2904f19a2506 Author: Siying Dong <siying.d...@databricks.com> AuthorDate: Fri Sep 15 10:36:44 2023 +0900 [SPARK-45084][SS] StateOperatorProgress to use accurate effective shuffle partition number ### What changes were proposed in this pull request? Make StateOperatorProgress.numShufflePartitions to use the effective number of shuffle partitions is reported. This metric StateStoreWriter.numShufflePartitions is dropped at the same time, as it is not a metric anymore. ### Why are the changes needed? Currently, there is a numShufflePartitions "metric" reported in StateOperatorProgress part of the progress report. However, the number is reported by aggregating executors so in the case of task retry or speculative executor, the metric is higher than number of shuffle partitions for the query plan. We change the metric to use the value to use to make it more usable. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? StreamingAggregationSuite contains a unit test that validates the value ### Was this patch authored or co-authored using generative AI tooling? No. Closes #42822 from siying/numShufflePartitionsMetric. Authored-by: Siying Dong <siying.d...@databricks.com> Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com> --- .../apache/spark/sql/execution/streaming/statefulOperators.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala index b31f6151fce..67d89c7f40f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala @@ -143,7 +143,6 @@ trait StateStoreWriter extends StatefulOperator with PythonSQLMetrics { self: Sp "allRemovalsTimeMs" -> SQLMetrics.createTimingMetric(sparkContext, "time to remove"), "commitTimeMs" -> SQLMetrics.createTimingMetric(sparkContext, "time to commit changes"), "stateMemory" -> SQLMetrics.createSizeMetric(sparkContext, "memory used by state"), - "numShufflePartitions" -> SQLMetrics.createMetric(sparkContext, "number of shuffle partitions"), "numStateStoreInstances" -> SQLMetrics.createMetric(sparkContext, "number of state store instances") ) ++ stateStoreCustomMetrics ++ pythonMetrics @@ -159,6 +158,8 @@ trait StateStoreWriter extends StatefulOperator with PythonSQLMetrics { self: Sp val javaConvertedCustomMetrics: java.util.HashMap[String, java.lang.Long] = new java.util.HashMap(customMetrics.mapValues(long2Long).toMap.asJava) + // We now don't report number of shuffle partitions inside the state operator. Instead, + // it will be filled when the stream query progress is reported new StateOperatorProgress( operatorName = shortName, numRowsTotal = longMetric("numTotalStateRows").value, @@ -169,7 +170,7 @@ trait StateStoreWriter extends StatefulOperator with PythonSQLMetrics { self: Sp commitTimeMs = longMetric("commitTimeMs").value, memoryUsedBytes = longMetric("stateMemory").value, numRowsDroppedByWatermark = longMetric("numRowsDroppedByWatermark").value, - numShufflePartitions = longMetric("numShufflePartitions").value, + numShufflePartitions = stateInfo.map(_.numPartitions.toLong).getOrElse(-1L), numStateStoreInstances = longMetric("numStateStoreInstances").value, javaConvertedCustomMetrics ) @@ -183,7 +184,6 @@ trait StateStoreWriter extends StatefulOperator with PythonSQLMetrics { self: Sp assert(numStateStoreInstances >= 1, s"invalid number of stores: $numStateStoreInstances") // Shuffle partitions capture the number of tasks that have this stateful operator instance. // For each task instance this number is incremented by one. - longMetric("numShufflePartitions") += 1 longMetric("numStateStoreInstances") += numStateStoreInstances } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org