SCHJonathan commented on code in PR #51050:
URL: https://github.com/apache/spark/pull/51050#discussion_r2116267127


##########
sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/TriggeredGraphExecution.scala:
##########
@@ -0,0 +1,469 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.pipelines.graph
+
+import java.util.concurrent.{ConcurrentHashMap, Semaphore}
+
+import scala.collection.mutable
+import scala.concurrent.duration._
+import scala.jdk.CollectionConverters._
+import scala.util.Try
+import scala.util.control.NonFatal
+
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.pipelines.graph.TriggeredGraphExecution._
+import org.apache.spark.sql.pipelines.util.ExponentialBackoffStrategy
+import org.apache.spark.sql.streaming.Trigger
+import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils}
+
+sealed trait StreamState
+
+object StreamState {
+  case object QUEUED extends StreamState
+  case object RUNNING extends StreamState
+  case object EXCLUDED extends StreamState
+  case object IDLE extends StreamState
+  case object SKIPPED extends StreamState
+  case object TERMINATED_WITH_ERROR extends StreamState
+  case object CANCELED extends StreamState
+  case object SUCCESSFUL extends StreamState
+}
+
+class TriggeredGraphExecution(
+    graphForExecution: DataflowGraph,
+    env: PipelineUpdateContext,
+    onCompletion: UpdateTerminationReason => Unit = _ => (),
+    clock: Clock = new SystemClock()
+) extends GraphExecution(graphForExecution, env) {
+
+  /**
+   * [Visible for testing] A map to store stream state of all flows which 
should be materialized.
+   * This includes flows whose streams have not yet been started, ie they are 
queued or have been
+   * marked as skipped.
+   */
+  private[pipelines] val pipelineState = {
+    new ConcurrentHashMap[TableIdentifier, StreamState]().asScala
+  }
+
+  /**
+   * Keeps track of flow failure information required for retry logic.
+   * This only contains values for flows that either failed previously or are 
currently in the
+   * failed state.
+   */
+  private val failureTracker = {
+    new ConcurrentHashMap[TableIdentifier, TriggeredFailureInfo]().asScala
+  }
+
+  /** Back-off strategy used to determine duration between retries. */
+  private val backoffStrategy = ExponentialBackoffStrategy(
+    maxTime = (pipelineConf.watchdogMaxRetryTimeInSeconds * 1000).millis,
+    stepSize = (pipelineConf.watchdogMinRetryTimeInSeconds * 1000).millis
+  )
+
+  override def streamTrigger(flow: Flow): Trigger = {
+    Trigger.AvailableNow()
+  }
+
+  /** The control thread responsible for topologically executing flows. */
+  private var topologicalExecutionThread: Option[Thread] = None
+
+  private def buildTopologicalExecutionThread(): Thread = {
+    new Thread("Topological Execution") {
+      override def run(): Unit = {
+        try {
+          topologicalExecution()
+        } finally {
+          TriggeredGraphExecution.super.stop()
+        }
+      }
+    }
+  }
+
+  override def start(): Unit = {
+    super.start()
+    // If tablesToUpdate is empty, queue all flows; Otherwise, queue flows for 
which the
+    // destination tables are specified in tablesToUpdate.
+    env.refreshFlows
+      .filter(graphForExecution.materializedFlows)
+      .foreach { f =>
+        env.flowProgressEventLogger.recordQueued(f)
+        pipelineState.put(f.identifier, StreamState.QUEUED)
+      }
+    env.refreshFlows
+      .filterNot(graphForExecution.materializedFlows)
+      .foreach { f =>
+        env.flowProgressEventLogger.recordExcluded(f)
+        pipelineState.put(f.identifier, StreamState.EXCLUDED)
+      }
+    val thread = buildTopologicalExecutionThread()
+    UncaughtExceptionHandler.addHandler(
+      thread, {
+        case _: InterruptedException => // stop from UI leads to 
InterruptedException. Do nothing.
+        case _ =>
+          try {
+            stopInternal(stopTopologicalExecutionThread = false)
+          } catch {
+            case ex: Throwable =>
+              logError(s"Exception thrown while stopping the update...", ex)
+          } finally {
+            onCompletion(UnexpectedUpdateFailure())
+          }
+      }
+    )
+    thread.start()
+    topologicalExecutionThread = Option(thread)
+  }
+
+  /** Used to control how many flows are executing at once. */
+  private val concurrencyLimit: Semaphore = new 
Semaphore(pipelineConf.maxConcurrentFlows)
+
+  /**
+   * Runs the pipeline in a topological order.
+   *
+   * Non-accepting states: Queued, Running
+   * Accepting states: Successful, TerminatedWithError, Skipped, Cancelled, 
Excluded
+   * All [[Flow]]s which can write to a stream begin in a queued state. The 
following state
+   * transitions describe the topological execution of a [[DataflowGraph]].
+   *
+   * Queued -> Running if Flow has no parents or the parent tables of the 
queued [[Flow]]
+   *   have run successfully.
+   * Running -> Successful if the stream associated with the [[Flow]] succeeds.
+   * Running -> TerminatedWithError if the stream associated with the [[Flow]] 
stops with an
+   *   exception.
+   *
+   * Non-fatally failed flows are retried with exponential back-off a bounded 
no. of times.
+   * If a flow cannot be retried, all downstream flows of the failed flow are 
moved to Skipped
+   * state.
+   * Running -> Cancelled if the stream associated with the [[Flow]] is 
stopped mid-run by
+   * calling `stop`. All remaining [[Flow]]s in queue are moved to state 
Skipped.
+   *
+   * The execution is over once there are no [[Flow]]s left running or in the 
queue.
+   */
+  private def topologicalExecution(): Unit = {
+    // Done executing once no flows remain running or in queue
+    def allFlowsDone = {
+      flowsWithState(StreamState.QUEUED).isEmpty && 
flowsWithState(StreamState.RUNNING).isEmpty &&
+      flowsQueuedForRetry().isEmpty
+    }
+
+    // LinkedHashSet returns elements in the order inserted. This ensures that 
flows queued but
+    // unable to run because we are at max concurrent execution will get 
priority on the next round.
+    val runnableFlows: mutable.LinkedHashSet[TableIdentifier] = new 
mutable.LinkedHashSet()
+
+    while (!Thread.interrupted() && !allFlowsDone) {
+      // Since queries are managed by PhysicalFlows, so update state based on 
[[PhysicalFlow]]s.
+      flowsWithState(StreamState.RUNNING).foreach { flowIdentifier =>
+        physicalFlows(flowIdentifier) match {
+          case f if !f.isCompleted => // Nothing to be done; let this stream 
continue.
+          case f if f.isCompleted && f.exception.isEmpty =>
+            recordSuccess(flowIdentifier)
+          case f =>
+            recordFailed(flowIdentifier = flowIdentifier, e = f.exception.get)
+        }
+      }
+
+      // Log info on if we're leaking Semaphore permits. Synchronize here so 
we don't double-count
+      // or mis-count because a batch flow is finishing asynchronously.
+      val (runningFlows, availablePermits) = concurrencyLimit.synchronized {
+        (flowsWithState(StreamState.RUNNING).size, 
concurrencyLimit.availablePermits)
+      }
+      if ((runningFlows + availablePermits) < pipelineConf.maxConcurrentFlows) 
{
+        val errorStr =
+          s"The max concurrency is ${pipelineConf.maxConcurrentFlows}, but 
there are only " +
+          s"$availablePermits permits available with $runningFlows flows 
running. If this " +
+          s"happens consistently, it's possible we're leaking permits."
+        logError(errorStr)
+        if (Utils.isTesting) {
+          throw new IllegalStateException(errorStr)
+        }
+      }
+
+      // All flows which can potentially be run now if their parent tables 
have successfully
+      // completed or have been excluded.
+      val queuedForRetry =
+        flowsQueuedForRetry().filter(nextRetryTime(_) <= clock.getTimeMillis())
+      // Take flows that have terminated but have retry attempts left and 
flows that are queued, and
+      // filter the ones whose parents have all successfully completed, 
excluded, or idled because
+      // they are ONCE flows which already ran.
+      runnableFlows ++= (queuedForRetry ++ 
flowsWithState(StreamState.QUEUED)).filter { id =>
+        graphForExecution
+          .upstreamFlows(id)
+          .intersect(graphForExecution.materializedFlowIdentifiers)
+          .forall { id =>
+            pipelineState(id) == StreamState.SUCCESSFUL ||
+            pipelineState(id) == StreamState.EXCLUDED ||
+            pipelineState(id) == StreamState.IDLE
+          }
+      }
+
+      // collect flow that are ready to start
+      val flowsToStart = mutable.ArrayBuffer[ResolvedFlow]()
+      while (runnableFlows.nonEmpty && concurrencyLimit.tryAcquire()) {
+        val flowIdentifier = runnableFlows.head
+        runnableFlows.remove(flowIdentifier)
+        flowsToStart.append(graphForExecution.resolvedFlow(flowIdentifier))
+      }
+
+      val (batchFlowsToStart, otherFlowsToStart) = flowsToStart.partition { f 
=>
+        graphForExecution.resolvedFlow(f.identifier).isInstanceOf[CompleteFlow]
+      }
+
+      def startFlowWithPlanningMode(flow: ResolvedFlow, mode: String): Unit = {

Review Comment:
   yeah it looks like there's a lot of outdated comments around this, we always 
start flows serially in triggered graph execution, there's also some outdated 
logic where we mentioned we start MV flows in parallel but we actually didn't. 
Also refactor and cleanup the misleading code around it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to