Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/8180#discussion_r38916650
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
    @@ -45,17 +45,65 @@ import 
org.apache.spark.storage.BlockManagerMessages.BlockManagerHeartbeat
      * The high-level scheduling layer that implements stage-oriented 
scheduling. It computes a DAG of
      * stages for each job, keeps track of which RDDs and stage outputs are 
materialized, and finds a
      * minimal schedule to run the job. It then submits stages as TaskSets to 
an underlying
    - * TaskScheduler implementation that runs them on the cluster.
    + * TaskScheduler implementation that runs them on the cluster. A TaskSet 
contains fully independent
    + * tasks that can run right away based on the data that's already on the 
cluster (e.g. map output
    + * files from previous stages), though it may fail if this data becomes 
unavailable.
      *
    - * In addition to coming up with a DAG of stages, this class also 
determines the preferred
    + * Spark stages are created by breaking the RDD graph at shuffle 
boundaries. RDD operations with
    + * "narrow" dependencies, like map() and filter(), are pipelined together 
into one set of tasks
    + * in each stage, but operations with shuffle dependencies require 
multiple stages (one to write a
    + * set of map output files, and another to read those files after a 
barrier). In the end, every
    + * stage will have only shuffle dependencies on other stages, and may 
compute multiple operations
    + * inside it. The actual pipelining of these operations happens in the 
RDD.compute() functions of
    + * various RDDs (MappedRDD, FilteredRDD, etc).
    + *
    + * In addition to coming up with a DAG of stages, the DAGScheduler also 
determines the preferred
      * locations to run each task on, based on the current cache status, and 
passes these to the
      * low-level TaskScheduler. Furthermore, it handles failures due to 
shuffle output files being
      * lost, in which case old stages may need to be resubmitted. Failures 
*within* a stage that are
      * not caused by shuffle file loss are handled by the TaskScheduler, which 
will retry each task
      * a small number of times before cancelling the whole stage.
      *
    + * When looking through this code, there are several key concepts:
    + *
    + *  - Jobs (represented by [[ActiveJob]]) are the top-level work items 
submitted to the scheduler.
    + *    For example, when the user calls an action, like count(), a job will 
be submitted through
    + *    submitJob. Each Job may require the execution of multiple stages to 
build intermediate data.
    + *
    + *  - Stages ([[Stage]]) are sets of tasks that compute intermediate 
results in jobs, where each
    + *    task computes the same function on partitions of the same RDD. 
Stages are separated at shuffle
    + *    boundaries, which introduce a barrier (where we must wait for the 
previous stage to finish to
    + *    fetch outputs). There are two types of stages: [[ResultStage]], for 
the final stage that
    + *    executes an action, and [[ShuffleMapStage]], which writes map output 
files for a shuffle.
    + *    Stages are often shared across multiple jobs, if these jobs reuse 
the same RDDs.
    + *
    + *  - Tasks are individual units of work, each sent to one machine.
    + *
    + *  - Cache tracking: the DAGScheduler figures out which RDDs are cached 
to avoid recomputing them
    + *    and likewise remembers which shuffle map stages have already 
produced output files to avoid
    + *    redoing the map side of a shuffle.
    + *
    + *  - Preferred locations: the DAGScheduler also computes where to run 
each task in a stage based
    + *    on the preferred locations of its underlying RDDs, or the location 
of cached or shuffle data.
    + *
    + *  - Cleanup: all data structures are cleared when the running jobs that 
depend on them finish,
    + *    to prevent memory leaks in a long-running application.
    + *
    + * To recover from failures, the same stage might need to run multiple 
times, which are called
    + * "attempts". If the TaskScheduler reports that a task failed because a 
map output file from a
    + * previous stage was lost, the DAGScheduler resubmits that lost stage. 
This is detected through a
    + * through a CompletionEvent with FetchFailed, or an ExecutorLost event. 
The DAGScheduler will wait
    --- End diff --
    
    nit: `through a` is duplicate


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to