szehon-ho commented on code in PR #53571:
URL: https://github.com/apache/spark/pull/53571#discussion_r2683529968
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala:
##########
@@ -504,6 +508,54 @@ trait V2TableWriteExec extends V2CommandExec with
UnaryExecNode with AdaptiveSpa
)
}
}
+
+ private object SourceAncestor {
+ def unapply(p: SparkPlan): Option[SparkPlan] = p match {
+ case w: WholeStageCodegenExec => Some(w.child)
+ case i: InputAdapter => Some(i.child)
+ case p: ProjectExec => Some(p.child)
+ case f: FilterExec => Some(f.child)
+ case s: SortExec => Some(s.child)
+ case e: ShuffleExchangeExec => Some(e.child)
+ case a: AQEShuffleReadExec => Some(a.child)
+ case q: QueryStageExec => Some(q.plan)
+ case _ => None
+ }
+ }
+
+ private def getNumSourceRows(mergeRowsExec: MergeRowsExec): Long = {
+ def hasTargetTable(plan: SparkPlan): Boolean = {
+ collectFirst(plan) {
+ case scan @ BatchScanExec(_, _, _, _, _: RowLevelOperationTable, _) =>
scan
+ }.isDefined
+ }
+
+ def findSource(plan: SparkPlan): Long = plan match {
+ case SourceAncestor(child) =>
+ findSource(child)
+ case n if n.metrics.contains("numOutputRows") =>
+ n.metrics("numOutputRows").value
+ case _ =>
+ -1L
+ }
+
+ val joinOpt = collectFirst(mergeRowsExec.child) { case j: BaseJoinExec =>
j }
+
+ joinOpt match {
+ case Some(join) =>
+ // Merge with join, find source side of the join
+ val sourceSide = if (hasTargetTable(join.left)) join.right else
join.left
+ findSource(sourceSide)
+ case None =>
+ // Group-based merge, no join.
Review Comment:
the comment is a bit off, group-based still have join. Group-based refers
to a mode that is copy-on-write (ie, no delete vectors/merge-on-read). Join
disappears due to some kind of optimizater rule (not sure the name of it)
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala:
##########
@@ -504,6 +508,54 @@ trait V2TableWriteExec extends V2CommandExec with
UnaryExecNode with AdaptiveSpa
)
}
}
+
+ private object SourceAncestor {
+ def unapply(p: SparkPlan): Option[SparkPlan] = p match {
+ case w: WholeStageCodegenExec => Some(w.child)
+ case i: InputAdapter => Some(i.child)
+ case p: ProjectExec => Some(p.child)
Review Comment:
I am wondering, is Project/FilterExec needed, as those have numOutputRows?
What format of SparkPlan p is here, that make use need those?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]