AnishMahto commented on code in PR #56578:
URL: https://github.com/apache/spark/pull/56578#discussion_r3465474815
##########
sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/FlowAnalysis.scala:
##########
@@ -127,7 +130,56 @@ object FlowAnalysis {
resolved.mergeTagsFrom(u)
resolved
}
- Dataset.ofRows(spark, resolvedPlan)
+ val result = Dataset.ofRows(spark, resolvedPlan)
+
+ // SPARK-57352: Detect inputs that were resolved directly by the user
(e.g., via
+ // spark.table()) rather than through the pipeline's UnresolvedRelation
path above.
+ // Scan the resolved plan (including subquery expressions) for table
relations whose
+ // fully-qualified identifier matches a known pipeline input. Record them
in
+ // requestedInputs so the DAG scheduler orders execution correctly.
+ def scanForResolvedInputs(plan: LogicalPlan): Unit = {
+ plan.foreach { node =>
+ val tableIdOpt: Option[TableIdentifier] = node match {
+ case r: DataSourceV2Relation =>
+ r.identifier.map { id =>
+ val ns = Option(id.namespace()).getOrElse(Array.empty[String])
+ TableIdentifier(
+ table = id.name(),
+ database = if (ns.nonEmpty) Some(ns.last) else None,
+ catalog = if (ns.length > 1) Some(ns.head) else None)
+ }
+ case r: HiveTableRelation =>
+ Some(r.tableMeta.identifier)
+ case r: LogicalRelation =>
+ r.catalogTable.map(_.identifier)
+ case _ => None
Review Comment:
I'm a little worried this approach might be brittle. We're assuming that all
resolved data sources materialize as one of `DataSourceV2Relation`,
`HiveTableRelation`, or `LogicalRelation`.
This may or may not be true today (ex. if we're load from a cached
dataframe, I think it can resolve as an `InMemoryRelation`), but I'm more
concerned about if in the future new resolved relation types are introduced -
this match list will become difficult to maintain.
Another high level concern is we're taking a bit of a round about approach
here - we let data sources fully resolve in logical plans before mapping them
back to table read/loads.
I think we can do something more direct where we directly intercept
identifiers from SC Read protos and load them in the dataflow graph, rather
than try to parse out those same identifiers from the analyzed logical plan.
Giving my proposal in my other review comment.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]