juliuszsompolski opened a new pull request, #55738: URL: https://github.com/apache/spark/pull/55738
### What changes were proposed in this pull request? Extends the test-only fetch-failure injection in `DAGScheduler` with two new knobs and changes the semantics of the existing master switch. Three flags total (all in `org.apache.spark.internal.config.Tests`): - `INJECT_SHUFFLE_FETCH_FAILURES` (existing). Semantics changed: previously corrupted every map task of stage attempt 0 (so only leaf shuffle map stages were ever affected, since non-leaf stages typically fail-fetch on attempt 0). Now corrupts the partition-0 task of the first SUCCESSFUL attempt of every shuffle map stage, including non-leaf stages whose attempt 0 fails on fetch from upstream. - `INJECT_SHUFFLE_FETCH_FAILURES_DOWNSTREAM_DELAY` (int, default `1`, new). Defers the producer's mapper-0 corruption until N task-success events have arrived from stages that consume the shuffle. The DAGScheduler event loop processes task-completion events serially, so this guarantees N consumer tasks fully completed BEFORE the FetchFailed cascade kicks in. Subsequent tasks dispatched to free slots after the corruption see the invalid `MapStatus` and `FetchFailed`. With `spark.sql.shuffle.partitions` much larger than executor cores, this gives a deterministic "partial first-attempt + recompute" shape. Set to `0` to corrupt inline at registration. - `INJECT_SHUFFLE_FORCE_CHECKSUM_MISMATCH_ON_RECOMPUTE` (boolean, default false, new). After a downstream `FetchFailed` forces the producer's partition 0 to be recomputed, the recomputed task's `MapStatus` registration is artificially flagged as a checksum mismatch. The DAGScheduler then runs `rollbackSucceedingStages`, which clears the downstream `ShuffleMapStage`'s outputs and forces a full retry of that stage. `ResultStage` downstreams are aborted (OSS Spark does not support rolling them back, SPARK-25342). ### Why are the changes needed? The existing `INJECT_SHUFFLE_FETCH_FAILURES` flag corrupts attempt 0 of every shuffle map stage, but only leaf shuffle map stages succeed on attempt 0. Non-leaf stages fail-fetch from corrupted upstream on attempt 0 and are never themselves corrupted, so unit tests cannot exercise the full range of stage-retry shapes that production hits (metric stability under non-determinism, rollback for indeterminate producers, SLAM semantics across retries). The new knobs let tests deterministically reach those shapes. ### Does this PR introduce _any_ user-facing change? No. All flags are test-only (gated by `Utils.isTesting`) and are under `spark.testing.*`. ### How was this patch tested? - Existing tests in `MetricsFailureInjectionSuite`, `SQLLastAttemptMetricPlanShapesSuite`, and `SQLLastAttemptMetricIntegrationSuite` continue to pass with the new default semantics. The non-deterministic-stage test sees stage-2 raw metric overcount under the new default (because checksum-mismatch rollback now fires on the non-determinism); SLAM remains stable, which is the point of that test. - New test `Three stage metrics force-checksum-mismatch with delayed corruption`: with `shuffle.partitions=20` (much greater than the test's `local[2]` cores) and `delay=1`, the rollback re-plays at least one already-completed stage-2 partition on top of the full re-run, putting the raw metric strictly above the recompute-only baseline. - New test `Force checksum mismatch aborts a downstream ResultStage`: 2-stage `groupBy().count()` query where stage 2 is a `ResultStage`. With `INJECT_SHUFFLE_FETCH_FAILURES` + `INJECT_SHUFFLE_FORCE_CHECKSUM_MISMATCH_ON_RECOMPUTE` and the default `delay=1`, one result task succeeds before the FetchFailed cascade; mode-3's forced checksum mismatch on stage 1's mapper-0 recompute then fires `rollbackSucceedingStages`, which sees `numMissingPartitions < numTasks` on the result stage and aborts. The query throws a `SparkException` with `"indeterminate"` in the message. - `core/scalastyle`, `sql/scalastyle`, and 149 `DAGSchedulerSuite` tests pass. ### Was this patch authored or co-authored using generative AI tooling? Generated-by: Claude Code, Opus 4.7 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
