cloud-fan commented on code in PR #56550:
URL: https://github.com/apache/spark/pull/56550#discussion_r3449185950
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala:
##########
@@ -775,9 +862,9 @@ object JdbcUtils extends Logging with SQLConfHelper {
* are used.
*
* Note that this method records task output metrics. It assumes the method
is
- * running in a task. For now, we only records the number of rows being
written
- * because there's no good way to measure the total bytes being written. Only
- * effective outputs are taken into account: for example, metric will not be
updated
+ * running in a task. Records both the number of rows being written and an
estimate
Review Comment:
This scaladoc now covers both `recordsWritten` and the new byte metric, but
the "updated even with error if it doesn't support transaction" guarantee only
holds for `recordsWritten`. That one is an internal task output metric
(`countFailedValues=true`), so it's reported even when the task throws. The
byte metric rides on `bytesAccumulator` — a plain
`sparkContext.longAccumulator` with `countFailedValues=false` — and
`Task.collectAccumulatorUpdates(taskFailed=true)` drops such external
accumulators from failed tasks. So on the non-transactional error path the row
count reports the dirty rows while the byte metric reports nothing. Worth
narrowing the comment (the byte metric is only reported for successful tasks).
May be moot depending on how the metric is reworked per the design comment.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCWriteBuilder.scala:
##########
@@ -35,14 +36,40 @@ case class JDBCWriteBuilder(schema: StructType, options:
JdbcOptionsInWrite) ext
}
override def build(): V1Write = new V1Write {
+ // Accumulator to collect estimated bytes written across all partitions.
+ @transient private lazy val bytesAccumulator =
SparkSession.active.sparkContext.longAccumulator
+
override def toInsertableRelation: InsertableRelation = (data: DataFrame,
_: Boolean) => {
// TODO (SPARK-32595): do truncate and append atomically.
if (isTruncate) {
val dialect = JdbcDialects.get(options.url)
val conn = dialect.createConnectionFactory(options)(-1)
JdbcUtils.truncateTable(conn, options)
}
- JdbcUtils.saveTable(data, Some(schema),
SQLConf.get.caseSensitiveAnalysis, options)
+ JdbcUtils.saveTable(
+ data, Some(schema), SQLConf.get.caseSensitiveAnalysis, options,
Some(bytesAccumulator))
+ }
+
+ override def supportedCustomMetrics(): Array[CustomMetric] = {
+ Array(new JDBCEstimatedDataSizeMetric)
+ }
+
+ override def reportDriverMetrics(): Array[CustomTaskMetric] = {
+ Array(new CustomTaskMetric {
+ override def name(): String = JDBCEstimatedDataSizeMetric.NAME
+ override def value(): Long = bytesAccumulator.value
+ })
}
}
}
+
+/** CustomMetric declaration for JDBC estimated write data size. */
+class JDBCEstimatedDataSizeMetric extends CustomSumMetric {
+ override def name(): String = JDBCEstimatedDataSizeMetric.NAME
+ override def description(): String =
+ "Estimated decoded row size written (strings measured by char count, not
UTF-8 bytes)"
Review Comment:
I'd like to make a principle explicit here: **Spark SQL operator metrics
should report actual measured values, not estimates.** `filesSize` is the real
"size of files read", `numOutputRows` is the real row count, and so on — an
estimate dressed up as a byte/size metric is misleading, because users read the
SQL UI value as a measured quantity and make capacity/cost decisions on it.
This metric is an estimate by construction: the write side measures
`String.length` (char count) as a byte proxy, so it under-counts any non-ASCII
UTF-8 string, and both metrics are named/described "estimated". But the actual
decoded size is computable — on both paths we already hold the materialized
rows, and variable-length fields are already measured exactly. So the estimate
isn't necessary.
Concretely:
- Write side: measure real UTF-8 bytes (`UTF8String.numBytes()`), not
`String.length`.
- Read side: accumulate the size inside the existing per-column decode loop
in `resultSetToSparkInternalRows` (each getter already visits every value),
rather than the separate second `O(numColumns)` pass — since we have the row,
there's no reason to walk it twice.
- Drop "estimated…" from the names/descriptions (e.g. "JDBC data size" /
"data size written").
If there's a specific type whose exact decoded size genuinely can't be
computed cheaply (deep nested complex types), let's discuss that one case — but
the default should be an actual value.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala:
##########
@@ -233,6 +233,16 @@ class JDBCRDD(
// Message that user sees does not have to leak details about conversion
name = "JDBC remote data fetch and translation time")
+ // Estimates decoded Spark-side row size using UTF8String.numBytes() (actual
UTF-8 bytes).
+ // The write-side metric uses String.length (char count) to stay
allocation-free, so
+ // read vs write estimates may differ slightly for multi-byte strings.
+ // createSizeMetric defaults initValue to -1 as a 'task did not update'
sentinel; we use 0
+ // here because every task accumulates into this metric and -1 would offset
the summed total.
Review Comment:
`-1` doesn't actually offset the sum: `SQLMetric.add` resets an `isZero`
metric to 0 on the first update and `merge` skips `isZero` operands, so the
sentinel never contributes to the total. Its real effect is keeping no-update
partitions out of the min/med/max breakdown (SPARK-11013). `initValue=0` is the
right call; just the stated reason is off.
```suggestion
// here so no-update partitions report 0 instead of the -1 'invalid'
sentinel.
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]