yadavay-amzn commented on code in PR #56578:
URL: https://github.com/apache/spark/pull/56578#discussion_r3432297708
##########
sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/FlowAnalysis.scala:
##########
@@ -127,7 +127,40 @@ object FlowAnalysis {
resolved.mergeTagsFrom(u)
resolved
}
- Dataset.ofRows(spark, resolvedPlan)
+ val result = Dataset.ofRows(spark, resolvedPlan)
+
+ // SPARK-57352: Detect inputs that were resolved directly by the user
(e.g., via
+ // spark.table()) rather than through the pipeline's UnresolvedRelation
path above.
+ // Scan the resolved plan for table relations that match known pipeline
inputs and
+ // record them as external inputs so the DAG scheduler orders them
correctly.
+ resolvedPlan.foreach {
+ case r: org.apache.spark.sql.catalyst.analysis.UnresolvedRelation =>
+ // Already handled above
+ case node =>
+ val tableIdOpt = node match {
+ case r:
org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation =>
+ r.identifier.map(id => TableIdentifier(id.name(),
+ Option(id.namespace()).filter(_.nonEmpty).map(_.last)))
+ case r: org.apache.spark.sql.catalyst.catalog.HiveTableRelation =>
+ Some(r.tableMeta.identifier)
+ case r: org.apache.spark.sql.execution.datasources.LogicalRelation =>
+ r.catalogTable.map(_.identifier)
+ case _ => None
+ }
+ tableIdOpt.foreach { tableId =>
+ // Match by table name, allowing for different catalog/database
qualifications
+ val matchesInput = context.allInputs.exists(input =>
+ input.table == tableId.table
+ )
+ if (matchesInput &&
+ !context.batchInputs.exists(_.input.identifier.table ==
tableId.table) &&
+ !context.streamingInputs.exists(_.input.identifier.table ==
tableId.table)) {
+ context.externalInputs += tableId
+ }
+ }
+ }
+
+ result
Review Comment:
I think this records the dependency in a field the scheduler doesn't read
for ordering, so the matched table may not end up affecting execution order.
Tracing it through: `context.externalInputs` becomes
`FlowFunctionResult.usedExternalInputs`, but the DAG dependency set is
`ResolvedFlow.inputs`, which (in Flow.scala) is `(batchInputs ++
streamingInputs).map(_.input.identifier)` — `usedExternalInputs` isn't
included. The consumers that order/validate the graph all read `.inputs`:
`GraphOperations.upstreamFlows`, `GraphValidations` (topo order),
`DatasetManager` (materialization deps), and `CoreDataflowNodeProcessor`.
`externalInputs` also semantically means "tables outside the pipeline" —
it's only written today by `readExternalBatchInput`/`readExternalStreamInput`,
which are precisely the non-pipeline paths. So when the matched relation *is* a
pipeline dataset, I think we'd want it to flow through the internal-input path
(landing in `batchInputs`/`streamingInputs`, which is what `.inputs` and the
DAG consume) rather than `externalInputs`. Otherwise the ordering the PR is
targeting may not change. Could you double-check this against how you saw the
scheduler pick it up? Happy to pair on it.
##########
sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/FlowAnalysis.scala:
##########
@@ -127,7 +127,40 @@ object FlowAnalysis {
resolved.mergeTagsFrom(u)
resolved
}
- Dataset.ofRows(spark, resolvedPlan)
+ val result = Dataset.ofRows(spark, resolvedPlan)
+
+ // SPARK-57352: Detect inputs that were resolved directly by the user
(e.g., via
+ // spark.table()) rather than through the pipeline's UnresolvedRelation
path above.
+ // Scan the resolved plan for table relations that match known pipeline
inputs and
+ // record them as external inputs so the DAG scheduler orders them
correctly.
+ resolvedPlan.foreach {
+ case r: org.apache.spark.sql.catalyst.analysis.UnresolvedRelation =>
+ // Already handled above
+ case node =>
+ val tableIdOpt = node match {
+ case r:
org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation =>
+ r.identifier.map(id => TableIdentifier(id.name(),
+ Option(id.namespace()).filter(_.nonEmpty).map(_.last)))
+ case r: org.apache.spark.sql.catalyst.catalog.HiveTableRelation =>
+ Some(r.tableMeta.identifier)
+ case r: org.apache.spark.sql.execution.datasources.LogicalRelation =>
+ r.catalogTable.map(_.identifier)
+ case _ => None
+ }
+ tableIdOpt.foreach { tableId =>
+ // Match by table name, allowing for different catalog/database
qualifications
+ val matchesInput = context.allInputs.exists(input =>
+ input.table == tableId.table
+ )
+ if (matchesInput &&
+ !context.batchInputs.exists(_.input.identifier.table ==
tableId.table) &&
+ !context.streamingInputs.exists(_.input.identifier.table ==
tableId.table)) {
+ context.externalInputs += tableId
+ }
+ }
Review Comment:
The match is on `tableId.table` only (unqualified name), so a
genuinely-external table that happens to share an unqualified name with a
pipeline dataset in a different catalog/database would also match here and get
recorded. The comment just above says "allowing for different catalog/database
qualifications" — in a multi-catalog pipeline, name-only matching could
introduce a dependency on the wrong table. Would it be worth qualifying the
comparison by catalog + database (the `TableIdentifier` carries them) so only
the intended dataset matches?
##########
sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/FlowAnalysis.scala:
##########
@@ -127,7 +127,40 @@ object FlowAnalysis {
resolved.mergeTagsFrom(u)
resolved
}
- Dataset.ofRows(spark, resolvedPlan)
+ val result = Dataset.ofRows(spark, resolvedPlan)
+
+ // SPARK-57352: Detect inputs that were resolved directly by the user
(e.g., via
+ // spark.table()) rather than through the pipeline's UnresolvedRelation
path above.
+ // Scan the resolved plan for table relations that match known pipeline
inputs and
+ // record them as external inputs so the DAG scheduler orders them
correctly.
+ resolvedPlan.foreach {
+ case r: org.apache.spark.sql.catalyst.analysis.UnresolvedRelation =>
+ // Already handled above
+ case node =>
+ val tableIdOpt = node match {
+ case r:
org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation =>
+ r.identifier.map(id => TableIdentifier(id.name(),
+ Option(id.namespace()).filter(_.nonEmpty).map(_.last)))
+ case r: org.apache.spark.sql.catalyst.catalog.HiveTableRelation =>
+ Some(r.tableMeta.identifier)
+ case r: org.apache.spark.sql.execution.datasources.LogicalRelation =>
+ r.catalogTable.map(_.identifier)
Review Comment:
`Option(id.namespace()).filter(_.nonEmpty).map(_.last)` takes only the last
namespace element as the database and drops the catalog (and any earlier
namespace levels). For a multi-level / 3-part identifier this loses the
catalog, which feeds into the name-only matching concern below. If we end up
qualifying the match by catalog+db, we'd want to preserve the full namespace
here too.
##########
sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/FlowAnalysis.scala:
##########
@@ -127,7 +127,40 @@ object FlowAnalysis {
resolved.mergeTagsFrom(u)
resolved
}
- Dataset.ofRows(spark, resolvedPlan)
+ val result = Dataset.ofRows(spark, resolvedPlan)
+
+ // SPARK-57352: Detect inputs that were resolved directly by the user
(e.g., via
+ // spark.table()) rather than through the pipeline's UnresolvedRelation
path above.
+ // Scan the resolved plan for table relations that match known pipeline
inputs and
+ // record them as external inputs so the DAG scheduler orders them
correctly.
+ resolvedPlan.foreach {
+ case r: org.apache.spark.sql.catalyst.analysis.UnresolvedRelation =>
+ // Already handled above
+ case node =>
Review Comment:
A few smaller things on this detection block:
- `resolvedPlan.foreach` doesn't descend into subquery expressions, whereas
the resolver above uses `transformWithSubqueries`. A resolved table reference
inside a subquery (e.g. `WHERE x IN (SELECT ... FROM pipeline_tbl)`) would be
missed. If the goal is parity with the `UnresolvedRelation` path, a
subquery-aware traversal would match it.
- The relation types are referenced by fully-qualified name inline
(`org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation`, etc.) —
could add imports to match the file's style.
- The `case r: UnresolvedRelation => // Already handled` arm: by this point
the plan is resolved, so the pipeline's `UnresolvedRelation`s are already gone
— this arm looks like it never fires.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]