asl3 commented on code in PR #52669:
URL: https://github.com/apache/spark/pull/52669#discussion_r2466246063


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala:
##########
@@ -481,9 +482,45 @@ trait V2TableWriteExec extends V2CommandExec with 
UnaryExecNode with AdaptiveSpa
   }
 
   private def getOperationMetrics(query: SparkPlan): util.Map[String, 
lang.Long] = {
-    collectFirst(query) { case m: MergeRowsExec => m }.map{ n =>
-      n.metrics.map { case (name, metric) => s"merge.$name" -> 
lang.Long.valueOf(metric.value) }
-    }.getOrElse(Map.empty[String, lang.Long]).asJava
+    collectFirst(query) { case m: MergeRowsExec => m } match {
+      case Some(mergeRowsExec) =>
+        val mergeMetrics = mergeRowsExec.metrics.map {
+          case (name, metric) => s"merge.$name" -> 
lang.Long.valueOf(metric.value)
+        }
+        val numSourceRows = getNumSourceRows(mergeRowsExec)
+        (mergeMetrics + ("merge.numSourceRows" -> 
lang.Long.valueOf(numSourceRows))).asJava
+      case None =>
+        Map.empty[String, lang.Long].asJava
+    }
+  }
+
+  private def getNumSourceRows(mergeRowsExec: MergeRowsExec): Long = {
+    def isTargetTableScan(plan: SparkPlan): Boolean = {
+      collectFirst(plan) {
+        case scan: BatchScanExec if 
scan.table.isInstanceOf[RowLevelOperationTable] => true
+      }.getOrElse(false)
+    }
+
+    val joinOpt = collectFirst(mergeRowsExec.child) { case j: BaseJoinExec => 
j }
+
+    joinOpt.flatMap { join =>
+      val leftIsTarget = isTargetTableScan(join.left)
+      val rightIsTarget = isTargetTableScan(join.right)
+
+      val sourceChild = if (leftIsTarget) {
+        Some(join.right)
+      } else if (rightIsTarget) {
+        Some(join.left)
+      } else {
+        None
+      }
+
+      sourceChild.flatMap { child =>

Review Comment:
   I renamed to findSourceSide, as we still need a step to find the source node 
with numOutputRows. 
   
   For example, with: 
   ```
   +- *(2) BroadcastHashJoin ...
                        :- *(2) Project ... 
                        :  +- BatchScan ... 
                        +- BroadcastQueryStage ...
                           +- BroadcastExchange ... 
                              +- *(1) Project ...
                                 +- *(1) LocalTableScan ...
   ```
   
   we find `BroadcastQueryStage` has the source table (after checking 
`isTargetTableScan`), but still need a step to traverse for `LocalTableScan`. 
As it is `collectFirst`, I think we don't worry about traversing too far



##########
sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoTableSuiteBase.scala:
##########
@@ -2154,6 +2161,41 @@ abstract class MergeIntoTableSuiteBase extends 
RowLevelOperationSuiteBase
     }
   }
 
+  test("Merge metrics with numSourceRows for empty source") {

Review Comment:
   SG! I left a Todo comment in the test, `Merge metrics with matched and not 
matched clause`, to tackle this scenario in a follow-up. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to