SCHJonathan commented on code in PR #51050: URL: https://github.com/apache/spark/pull/51050#discussion_r2116267127
########## sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/TriggeredGraphExecution.scala: ########## @@ -0,0 +1,469 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.pipelines.graph + +import java.util.concurrent.{ConcurrentHashMap, Semaphore} + +import scala.collection.mutable +import scala.concurrent.duration._ +import scala.jdk.CollectionConverters._ +import scala.util.Try +import scala.util.control.NonFatal + +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.pipelines.graph.TriggeredGraphExecution._ +import org.apache.spark.sql.pipelines.util.ExponentialBackoffStrategy +import org.apache.spark.sql.streaming.Trigger +import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils} + +sealed trait StreamState + +object StreamState { + case object QUEUED extends StreamState + case object RUNNING extends StreamState + case object EXCLUDED extends StreamState + case object IDLE extends StreamState + case object SKIPPED extends StreamState + case object TERMINATED_WITH_ERROR extends StreamState + case object CANCELED extends StreamState + case object SUCCESSFUL extends StreamState +} + +class TriggeredGraphExecution( + graphForExecution: DataflowGraph, + env: PipelineUpdateContext, + onCompletion: UpdateTerminationReason => Unit = _ => (), + clock: Clock = new SystemClock() +) extends GraphExecution(graphForExecution, env) { + + /** + * [Visible for testing] A map to store stream state of all flows which should be materialized. + * This includes flows whose streams have not yet been started, ie they are queued or have been + * marked as skipped. + */ + private[pipelines] val pipelineState = { + new ConcurrentHashMap[TableIdentifier, StreamState]().asScala + } + + /** + * Keeps track of flow failure information required for retry logic. + * This only contains values for flows that either failed previously or are currently in the + * failed state. + */ + private val failureTracker = { + new ConcurrentHashMap[TableIdentifier, TriggeredFailureInfo]().asScala + } + + /** Back-off strategy used to determine duration between retries. */ + private val backoffStrategy = ExponentialBackoffStrategy( + maxTime = (pipelineConf.watchdogMaxRetryTimeInSeconds * 1000).millis, + stepSize = (pipelineConf.watchdogMinRetryTimeInSeconds * 1000).millis + ) + + override def streamTrigger(flow: Flow): Trigger = { + Trigger.AvailableNow() + } + + /** The control thread responsible for topologically executing flows. */ + private var topologicalExecutionThread: Option[Thread] = None + + private def buildTopologicalExecutionThread(): Thread = { + new Thread("Topological Execution") { + override def run(): Unit = { + try { + topologicalExecution() + } finally { + TriggeredGraphExecution.super.stop() + } + } + } + } + + override def start(): Unit = { + super.start() + // If tablesToUpdate is empty, queue all flows; Otherwise, queue flows for which the + // destination tables are specified in tablesToUpdate. + env.refreshFlows + .filter(graphForExecution.materializedFlows) + .foreach { f => + env.flowProgressEventLogger.recordQueued(f) + pipelineState.put(f.identifier, StreamState.QUEUED) + } + env.refreshFlows + .filterNot(graphForExecution.materializedFlows) + .foreach { f => + env.flowProgressEventLogger.recordExcluded(f) + pipelineState.put(f.identifier, StreamState.EXCLUDED) + } + val thread = buildTopologicalExecutionThread() + UncaughtExceptionHandler.addHandler( + thread, { + case _: InterruptedException => // stop from UI leads to InterruptedException. Do nothing. + case _ => + try { + stopInternal(stopTopologicalExecutionThread = false) + } catch { + case ex: Throwable => + logError(s"Exception thrown while stopping the update...", ex) + } finally { + onCompletion(UnexpectedUpdateFailure()) + } + } + ) + thread.start() + topologicalExecutionThread = Option(thread) + } + + /** Used to control how many flows are executing at once. */ + private val concurrencyLimit: Semaphore = new Semaphore(pipelineConf.maxConcurrentFlows) + + /** + * Runs the pipeline in a topological order. + * + * Non-accepting states: Queued, Running + * Accepting states: Successful, TerminatedWithError, Skipped, Cancelled, Excluded + * All [[Flow]]s which can write to a stream begin in a queued state. The following state + * transitions describe the topological execution of a [[DataflowGraph]]. + * + * Queued -> Running if Flow has no parents or the parent tables of the queued [[Flow]] + * have run successfully. + * Running -> Successful if the stream associated with the [[Flow]] succeeds. + * Running -> TerminatedWithError if the stream associated with the [[Flow]] stops with an + * exception. + * + * Non-fatally failed flows are retried with exponential back-off a bounded no. of times. + * If a flow cannot be retried, all downstream flows of the failed flow are moved to Skipped + * state. + * Running -> Cancelled if the stream associated with the [[Flow]] is stopped mid-run by + * calling `stop`. All remaining [[Flow]]s in queue are moved to state Skipped. + * + * The execution is over once there are no [[Flow]]s left running or in the queue. + */ + private def topologicalExecution(): Unit = { + // Done executing once no flows remain running or in queue + def allFlowsDone = { + flowsWithState(StreamState.QUEUED).isEmpty && flowsWithState(StreamState.RUNNING).isEmpty && + flowsQueuedForRetry().isEmpty + } + + // LinkedHashSet returns elements in the order inserted. This ensures that flows queued but + // unable to run because we are at max concurrent execution will get priority on the next round. + val runnableFlows: mutable.LinkedHashSet[TableIdentifier] = new mutable.LinkedHashSet() + + while (!Thread.interrupted() && !allFlowsDone) { + // Since queries are managed by PhysicalFlows, so update state based on [[PhysicalFlow]]s. + flowsWithState(StreamState.RUNNING).foreach { flowIdentifier => + physicalFlows(flowIdentifier) match { + case f if !f.isCompleted => // Nothing to be done; let this stream continue. + case f if f.isCompleted && f.exception.isEmpty => + recordSuccess(flowIdentifier) + case f => + recordFailed(flowIdentifier = flowIdentifier, e = f.exception.get) + } + } + + // Log info on if we're leaking Semaphore permits. Synchronize here so we don't double-count + // or mis-count because a batch flow is finishing asynchronously. + val (runningFlows, availablePermits) = concurrencyLimit.synchronized { + (flowsWithState(StreamState.RUNNING).size, concurrencyLimit.availablePermits) + } + if ((runningFlows + availablePermits) < pipelineConf.maxConcurrentFlows) { + val errorStr = + s"The max concurrency is ${pipelineConf.maxConcurrentFlows}, but there are only " + + s"$availablePermits permits available with $runningFlows flows running. If this " + + s"happens consistently, it's possible we're leaking permits." + logError(errorStr) + if (Utils.isTesting) { + throw new IllegalStateException(errorStr) + } + } + + // All flows which can potentially be run now if their parent tables have successfully + // completed or have been excluded. + val queuedForRetry = + flowsQueuedForRetry().filter(nextRetryTime(_) <= clock.getTimeMillis()) + // Take flows that have terminated but have retry attempts left and flows that are queued, and + // filter the ones whose parents have all successfully completed, excluded, or idled because + // they are ONCE flows which already ran. + runnableFlows ++= (queuedForRetry ++ flowsWithState(StreamState.QUEUED)).filter { id => + graphForExecution + .upstreamFlows(id) + .intersect(graphForExecution.materializedFlowIdentifiers) + .forall { id => + pipelineState(id) == StreamState.SUCCESSFUL || + pipelineState(id) == StreamState.EXCLUDED || + pipelineState(id) == StreamState.IDLE + } + } + + // collect flow that are ready to start + val flowsToStart = mutable.ArrayBuffer[ResolvedFlow]() + while (runnableFlows.nonEmpty && concurrencyLimit.tryAcquire()) { + val flowIdentifier = runnableFlows.head + runnableFlows.remove(flowIdentifier) + flowsToStart.append(graphForExecution.resolvedFlow(flowIdentifier)) + } + + val (batchFlowsToStart, otherFlowsToStart) = flowsToStart.partition { f => + graphForExecution.resolvedFlow(f.identifier).isInstanceOf[CompleteFlow] + } + + def startFlowWithPlanningMode(flow: ResolvedFlow, mode: String): Unit = { Review Comment: yeah it looks like there's a lot of outdated comments around this, we always start flows serially in triggered graph execution, there's also some outdated logic where we mentioned we start MV flows in parallel but we actually didn't. Also refactor and cleanup the misleading code around it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org