sririshindra commented on a change in pull request #31477:
URL: https://github.com/apache/spark/pull/31477#discussion_r570672502



##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoinExec.scala
##########
@@ -89,13 +90,20 @@ case class BroadcastNestedLoopJoinExec(
     }
   }
 
-  @transient private lazy val boundCondition = {
+  private val numMatchedPairs = longMetric("numMatchedPairs")
+
+  @transient private lazy val boundCondition: InternalRow => Boolean =
     if (condition.isDefined) {

Review comment:
       If the condition is empty, then `numOutputRows == numMatchedPairs` only 
in the case of InnerJoin. It is not necessarily the case for other types of 
join, because numMatchedPairs is only measuring the records when there is a 
common key on both the build side and the stream side.  For instance, in the 
case of leftOuterJoin `numOutputRows  = numMatchedPairs  + (number of non 
matched pairs on the left side of the join)`.
   Moreover, if we define a metric and fail to populate it for some cases, then 
the existing unit tests in the SQLMetricsSuite will fail because of the way the 
unit tests fetch the actual metrics. 

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala
##########
@@ -57,7 +57,8 @@ case class BroadcastHashJoinExec(
   }
 
   override lazy val metrics = Map(
-    "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output 
rows"))
+    "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output 
rows"),
+    "numMatchedPairs" -> SQLMetrics.createMetric(sparkContext, "number of 
matched pairs"))

Review comment:
       Sure, do you also want me to change the display name as well, `number of 
matched pairs` -> `number of joined rows`? I feel that `number of matched 
pairs` might be more precise and less confusing to the user. But if you think 
that  `number of joined rows` better I will change the name of the metric 
accordingly.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoinExec.scala
##########
@@ -89,13 +90,20 @@ case class BroadcastNestedLoopJoinExec(
     }
   }
 
-  @transient private lazy val boundCondition = {
+  private val numMatchedPairs = longMetric("numMatchedPairs")
+
+  @transient private lazy val boundCondition: InternalRow => Boolean =
     if (condition.isDefined) {

Review comment:
       If the condition is empty, then `numOutputRows == numMatchedPairs` only 
in the case of InnerJoin. It is not necessarily the case for other types of 
join, because numMatchedPairs is only measuring the records when there is a 
common key on both the build side and the stream side.  For instance, in the 
case of leftOuterJoin `numOutputRows  = numMatchedPairs  + (number of non 
matched pairs on the left side of the join)`.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
##########
@@ -415,24 +431,26 @@ trait HashJoin extends BaseJoinExec with CodegenSupport {
       ctx: CodegenContext,
       input: Seq[ExprCode]): (String, String, Seq[ExprCode]) = {
     val matched = ctx.freshName("matched")
+    val numMatched = metricTerm(ctx, "numMatchedPairs")
     val buildVars = genBuildSideVars(ctx, matched)
-    val checkCondition = if (condition.isDefined) {
-      val expr = condition.get
-      // evaluate the variables from build side that used by condition
-      val eval = evaluateRequiredVariables(buildPlan.output, buildVars, 
expr.references)
-      // filter the output via condition
-      ctx.currentVars = input ++ buildVars
-      val ev =
-        BindReferences.bindReference(expr, streamedPlan.output ++ 
buildPlan.output).genCode(ctx)
-      val skipRow = s"${ev.isNull} || !${ev.value}"
-      s"""
-         |$eval
-         |${ev.code}
-         |if (!($skipRow))
-       """.stripMargin
-    } else {
-      ""
-    }
+    val checkCondition = s"$numMatched.add(1);\n" +
+      (if (condition.isDefined) {
+        val expr = condition.get
+        // evaluate the variables from build side that used by condition
+        val eval = evaluateRequiredVariables(buildPlan.output, buildVars, 
expr.references)
+        // filter the output via condition
+        ctx.currentVars = input ++ buildVars
+        val ev =
+          BindReferences.bindReference(expr, streamedPlan.output ++ 
buildPlan.output).genCode(ctx)
+        val skipRow = s"${ev.isNull} || !${ev.value}"
+        s"""
+           |$eval
+           |${ev.code}
+           |if (!($skipRow))
+         """.stripMargin
+      } else {
+        ""
+      })

Review comment:
       Fixed in latest commit.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala
##########
@@ -57,7 +57,8 @@ case class BroadcastHashJoinExec(
   }
 
   override lazy val metrics = Map(
-    "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output 
rows"))
+    "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output 
rows"),
+    "numMatchedPairs" -> SQLMetrics.createMetric(sparkContext, "number of 
matched pairs"))

Review comment:
       Sure, do you also want me to change the display name as well, `number of 
matched pairs` -> `number of joined rows`? I feel that `number of matched 
pairs` might be more precise and less confusing to the user. But if you think 
that  `number of joined rows` is better I will change the name of the metric 
accordingly.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala
##########
@@ -57,7 +57,8 @@ case class BroadcastHashJoinExec(
   }
 
   override lazy val metrics = Map(
-    "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output 
rows"))
+    "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output 
rows"),
+    "numMatchedPairs" -> SQLMetrics.createMetric(sparkContext, "number of 
matched pairs"))

Review comment:
       How about `number of matched rows` instead?  I wanted highlight the fact 
that this metric measures the number of keys matched between the stream side 
and the build side.  I also felt that users might not recognize the difference 
between `number of joined rows` and `number of output rows`. But `number of 
joined rows` is fine as well. If you think that users will understand that 
better I will change the name of the metric to `number of joined rows`

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoinExec.scala
##########
@@ -89,13 +90,20 @@ case class BroadcastNestedLoopJoinExec(
     }
   }
 
-  @transient private lazy val boundCondition = {
+  private val numMatchedPairs = longMetric("numMatchedPairs")
+
+  @transient private lazy val boundCondition: InternalRow => Boolean =
     if (condition.isDefined) {

Review comment:
       - Yes, the metric is very useful especially when there is a condition. 
It helps the users to recognize skew. But even if there is no condition, it 
could help the users to see the number of matched rows. For instance, in case 
of outer joins users will be able to see just how many rows matched between the 
stream side and the build side without explicitly doing another inner join.   
   
   - I also faced an implementation problem when I originally tried to show the 
metric only when there is a condition. If there is a metric defined, but if the 
metric is not used in some cases (For example if we don't increment the metric 
when there is no condition), then the tests in SQLMetricsSuite will fail 
because the accumulatorIds corresponding the metrics would get messed up. The 
size of metric values in `val metricValues = 
statusStore.executionMetrics(executionId)` and the metrics in `node.metrics` 
would become inconsistent. 
   
   - It might also confuse the users to see the metric in some cases and not 
see the metric in other cases. It would probably be best to show the metric 
everytime there is a join node in the UI to avoid any confusion.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala
##########
@@ -57,7 +57,8 @@ case class BroadcastHashJoinExec(
   }
 
   override lazy val metrics = Map(
-    "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output 
rows"))
+    "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output 
rows"),
+    "numMatchedPairs" -> SQLMetrics.createMetric(sparkContext, "number of 
matched pairs"))

Review comment:
       Updated the name of the metric to `number of matched rows` in the latest 
commit.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to