yadavay-amzn commented on code in PR #56550:
URL: https://github.com/apache/spark/pull/56550#discussion_r3503881442
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCWriteBuilder.scala:
##########
@@ -35,14 +36,40 @@ case class JDBCWriteBuilder(schema: StructType, options:
JdbcOptionsInWrite) ext
}
override def build(): V1Write = new V1Write {
+ // Accumulator to collect estimated bytes written across all partitions.
+ @transient private lazy val bytesAccumulator =
SparkSession.active.sparkContext.longAccumulator
+
override def toInsertableRelation: InsertableRelation = (data: DataFrame,
_: Boolean) => {
// TODO (SPARK-32595): do truncate and append atomically.
if (isTruncate) {
val dialect = JdbcDialects.get(options.url)
val conn = dialect.createConnectionFactory(options)(-1)
JdbcUtils.truncateTable(conn, options)
}
- JdbcUtils.saveTable(data, Some(schema),
SQLConf.get.caseSensitiveAnalysis, options)
+ JdbcUtils.saveTable(
+ data, Some(schema), SQLConf.get.caseSensitiveAnalysis, options,
Some(bytesAccumulator))
+ }
+
+ override def supportedCustomMetrics(): Array[CustomMetric] = {
+ Array(new JDBCEstimatedDataSizeMetric)
+ }
+
+ override def reportDriverMetrics(): Array[CustomTaskMetric] = {
+ Array(new CustomTaskMetric {
+ override def name(): String = JDBCEstimatedDataSizeMetric.NAME
+ override def value(): Long = bytesAccumulator.value
+ })
}
}
}
+
+/** CustomMetric declaration for JDBC estimated write data size. */
+class JDBCEstimatedDataSizeMetric extends CustomSumMetric {
+ override def name(): String = JDBCEstimatedDataSizeMetric.NAME
+ override def description(): String =
+ "Estimated decoded row size written (strings measured by char count, not
UTF-8 bytes)"
Review Comment:
Reworked so the metrics report actual measured values. The write side now
measures real UTF-8 bytes via UTF8String.fromString(s).numBytes() (not
String.length), and the read side accumulates the actual decoded size inside
the existing per-column decode loop (removed the separate second pass). Dropped
the estimate framing from the metric names/keys/descriptions accordingly
(dataSizeBytes / "JDBC data size" / "data size written"). Verified a non-ASCII
string now counts real bytes (e.g. e-acute plus a CJK char = 5 bytes, not 2
chars).
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala:
##########
@@ -775,9 +862,9 @@ object JdbcUtils extends Logging with SQLConfHelper {
* are used.
*
* Note that this method records task output metrics. It assumes the method
is
- * running in a task. For now, we only records the number of rows being
written
- * because there's no good way to measure the total bytes being written. Only
- * effective outputs are taken into account: for example, metric will not be
updated
+ * running in a task. Records both the number of rows being written and an
estimate
Review Comment:
Narrowed the scaladoc: the updated-even-on-error guarantee applies only to
recordsWritten (internal task output metric, countFailedValues=true). The
data-size metric rides on a plain accumulator and is reported only for
successful tasks.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala:
##########
@@ -233,6 +233,16 @@ class JDBCRDD(
// Message that user sees does not have to leak details about conversion
name = "JDBC remote data fetch and translation time")
+ // Estimates decoded Spark-side row size using UTF8String.numBytes() (actual
UTF-8 bytes).
+ // The write-side metric uses String.length (char count) to stay
allocation-free, so
+ // read vs write estimates may differ slightly for multi-byte strings.
+ // createSizeMetric defaults initValue to -1 as a 'task did not update'
sentinel; we use 0
+ // here because every task accumulates into this metric and -1 would offset
the summed total.
Review Comment:
Fixed the comment per your wording.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]