cloud-fan commented on code in PR #56550:
URL: https://github.com/apache/spark/pull/56550#discussion_r3449185950


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala:
##########
@@ -775,9 +862,9 @@ object JdbcUtils extends Logging with SQLConfHelper {
    * are used.
    *
    * Note that this method records task output metrics. It assumes the method 
is
-   * running in a task. For now, we only records the number of rows being 
written
-   * because there's no good way to measure the total bytes being written. Only
-   * effective outputs are taken into account: for example, metric will not be 
updated
+   * running in a task. Records both the number of rows being written and an 
estimate

Review Comment:
   This scaladoc now covers both `recordsWritten` and the new byte metric, but 
the "updated even with error if it doesn't support transaction" guarantee only 
holds for `recordsWritten`. That one is an internal task output metric 
(`countFailedValues=true`), so it's reported even when the task throws. The 
byte metric rides on `bytesAccumulator` — a plain 
`sparkContext.longAccumulator` with `countFailedValues=false` — and 
`Task.collectAccumulatorUpdates(taskFailed=true)` drops such external 
accumulators from failed tasks. So on the non-transactional error path the row 
count reports the dirty rows while the byte metric reports nothing. Worth 
narrowing the comment (the byte metric is only reported for successful tasks). 
May be moot depending on how the metric is reworked per the design comment.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCWriteBuilder.scala:
##########
@@ -35,14 +36,40 @@ case class JDBCWriteBuilder(schema: StructType, options: 
JdbcOptionsInWrite) ext
   }
 
   override def build(): V1Write = new V1Write {
+    // Accumulator to collect estimated bytes written across all partitions.
+    @transient private lazy val bytesAccumulator = 
SparkSession.active.sparkContext.longAccumulator
+
     override def toInsertableRelation: InsertableRelation = (data: DataFrame, 
_: Boolean) => {
       // TODO (SPARK-32595): do truncate and append atomically.
       if (isTruncate) {
         val dialect = JdbcDialects.get(options.url)
         val conn = dialect.createConnectionFactory(options)(-1)
         JdbcUtils.truncateTable(conn, options)
       }
-      JdbcUtils.saveTable(data, Some(schema), 
SQLConf.get.caseSensitiveAnalysis, options)
+      JdbcUtils.saveTable(
+        data, Some(schema), SQLConf.get.caseSensitiveAnalysis, options, 
Some(bytesAccumulator))
+    }
+
+    override def supportedCustomMetrics(): Array[CustomMetric] = {
+      Array(new JDBCEstimatedDataSizeMetric)
+    }
+
+    override def reportDriverMetrics(): Array[CustomTaskMetric] = {
+      Array(new CustomTaskMetric {
+        override def name(): String = JDBCEstimatedDataSizeMetric.NAME
+        override def value(): Long = bytesAccumulator.value
+      })
     }
   }
 }
+
+/** CustomMetric declaration for JDBC estimated write data size. */
+class JDBCEstimatedDataSizeMetric extends CustomSumMetric {
+  override def name(): String = JDBCEstimatedDataSizeMetric.NAME
+  override def description(): String =
+    "Estimated decoded row size written (strings measured by char count, not 
UTF-8 bytes)"

Review Comment:
   I'd like to make a principle explicit here: **Spark SQL operator metrics 
should report actual measured values, not estimates.** `filesSize` is the real 
"size of files read", `numOutputRows` is the real row count, and so on — an 
estimate dressed up as a byte/size metric is misleading, because users read the 
SQL UI value as a measured quantity and make capacity/cost decisions on it.
   
   This metric is an estimate by construction: the write side measures 
`String.length` (char count) as a byte proxy, so it under-counts any non-ASCII 
UTF-8 string, and both metrics are named/described "estimated". But the actual 
decoded size is computable — on both paths we already hold the materialized 
rows, and variable-length fields are already measured exactly. So the estimate 
isn't necessary.
   
   Concretely:
   - Write side: measure real UTF-8 bytes (`UTF8String.numBytes()`), not 
`String.length`.
   - Read side: accumulate the size inside the existing per-column decode loop 
in `resultSetToSparkInternalRows` (each getter already visits every value), 
rather than the separate second `O(numColumns)` pass — since we have the row, 
there's no reason to walk it twice.
   - Drop "estimated…" from the names/descriptions (e.g. "JDBC data size" / 
"data size written").
   
   If there's a specific type whose exact decoded size genuinely can't be 
computed cheaply (deep nested complex types), let's discuss that one case — but 
the default should be an actual value.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala:
##########
@@ -233,6 +233,16 @@ class JDBCRDD(
     // Message that user sees does not have to leak details about conversion
     name = "JDBC remote data fetch and translation time")
 
+  // Estimates decoded Spark-side row size using UTF8String.numBytes() (actual 
UTF-8 bytes).
+  // The write-side metric uses String.length (char count) to stay 
allocation-free, so
+  // read vs write estimates may differ slightly for multi-byte strings.
+  // createSizeMetric defaults initValue to -1 as a 'task did not update' 
sentinel; we use 0
+  // here because every task accumulates into this metric and -1 would offset 
the summed total.

Review Comment:
   `-1` doesn't actually offset the sum: `SQLMetric.add` resets an `isZero` 
metric to 0 on the first update and `merge` skips `isZero` operands, so the 
sentinel never contributes to the total. Its real effect is keeping no-update 
partitions out of the min/med/max breakdown (SPARK-11013). `initValue=0` is the 
right call; just the stated reason is off.
   ```suggestion
     // here so no-update partitions report 0 instead of the -1 'invalid' 
sentinel.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to