yadavay-amzn commented on code in PR #56578:
URL: https://github.com/apache/spark/pull/56578#discussion_r3432297708


##########
sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/FlowAnalysis.scala:
##########
@@ -127,7 +127,40 @@ object FlowAnalysis {
           resolved.mergeTagsFrom(u)
           resolved
       }
-    Dataset.ofRows(spark, resolvedPlan)
+    val result = Dataset.ofRows(spark, resolvedPlan)
+
+    // SPARK-57352: Detect inputs that were resolved directly by the user 
(e.g., via
+    // spark.table()) rather than through the pipeline's UnresolvedRelation 
path above.
+    // Scan the resolved plan for table relations that match known pipeline 
inputs and
+    // record them as external inputs so the DAG scheduler orders them 
correctly.
+    resolvedPlan.foreach {
+      case r: org.apache.spark.sql.catalyst.analysis.UnresolvedRelation =>
+        // Already handled above
+      case node =>
+        val tableIdOpt = node match {
+          case r: 
org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation =>
+            r.identifier.map(id => TableIdentifier(id.name(),
+              Option(id.namespace()).filter(_.nonEmpty).map(_.last)))
+          case r: org.apache.spark.sql.catalyst.catalog.HiveTableRelation =>
+            Some(r.tableMeta.identifier)
+          case r: org.apache.spark.sql.execution.datasources.LogicalRelation =>
+            r.catalogTable.map(_.identifier)
+          case _ => None
+        }
+        tableIdOpt.foreach { tableId =>
+          // Match by table name, allowing for different catalog/database 
qualifications
+          val matchesInput = context.allInputs.exists(input =>
+            input.table == tableId.table
+          )
+          if (matchesInput &&
+              !context.batchInputs.exists(_.input.identifier.table == 
tableId.table) &&
+              !context.streamingInputs.exists(_.input.identifier.table == 
tableId.table)) {
+            context.externalInputs += tableId
+          }
+        }
+    }
+
+    result

Review Comment:
   I think this records the dependency in a field the scheduler doesn't read 
for ordering, so the matched table may not end up affecting execution order. 
Tracing it through: `context.externalInputs` becomes 
`FlowFunctionResult.usedExternalInputs`, but the DAG dependency set is 
`ResolvedFlow.inputs`, which (in Flow.scala) is `(batchInputs ++ 
streamingInputs).map(_.input.identifier)` — `usedExternalInputs` isn't 
included. The consumers that order/validate the graph all read `.inputs`: 
`GraphOperations.upstreamFlows`, `GraphValidations` (topo order), 
`DatasetManager` (materialization deps), and `CoreDataflowNodeProcessor`.
   
   `externalInputs` also semantically means "tables outside the pipeline" — 
it's only written today by `readExternalBatchInput`/`readExternalStreamInput`, 
which are precisely the non-pipeline paths. So when the matched relation *is* a 
pipeline dataset, I think we'd want it to flow through the internal-input path 
(landing in `batchInputs`/`streamingInputs`, which is what `.inputs` and the 
DAG consume) rather than `externalInputs`. Otherwise the ordering the PR is 
targeting may not change. Could you double-check this against how you saw the 
scheduler pick it up? Happy to pair on it.



##########
sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/FlowAnalysis.scala:
##########
@@ -127,7 +127,40 @@ object FlowAnalysis {
           resolved.mergeTagsFrom(u)
           resolved
       }
-    Dataset.ofRows(spark, resolvedPlan)
+    val result = Dataset.ofRows(spark, resolvedPlan)
+
+    // SPARK-57352: Detect inputs that were resolved directly by the user 
(e.g., via
+    // spark.table()) rather than through the pipeline's UnresolvedRelation 
path above.
+    // Scan the resolved plan for table relations that match known pipeline 
inputs and
+    // record them as external inputs so the DAG scheduler orders them 
correctly.
+    resolvedPlan.foreach {
+      case r: org.apache.spark.sql.catalyst.analysis.UnresolvedRelation =>
+        // Already handled above
+      case node =>
+        val tableIdOpt = node match {
+          case r: 
org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation =>
+            r.identifier.map(id => TableIdentifier(id.name(),
+              Option(id.namespace()).filter(_.nonEmpty).map(_.last)))
+          case r: org.apache.spark.sql.catalyst.catalog.HiveTableRelation =>
+            Some(r.tableMeta.identifier)
+          case r: org.apache.spark.sql.execution.datasources.LogicalRelation =>
+            r.catalogTable.map(_.identifier)
+          case _ => None
+        }
+        tableIdOpt.foreach { tableId =>
+          // Match by table name, allowing for different catalog/database 
qualifications
+          val matchesInput = context.allInputs.exists(input =>
+            input.table == tableId.table
+          )
+          if (matchesInput &&
+              !context.batchInputs.exists(_.input.identifier.table == 
tableId.table) &&
+              !context.streamingInputs.exists(_.input.identifier.table == 
tableId.table)) {
+            context.externalInputs += tableId
+          }
+        }

Review Comment:
   The match is on `tableId.table` only (unqualified name), so a 
genuinely-external table that happens to share an unqualified name with a 
pipeline dataset in a different catalog/database would also match here and get 
recorded. The comment just above says "allowing for different catalog/database 
qualifications" — in a multi-catalog pipeline, name-only matching could 
introduce a dependency on the wrong table. Would it be worth qualifying the 
comparison by catalog + database (the `TableIdentifier` carries them) so only 
the intended dataset matches?



##########
sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/FlowAnalysis.scala:
##########
@@ -127,7 +127,40 @@ object FlowAnalysis {
           resolved.mergeTagsFrom(u)
           resolved
       }
-    Dataset.ofRows(spark, resolvedPlan)
+    val result = Dataset.ofRows(spark, resolvedPlan)
+
+    // SPARK-57352: Detect inputs that were resolved directly by the user 
(e.g., via
+    // spark.table()) rather than through the pipeline's UnresolvedRelation 
path above.
+    // Scan the resolved plan for table relations that match known pipeline 
inputs and
+    // record them as external inputs so the DAG scheduler orders them 
correctly.
+    resolvedPlan.foreach {
+      case r: org.apache.spark.sql.catalyst.analysis.UnresolvedRelation =>
+        // Already handled above
+      case node =>
+        val tableIdOpt = node match {
+          case r: 
org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation =>
+            r.identifier.map(id => TableIdentifier(id.name(),
+              Option(id.namespace()).filter(_.nonEmpty).map(_.last)))
+          case r: org.apache.spark.sql.catalyst.catalog.HiveTableRelation =>
+            Some(r.tableMeta.identifier)
+          case r: org.apache.spark.sql.execution.datasources.LogicalRelation =>
+            r.catalogTable.map(_.identifier)

Review Comment:
   `Option(id.namespace()).filter(_.nonEmpty).map(_.last)` takes only the last 
namespace element as the database and drops the catalog (and any earlier 
namespace levels). For a multi-level / 3-part identifier this loses the 
catalog, which feeds into the name-only matching concern below. If we end up 
qualifying the match by catalog+db, we'd want to preserve the full namespace 
here too.



##########
sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/FlowAnalysis.scala:
##########
@@ -127,7 +127,40 @@ object FlowAnalysis {
           resolved.mergeTagsFrom(u)
           resolved
       }
-    Dataset.ofRows(spark, resolvedPlan)
+    val result = Dataset.ofRows(spark, resolvedPlan)
+
+    // SPARK-57352: Detect inputs that were resolved directly by the user 
(e.g., via
+    // spark.table()) rather than through the pipeline's UnresolvedRelation 
path above.
+    // Scan the resolved plan for table relations that match known pipeline 
inputs and
+    // record them as external inputs so the DAG scheduler orders them 
correctly.
+    resolvedPlan.foreach {
+      case r: org.apache.spark.sql.catalyst.analysis.UnresolvedRelation =>
+        // Already handled above
+      case node =>

Review Comment:
   A few smaller things on this detection block:
   
   - `resolvedPlan.foreach` doesn't descend into subquery expressions, whereas 
the resolver above uses `transformWithSubqueries`. A resolved table reference 
inside a subquery (e.g. `WHERE x IN (SELECT ... FROM pipeline_tbl)`) would be 
missed. If the goal is parity with the `UnresolvedRelation` path, a 
subquery-aware traversal would match it.
   - The relation types are referenced by fully-qualified name inline 
(`org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation`, etc.) — 
could add imports to match the file's style.
   - The `case r: UnresolvedRelation => // Already handled` arm: by this point 
the plan is resolved, so the pipeline's `UnresolvedRelation`s are already gone 
— this arm looks like it never fires.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to