szehon-ho commented on code in PR #55967:
URL: https://github.com/apache/spark/pull/55967#discussion_r3270320890
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MergeRowsExec.scala:
##########
@@ -518,6 +518,21 @@ case class MergeRowsExec(
private val notMatchedBySourceInstructions: Seq[InstructionExec])
extends Iterator[InternalRow] {
+ // Resolve metrics once per partition; longMetric(name) does a map lookup
on each call.
+ // See SPARK-56933.
+ private val numTargetRowsCopied =
MergeRowsExec.this.longMetric("numTargetRowsCopied")
+ private val numTargetRowsInserted =
MergeRowsExec.this.longMetric("numTargetRowsInserted")
+ private val numTargetRowsDeleted =
MergeRowsExec.this.longMetric("numTargetRowsDeleted")
+ private val numTargetRowsUpdated =
MergeRowsExec.this.longMetric("numTargetRowsUpdated")
+ private val numTargetRowsMatchedUpdated =
+ MergeRowsExec.this.longMetric("numTargetRowsMatchedUpdated")
+ private val numTargetRowsMatchedDeleted =
+ MergeRowsExec.this.longMetric("numTargetRowsMatchedDeleted")
+ private val numTargetRowsNotMatchedBySourceUpdated =
+ MergeRowsExec.this.longMetric("numTargetRowsNotMatchedBySourceUpdated")
+ private val numTargetRowsNotMatchedBySourceDeleted =
+ MergeRowsExec.this.longMetric("numTargetRowsNotMatchedBySourceDeleted")
Review Comment:
Done — metrics are now `lazy val` so we only resolve (and cache) metrics
that are actually incremented in a given partition.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MergeRowsExec.scala:
##########
@@ -518,6 +518,21 @@ case class MergeRowsExec(
private val notMatchedBySourceInstructions: Seq[InstructionExec])
extends Iterator[InternalRow] {
+ // Resolve metrics once per partition; longMetric(name) does a map lookup
on each call.
+ // See SPARK-56933.
+ private val numTargetRowsCopied =
MergeRowsExec.this.longMetric("numTargetRowsCopied")
Review Comment:
Done — using `longMetric()` directly in `MergeRowIterator`.
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/MergeRowsExecBenchmark.scala:
##########
@@ -101,6 +105,33 @@ object MergeRowsExecBenchmark extends SqlBasedBenchmark
with ClassicConversions
Dataset.ofRows(spark, mergeRows)
}
+ /**
+ * Like [[codegenBenchmark]], but with JIT warm-up and a longer timed window
so interpreted
+ * (whole-stage off) results are more stable when comparing metric caching
changes.
+ */
+ private def mergeRowsCodegenBenchmark(name: String, cardinality: Long)(f: =>
Unit): Unit = {
+ withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false") { f }
+ withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "true") { f }
Review Comment:
Agreed — removed the redundant pre-run calls. `MergeRowsExecBenchmark` now
only uses the extended `codegenBenchmark` helper (`warmupTime` / `minTime`);
the `Benchmark` class already warms up each case before timed iterations.
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/MergeRowsExecBenchmark.scala:
##########
@@ -101,6 +105,33 @@ object MergeRowsExecBenchmark extends SqlBasedBenchmark
with ClassicConversions
Dataset.ofRows(spark, mergeRows)
}
+ /**
+ * Like [[codegenBenchmark]], but with JIT warm-up and a longer timed window
so interpreted
+ * (whole-stage off) results are more stable when comparing metric caching
changes.
+ */
+ private def mergeRowsCodegenBenchmark(name: String, cardinality: Long)(f: =>
Unit): Unit = {
Review Comment:
Updated the PR description with a new A/B run on the **same harness** for
both sides: `MergeRowsExec` at `1ad4fa420cd` (before the cache) vs this PR
(with cache), with only that file swapped between runs. Both used 7s warmup and
7s timed window via extended `codegenBenchmark`. The earlier table compared
against `origin/master` with mixed harness settings and is replaced.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]