juliuszsompolski opened a new pull request, #55738:
URL: https://github.com/apache/spark/pull/55738

   ### What changes were proposed in this pull request?
   
   Extends the test-only fetch-failure injection in `DAGScheduler` with two new 
knobs and changes the semantics of the existing master switch. Three flags 
total (all in `org.apache.spark.internal.config.Tests`):
   
   - `INJECT_SHUFFLE_FETCH_FAILURES` (existing). Semantics changed: previously 
corrupted every map task of stage attempt 0 (so only leaf shuffle map stages 
were ever affected, since non-leaf stages typically fail-fetch on attempt 0). 
Now corrupts the partition-0 task of the first SUCCESSFUL attempt of every 
shuffle map stage, including non-leaf stages whose attempt 0 fails on fetch 
from upstream.
   
   - `INJECT_SHUFFLE_FETCH_FAILURES_DOWNSTREAM_DELAY` (int, default `1`, new). 
Defers the producer's mapper-0 corruption until N task-success events have 
arrived from stages that consume the shuffle. The DAGScheduler event loop 
processes task-completion events serially, so this guarantees N consumer tasks 
fully completed BEFORE the FetchFailed cascade kicks in. Subsequent tasks 
dispatched to free slots after the corruption see the invalid `MapStatus` and 
`FetchFailed`. With `spark.sql.shuffle.partitions` much larger than executor 
cores, this gives a deterministic "partial first-attempt + recompute" shape. 
Set to `0` to corrupt inline at registration.
   
   - `INJECT_SHUFFLE_FORCE_CHECKSUM_MISMATCH_ON_RECOMPUTE` (boolean, default 
false, new). After a downstream `FetchFailed` forces the producer's partition 0 
to be recomputed, the recomputed task's `MapStatus` registration is 
artificially flagged as a checksum mismatch. The DAGScheduler then runs 
`rollbackSucceedingStages`, which clears the downstream `ShuffleMapStage`'s 
outputs and forces a full retry of that stage. `ResultStage` downstreams are 
aborted (OSS Spark does not support rolling them back, SPARK-25342).
   
   ### Why are the changes needed?
   
   The existing `INJECT_SHUFFLE_FETCH_FAILURES` flag corrupts attempt 0 of 
every shuffle map stage, but only leaf shuffle map stages succeed on attempt 0. 
Non-leaf stages fail-fetch from corrupted upstream on attempt 0 and are never 
themselves corrupted, so unit tests cannot exercise the full range of 
stage-retry shapes that production hits (metric stability under 
non-determinism, rollback for indeterminate producers, SLAM semantics across 
retries). The new knobs let tests deterministically reach those shapes.
   
   ### Does this PR introduce _any_ user-facing change?
   
   No. All flags are test-only (gated by `Utils.isTesting`) and are under 
`spark.testing.*`.
   
   ### How was this patch tested?
   
   - Existing tests in `MetricsFailureInjectionSuite`, 
`SQLLastAttemptMetricPlanShapesSuite`, and 
`SQLLastAttemptMetricIntegrationSuite` continue to pass with the new default 
semantics. The non-deterministic-stage test sees stage-2 raw metric overcount 
under the new default (because checksum-mismatch rollback now fires on the 
non-determinism); SLAM remains stable, which is the point of that test.
   
   - New test `Three stage metrics force-checksum-mismatch with delayed 
corruption`: with `shuffle.partitions=20` (much greater than the test's 
`local[2]` cores) and `delay=1`, the rollback re-plays at least one 
already-completed stage-2 partition on top of the full re-run, putting the raw 
metric strictly above the recompute-only baseline.
   
   - New test `Force checksum mismatch aborts a downstream ResultStage`: 
2-stage `groupBy().count()` query where stage 2 is a `ResultStage`. With 
`INJECT_SHUFFLE_FETCH_FAILURES` + 
`INJECT_SHUFFLE_FORCE_CHECKSUM_MISMATCH_ON_RECOMPUTE` and the default 
`delay=1`, one result task succeeds before the FetchFailed cascade; mode-3's 
forced checksum mismatch on stage 1's mapper-0 recompute then fires 
`rollbackSucceedingStages`, which sees `numMissingPartitions < numTasks` on the 
result stage and aborts. The query throws a `SparkException` with 
`"indeterminate"` in the message.
   
   - `core/scalastyle`, `sql/scalastyle`, and 149 `DAGSchedulerSuite` tests 
pass.
   
   ### Was this patch authored or co-authored using generative AI tooling?
   
   Generated-by: Claude Code, Opus 4.7


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to