Github user viirya commented on a diff in the pull request:
https://github.com/apache/spark/pull/19568#discussion_r146757237
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
---
@@ -585,21 +585,26 @@ case class SortMergeJoinExec(
val iterator = ctx.freshName("iterator")
val numOutput = metricTerm(ctx, "numOutputRows")
+ val joinedRow = ctx.freshName("joined")
val (beforeLoop, condCheck) = if (condition.isDefined) {
// Split the code of creating variables based on whether it's used
by condition or not.
val loaded = ctx.freshName("loaded")
val (leftBefore, leftAfter) = splitVarsByCondition(left.output,
leftVars)
val (rightBefore, rightAfter) = splitVarsByCondition(right.output,
rightVars)
+
// Generate code for condition
+ ctx.INPUT_ROW = joinedRow
--- End diff --
We should also leave a comment explaining why add a JoinedRow as INPUT_ROW.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]