uros-b commented on code in PR #56662:
URL: https://github.com/apache/spark/pull/56662#discussion_r3472428311


##########
sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/ConnectValidPipelineSuite.scala:
##########
@@ -561,4 +563,102 @@ class ConnectValidPipelineSuite extends PipelineTest with 
SharedSparkSession {
       s"Flow ${identifier.unquotedString} has the wrong schema"
     )
   }
+
+  test("per-flow confs are visible to the analyzer but do not leak onto the 
run session") {
+    val key = "pipelines.test.flowConfIsolation"
+    assert(spark.conf.getOption(key).isEmpty)
+
+    val inputId = TableIdentifier("conf_observer")
+    // (conf the analyzer reads via SQLConf.get, conf on the run session) 
captured during load().
+    var observed: (Option[String], Option[String]) = null
+    val runSession = spark
+    val observingInput = new Input {
+      override def identifier: TableIdentifier = inputId
+      override def origin: QueryOrigin = QueryOrigin()
+      override def load(asStreaming: Boolean): DataFrame = {
+        observed = (SQLConf.get.getAllConfs.get(key), 
runSession.conf.getOption(key))
+        runSession.range(1).toDF()
+      }
+    }
+
+    val result = FlowAnalysis
+      
.createFlowFunctionFromLogicalPlan(UnresolvedRelation(Seq("conf_observer")))
+      .call(
+        allInputs = Set(inputId),
+        availableInputs = Seq(observingInput),
+        configuration = Map(key -> "flowValue"),
+        queryContext = QueryContext(currentCatalog = None, currentDatabase = 
None),
+        queryOrigin = QueryOrigin())
+
+    assert(result.dataFrame.isSuccess, s"flow analysis failed: 
${result.dataFrame}")
+    assert(observed != null, "input.load was not invoked during analysis")
+    val (analyzerConf, runConf) = observed
+    // The per-flow conf is what the analyzer reads ...
+    assert(analyzerConf.contains("flowValue"))
+    // ... but it must not leak onto the session the pipeline is run from.
+    assert(
+      !runConf.contains("flowValue"),
+      "per-flow conf leaked onto the run session during flow analysis")
+    // ... and nothing is left behind on the run session afterwards.
+    assert(spark.conf.getOption(key).isEmpty)
+  }
+
+  test("per-flow confs stay isolated when flows are resolved in parallel") {
+    val key = "pipelines.test.flowConfIsolation"
+    assert(spark.conf.getOption(key).isEmpty)
+
+    val numFlows = 8
+    val runSession = spark
+    // The conf value each flow's analyzer reads for `key`.
+    val observed = new java.util.concurrent.ConcurrentHashMap[Int, String]()
+    val errors = new java.util.concurrent.ConcurrentLinkedQueue[Throwable]()
+    // Rendezvous so every flow is mid-analysis - its per-flow conf already 
applied - at the same
+    // time. That is exactly when applying confs to a shared session would let 
one flow observe
+    // another flow's value.
+    val barrier = new java.util.concurrent.CyclicBarrier(numFlows)
+
+    def observingInput(i: Int): Input = new Input {
+      override def identifier: TableIdentifier = 
TableIdentifier(s"conf_observer_$i")
+      override def origin: QueryOrigin = QueryOrigin()
+      override def load(asStreaming: Boolean): DataFrame = {
+        barrier.await(60, java.util.concurrent.TimeUnit.SECONDS)
+        observed.put(i, SQLConf.get.getConfString(key, "<unset>"))
+        runSession.range(1).toDF()
+      }
+    }
+
+    val threads = (0 until numFlows).map { i =>
+      val t = new Thread(() => {
+        try {
+          val result = FlowAnalysis
+            
.createFlowFunctionFromLogicalPlan(UnresolvedRelation(Seq(s"conf_observer_$i")))
+            .call(
+              allInputs = Set(TableIdentifier(s"conf_observer_$i")),
+              availableInputs = Seq(observingInput(i)),
+              configuration = Map(key -> s"flowValue_$i"),
+              queryContext = QueryContext(currentCatalog = None, 
currentDatabase = None),
+              queryOrigin = QueryOrigin())
+          result.dataFrame.failed.foreach(errors.add)
+        } catch {
+          case t: Throwable => errors.add(t)
+        }
+      })
+      t.setName(s"flow-conf-isolation-$i")
+      t.start()
+      t
+    }
+    threads.foreach(_.join(120000))
+
+    assert(errors.isEmpty, s"flow analysis threads failed: 
${errors.toArray.mkString(", ")}")
+    assert(
+      observed.size() == numFlows,
+      s"only ${observed.size()} of $numFlows flows recorded a conf")
+    (0 until numFlows).foreach { i =>
+      assert(
+        observed.get(i) == s"flowValue_$i",
+        s"flow $i observed '${observed.get(i)}' instead of its own per-flow 
conf")
+    }
+    // Nothing leaks onto the run session.
+    assert(spark.conf.getOption(key).isEmpty)
+  }

Review Comment:
   The tests exercise FlowAnalysis directly, not DataflowGraphTransformer. The 
barrier test is a good stand-in for the real race. However, an optional 
integration test through resolveToDataflowGraph() with per-flow confs would 
increase confidence.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to