This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 50e668c780da [SPARK-46294][SQL] Clean up semantics of init vs zero 
value
50e668c780da is described below

commit 50e668c780dadaa802c5e0adf936bc4a675886c1
Author: Davin Tjong <davin.tj...@databricks.com>
AuthorDate: Thu Dec 14 15:39:09 2023 -0800

    [SPARK-46294][SQL] Clean up semantics of init vs zero value
    
    ### What changes were proposed in this pull request?
    
    Cleaning up the semantics of init and zero value to the following. This 
also helps define what an "invalid" metric is.
    
    initValue is the starting value for a SQLMetric. If a metric has value 
equal to its initValue, then it can/should be filtered out before aggregating 
with SQLMetrics.stringValue().
    
    zeroValue defines the lowest value considered valid. If a SQLMetric is 
invalid, it is set to zeroValue upon receiving any updates, and it also reports 
zeroValue as its value to avoid exposing it to the user programatically 
(concern previouosly addressed in 
[SPARK-41442](https://issues.apache.org/jira/browse/SPARK-41442)).
    
    For many SQLMetrics, we use initValue = -1 and zeroValue = 0 to indicate 
that the metric is by default invalid. Whenever an invalid metric is updated, 
it sets itself to zeroValue and becomes valid. Invalid metrics will be filtered 
out when calculating min, max, etc. as a workaround for 
[SPARK-11013](https://issues.apache.org/jira/browse/SPARK-11013).
    
    ### Why are the changes needed?
    
    The semantics of initValue and _zeroValue in SQLMetrics is a little bit 
confusing, since they effectively mean the same thing. Changing it to the 
following would be clearer, especially in terms of defining what an "invalid" 
metric is.
    
    ### Does this PR introduce _any_ user-facing change?
    No. This shouldn't change any behavior.
    
    ### How was this patch tested?
    Existing tests.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No.
    
    Closes #44222 from davintjong-db/sqlmetric-initvalue-refactor.
    
    Authored-by: Davin Tjong <davin.tj...@databricks.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../spark/sql/execution/metric/SQLMetrics.scala    | 51 +++++++++++++---------
 1 file changed, 31 insertions(+), 20 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
index f0e58766dc63..9488c890a448 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
@@ -37,36 +37,50 @@ import org.apache.spark.util.AccumulatorContext.internOption
  * the executor side are automatically propagated and shown in the SQL UI 
through metrics. Updates
  * on the driver side must be explicitly posted using 
[[SQLMetrics.postDriverMetricUpdates()]].
  */
-class SQLMetric(val metricType: String, initValue: Long = 0L) extends 
AccumulatorV2[Long, Long] {
-  // This is a workaround for SPARK-11013.
-  // We may use -1 as initial value of the accumulator, if the accumulator is 
valid, we will
-  // update it at the end of task and the value will be at least 0. Then we 
can filter out the -1
-  // values before calculate max, min, etc.
-  private[this] var _value = initValue
-  private var _zeroValue = initValue
+class SQLMetric(
+    val metricType: String,
+    initValue: Long = 0L,
+    zeroValue: Long = 0L) extends AccumulatorV2[Long, Long] {
+  // initValue defines the initial value of the metric. zeroValue defines the 
lowest value
+  // considered valid. If a SQLMetric is invalid, it is set to zeroValue upon 
receiving any
+  // updates, and it also reports zeroValue as its value to avoid exposing it 
to the user
+  // programatically.
+  //
+  // For many SQLMetrics, we use initValue = -1 and zeroValue = 0 to indicate 
that the metric is
+  // by default invalid. At the end of a task, we will update the metric 
making it valid, and the
+  // invalid metrics will be filtered out when calculating min, max, etc. as a 
workaround for
+  // SPARK-11013.
+  private var _value = initValue
 
   override def copy(): SQLMetric = {
-    val newAcc = new SQLMetric(metricType, _value)
-    newAcc._zeroValue = initValue
+    val newAcc = new SQLMetric(metricType, initValue, zeroValue)
+    newAcc._value = _value
     newAcc
   }
 
-  override def reset(): Unit = _value = _zeroValue
+  override def reset(): Unit = _value = initValue
 
   override def merge(other: AccumulatorV2[Long, Long]): Unit = other match {
     case o: SQLMetric =>
-      if (o.value > 0) {
-        if (_value < 0) _value = 0
+      if (o.isValid) {
+        if (!isValid) _value = zeroValue
         _value += o.value
       }
     case _ => throw QueryExecutionErrors.cannotMergeClassWithOtherClassError(
       this.getClass.getName, other.getClass.getName)
   }
 
-  override def isZero: Boolean = _value == _zeroValue
+  // This is used to filter out metrics. Metrics with value equal to initValue 
should
+  // be filtered out, since they are either invalid or safe to filter without 
changing
+  // the aggregation defined in [[SQLMetrics.stringValue]].
+  // Note that we don't use zeroValue here since we may want to collect 
zeroValue metrics
+  // for calculating min, max, etc. See SPARK-11013.
+  override def isZero: Boolean = _value == initValue
+
+  def isValid: Boolean = _value >= zeroValue
 
   override def add(v: Long): Unit = {
-    if (_value < 0) _value = 0
+    if (!isValid) _value = zeroValue
     _value += v
   }
 
@@ -78,12 +92,9 @@ class SQLMetric(val metricType: String, initValue: Long = 
0L) extends Accumulato
 
   def +=(v: Long): Unit = add(v)
 
-  // We may use -1 as initial value of the accumulator, so that the SQL UI can 
filter out
-  // invalid accumulator values (0 is a valid metric value) when calculating 
min, max, etc.
-  // However, users can also access the SQL metrics values programmatically 
via this method.
-  // We should be consistent with the SQL UI and don't expose -1 to users.
-  // See `SQLMetrics.stringValue`. When there is no valid accumulator values, 
0 is the metric value.
-  override def value: Long = if (_value < 0) 0 else _value
+  // _value may be invalid, in many cases being -1. We should not expose it to 
the user
+  // and instead return zeroValue.
+  override def value: Long = if (!isValid) zeroValue else _value
 
   // Provide special identifier as metadata so we can tell that this is a 
`SQLMetric` later
   override def toInfo(update: Option[Any], value: Option[Any]): 
AccumulableInfo = {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to