zhengruifeng opened a new pull request, #56399:
URL: https://github.com/apache/spark/pull/56399

   ### What changes were proposed in this pull request?
   
   `zipWithIndex` builds a `ZippedWithIndexRDD`, which precomputes the start 
index of every partition by launching a small counting job over all but the 
last partition. This PR makes that step reuse the work an ancestor 
`ZippedWithIndexRDD` has already done.
   
   `getAncestorWithSamePartitionSizes` walks the lineage across size-preserving 
operators (`map`, size-preserving `mapPartitions`, size-preserving zips, ...). 
It now stops at an ancestor `ZippedWithIndexRDD` **as long as that ancestor's 
`startIndices` is still populated**, and the `startIndices` computation reuses 
those indices directly instead of running another counting job. Because every 
operator on the path preserves partition sizes, the ancestor's per-partition 
start indices are identical to ours.
   
   `startIndices` is `@transient`, so a deserialized/recovered ancestor can 
have a null `startIndices`. Both the lineage-walk stop and the reuse path guard 
on `startIndices != null`, so such an ancestor is not reused; the code falls 
back to a normal counting job instead of propagating a null array.
   
   As a result, a chain such as `rdd.zipWithIndex().map(f).zipWithIndex()` runs 
the counting job only once.
   
   ### Why are the changes needed?
   
   The counting job submitted by `zipWithIndex` is pure overhead when the 
per-partition indices have already been computed upstream. Pipelines that 
re-`zipWithIndex` after size-preserving transforms paid for a redundant job 
every time. Reusing the ancestor's `startIndices` removes that extra job.
   
   ### Does this PR introduce any user-facing change?
   
   No. The computed indices are identical; only the redundant counting job is 
avoided.
   
   ### How was this patch tested?
   
   New unit tests in `RDDSuite` that use a `SparkListener` to count submitted 
jobs:
   - reuse through a single size-preserving `map`;
   - direct `zipWithIndex().zipWithIndex()` chaining;
   - reuse through multiple chained `map`s;
   - a negative case where a `filter` breaks the chain, confirming a fresh 
counting job is still submitted and the indices are recomputed correctly.
   
   ### Was this patch authored or co-authored using generative AI tooling?
   
   Generated-by: Claude Code (model: claude-opus-4-8)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to