Hi,

  I am not yet very familiar with Celeborn, so will restrict my notes on
the proposal in context to Apache Spark:

a) For Option 1, there is SPARK-25299 - which was started a few years back.
Unfortunately, the work there has stalled: but if there is interest in
pushing that forward, I can help shepard the contributions !
Full disclosure, the earlier proposal might be fairly outdated, and will
involve a bit of investment to restart that work.

b) On the ability to reuse a previous mapper output/minimize cost - that
depends on a stage's DeterministicLevel.
DETERMINATE mapper stage output can be reused, and not others - and there
is a lot of nuance around how DAGScheduler handles it.
Lot of it has to do with data correctness (See SPARK-23243 and the PR's
linked there for more indepth analysis of this) - and this has kept
evolving in the years since.
DAGScheduler directly updates MapOutputTracker for a few cases - which
includes for this.

c) As a follow up to (b) above, even though MapOutputTracker is part of
SparkEnv, and so 'accessible', I would be careful modifying its state
directly outside of DAGScheduler.

d) The computation for "celeborn shuffleId" would not work - since
spark.stage.maxConsecutiveAttempts is for consecutive failures for a single
stage in a job.
The same shuffle id can be computed by different stages across jobs (for
example: very common with Spark SQL AQE btw).
A simple example here [1]


Other than Option 1, the rest look like a tradeoff to varying degrees.
I am not familiar enough with Celeborn to give good suggestions yet though.


All the best in trying to solve this issue - looking forward to updates !

Regards,
Mridul

[1]
Run with './bin/spark-shell  --master 'local-cluster[4, 3, 1024]'' or
yarn/k8s

import org.apache.spark.TaskContext

val rdd1 = sc.parallelize(0 until 10000, 20).map(v => (v, v)).groupByKey()
val rdd2 = rdd1.mapPartitions { iter =>
  val tc = TaskContext.get()
  if (0 == tc.partitionId() && tc.stageAttemptNumber() < 1) {
    System.exit(0)
  }
  iter
}

rdd2.count()
rdd2.map(v => (v._1, v._2)).groupByKey().count()

For both the jobs, the same shuffle id is used for the first shuffle.



On Fri, Sep 22, 2023 at 10:48 AM Erik fang <fme...@gmail.com> wrote:

> Hi folks,
>
> I have a proposal to implement Spark stage resubmission to handle shuffle
> fetch failure in Celeborn
>
>
> https://docs.google.com/document/d/1dkG6fww3g99VAb1wkphNlUES_MpngVPNg8601chmVp8
>
> please have a look and let me know what you think
>
> Regards,
> Erik
>

Reply via email to