LuciferYang commented on code in PR #56662:
URL: https://github.com/apache/spark/pull/56662#discussion_r3479646938


##########
sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/FlowAnalysis.scala:
##########
@@ -44,17 +45,23 @@ object FlowAnalysis {
       confs: Map[String, String],
       queryContext: QueryContext,
       queryOrigin: QueryOrigin) => {
+      // Flows are resolved in parallel on a shared session, so applying 
per-flow confs by mutating
+      // that session's conf would race across flows. Instead, give each flow 
a private SQLConf
+      // (a clone of the session's conf plus this flow's overrides) and 
install it for the analyzing
+      // thread via SQLConf.withExistingConf. Analysis still runs on the 
shared session, so its
+      // catalog and the resolved DataFrames are unaffected; only the confs 
the analyzer reads are
+      // isolated per flow.
+      val spark = SparkSession.active
       val ctx = FlowAnalysisContext(
         allInputs = allInputs,
         availableInputs = availableInputs,
         queryContext = queryContext,
-        spark = SparkSession.active
+        spark = spark,
+        flowConf = spark.sessionState.conf.clone()

Review Comment:
   Good question. I don't think either path needs the same treatment, for 
different reasons.
   
   `isPathIdentifier` reads the conf only to hand it to 
`DataSource.lookupDataSource`, which decides things like V1/V2 and the ORC 
impl, not whether the datasource name resolves. Its boolean result doesn't 
depend on any conf a flow would realistically set, so reading the baseline vs 
the per-flow conf there is a no-op in practice. I had a one-liner to switch it 
to `SQLConf.get` for consistency, but since it changes no behavior and there's 
nothing meaningful to test, I dropped it rather than ship untested churn. Are 
you OK leaving it as is? Happy to put the `SQLConf.get` read back if you'd 
rather have the in-scope conf used everywhere on principle.
   
   `AutoCdcMergeFlow.schema` and the Scd processors are outside the analysis 
scope this PR isolates. The schema is computed when `CoreDataflowNodeProcessor` 
builds the resolved flow from the `FlowFunctionResult`, after the flow function 
returns, and the Scd processors read `caseSensitiveAnalysis` at execution time 
on the run session. Both read the session baseline before this PR too (the old 
code restored the session conf before the flow function returned), so neither 
is a regression.
   



##########
sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/FlowAnalysis.scala:
##########


Review Comment:
   Done in a76fcf4: reworded to describe the `withExistingConf` approach 
instead of the old "no thread-locals" warning.
   



##########
sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/ConnectValidPipelineSuite.scala:
##########
@@ -561,4 +563,102 @@ class ConnectValidPipelineSuite extends PipelineTest with 
SharedSparkSession {
       s"Flow ${identifier.unquotedString} has the wrong schema"
     )
   }
+
+  test("per-flow confs are visible to the analyzer but do not leak onto the 
run session") {
+    val key = "pipelines.test.flowConfIsolation"
+    assert(spark.conf.getOption(key).isEmpty)
+
+    val inputId = TableIdentifier("conf_observer")
+    // (conf the analyzer reads via SQLConf.get, conf on the run session) 
captured during load().
+    var observed: (Option[String], Option[String]) = null
+    val runSession = spark
+    val observingInput = new Input {
+      override def identifier: TableIdentifier = inputId
+      override def origin: QueryOrigin = QueryOrigin()
+      override def load(asStreaming: Boolean): DataFrame = {
+        observed = (SQLConf.get.getAllConfs.get(key), 
runSession.conf.getOption(key))
+        runSession.range(1).toDF()
+      }
+    }
+
+    val result = FlowAnalysis
+      
.createFlowFunctionFromLogicalPlan(UnresolvedRelation(Seq("conf_observer")))
+      .call(
+        allInputs = Set(inputId),
+        availableInputs = Seq(observingInput),
+        configuration = Map(key -> "flowValue"),
+        queryContext = QueryContext(currentCatalog = None, currentDatabase = 
None),
+        queryOrigin = QueryOrigin())
+
+    assert(result.dataFrame.isSuccess, s"flow analysis failed: 
${result.dataFrame}")
+    assert(observed != null, "input.load was not invoked during analysis")
+    val (analyzerConf, runConf) = observed
+    // The per-flow conf is what the analyzer reads ...
+    assert(analyzerConf.contains("flowValue"))
+    // ... but it must not leak onto the session the pipeline is run from.
+    assert(
+      !runConf.contains("flowValue"),
+      "per-flow conf leaked onto the run session during flow analysis")
+    // ... and nothing is left behind on the run session afterwards.
+    assert(spark.conf.getOption(key).isEmpty)
+  }
+
+  test("per-flow confs stay isolated when flows are resolved in parallel") {
+    val key = "pipelines.test.flowConfIsolation"
+    assert(spark.conf.getOption(key).isEmpty)
+
+    val numFlows = 8
+    val runSession = spark
+    // The conf value each flow's analyzer reads for `key`.
+    val observed = new java.util.concurrent.ConcurrentHashMap[Int, String]()
+    val errors = new java.util.concurrent.ConcurrentLinkedQueue[Throwable]()
+    // Rendezvous so every flow is mid-analysis - its per-flow conf already 
applied - at the same
+    // time. That is exactly when applying confs to a shared session would let 
one flow observe
+    // another flow's value.
+    val barrier = new java.util.concurrent.CyclicBarrier(numFlows)
+
+    def observingInput(i: Int): Input = new Input {
+      override def identifier: TableIdentifier = 
TableIdentifier(s"conf_observer_$i")
+      override def origin: QueryOrigin = QueryOrigin()
+      override def load(asStreaming: Boolean): DataFrame = {
+        barrier.await(60, java.util.concurrent.TimeUnit.SECONDS)
+        observed.put(i, SQLConf.get.getConfString(key, "<unset>"))
+        runSession.range(1).toDF()
+      }
+    }
+
+    val threads = (0 until numFlows).map { i =>
+      val t = new Thread(() => {
+        try {
+          val result = FlowAnalysis
+            
.createFlowFunctionFromLogicalPlan(UnresolvedRelation(Seq(s"conf_observer_$i")))
+            .call(
+              allInputs = Set(TableIdentifier(s"conf_observer_$i")),
+              availableInputs = Seq(observingInput(i)),
+              configuration = Map(key -> s"flowValue_$i"),
+              queryContext = QueryContext(currentCatalog = None, 
currentDatabase = None),
+              queryOrigin = QueryOrigin())
+          result.dataFrame.failed.foreach(errors.add)
+        } catch {
+          case t: Throwable => errors.add(t)
+        }
+      })
+      t.setName(s"flow-conf-isolation-$i")
+      t.start()
+      t
+    }
+    threads.foreach(_.join(120000))
+
+    assert(errors.isEmpty, s"flow analysis threads failed: 
${errors.toArray.mkString(", ")}")
+    assert(
+      observed.size() == numFlows,
+      s"only ${observed.size()} of $numFlows flows recorded a conf")
+    (0 until numFlows).foreach { i =>
+      assert(
+        observed.get(i) == s"flowValue_$i",
+        s"flow $i observed '${observed.get(i)}' instead of its own per-flow 
conf")
+    }
+    // Nothing leaks onto the run session.
+    assert(spark.conf.getOption(key).isEmpty)
+  }

Review Comment:
   Added in a76fcf4: a test that resolves a graph through 
`resolveToDataflowGraph()` with a per-flow conf, so it exercises 
`DataflowGraphTransformer` rather than calling `FlowAnalysis` directly. It 
pairs with the existing barrier test for the concurrency side.
   



##########
sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/ConnectValidPipelineSuite.scala:
##########
@@ -561,4 +563,102 @@ class ConnectValidPipelineSuite extends PipelineTest with 
SharedSparkSession {
       s"Flow ${identifier.unquotedString} has the wrong schema"
     )
   }
+
+  test("per-flow confs are visible to the analyzer but do not leak onto the 
run session") {
+    val key = "pipelines.test.flowConfIsolation"

Review Comment:
   Done in a76fcf4. The new `resolveToDataflowGraph()` test sets 
`spark.sql.caseSensitive=true` on one flow: `SELECT Foo FROM src` resolves 
under the default but fails for that flow, so it shows analysis actually 
consumes the isolated conf rather than just storing it. (It's the same test I 
added for your other comment about going through the real resolver.)
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to