Github user kayousterhout commented on a diff in the pull request:
https://github.com/apache/spark/pull/186#discussion_r10951890
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
---
@@ -511,107 +514,121 @@ class DAGScheduler(
* @return `true` if we should stop the event loop.
*/
private[scheduler] def processEvent(event: DAGSchedulerEvent): Boolean =
{
- event match {
- case JobSubmitted(jobId, rdd, func, partitions, allowLocal,
callSite, listener, properties) =>
- var finalStage: Stage = null
- try {
- // New stage creation may throw an exception if, for example,
jobs are run on a HadoopRDD
- // whose underlying HDFS files have been deleted.
- finalStage = newStage(rdd, partitions.size, None, jobId,
Some(callSite))
- } catch {
- case e: Exception =>
- logWarning("Creating new stage failed due to exception - job:
" + jobId, e)
- listener.jobFailed(e)
- return false
- }
- val job = new ActiveJob(jobId, finalStage, func, partitions,
callSite, listener, properties)
- clearCacheLocs()
- logInfo("Got job " + job.jobId + " (" + callSite + ") with " +
partitions.length +
- " output partitions (allowLocal=" + allowLocal + ")")
- logInfo("Final stage: " + finalStage + " (" + finalStage.name +
")")
- logInfo("Parents of final stage: " + finalStage.parents)
- logInfo("Missing parents: " + getMissingParentStages(finalStage))
- if (allowLocal && finalStage.parents.size == 0 &&
partitions.length == 1) {
- // Compute very short actions like first() or take() with no
parent stages locally.
- listenerBus.post(SparkListenerJobStart(job.jobId, Array[Int](),
properties))
- runLocally(job)
- } else {
- stageIdToActiveJob(jobId) = job
- activeJobs += job
- resultStageToJob(finalStage) = job
- listenerBus.post(
- SparkListenerJobStart(job.jobId,
jobIdToStageIds(jobId).toArray, properties))
- submitStage(finalStage)
- }
+ try {
+ event match {
+ case JobSubmitted(jobId, rdd, func, partitions, allowLocal,
callSite, listener,
+ properties) =>
+ var finalStage: Stage = null
+ try {
+ // New stage creation may throw an exception if, for example,
jobs are run on a
+ // HadoopRDD whose underlying HDFS files have been deleted.
+ finalStage = newStage(rdd, partitions.size, None, jobId,
Some(callSite))
+ } catch {
+ case e: Exception =>
+ logWarning("Creating new stage failed due to exception -
job: " + jobId, e)
+ listener.jobFailed(e)
+ return false
+ }
+ val job = new ActiveJob(jobId, finalStage, func, partitions,
callSite, listener,
+ properties)
+ clearCacheLocs()
+ logInfo("Got job " + job.jobId + " (" + callSite + ") with " +
partitions.length +
+ " output partitions (allowLocal=" + allowLocal + ")")
+ logInfo("Final stage: " + finalStage + " (" + finalStage.name +
")")
+ logInfo("Parents of final stage: " + finalStage.parents)
+ logInfo("Missing parents: " + getMissingParentStages(finalStage))
+ if (allowLocal && finalStage.parents.size == 0 &&
partitions.length == 1) {
+ // Compute very short actions like first() or take() with no
parent stages locally.
+ listenerBus.post(SparkListenerJobStart(job.jobId,
Array[Int](), properties))
+ runLocally(job)
+ } else {
+ stageIdToActiveJob(jobId) = job
+ activeJobs += job
+ resultStageToJob(finalStage) = job
+ listenerBus.post(
+ SparkListenerJobStart(job.jobId,
jobIdToStageIds(jobId).toArray, properties))
+ submitStage(finalStage)
+ }
- case JobCancelled(jobId) =>
- handleJobCancellation(jobId)
-
- case JobGroupCancelled(groupId) =>
- // Cancel all jobs belonging to this job group.
- // First finds all active jobs with this group id, and then kill
stages for them.
- val activeInGroup = activeJobs.filter(activeJob =>
- groupId ==
activeJob.properties.get(SparkContext.SPARK_JOB_GROUP_ID))
- val jobIds = activeInGroup.map(_.jobId)
- jobIds.foreach(handleJobCancellation)
-
- case AllJobsCancelled =>
- // Cancel all running jobs.
- runningStages.map(_.jobId).foreach(handleJobCancellation)
- activeJobs.clear() // These should already be empty by this
point,
- stageIdToActiveJob.clear() // but just in case we lost track of
some jobs...
-
- case ExecutorAdded(execId, host) =>
- handleExecutorAdded(execId, host)
-
- case ExecutorLost(execId) =>
- handleExecutorLost(execId)
-
- case BeginEvent(task, taskInfo) =>
- for (
- job <- stageIdToActiveJob.get(task.stageId);
- stage <- stageIdToStage.get(task.stageId);
- stageInfo <- stageToInfos.get(stage)
- ) {
- if (taskInfo.serializedSize > TASK_SIZE_TO_WARN * 1024 &&
+ case JobCancelled(jobId) =>
+ handleJobCancellation(jobId)
+
+ case JobGroupCancelled(groupId) =>
+ // Cancel all jobs belonging to this job group.
+ // First finds all active jobs with this group id, and then kill
stages for them.
+ val activeInGroup = activeJobs.filter(activeJob =>
+ groupId ==
activeJob.properties.get(SparkContext.SPARK_JOB_GROUP_ID))
+ val jobIds = activeInGroup.map(_.jobId)
+ jobIds.foreach(handleJobCancellation)
+
+ case AllJobsCancelled =>
+ // Cancel all running jobs.
+ runningStages.map(_.jobId).foreach(handleJobCancellation)
+ activeJobs.clear() // These should already be empty by this
point,
+ stageIdToActiveJob.clear() // but just in case we lost track of
some jobs...
+
+ case ExecutorAdded(execId, host) =>
+ handleExecutorAdded(execId, host)
+
+ case ExecutorLost(execId) =>
+ handleExecutorLost(execId)
+
+ case BeginEvent(task, taskInfo) =>
+ for (
+ job <- stageIdToActiveJob.get(task.stageId);
+ stage <- stageIdToStage.get(task.stageId);
+ stageInfo <- stageToInfos.get(stage)
+ ) {
+ if (taskInfo.serializedSize > TASK_SIZE_TO_WARN * 1024 &&
!stageInfo.emittedTaskSizeWarning) {
- stageInfo.emittedTaskSizeWarning = true
- logWarning(("Stage %d (%s) contains a task of very large " +
- "size (%d KB). The maximum recommended task size is %d
KB.").format(
- task.stageId, stageInfo.name, taskInfo.serializedSize /
1024, TASK_SIZE_TO_WARN))
+ stageInfo.emittedTaskSizeWarning = true
+ logWarning(("Stage %d (%s) contains a task of very large " +
+ "size (%d KB). The maximum recommended task size is %d
KB.").format(
+ task.stageId, stageInfo.name, taskInfo.serializedSize /
1024, TASK_SIZE_TO_WARN))
+ }
}
- }
- listenerBus.post(SparkListenerTaskStart(task.stageId, taskInfo))
+ listenerBus.post(SparkListenerTaskStart(task.stageId, taskInfo))
- case GettingResultEvent(task, taskInfo) =>
- listenerBus.post(SparkListenerTaskGettingResult(taskInfo))
+ case GettingResultEvent(task, taskInfo) =>
+ listenerBus.post(SparkListenerTaskGettingResult(taskInfo))
- case completion @ CompletionEvent(task, reason, _, _, taskInfo,
taskMetrics) =>
- val stageId = task.stageId
- val taskType = Utils.getFormattedClassName(task)
- listenerBus.post(SparkListenerTaskEnd(stageId, taskType, reason,
taskInfo, taskMetrics))
- handleTaskCompletion(completion)
+ case completion@CompletionEvent(task, reason, _, _, taskInfo,
taskMetrics) =>
+ val stageId = task.stageId
+ val taskType = Utils.getFormattedClassName(task)
+ listenerBus.post(SparkListenerTaskEnd(stageId, taskType, reason,
taskInfo, taskMetrics))
+ handleTaskCompletion(completion)
- case TaskSetFailed(taskSet, reason) =>
- stageIdToStage.get(taskSet.stageId).foreach { abortStage(_,
reason) }
+ case TaskSetFailed(taskSet, reason) =>
+ stageIdToStage.get(taskSet.stageId).foreach {
+ abortStage(_, reason)
+ }
- case ResubmitFailedStages =>
- if (failedStages.size > 0) {
- // Failed stages may be removed by job cancellation, so failed
might be empty even if
- // the ResubmitFailedStages event has been scheduled.
- resubmitFailedStages()
- }
+ case ResubmitFailedStages =>
+ if (failedStages.size > 0) {
+ // Failed stages may be removed by job cancellation, so failed
might be empty even if
+ // the ResubmitFailedStages event has been scheduled.
+ resubmitFailedStages()
+ }
- case StopDAGScheduler =>
- // Cancel any active jobs
- for (job <- activeJobs) {
- val error = new SparkException("Job cancelled because
SparkContext was shut down")
- job.listener.jobFailed(error)
- listenerBus.post(SparkListenerJobEnd(job.jobId, JobFailed(error,
-1)))
- }
- return true
+ case StopDAGScheduler =>
+ // Cancel any active jobs
+ for (job <- activeJobs) {
+ val error = new SparkException("Job cancelled because
SparkContext was shut down")
+ job.listener.jobFailed(error)
+ listenerBus.post(SparkListenerJobEnd(job.jobId,
JobFailed(error, -1)))
+ }
+ return true
+ }
+ false
+ } catch {
+ case e: Exception => {
+ logError("Exception was thrown when DAGScheduler processes " +
event +
--- End diff --
processed
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---