juliuszsompolski opened a new pull request, #55711:
URL: https://github.com/apache/spark/pull/55711

   ### What changes were proposed in this pull request?
   
   Switches the DSv2 row-level operation metrics that count rows produced by 
executor tasks to use `SQLLastAttemptMetric` instead of plain `SQLMetric`, so 
the values surfaced in `UpdateSummary` and `MergeSummary` are stable across 
stage retries.
   
   Specifically:
   - In `RowLevelWriteExec.sparkMetrics`, the UPDATE branch now constructs 
`numUpdatedRows` and `numCopiedRows` via `SQLLastAttemptMetrics.createMetric`. 
Increment paths are unchanged (`SQLLastAttemptMetric extends SQLMetric`).
   - In `MergeRowsExec.metrics`, all 8 row counters (`numTargetRowsCopied`, 
`numTargetRowsInserted`, `numTargetRowsDeleted`, `numTargetRowsUpdated`, and 
the matched / not-matched-by-source splits) are switched to SLAM. Both the 
interpreted (`longMetric("...") += 1`) and codegen (`metricTerm(...).add(1)`) 
increment paths work unchanged.
   - `RowLevelWriteExec.getMetricValue` now reads via 
`lastAttemptValueForHighestRDDId()` for `SQLLastAttemptMetric`, falling back to 
`slam.value` if SLAM bailed out. This affects both the UPDATE summary path and 
the MERGE summary path (which reads `MergeRowsExec.metrics`).
   
   DELETE is left for another PR.
   
   ### Why are the changes needed?
   
   `SQLMetric.value` aggregates increments from every task attempt that ever 
ran, so on a stage retry the row counts double up. The values flow into the 
connector-visible `WriteSummary` (and downstream into operator metrics 
consumers such as Delta's invariant checks), so an inflated count mis-reports 
what the operation actually did. `SQLLastAttemptMetric` reports only the last 
attempt's contribution and so gives the row count that matches what was 
actually committed.
   
   ### Does this PR introduce _any_ user-facing change?
   
   The values surfaced via `UpdateSummary.numUpdatedRows / numCopiedRows` and 
`MergeSummary.numTargetRows*` (i.e. what the connector receives from 
`BatchWrite.commit(messages, summary)`) will, in the presence of stage retries, 
be the row counts from the last attempt rather than the sum across all 
attempts. With no retries, behavior is unchanged. The metric names, display 
strings, and presence in the SQL UI are unchanged.
   
   Note: the SQL UI still shows the raw accumulator value (`SQLMetric.value`), 
which on stage retries is the sum across all task attempts and therefore 
overcounts. Only the values passed to the connector via `WriteSummary` are 
SLAM-corrected. Making the SQL UI also display the last-attempt value is a 
bigger follow-up that would touch the SQL UI's metric collection pipeline.
   
   ### How was this patch tested?
   
   Existing tests in `UpdateTableSuiteBase` subclasses and the merge suites 
exercise the metric values; they continue to pass since SLAM and `SQLMetric` 
report the same value when there are no stage retries.
   
   Two new tests cover the retry behavior directly:
   - `UpdateTableSuiteBase`: "metric values are stable across stage retries" — 
runs an UPDATE with an `IN`-subquery to force a shuffle (with 
`spark.sql.autoBroadcastJoinThreshold = -1` so the join doesn't get broadcast 
away — AQE is left at its default), then enables 
`spark.test.injectShuffleFetchFailures`. The DAGScheduler corrupts the first 
attempt of each shuffle map stage, the writer stage retries, and the test 
asserts that `UpdateSummary.numUpdatedRows / numCopiedRows` still match the 
actual rows updated/copied. Runs across all four UPDATE variants.
   - `MergeIntoTableSuiteBase`: "metric values are stable across stage retries" 
— same idea, with the join introducing the shuffle. Asserts all eight 
`MergeSummary` fields. Runs across the merge variants.
   
   The injected retries are visible in the test logs as `FetchFailedException` 
/ `MetadataFetchFailedException`. The summary-value assertions only pass 
because the new SLAM-aware reader returns the last-attempt value rather than 
the doubled raw accumulator value.
   
   ### Was this patch authored or co-authored using generative AI tooling?
   
   Generated-by: Claude Opus 4.7


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to