LuciferYang commented on code in PR #56662:
URL: https://github.com/apache/spark/pull/56662#discussion_r3479646938
##########
sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/FlowAnalysis.scala:
##########
@@ -44,17 +45,23 @@ object FlowAnalysis {
confs: Map[String, String],
queryContext: QueryContext,
queryOrigin: QueryOrigin) => {
+ // Flows are resolved in parallel on a shared session, so applying
per-flow confs by mutating
+ // that session's conf would race across flows. Instead, give each flow
a private SQLConf
+ // (a clone of the session's conf plus this flow's overrides) and
install it for the analyzing
+ // thread via SQLConf.withExistingConf. Analysis still runs on the
shared session, so its
+ // catalog and the resolved DataFrames are unaffected; only the confs
the analyzer reads are
+ // isolated per flow.
+ val spark = SparkSession.active
val ctx = FlowAnalysisContext(
allInputs = allInputs,
availableInputs = availableInputs,
queryContext = queryContext,
- spark = SparkSession.active
+ spark = spark,
+ flowConf = spark.sessionState.conf.clone()
Review Comment:
Good question. I don't think either path needs the same treatment, for
different reasons.
`isPathIdentifier` reads the conf only to hand it to
`DataSource.lookupDataSource`, which decides things like V1/V2 and the ORC
impl, not whether the datasource name resolves. Its boolean result doesn't
depend on any conf a flow would realistically set, so reading the baseline vs
the per-flow conf there is a no-op in practice. I had a one-liner to switch it
to `SQLConf.get` for consistency, but since it changes no behavior and there's
nothing meaningful to test, I dropped it rather than ship untested churn. Are
you OK leaving it as is? Happy to put the `SQLConf.get` read back if you'd
rather have the in-scope conf used everywhere on principle.
`AutoCdcMergeFlow.schema` and the Scd processors are outside the analysis
scope this PR isolates. The schema is computed when `CoreDataflowNodeProcessor`
builds the resolved flow from the `FlowFunctionResult`, after the flow function
returns, and the Scd processors read `caseSensitiveAnalysis` at execution time
on the run session. Both read the session baseline before this PR too (the old
code restored the session conf before the flow function returned), so neither
is a regression.
##########
sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/FlowAnalysis.scala:
##########
Review Comment:
Done in a76fcf4: reworded to describe the `withExistingConf` approach
instead of the old "no thread-locals" warning.
##########
sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/ConnectValidPipelineSuite.scala:
##########
@@ -561,4 +563,102 @@ class ConnectValidPipelineSuite extends PipelineTest with
SharedSparkSession {
s"Flow ${identifier.unquotedString} has the wrong schema"
)
}
+
+ test("per-flow confs are visible to the analyzer but do not leak onto the
run session") {
+ val key = "pipelines.test.flowConfIsolation"
+ assert(spark.conf.getOption(key).isEmpty)
+
+ val inputId = TableIdentifier("conf_observer")
+ // (conf the analyzer reads via SQLConf.get, conf on the run session)
captured during load().
+ var observed: (Option[String], Option[String]) = null
+ val runSession = spark
+ val observingInput = new Input {
+ override def identifier: TableIdentifier = inputId
+ override def origin: QueryOrigin = QueryOrigin()
+ override def load(asStreaming: Boolean): DataFrame = {
+ observed = (SQLConf.get.getAllConfs.get(key),
runSession.conf.getOption(key))
+ runSession.range(1).toDF()
+ }
+ }
+
+ val result = FlowAnalysis
+
.createFlowFunctionFromLogicalPlan(UnresolvedRelation(Seq("conf_observer")))
+ .call(
+ allInputs = Set(inputId),
+ availableInputs = Seq(observingInput),
+ configuration = Map(key -> "flowValue"),
+ queryContext = QueryContext(currentCatalog = None, currentDatabase =
None),
+ queryOrigin = QueryOrigin())
+
+ assert(result.dataFrame.isSuccess, s"flow analysis failed:
${result.dataFrame}")
+ assert(observed != null, "input.load was not invoked during analysis")
+ val (analyzerConf, runConf) = observed
+ // The per-flow conf is what the analyzer reads ...
+ assert(analyzerConf.contains("flowValue"))
+ // ... but it must not leak onto the session the pipeline is run from.
+ assert(
+ !runConf.contains("flowValue"),
+ "per-flow conf leaked onto the run session during flow analysis")
+ // ... and nothing is left behind on the run session afterwards.
+ assert(spark.conf.getOption(key).isEmpty)
+ }
+
+ test("per-flow confs stay isolated when flows are resolved in parallel") {
+ val key = "pipelines.test.flowConfIsolation"
+ assert(spark.conf.getOption(key).isEmpty)
+
+ val numFlows = 8
+ val runSession = spark
+ // The conf value each flow's analyzer reads for `key`.
+ val observed = new java.util.concurrent.ConcurrentHashMap[Int, String]()
+ val errors = new java.util.concurrent.ConcurrentLinkedQueue[Throwable]()
+ // Rendezvous so every flow is mid-analysis - its per-flow conf already
applied - at the same
+ // time. That is exactly when applying confs to a shared session would let
one flow observe
+ // another flow's value.
+ val barrier = new java.util.concurrent.CyclicBarrier(numFlows)
+
+ def observingInput(i: Int): Input = new Input {
+ override def identifier: TableIdentifier =
TableIdentifier(s"conf_observer_$i")
+ override def origin: QueryOrigin = QueryOrigin()
+ override def load(asStreaming: Boolean): DataFrame = {
+ barrier.await(60, java.util.concurrent.TimeUnit.SECONDS)
+ observed.put(i, SQLConf.get.getConfString(key, "<unset>"))
+ runSession.range(1).toDF()
+ }
+ }
+
+ val threads = (0 until numFlows).map { i =>
+ val t = new Thread(() => {
+ try {
+ val result = FlowAnalysis
+
.createFlowFunctionFromLogicalPlan(UnresolvedRelation(Seq(s"conf_observer_$i")))
+ .call(
+ allInputs = Set(TableIdentifier(s"conf_observer_$i")),
+ availableInputs = Seq(observingInput(i)),
+ configuration = Map(key -> s"flowValue_$i"),
+ queryContext = QueryContext(currentCatalog = None,
currentDatabase = None),
+ queryOrigin = QueryOrigin())
+ result.dataFrame.failed.foreach(errors.add)
+ } catch {
+ case t: Throwable => errors.add(t)
+ }
+ })
+ t.setName(s"flow-conf-isolation-$i")
+ t.start()
+ t
+ }
+ threads.foreach(_.join(120000))
+
+ assert(errors.isEmpty, s"flow analysis threads failed:
${errors.toArray.mkString(", ")}")
+ assert(
+ observed.size() == numFlows,
+ s"only ${observed.size()} of $numFlows flows recorded a conf")
+ (0 until numFlows).foreach { i =>
+ assert(
+ observed.get(i) == s"flowValue_$i",
+ s"flow $i observed '${observed.get(i)}' instead of its own per-flow
conf")
+ }
+ // Nothing leaks onto the run session.
+ assert(spark.conf.getOption(key).isEmpty)
+ }
Review Comment:
Added in a76fcf4: a test that resolves a graph through
`resolveToDataflowGraph()` with a per-flow conf, so it exercises
`DataflowGraphTransformer` rather than calling `FlowAnalysis` directly. It
pairs with the existing barrier test for the concurrency side.
##########
sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/ConnectValidPipelineSuite.scala:
##########
@@ -561,4 +563,102 @@ class ConnectValidPipelineSuite extends PipelineTest with
SharedSparkSession {
s"Flow ${identifier.unquotedString} has the wrong schema"
)
}
+
+ test("per-flow confs are visible to the analyzer but do not leak onto the
run session") {
+ val key = "pipelines.test.flowConfIsolation"
Review Comment:
Done in a76fcf4. The new `resolveToDataflowGraph()` test sets
`spark.sql.caseSensitive=true` on one flow: `SELECT Foo FROM src` resolves
under the default but fails for that flow, so it shows analysis actually
consumes the isolated conf rather than just storing it. (It's the same test I
added for your other comment about going through the real resolver.)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]