jerrypeng commented on code in PR #56055: URL: https://github.com/apache/spark/pull/56055#discussion_r3315401860
########## core/src/main/scala/org/apache/spark/scheduler/ConcurrentStageDAGScheduler.scala: ########## @@ -0,0 +1,282 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import java.util.Properties + +import scala.collection.mutable + +import org.apache.spark.{MapOutputTrackerMaster, SparkContext, SparkEnv, SparkException, SparkRuntimeException, Success} +import org.apache.spark.internal.LogKeys +import org.apache.spark.internal.config.{SPECULATION_ENABLED, STREAMING_REALTIME_MODE_SLOTS_CHECK_DISABLED} +import org.apache.spark.resource.ResourceProfile +import org.apache.spark.storage.BlockManagerMaster +import org.apache.spark.util.Clock +import org.apache.spark.util.SystemClock + +/** + * A [[DAGScheduler]] that runs all the stages in a job without waiting for its parents + * complete. This combined with streaming shuffle between the stages, allows for low latency + * execution of streaming queries in real-time mode. + */ +class ConcurrentStageDAGScheduler( + sc: SparkContext, + taskScheduler: TaskScheduler, + listenerBus: LiveListenerBus, + mapOutputTracker: MapOutputTrackerMaster, + blockManagerMaster: BlockManagerMaster, + env: SparkEnv, + clock: Clock = new SystemClock()) + extends DAGScheduler( + sc, taskScheduler, listenerBus, mapOutputTracker, blockManagerMaster, env, clock) { + + import ConcurrentStageDAGScheduler._ + + def this(sc: SparkContext, taskScheduler: TaskScheduler) = { + this( + sc, + taskScheduler, + sc.listenerBus, + sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster], + sc.env.blockManager.master, + sc.env + ) + } + + def this(sc: SparkContext) = this(sc, sc.taskScheduler) + + // This contains all the concurrent states that are yet to be scheduled across all the jobs. + private[spark] val concurrentStages = new mutable.HashSet[Stage] + + private[scheduler] case class DependentStageInfo( + parents: mutable.HashSet[Stage] = mutable.HashSet.empty, + delayedTaskCompletionEvents: mutable.ListBuffer[CompletionEvent] = mutable.ListBuffer.empty) + + // This map holds parents of concurrently scheduled stages. When tasks for such a stage complete, + // and if any of the parents are still running, we delay processing of such events until parent + // stages are complete. We save these events in this map until then. + private[spark] val dependentStageMap = new mutable.HashMap[Stage, DependentStageInfo] + + private def totalNumCoreForStage(stage: Stage): Int = { + val numTask = stage match { + case r: ResultStage => r.partitions.length + case m: ShuffleMapStage => m.numPartitions + } + val resourceProfile = sc.resourceProfileManager.resourceProfileFromId(stage.resourceProfileId) + val taskCpus = ResourceProfile.getTaskCpusOrDefaultForProfile(resourceProfile, sc.conf) + taskCpus * numTask + } + + /** + * Hook invoked after the final stage is created. Registers stages reachable from + * the final stage as concurrent so they can be submitted in parallel. + */ + override def onFinalStageCreated(finalStage: Stage, properties: Properties): Unit = { + + val queryBatchId = getStreamingBatchIdFromProperties(properties) + + if (queryBatchId.nonEmpty && isConcurrentStagesEnabled(properties)) { + if (properties.getProperty(SPECULATION_ENABLED.key) == "true") { + // Speculation is not supported with concurrent stages. + throw new SparkException( + "Speculative execution is not supported with concurrent stages " + + s"(streaming query: $queryBatchId). Please disable ${SPECULATION_ENABLED.key} config." + ) + } + + logInfo(log"Concurrent stages is enabled for [query ${MDC(LogKeys.STREAMING_QUERY_ID, + queryBatchId.get.queryId)} batch ${MDC(LogKeys.BATCH_ID, queryBatchId.get.batchId)}]") + + // Mark current stage and all its ancestors as concurrent + var totalCoresNeeded = 0 + def visit(stage: Stage): Unit = { + if (!concurrentStages.contains(stage)) { + logInfo(log"Marking stage '${MDC(LogKeys.STAGE, stage)}' concurrent for [query ${MDC( + LogKeys.STREAMING_QUERY_ID, queryBatchId.get.queryId)} batch ${MDC( + LogKeys.BATCH_ID, queryBatchId.get.batchId)}]") + concurrentStages += stage + totalCoresNeeded += totalNumCoreForStage(stage) + stage.parents.foreach(visit) + } + } + visit(finalStage) + + if (!sc.conf.get(STREAMING_REALTIME_MODE_SLOTS_CHECK_DISABLED)) { + try { + val totalSlots = sc.schedulerBackend.defaultParallelism() + val coresInUse = runningStages.toArray.map(totalNumCoreForStage(_)).sum + if (totalSlots - coresInUse < totalCoresNeeded) { + throw new SparkRuntimeException( Review Comment: fixed by accumulating into a local visitedStages set during the DAG walk and only committing to concurrentStages after the slot check passes -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
