AnishMahto commented on code in PR #56578:
URL: https://github.com/apache/spark/pull/56578#discussion_r3465474815


##########
sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/FlowAnalysis.scala:
##########
@@ -127,7 +130,56 @@ object FlowAnalysis {
           resolved.mergeTagsFrom(u)
           resolved
       }
-    Dataset.ofRows(spark, resolvedPlan)
+    val result = Dataset.ofRows(spark, resolvedPlan)
+
+    // SPARK-57352: Detect inputs that were resolved directly by the user 
(e.g., via
+    // spark.table()) rather than through the pipeline's UnresolvedRelation 
path above.
+    // Scan the resolved plan (including subquery expressions) for table 
relations whose
+    // fully-qualified identifier matches a known pipeline input. Record them 
in
+    // requestedInputs so the DAG scheduler orders execution correctly.
+    def scanForResolvedInputs(plan: LogicalPlan): Unit = {
+      plan.foreach { node =>
+        val tableIdOpt: Option[TableIdentifier] = node match {
+          case r: DataSourceV2Relation =>
+            r.identifier.map { id =>
+              val ns = Option(id.namespace()).getOrElse(Array.empty[String])
+              TableIdentifier(
+                table = id.name(),
+                database = if (ns.nonEmpty) Some(ns.last) else None,
+                catalog = if (ns.length > 1) Some(ns.head) else None)
+            }
+          case r: HiveTableRelation =>
+            Some(r.tableMeta.identifier)
+          case r: LogicalRelation =>
+            r.catalogTable.map(_.identifier)
+          case _ => None

Review Comment:
   I'm a little worried this approach might be brittle. We're assuming that all 
resolved data sources materialize as one of `DataSourceV2Relation`, 
`HiveTableRelation`, or `LogicalRelation`. 
   
   This may or may not be true today (ex. if we're load from a cached 
dataframe, I think it can resolve as an `InMemoryRelation`), but I'm more 
concerned about if in the future new resolved relation types are introduced - 
this match list will become difficult to maintain.
   
   Another high level concern is we're taking a bit of a round about approach 
here - we let data sources fully resolve in logical plans before mapping them 
back to table read/loads. 
   
   I think we can do something more direct where we directly intercept 
identifiers from SC Read protos and load them in the dataflow graph, rather 
than try to parse out those same identifiers from the analyzed logical plan. 
Giving my proposal in my other review comment.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to