sryza commented on code in PR #51050: URL: https://github.com/apache/spark/pull/51050#discussion_r2114655891
########## sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/FlowExecution.scala: ########## @@ -0,0 +1,246 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.pipelines.graph + +import java.util.concurrent.ThreadPoolExecutor +import java.util.concurrent.atomic.AtomicBoolean + +import scala.concurrent.{ExecutionContext, Future} +import scala.util.{Failure, Success} +import scala.util.control.NonFatal + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.classic.SparkSession +import org.apache.spark.sql.pipelines.graph.QueryOrigin.ExceptionHelpers +import org.apache.spark.sql.pipelines.util.SparkSessionUtils +import org.apache.spark.sql.streaming.{OutputMode, StreamingQuery, Trigger} +import org.apache.spark.util.ThreadUtils + +/** + * A flow's execution may complete for two reasons: + * 1. it may finish performing all of its necessary work, or + * 2. it may be interrupted by a request from a user to stop it. + * + * We use this result to disambiguate these two cases, using [[ExecutionResult.FINISHED]] + * for the former and [[ExecutionResult.STOPPED]] for the latter. + */ +sealed trait ExecutionResult +object ExecutionResult { + case object FINISHED extends ExecutionResult + case object STOPPED extends ExecutionResult +} + +/** A [[FlowExecution]] specifies how to execute a flow and manages its execution. */ +trait FlowExecution { + + /** Identifier of this physical flow */ + def identifier: TableIdentifier + + /** + * Returns a user-visible name for the flow. + */ + final def displayName: String = identifier.unquotedString + + /** + * SparkSession to execute this physical flow with. + * + * The default value for streaming flows is the pipeline's spark session because the source + * dataframe is resolved using the pipeline's spark session, and a new session will be started + * implicitly by the streaming query. + * + * The default value for batch flows is a cloned spark session from the pipeline's spark session. + * + * Please make sure that the execution thread runs in a different spark session than the + * pipeline's spark session. + */ + protected def spark: SparkSession = updateContext.spark + + /** + * Origin to use when recording events for this flow. + */ + def getOrigin: QueryOrigin + + /** Returns true iff this PhysicalFlow has been completed with either success or an exception. */ + def isCompleted: Boolean = _future.exists(_.isCompleted) + + /** Returns true iff this PhysicalFlow executes using Spark Structured Streaming. */ + def isStreaming: Boolean + + /** Retrieves the future that can be used to track execution status. */ + def getFuture: Future[ExecutionResult] = { + _future.getOrElse( + throw new IllegalStateException(s"PhysicalFlow $identifier has not been executed.") + ) + } + + /** Tracks the currently running future. */ + private final var _future: Option[Future[ExecutionResult]] = None + + /** Context about this pipeline update. */ + def updateContext: PipelineUpdateContext + + implicit val executionContext: ExecutionContext = + ExecutionContext.fromExecutor(FlowExecution.threadPool) + + /** + * Stops execution of this [[FlowExecution]]. If you override this, please be sure to + * call `super.stop()` at the beginning of your method, so we can properly handle errors + * when a user tries to stop a flow. + */ + def stop(): Unit = { + stopped.set(true) + } + + def exception: Option[Throwable] = _future.flatMap(_.value).flatMap(_.failed.toOption) + + def executeInternal(): Future[Unit] + + /** + * Executes this PhysicalFlow asynchronously to perform its intended update. A future that can be + * used to track execution status is saved, and can be retrieved with `getFuture`. + */ + final def executeAsync(): Unit = { + if (_future.isDefined) { + throw new IllegalStateException( + s"PhysicalFlow ${identifier.unquotedString} has already been executed." + ) + } + + val queryOrigin = QueryOrigin(filePath = getOrigin.filePath) + + _future = try { + Option( + executeInternal() + .transform { + case Success(_) => Success(ExecutionResult.FINISHED) + // Add origin to exceptions raised while executing a flow i.e. inside the `Future` + // created by the `executeInternal` method. + case Failure(e) => + Failure(e) + } + .map(_ => ExecutionResult.FINISHED) + .recover { + case _: Throwable if stopped.get() => + ExecutionResult.STOPPED + } + ) + } catch { + case NonFatal(e) => + // Add query origin to exceptions raised while starting a flow + throw e.addOrigin(queryOrigin) + } + } + + /** The destination that this [[FlowExecution]] is writing to. */ + def destination: Output + + /** Has this [[FlowExecution]] been stopped? Set by [[FlowExecution.stop()]]. */ + private val stopped: AtomicBoolean = new AtomicBoolean(false) +} + +object FlowExecution { + private val threadPool: ThreadPoolExecutor = + ThreadUtils.newDaemonCachedThreadPool("FlowExecution") +} + +/** A [[FlowExecution]] that processes data statefully using Structured Streaming. */ +trait StreamingFlowExecution extends FlowExecution with Logging { + + /** The [[ResolvedFlow]] that this [[StreamingFlowExecution]] is executing. */ + def flow: ResolvedFlow + + /** Structured Streaming checkpoint. */ + def checkpointPath: String + + /** Structured Streaming trigger. */ + def trigger: Trigger + + def isStreaming: Boolean = true + + /** Spark confs that must be set when starting this flow. */ + protected def sqlConf: Map[String, String] + + /** Starts a stream and returns its streaming query. */ + protected def startStream(): StreamingQuery + + /** + * Executes this StreamingPhysicalFlow by starting its stream with the correct scheduling pool + * and confs. + */ + override final def executeInternal(): Future[Unit] = { + logInfo(s"Starting ${identifier.unquotedString} with checkpoint location $checkpointPath") + val streamingQuery = SparkSessionUtils.withSqlConf(spark, sqlConf.toList: _*)(startStream()) + Future(streamingQuery.awaitTermination()) + } +} + +class StreamingTableWrite( Review Comment: We should add some docstring here, e.g. "A flow execution that writes a streaming `DataFrame` to a table" ########## sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/TriggeredGraphExecution.scala: ########## @@ -0,0 +1,469 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.pipelines.graph + +import java.util.concurrent.{ConcurrentHashMap, Semaphore} + +import scala.collection.mutable +import scala.concurrent.duration._ +import scala.jdk.CollectionConverters._ +import scala.util.Try +import scala.util.control.NonFatal + +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.pipelines.graph.TriggeredGraphExecution._ +import org.apache.spark.sql.pipelines.util.ExponentialBackoffStrategy +import org.apache.spark.sql.streaming.Trigger +import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils} + +sealed trait StreamState + +object StreamState { + case object QUEUED extends StreamState + case object RUNNING extends StreamState + case object EXCLUDED extends StreamState + case object IDLE extends StreamState + case object SKIPPED extends StreamState + case object TERMINATED_WITH_ERROR extends StreamState + case object CANCELED extends StreamState + case object SUCCESSFUL extends StreamState +} + +class TriggeredGraphExecution( + graphForExecution: DataflowGraph, + env: PipelineUpdateContext, + onCompletion: UpdateTerminationReason => Unit = _ => (), + clock: Clock = new SystemClock() +) extends GraphExecution(graphForExecution, env) { + + /** + * [Visible for testing] A map to store stream state of all flows which should be materialized. + * This includes flows whose streams have not yet been started, ie they are queued or have been + * marked as skipped. + */ + private[pipelines] val pipelineState = { + new ConcurrentHashMap[TableIdentifier, StreamState]().asScala + } + + /** + * Keeps track of flow failure information required for retry logic. + * This only contains values for flows that either failed previously or are currently in the + * failed state. + */ + private val failureTracker = { + new ConcurrentHashMap[TableIdentifier, TriggeredFailureInfo]().asScala + } + + /** Back-off strategy used to determine duration between retries. */ + private val backoffStrategy = ExponentialBackoffStrategy( + maxTime = (pipelineConf.watchdogMaxRetryTimeInSeconds * 1000).millis, + stepSize = (pipelineConf.watchdogMinRetryTimeInSeconds * 1000).millis + ) + + override def streamTrigger(flow: Flow): Trigger = { + Trigger.AvailableNow() + } + + /** The control thread responsible for topologically executing flows. */ + private var topologicalExecutionThread: Option[Thread] = None + + private def buildTopologicalExecutionThread(): Thread = { + new Thread("Topological Execution") { + override def run(): Unit = { + try { + topologicalExecution() + } finally { + TriggeredGraphExecution.super.stop() + } + } + } + } + + override def start(): Unit = { + super.start() + // If tablesToUpdate is empty, queue all flows; Otherwise, queue flows for which the + // destination tables are specified in tablesToUpdate. + env.refreshFlows + .filter(graphForExecution.materializedFlows) + .foreach { f => + env.flowProgressEventLogger.recordQueued(f) + pipelineState.put(f.identifier, StreamState.QUEUED) + } + env.refreshFlows + .filterNot(graphForExecution.materializedFlows) + .foreach { f => + env.flowProgressEventLogger.recordExcluded(f) + pipelineState.put(f.identifier, StreamState.EXCLUDED) + } + val thread = buildTopologicalExecutionThread() + UncaughtExceptionHandler.addHandler( + thread, { + case _: InterruptedException => // stop from UI leads to InterruptedException. Do nothing. + case _ => + try { + stopInternal(stopTopologicalExecutionThread = false) + } catch { + case ex: Throwable => + logError(s"Exception thrown while stopping the update...", ex) + } finally { + onCompletion(UnexpectedUpdateFailure()) + } + } + ) + thread.start() + topologicalExecutionThread = Option(thread) + } + + /** Used to control how many flows are executing at once. */ + private val concurrencyLimit: Semaphore = new Semaphore(pipelineConf.maxConcurrentFlows) + + /** + * Runs the pipeline in a topological order. + * + * Non-accepting states: Queued, Running + * Accepting states: Successful, TerminatedWithError, Skipped, Cancelled, Excluded + * All [[Flow]]s which can write to a stream begin in a queued state. The following state + * transitions describe the topological execution of a [[DataflowGraph]]. + * + * Queued -> Running if Flow has no parents or the parent tables of the queued [[Flow]] + * have run successfully. + * Running -> Successful if the stream associated with the [[Flow]] succeeds. + * Running -> TerminatedWithError if the stream associated with the [[Flow]] stops with an + * exception. + * + * Non-fatally failed flows are retried with exponential back-off a bounded no. of times. + * If a flow cannot be retried, all downstream flows of the failed flow are moved to Skipped + * state. + * Running -> Cancelled if the stream associated with the [[Flow]] is stopped mid-run by + * calling `stop`. All remaining [[Flow]]s in queue are moved to state Skipped. + * + * The execution is over once there are no [[Flow]]s left running or in the queue. + */ + private def topologicalExecution(): Unit = { + // Done executing once no flows remain running or in queue + def allFlowsDone = { + flowsWithState(StreamState.QUEUED).isEmpty && flowsWithState(StreamState.RUNNING).isEmpty && + flowsQueuedForRetry().isEmpty + } + + // LinkedHashSet returns elements in the order inserted. This ensures that flows queued but + // unable to run because we are at max concurrent execution will get priority on the next round. + val runnableFlows: mutable.LinkedHashSet[TableIdentifier] = new mutable.LinkedHashSet() + + while (!Thread.interrupted() && !allFlowsDone) { + // Since queries are managed by PhysicalFlows, so update state based on [[PhysicalFlow]]s. + flowsWithState(StreamState.RUNNING).foreach { flowIdentifier => + physicalFlows(flowIdentifier) match { + case f if !f.isCompleted => // Nothing to be done; let this stream continue. + case f if f.isCompleted && f.exception.isEmpty => + recordSuccess(flowIdentifier) + case f => + recordFailed(flowIdentifier = flowIdentifier, e = f.exception.get) + } + } + + // Log info on if we're leaking Semaphore permits. Synchronize here so we don't double-count + // or mis-count because a batch flow is finishing asynchronously. + val (runningFlows, availablePermits) = concurrencyLimit.synchronized { + (flowsWithState(StreamState.RUNNING).size, concurrencyLimit.availablePermits) + } + if ((runningFlows + availablePermits) < pipelineConf.maxConcurrentFlows) { + val errorStr = + s"The max concurrency is ${pipelineConf.maxConcurrentFlows}, but there are only " + + s"$availablePermits permits available with $runningFlows flows running. If this " + + s"happens consistently, it's possible we're leaking permits." + logError(errorStr) + if (Utils.isTesting) { + throw new IllegalStateException(errorStr) + } + } + + // All flows which can potentially be run now if their parent tables have successfully + // completed or have been excluded. + val queuedForRetry = + flowsQueuedForRetry().filter(nextRetryTime(_) <= clock.getTimeMillis()) + // Take flows that have terminated but have retry attempts left and flows that are queued, and + // filter the ones whose parents have all successfully completed, excluded, or idled because + // they are ONCE flows which already ran. + runnableFlows ++= (queuedForRetry ++ flowsWithState(StreamState.QUEUED)).filter { id => + graphForExecution + .upstreamFlows(id) + .intersect(graphForExecution.materializedFlowIdentifiers) + .forall { id => + pipelineState(id) == StreamState.SUCCESSFUL || + pipelineState(id) == StreamState.EXCLUDED || + pipelineState(id) == StreamState.IDLE + } + } + + // collect flow that are ready to start + val flowsToStart = mutable.ArrayBuffer[ResolvedFlow]() + while (runnableFlows.nonEmpty && concurrencyLimit.tryAcquire()) { + val flowIdentifier = runnableFlows.head + runnableFlows.remove(flowIdentifier) + flowsToStart.append(graphForExecution.resolvedFlow(flowIdentifier)) + } + + val (batchFlowsToStart, otherFlowsToStart) = flowsToStart.partition { f => + graphForExecution.resolvedFlow(f.identifier).isInstanceOf[CompleteFlow] + } + + def startFlowWithPlanningMode(flow: ResolvedFlow, mode: String): Unit = { + val flowIdentifier = flow.identifier + logInfo(s"Starting flow ${flow.identifier} in $mode mode") + env.flowProgressEventLogger.recordPlanningForBatchFlow(flow) + try { + val flowStarted = startFlow(flow) + if (flowStarted.nonEmpty) { + pipelineState.put(flowIdentifier, StreamState.RUNNING) + logInfo(s"Flow $flowIdentifier started.") + } else { + if (flow.once) { + // ONCE flows are marked as IDLE in the event buffer for consistency with continuous + // execution where all unstarted flows are IDLE. + env.flowProgressEventLogger.recordIdle(flow) + pipelineState.put(flowIdentifier, StreamState.IDLE) + concurrencyLimit.release() + } else { + env.flowProgressEventLogger.recordSkipped(flow) + concurrencyLimit.release() + pipelineState.put(flowIdentifier, StreamState.SKIPPED) + } + } + } catch { + case NonFatal(ex) => recordFailed(flowIdentifier, ex) + } + } + + // start non-batch flows serially because the configs will be attached to the pipeline's spark + // session (source dataframe's spark session) + otherFlowsToStart.foreach(startFlowWithPlanningMode(_, SERIAL_PLANNING)) + + // only start MV flows in parallel if enabled + startPlanning(batchFlowsToStart.toSeq) { (flow, mode) => + startFlowWithPlanningMode(flow, mode) + } + + try { + Thread.sleep(pipelineConf.streamStatePollingInterval * 1000) Review Comment: What's the reason that we sleep here? ########## sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/GraphExecution.scala: ########## @@ -0,0 +1,349 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.pipelines.graph +import java.util.concurrent.{ConcurrentHashMap, TimeoutException} + +import scala.annotation.unused +import scala.concurrent.ExecutionContext +import scala.jdk.CollectionConverters._ +import scala.util.{Failure, Success} + +import org.apache.spark.SparkException +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.pipelines.logging.{BatchListener, StreamListener} +import org.apache.spark.sql.streaming.Trigger + +abstract class GraphExecution( + val graphForExecution: DataflowGraph, + env: PipelineUpdateContext +) extends Logging { + + /** The [[Trigger]] configuration for a streaming flow. */ + def streamTrigger(flow: Flow): Trigger + + protected val pipelineConf: PipelineConf = env.pipelineConf + + /** Maps flowName to count of consecutive failures. Used to manage flow retries */ + private val flowToNumConsecutiveFailure = new ConcurrentHashMap[TableIdentifier, Int].asScala + + /** Maps flow identifier to count of successful runs. Used to populate batch id. */ + private val flowToNumSuccess = new ConcurrentHashMap[TableIdentifier, Long].asScala + + /** + * [[FlowExecution]]s currently being executed and tracked by this FlowExecution. + * + * `BatchListener` needs to use this map to find the `PhysicalFlow` given a flow name in order to + * update the observed metrics for the corresponding `PhysicalFlow`. However, as a listener must + * not keep a strong reference to any `SparkSession` instance to avoid memory leaks, we use a + * `WeakReference` to hold the map in `BatchListener`. This requires that the map must be alive + * when an update is not finished yet. The requirement is satisfied because `FlowExecution` has + * a strong reference to the map. + */ + val physicalFlows = new collection.concurrent.TrieMap[TableIdentifier, FlowExecution] + + /** Increments flow execution retry count for `flow`. */ + private def incrementFlowToNumConsecutiveFailure(flowIdentifier: TableIdentifier): Unit = { + flowToNumConsecutiveFailure.put(flowIdentifier, flowToNumConsecutiveFailure(flowIdentifier) + 1) + } + + /** Protected so tests can override. */ + private val flowPlanner = new FlowPlanner( + graph = graphForExecution, + updateContext = env, + triggerFor = streamTrigger + ) + + val SERIAL_PLANNING = "SERIAL" + + // Listeners to process events and metrics. + private val batchListener = new BatchListener() + private val streamListener = new StreamListener(env, graphForExecution) + + /** + * Run the given planning function `f` for each flow in `flows`. + */ + protected def startPlanning(flows: Seq[ResolvedFlow])( + f: (ResolvedFlow, String) => Unit + ): Unit = { + flows.foreach(f(_, SERIAL_PLANNING)) + } + + /** + * Plans the logical [[ResolvedFlow]] into a [[FlowExecution]] and then starts executing it. + * Implementation note: Thread safe + * + * @return None if the flow planner decided that there is no actual update required here. + * Otherwise returns the corresponding physical flow. + */ + def startFlow(flow: ResolvedFlow): Option[FlowExecution] = { + try { + val physicalFlow = flowPlanner.plan( + flow = graphForExecution.resolvedFlow(flow.identifier) + ) + + env.flowProgressEventLogger.recordStart(physicalFlow) + + physicalFlow.executeAsync() + physicalFlows.put(flow.identifier, physicalFlow) + implicit val ec: ExecutionContext = physicalFlow.executionContext + + // Note: The asynchronous handling here means that completed events might be recorded after + // initializing events for the next retry of this flow. + physicalFlow.getFuture.onComplete { + case Failure(ex) if !physicalFlow.isStreaming => + incrementFlowToNumConsecutiveFailure(flow.identifier) + env.flowProgressEventLogger.recordFailed( + flow = flow, + exception = ex, + // Log as warn if flow has retries left + logAsWarn = { + flowToNumConsecutiveFailure(flow.identifier) < + 1 + maxRetryAttemptsForFlow(flow.identifier) + } + ) + case Success(ExecutionResult.STOPPED) => + // We already recorded a STOPPED event in [[FlowExecution.stopFlow()]]. + // We don't need to log another one here. + case Success(ExecutionResult.FINISHED) if !physicalFlow.isStreaming => + // Reset consecutive failure count on success + flowToNumConsecutiveFailure.put(flow.identifier, 0) + flowToNumSuccess.put( + flow.identifier, + flowToNumSuccess.getOrElse(flow.identifier, 0L) + 1L + ) + env.flowProgressEventLogger.recordCompletion(flow) + case _ => // Handled by StreamListener + } + Option(physicalFlow) + } catch { + // This is if the flow fails to even start. + case ex: Throwable => + logError(s"Unhandled exception while starting flow:${flow.displayName}", ex) + // InterruptedException is thrown when the thread executing `startFlow` is interrupted. + if (ex.isInstanceOf[InterruptedException]) { + env.flowProgressEventLogger.recordStop(flow) + } else { + env.flowProgressEventLogger.recordFailed( + flow = flow, + exception = ex, + logAsWarn = false + ) + } + throw ex + } + } + + /** + * Starts the execution of flows in [[graphForExecution]]. Does not block. + */ + def start(): Unit = { + env.spark.listenerManager.clear() + env.spark.listenerManager.register(batchListener) + env.spark.streams.addListener(streamListener) + } + + /** + * Stops this execution by stopping all streams and terminating any other resources. + * + * This method may be called multiple times due to race conditions and must be idempotent. + */ + def stop(): Unit = { + // Note: unregistering `batchListener` from `env.spark` is not sufficient to clean up. Each + // cloned `SparkSession` from `env.spark` will also register `batchListener` to its + // `listenerManager` which is kept in a global queue. It's hard to track all cloned + // `SparkSession`s, so we use a weak reference in `batchListener` to avoid keeping a strong + // reference to any `SparkSession`. Then, a `SparkSession` can be GCed when it's not used + // anywhere, and Spark will automatically clean up its `listenerManager` which holds a reference + // to `batchListener`. + env.spark.listenerManager.unregister(batchListener) + // Unlike `batchListener`, removing `streamListener` from `env.spark` is sufficient as cloned + // `SparkSession`s don't copy registered `streamListener`s from the parent `SparkSession`. + env.spark.streams.removeListener(streamListener) + } + + /** Stops execution of a [[FlowExecution]]. */ + def stopFlow(pf: FlowExecution): Unit = { + if (!pf.isCompleted) { + val flow = graphForExecution.resolvedFlow(pf.identifier) + try { + logInfo(s"Stopping ${pf.identifier}") + pf.stop() + } catch { + case e: Throwable => + val message = s"Error stopping flow ${pf.identifier}" + logError(message, e) + env.flowProgressEventLogger.recordFailed( + flow = flow, + exception = e, + logAsWarn = false, + messageOpt = Option(s"Flow '${pf.displayName}' has failed to stop.") + ) + throw e + } + env.flowProgressEventLogger.recordStop(flow) + logInfo(s"Stopped ${pf.identifier}") + } else { + logWarning( + s"Flow ${pf.identifier} was not stopped because it was already completed. " + + s"Exception: ${pf.exception}" + ) + } + } + + /** + * Blocks the current thread while any flows are queued or running. Returns when all flows that + * could be run have completed. When this returns, all flows are either SUCCESSFUL, + * TERMINATED_WITH_ERROR, SKIPPED, CANCELED, or EXCLUDED. + */ + def awaitCompletion(): Unit + + /** + * Returns the reason why this flow execution has terminated. + * If the function is called before the flow has not terminated yet, the behavior is undefined, + * and may return [[UnexpectedUpdateFailure]]. + */ + def getUpdateTerminationReason: UpdateTerminationReason + + def maxRetryAttemptsForFlow(flowName: TableIdentifier): Int = { + val flow = graphForExecution.flow(flowName) + flow.sqlConf + .get(SQLConf.PIPELINES_MAX_FLOW_RETRY_ATTEMPTS.key) + .map(_.toInt) // Flow-level conf + // Pipeline-level conf, else default flow retry limit + .getOrElse(pipelineConf.maxFlowRetryAttempts) + } + + /** + * Stop a thread timeout. + */ + def stopThread(thread: Thread): Unit = { + // Don't wait to join if current thread is the thread to stop + if (thread.getId != Thread.currentThread().getId) { + thread.join(env.pipelineConf.timeoutMsForTerminationJoinAndLock) + // thread is alive after we join. + if (thread.isAlive) { + throw new TimeoutException("Failed to stop the update due to a hanging control thread.") + } + } + } +} + +object GraphExecution extends Logging { + + // Set of states after checking the exception for flow execution retryability analysis. + sealed trait FlowExecutionAction + case object RetryFlowExecution extends FlowExecutionAction + case class StopFlowExecution(reason: FlowExecutionStopReason) extends FlowExecutionAction + + sealed trait FlowExecutionStopReason { + def cause: Throwable + def flowDisplayName: String + def updateTerminationReason: UpdateTerminationReason + def failureMessage: String + // If true, we record this flow execution as STOPPED with a WARNING instead a FAILED with ERROR. + def warnInsteadOfError: Boolean = false + } + + @unused + case class ReanalyzeFlowSchema(originalCause: Throwable, flowDisplayName: String) + extends FlowExecutionStopReason { + override lazy val updateTerminationReason: UpdateTerminationReason = { + UpdateSchemaChange(flowDisplayName, Option(originalCause)) + } + // Schema change can be automatically retried to handle + override val warnInsteadOfError: Boolean = true + override lazy val failureMessage: String = { + s"Flow '$flowDisplayName' has encountered a schema change during execution and " + + s"terminated. A new update using the new schema will be automatically started." + } + // Override the cause to make it more friendly for tracking purpose + override lazy val cause: Throwable = { + new SparkException( + errorClass = "FLOW_SCHEMA_CHANGED", + messageParameters = Map("flowName" -> flowDisplayName), + cause = originalCause + ) + } + } + + private case class MaxRetryExceeded( + cause: Throwable, + flowDisplayName: String, + maxAllowedRetries: Int + ) extends FlowExecutionStopReason { + override lazy val updateTerminationReason: UpdateTerminationReason = { + QueryExecutionFailure(flowDisplayName, maxAllowedRetries, Option(cause)) + } + override lazy val failureMessage: String = { + s"Flow '$flowDisplayName' has FAILED more than $maxAllowedRetries times and will not be " + + s"restarted." + } + } + + @unused Review Comment: Since this is marked unused, should we take it out? ########## sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/FlowExecution.scala: ########## @@ -0,0 +1,246 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.pipelines.graph + +import java.util.concurrent.ThreadPoolExecutor +import java.util.concurrent.atomic.AtomicBoolean + +import scala.concurrent.{ExecutionContext, Future} +import scala.util.{Failure, Success} +import scala.util.control.NonFatal + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.classic.SparkSession +import org.apache.spark.sql.pipelines.graph.QueryOrigin.ExceptionHelpers +import org.apache.spark.sql.pipelines.util.SparkSessionUtils +import org.apache.spark.sql.streaming.{OutputMode, StreamingQuery, Trigger} +import org.apache.spark.util.ThreadUtils + +/** + * A flow's execution may complete for two reasons: + * 1. it may finish performing all of its necessary work, or + * 2. it may be interrupted by a request from a user to stop it. + * + * We use this result to disambiguate these two cases, using [[ExecutionResult.FINISHED]] + * for the former and [[ExecutionResult.STOPPED]] for the latter. + */ +sealed trait ExecutionResult +object ExecutionResult { + case object FINISHED extends ExecutionResult + case object STOPPED extends ExecutionResult +} + +/** A [[FlowExecution]] specifies how to execute a flow and manages its execution. */ +trait FlowExecution { + + /** Identifier of this physical flow */ + def identifier: TableIdentifier + + /** + * Returns a user-visible name for the flow. + */ + final def displayName: String = identifier.unquotedString + + /** + * SparkSession to execute this physical flow with. + * + * The default value for streaming flows is the pipeline's spark session because the source + * dataframe is resolved using the pipeline's spark session, and a new session will be started + * implicitly by the streaming query. + * + * The default value for batch flows is a cloned spark session from the pipeline's spark session. + * + * Please make sure that the execution thread runs in a different spark session than the + * pipeline's spark session. + */ + protected def spark: SparkSession = updateContext.spark + + /** + * Origin to use when recording events for this flow. + */ + def getOrigin: QueryOrigin + + /** Returns true iff this PhysicalFlow has been completed with either success or an exception. */ + def isCompleted: Boolean = _future.exists(_.isCompleted) + + /** Returns true iff this PhysicalFlow executes using Spark Structured Streaming. */ + def isStreaming: Boolean + + /** Retrieves the future that can be used to track execution status. */ + def getFuture: Future[ExecutionResult] = { + _future.getOrElse( + throw new IllegalStateException(s"PhysicalFlow $identifier has not been executed.") + ) + } + + /** Tracks the currently running future. */ + private final var _future: Option[Future[ExecutionResult]] = None + + /** Context about this pipeline update. */ + def updateContext: PipelineUpdateContext + + implicit val executionContext: ExecutionContext = + ExecutionContext.fromExecutor(FlowExecution.threadPool) + + /** + * Stops execution of this [[FlowExecution]]. If you override this, please be sure to + * call `super.stop()` at the beginning of your method, so we can properly handle errors + * when a user tries to stop a flow. + */ + def stop(): Unit = { + stopped.set(true) + } + + def exception: Option[Throwable] = _future.flatMap(_.value).flatMap(_.failed.toOption) + + def executeInternal(): Future[Unit] + + /** + * Executes this PhysicalFlow asynchronously to perform its intended update. A future that can be + * used to track execution status is saved, and can be retrieved with `getFuture`. + */ + final def executeAsync(): Unit = { + if (_future.isDefined) { + throw new IllegalStateException( + s"PhysicalFlow ${identifier.unquotedString} has already been executed." + ) + } + + val queryOrigin = QueryOrigin(filePath = getOrigin.filePath) + + _future = try { + Option( + executeInternal() + .transform { + case Success(_) => Success(ExecutionResult.FINISHED) + // Add origin to exceptions raised while executing a flow i.e. inside the `Future` Review Comment: It looks like we're not adding the origin here – should we be? Or should we take out this comment? ########## sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/FlowExecution.scala: ########## @@ -0,0 +1,246 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.pipelines.graph + +import java.util.concurrent.ThreadPoolExecutor +import java.util.concurrent.atomic.AtomicBoolean + +import scala.concurrent.{ExecutionContext, Future} +import scala.util.{Failure, Success} +import scala.util.control.NonFatal + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.classic.SparkSession +import org.apache.spark.sql.pipelines.graph.QueryOrigin.ExceptionHelpers +import org.apache.spark.sql.pipelines.util.SparkSessionUtils +import org.apache.spark.sql.streaming.{OutputMode, StreamingQuery, Trigger} +import org.apache.spark.util.ThreadUtils + +/** + * A flow's execution may complete for two reasons: + * 1. it may finish performing all of its necessary work, or + * 2. it may be interrupted by a request from a user to stop it. + * + * We use this result to disambiguate these two cases, using [[ExecutionResult.FINISHED]] + * for the former and [[ExecutionResult.STOPPED]] for the latter. + */ +sealed trait ExecutionResult +object ExecutionResult { + case object FINISHED extends ExecutionResult + case object STOPPED extends ExecutionResult +} + +/** A [[FlowExecution]] specifies how to execute a flow and manages its execution. */ +trait FlowExecution { + + /** Identifier of this physical flow */ + def identifier: TableIdentifier + + /** + * Returns a user-visible name for the flow. + */ + final def displayName: String = identifier.unquotedString + + /** + * SparkSession to execute this physical flow with. + * + * The default value for streaming flows is the pipeline's spark session because the source + * dataframe is resolved using the pipeline's spark session, and a new session will be started + * implicitly by the streaming query. + * + * The default value for batch flows is a cloned spark session from the pipeline's spark session. + * + * Please make sure that the execution thread runs in a different spark session than the + * pipeline's spark session. + */ + protected def spark: SparkSession = updateContext.spark + + /** + * Origin to use when recording events for this flow. + */ + def getOrigin: QueryOrigin + + /** Returns true iff this PhysicalFlow has been completed with either success or an exception. */ + def isCompleted: Boolean = _future.exists(_.isCompleted) + + /** Returns true iff this PhysicalFlow executes using Spark Structured Streaming. */ + def isStreaming: Boolean + + /** Retrieves the future that can be used to track execution status. */ + def getFuture: Future[ExecutionResult] = { + _future.getOrElse( + throw new IllegalStateException(s"PhysicalFlow $identifier has not been executed.") + ) + } + + /** Tracks the currently running future. */ + private final var _future: Option[Future[ExecutionResult]] = None + + /** Context about this pipeline update. */ + def updateContext: PipelineUpdateContext + + implicit val executionContext: ExecutionContext = + ExecutionContext.fromExecutor(FlowExecution.threadPool) + + /** + * Stops execution of this [[FlowExecution]]. If you override this, please be sure to + * call `super.stop()` at the beginning of your method, so we can properly handle errors + * when a user tries to stop a flow. + */ + def stop(): Unit = { + stopped.set(true) + } + + def exception: Option[Throwable] = _future.flatMap(_.value).flatMap(_.failed.toOption) + + def executeInternal(): Future[Unit] + + /** + * Executes this PhysicalFlow asynchronously to perform its intended update. A future that can be + * used to track execution status is saved, and can be retrieved with `getFuture`. + */ + final def executeAsync(): Unit = { + if (_future.isDefined) { + throw new IllegalStateException( + s"PhysicalFlow ${identifier.unquotedString} has already been executed." + ) + } + + val queryOrigin = QueryOrigin(filePath = getOrigin.filePath) + + _future = try { + Option( + executeInternal() + .transform { + case Success(_) => Success(ExecutionResult.FINISHED) + // Add origin to exceptions raised while executing a flow i.e. inside the `Future` + // created by the `executeInternal` method. + case Failure(e) => + Failure(e) + } + .map(_ => ExecutionResult.FINISHED) + .recover { + case _: Throwable if stopped.get() => + ExecutionResult.STOPPED + } + ) + } catch { + case NonFatal(e) => + // Add query origin to exceptions raised while starting a flow + throw e.addOrigin(queryOrigin) + } + } + + /** The destination that this [[FlowExecution]] is writing to. */ + def destination: Output + + /** Has this [[FlowExecution]] been stopped? Set by [[FlowExecution.stop()]]. */ + private val stopped: AtomicBoolean = new AtomicBoolean(false) +} + +object FlowExecution { + private val threadPool: ThreadPoolExecutor = + ThreadUtils.newDaemonCachedThreadPool("FlowExecution") +} + +/** A [[FlowExecution]] that processes data statefully using Structured Streaming. */ +trait StreamingFlowExecution extends FlowExecution with Logging { + + /** The [[ResolvedFlow]] that this [[StreamingFlowExecution]] is executing. */ + def flow: ResolvedFlow + + /** Structured Streaming checkpoint. */ + def checkpointPath: String + + /** Structured Streaming trigger. */ + def trigger: Trigger + + def isStreaming: Boolean = true + + /** Spark confs that must be set when starting this flow. */ + protected def sqlConf: Map[String, String] + + /** Starts a stream and returns its streaming query. */ + protected def startStream(): StreamingQuery + + /** + * Executes this StreamingPhysicalFlow by starting its stream with the correct scheduling pool + * and confs. + */ + override final def executeInternal(): Future[Unit] = { + logInfo(s"Starting ${identifier.unquotedString} with checkpoint location $checkpointPath") + val streamingQuery = SparkSessionUtils.withSqlConf(spark, sqlConf.toList: _*)(startStream()) + Future(streamingQuery.awaitTermination()) + } +} + +class StreamingTableWrite( + val identifier: TableIdentifier, + val flow: ResolvedFlow, + val graph: DataflowGraph, + val updateContext: PipelineUpdateContext, + val checkpointPath: String, + val trigger: Trigger, + val destination: Table, + val sqlConf: Map[String, String] +) extends StreamingFlowExecution { + + override def getOrigin: QueryOrigin = flow.origin + + def startStream(): StreamingQuery = { + val data = graph.reanalyzeFlow(flow).df + val dataStreamWriter = data.writeStream + .queryName(displayName) + .option("checkpointLocation", checkpointPath) + .trigger(trigger) + .outputMode(OutputMode.Append()) + if (destination.format.isDefined) { + dataStreamWriter.format(destination.format.get) + } + dataStreamWriter.toTable(destination.identifier.unquotedString) + } +} + +class BatchFlowExecution( Review Comment: We should add some docstring here, e.g. "A flow execution that writes a batch DataFrame to a table" ########## sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/TriggeredGraphExecution.scala: ########## @@ -0,0 +1,469 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.pipelines.graph + +import java.util.concurrent.{ConcurrentHashMap, Semaphore} + +import scala.collection.mutable +import scala.concurrent.duration._ +import scala.jdk.CollectionConverters._ +import scala.util.Try +import scala.util.control.NonFatal + +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.pipelines.graph.TriggeredGraphExecution._ +import org.apache.spark.sql.pipelines.util.ExponentialBackoffStrategy +import org.apache.spark.sql.streaming.Trigger +import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils} + +sealed trait StreamState + +object StreamState { + case object QUEUED extends StreamState + case object RUNNING extends StreamState + case object EXCLUDED extends StreamState + case object IDLE extends StreamState + case object SKIPPED extends StreamState + case object TERMINATED_WITH_ERROR extends StreamState + case object CANCELED extends StreamState + case object SUCCESSFUL extends StreamState +} + +class TriggeredGraphExecution( + graphForExecution: DataflowGraph, + env: PipelineUpdateContext, + onCompletion: UpdateTerminationReason => Unit = _ => (), + clock: Clock = new SystemClock() +) extends GraphExecution(graphForExecution, env) { + + /** + * [Visible for testing] A map to store stream state of all flows which should be materialized. + * This includes flows whose streams have not yet been started, ie they are queued or have been + * marked as skipped. + */ + private[pipelines] val pipelineState = { + new ConcurrentHashMap[TableIdentifier, StreamState]().asScala + } + + /** + * Keeps track of flow failure information required for retry logic. + * This only contains values for flows that either failed previously or are currently in the + * failed state. + */ + private val failureTracker = { + new ConcurrentHashMap[TableIdentifier, TriggeredFailureInfo]().asScala + } + + /** Back-off strategy used to determine duration between retries. */ + private val backoffStrategy = ExponentialBackoffStrategy( + maxTime = (pipelineConf.watchdogMaxRetryTimeInSeconds * 1000).millis, + stepSize = (pipelineConf.watchdogMinRetryTimeInSeconds * 1000).millis + ) + + override def streamTrigger(flow: Flow): Trigger = { + Trigger.AvailableNow() + } + + /** The control thread responsible for topologically executing flows. */ + private var topologicalExecutionThread: Option[Thread] = None + + private def buildTopologicalExecutionThread(): Thread = { + new Thread("Topological Execution") { + override def run(): Unit = { + try { + topologicalExecution() + } finally { + TriggeredGraphExecution.super.stop() + } + } + } + } + + override def start(): Unit = { + super.start() + // If tablesToUpdate is empty, queue all flows; Otherwise, queue flows for which the + // destination tables are specified in tablesToUpdate. + env.refreshFlows + .filter(graphForExecution.materializedFlows) + .foreach { f => + env.flowProgressEventLogger.recordQueued(f) + pipelineState.put(f.identifier, StreamState.QUEUED) + } + env.refreshFlows + .filterNot(graphForExecution.materializedFlows) + .foreach { f => + env.flowProgressEventLogger.recordExcluded(f) + pipelineState.put(f.identifier, StreamState.EXCLUDED) + } + val thread = buildTopologicalExecutionThread() + UncaughtExceptionHandler.addHandler( + thread, { + case _: InterruptedException => // stop from UI leads to InterruptedException. Do nothing. + case _ => + try { + stopInternal(stopTopologicalExecutionThread = false) + } catch { + case ex: Throwable => + logError(s"Exception thrown while stopping the update...", ex) + } finally { + onCompletion(UnexpectedUpdateFailure()) + } + } + ) + thread.start() + topologicalExecutionThread = Option(thread) + } + + /** Used to control how many flows are executing at once. */ + private val concurrencyLimit: Semaphore = new Semaphore(pipelineConf.maxConcurrentFlows) + + /** + * Runs the pipeline in a topological order. + * + * Non-accepting states: Queued, Running + * Accepting states: Successful, TerminatedWithError, Skipped, Cancelled, Excluded + * All [[Flow]]s which can write to a stream begin in a queued state. The following state + * transitions describe the topological execution of a [[DataflowGraph]]. + * + * Queued -> Running if Flow has no parents or the parent tables of the queued [[Flow]] + * have run successfully. + * Running -> Successful if the stream associated with the [[Flow]] succeeds. + * Running -> TerminatedWithError if the stream associated with the [[Flow]] stops with an + * exception. + * + * Non-fatally failed flows are retried with exponential back-off a bounded no. of times. + * If a flow cannot be retried, all downstream flows of the failed flow are moved to Skipped + * state. + * Running -> Cancelled if the stream associated with the [[Flow]] is stopped mid-run by + * calling `stop`. All remaining [[Flow]]s in queue are moved to state Skipped. + * + * The execution is over once there are no [[Flow]]s left running or in the queue. + */ + private def topologicalExecution(): Unit = { + // Done executing once no flows remain running or in queue + def allFlowsDone = { + flowsWithState(StreamState.QUEUED).isEmpty && flowsWithState(StreamState.RUNNING).isEmpty && + flowsQueuedForRetry().isEmpty + } + + // LinkedHashSet returns elements in the order inserted. This ensures that flows queued but + // unable to run because we are at max concurrent execution will get priority on the next round. + val runnableFlows: mutable.LinkedHashSet[TableIdentifier] = new mutable.LinkedHashSet() + + while (!Thread.interrupted() && !allFlowsDone) { + // Since queries are managed by PhysicalFlows, so update state based on [[PhysicalFlow]]s. + flowsWithState(StreamState.RUNNING).foreach { flowIdentifier => + physicalFlows(flowIdentifier) match { + case f if !f.isCompleted => // Nothing to be done; let this stream continue. + case f if f.isCompleted && f.exception.isEmpty => + recordSuccess(flowIdentifier) + case f => + recordFailed(flowIdentifier = flowIdentifier, e = f.exception.get) + } + } + + // Log info on if we're leaking Semaphore permits. Synchronize here so we don't double-count + // or mis-count because a batch flow is finishing asynchronously. + val (runningFlows, availablePermits) = concurrencyLimit.synchronized { + (flowsWithState(StreamState.RUNNING).size, concurrencyLimit.availablePermits) + } + if ((runningFlows + availablePermits) < pipelineConf.maxConcurrentFlows) { + val errorStr = + s"The max concurrency is ${pipelineConf.maxConcurrentFlows}, but there are only " + + s"$availablePermits permits available with $runningFlows flows running. If this " + + s"happens consistently, it's possible we're leaking permits." + logError(errorStr) + if (Utils.isTesting) { + throw new IllegalStateException(errorStr) + } + } + + // All flows which can potentially be run now if their parent tables have successfully + // completed or have been excluded. + val queuedForRetry = + flowsQueuedForRetry().filter(nextRetryTime(_) <= clock.getTimeMillis()) + // Take flows that have terminated but have retry attempts left and flows that are queued, and + // filter the ones whose parents have all successfully completed, excluded, or idled because + // they are ONCE flows which already ran. + runnableFlows ++= (queuedForRetry ++ flowsWithState(StreamState.QUEUED)).filter { id => + graphForExecution + .upstreamFlows(id) + .intersect(graphForExecution.materializedFlowIdentifiers) + .forall { id => + pipelineState(id) == StreamState.SUCCESSFUL || + pipelineState(id) == StreamState.EXCLUDED || + pipelineState(id) == StreamState.IDLE + } + } + + // collect flow that are ready to start + val flowsToStart = mutable.ArrayBuffer[ResolvedFlow]() + while (runnableFlows.nonEmpty && concurrencyLimit.tryAcquire()) { + val flowIdentifier = runnableFlows.head + runnableFlows.remove(flowIdentifier) + flowsToStart.append(graphForExecution.resolvedFlow(flowIdentifier)) + } + + val (batchFlowsToStart, otherFlowsToStart) = flowsToStart.partition { f => + graphForExecution.resolvedFlow(f.identifier).isInstanceOf[CompleteFlow] + } + + def startFlowWithPlanningMode(flow: ResolvedFlow, mode: String): Unit = { Review Comment: It looks like we don't actually respect the `mode` property, so we should probably take it out? ########## sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/TriggeredGraphExecution.scala: ########## @@ -0,0 +1,469 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.pipelines.graph + +import java.util.concurrent.{ConcurrentHashMap, Semaphore} + +import scala.collection.mutable +import scala.concurrent.duration._ +import scala.jdk.CollectionConverters._ +import scala.util.Try +import scala.util.control.NonFatal + +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.pipelines.graph.TriggeredGraphExecution._ +import org.apache.spark.sql.pipelines.util.ExponentialBackoffStrategy +import org.apache.spark.sql.streaming.Trigger +import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils} + +sealed trait StreamState + +object StreamState { + case object QUEUED extends StreamState + case object RUNNING extends StreamState + case object EXCLUDED extends StreamState + case object IDLE extends StreamState + case object SKIPPED extends StreamState + case object TERMINATED_WITH_ERROR extends StreamState + case object CANCELED extends StreamState + case object SUCCESSFUL extends StreamState +} + +class TriggeredGraphExecution( Review Comment: We should add docstring, e.g. ```scala /** * Executes all of the flows in the given graph in topological order. Each flow processes * all available data before downstream flows are triggered. * * @param onCompletion a callback to execute after all streams are done. The boolean * argument is true if the execution was successful. */ ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org