sririshindra commented on a change in pull request #31477:
URL: https://github.com/apache/spark/pull/31477#discussion_r570672502
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoinExec.scala
##########
@@ -89,13 +90,20 @@ case class BroadcastNestedLoopJoinExec(
}
}
- @transient private lazy val boundCondition = {
+ private val numMatchedPairs = longMetric("numMatchedPairs")
+
+ @transient private lazy val boundCondition: InternalRow => Boolean =
if (condition.isDefined) {
Review comment:
If the condition is empty, then `numOutputRows == numMatchedPairs` only
in the case of InnerJoin. It is not necessarily the case for other types of
join, because numMatchedPairs is only measuring the records when there is a
common key on both the build side and the stream side. For instance, in the
case of leftOuterJoin `numOutputRows = numMatchedPairs + (number of non
matched pairs on the left side of the join)`.
Moreover, if we define a metric and fail to populate it for some cases, then
the existing unit tests in the SQLMetricsSuite will fail because of the way the
unit tests fetch the actual metrics.
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala
##########
@@ -57,7 +57,8 @@ case class BroadcastHashJoinExec(
}
override lazy val metrics = Map(
- "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output
rows"))
+ "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output
rows"),
+ "numMatchedPairs" -> SQLMetrics.createMetric(sparkContext, "number of
matched pairs"))
Review comment:
Sure, do you also want me to change the display name as well, `number of
matched pairs` -> `number of joined rows`? I feel that `number of matched
pairs` might be more precise and less confusing to the user. But if you think
that `number of joined rows` better I will change the name of the metric
accordingly.
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoinExec.scala
##########
@@ -89,13 +90,20 @@ case class BroadcastNestedLoopJoinExec(
}
}
- @transient private lazy val boundCondition = {
+ private val numMatchedPairs = longMetric("numMatchedPairs")
+
+ @transient private lazy val boundCondition: InternalRow => Boolean =
if (condition.isDefined) {
Review comment:
If the condition is empty, then `numOutputRows == numMatchedPairs` only
in the case of InnerJoin. It is not necessarily the case for other types of
join, because numMatchedPairs is only measuring the records when there is a
common key on both the build side and the stream side. For instance, in the
case of leftOuterJoin `numOutputRows = numMatchedPairs + (number of non
matched pairs on the left side of the join)`.
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
##########
@@ -415,24 +431,26 @@ trait HashJoin extends BaseJoinExec with CodegenSupport {
ctx: CodegenContext,
input: Seq[ExprCode]): (String, String, Seq[ExprCode]) = {
val matched = ctx.freshName("matched")
+ val numMatched = metricTerm(ctx, "numMatchedPairs")
val buildVars = genBuildSideVars(ctx, matched)
- val checkCondition = if (condition.isDefined) {
- val expr = condition.get
- // evaluate the variables from build side that used by condition
- val eval = evaluateRequiredVariables(buildPlan.output, buildVars,
expr.references)
- // filter the output via condition
- ctx.currentVars = input ++ buildVars
- val ev =
- BindReferences.bindReference(expr, streamedPlan.output ++
buildPlan.output).genCode(ctx)
- val skipRow = s"${ev.isNull} || !${ev.value}"
- s"""
- |$eval
- |${ev.code}
- |if (!($skipRow))
- """.stripMargin
- } else {
- ""
- }
+ val checkCondition = s"$numMatched.add(1);\n" +
+ (if (condition.isDefined) {
+ val expr = condition.get
+ // evaluate the variables from build side that used by condition
+ val eval = evaluateRequiredVariables(buildPlan.output, buildVars,
expr.references)
+ // filter the output via condition
+ ctx.currentVars = input ++ buildVars
+ val ev =
+ BindReferences.bindReference(expr, streamedPlan.output ++
buildPlan.output).genCode(ctx)
+ val skipRow = s"${ev.isNull} || !${ev.value}"
+ s"""
+ |$eval
+ |${ev.code}
+ |if (!($skipRow))
+ """.stripMargin
+ } else {
+ ""
+ })
Review comment:
Fixed in latest commit.
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala
##########
@@ -57,7 +57,8 @@ case class BroadcastHashJoinExec(
}
override lazy val metrics = Map(
- "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output
rows"))
+ "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output
rows"),
+ "numMatchedPairs" -> SQLMetrics.createMetric(sparkContext, "number of
matched pairs"))
Review comment:
Sure, do you also want me to change the display name as well, `number of
matched pairs` -> `number of joined rows`? I feel that `number of matched
pairs` might be more precise and less confusing to the user. But if you think
that `number of joined rows` is better I will change the name of the metric
accordingly.
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala
##########
@@ -57,7 +57,8 @@ case class BroadcastHashJoinExec(
}
override lazy val metrics = Map(
- "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output
rows"))
+ "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output
rows"),
+ "numMatchedPairs" -> SQLMetrics.createMetric(sparkContext, "number of
matched pairs"))
Review comment:
How about `number of matched rows` instead? I wanted highlight the fact
that this metric measures the number of keys matched between the stream side
and the build side. I also felt that users might not recognize the difference
between `number of joined rows` and `number of output rows`. But `number of
joined rows` is fine as well. If you think that users will understand that
better I will change the name of the metric to `number of joined rows`
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoinExec.scala
##########
@@ -89,13 +90,20 @@ case class BroadcastNestedLoopJoinExec(
}
}
- @transient private lazy val boundCondition = {
+ private val numMatchedPairs = longMetric("numMatchedPairs")
+
+ @transient private lazy val boundCondition: InternalRow => Boolean =
if (condition.isDefined) {
Review comment:
- Yes, the metric is very useful especially when there is a condition.
It helps the users to recognize skew. But even if there is no condition, it
could help the users to see the number of matched rows. For instance, in case
of outer joins users will be able to see just how many rows matched between the
stream side and the build side without explicitly doing another inner join.
- I also faced an implementation problem when I originally tried to show the
metric only when there is a condition. If there is a metric defined, but if the
metric is not used in some cases (For example if we don't increment the metric
when there is no condition), then the tests in SQLMetricsSuite will fail
because the accumulatorIds corresponding the metrics would get messed up. The
size of metric values in `val metricValues =
statusStore.executionMetrics(executionId)` and the metrics in `node.metrics`
would become inconsistent.
- It might also confuse the users to see the metric in some cases and not
see the metric in other cases. It would probably be best to show the metric
everytime there is a join node in the UI to avoid any confusion.
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala
##########
@@ -57,7 +57,8 @@ case class BroadcastHashJoinExec(
}
override lazy val metrics = Map(
- "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output
rows"))
+ "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output
rows"),
+ "numMatchedPairs" -> SQLMetrics.createMetric(sparkContext, "number of
matched pairs"))
Review comment:
Updated the name of the metric to `number of matched rows` in the latest
commit.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]