c21 commented on a change in pull request #34581:
URL: https://github.com/apache/spark/pull/34581#discussion_r749112440



##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
##########
@@ -890,6 +898,250 @@ case class SortMergeJoinExec(
      """.stripMargin
   }
 
+  /**
+   * Generates the code for Full Outer join.
+   */
+  private def codegenFullOuter(ctx: CodegenContext): String = {
+    // Inline mutable state since not many join operations in a task.
+    // Create class member for input iterator from both sides.
+    val leftInput = ctx.addMutableState("scala.collection.Iterator", 
"leftInput",
+      v => s"$v = inputs[0];", forceInline = true)
+    val rightInput = ctx.addMutableState("scala.collection.Iterator", 
"rightInput",
+      v => s"$v = inputs[1];", forceInline = true)
+
+    // Create class member for next input row from both sides.
+    val leftInputRow = ctx.addMutableState("InternalRow", "leftInputRow", 
forceInline = true)
+    val rightInputRow = ctx.addMutableState("InternalRow", "rightInputRow", 
forceInline = true)
+
+    // Create variables for join keys from both sides.
+    val leftKeyVars = createJoinKey(ctx, leftInputRow, leftKeys, left.output)
+    val leftAnyNull = leftKeyVars.map(_.isNull).mkString(" || ")
+    val rightKeyVars = createJoinKey(ctx, rightInputRow, rightKeys, 
right.output)
+    val rightAnyNull = rightKeyVars.map(_.isNull).mkString(" || ")
+    val matchedKeyVars = copyKeys(ctx, leftKeyVars)
+    val leftMatchedKeyVars = createJoinKey(ctx, leftInputRow, leftKeys, 
left.output)
+    val rightMatchedKeyVars = createJoinKey(ctx, rightInputRow, rightKeys, 
right.output)
+
+    // Create class member for next output row from both sides.
+    val leftOutputRow = ctx.addMutableState("InternalRow", "leftOutputRow", 
forceInline = true)
+    val rightOutputRow = ctx.addMutableState("InternalRow", "rightOutputRow", 
forceInline = true)
+
+    // Create class member for buffers of rows with same join keys from both 
sides.
+    val bufferClsName = "java.util.ArrayList<InternalRow>"
+    val leftBuffer = ctx.addMutableState(bufferClsName, "leftBuffer",
+      v => s"$v = new $bufferClsName();", forceInline = true)
+    val rightBuffer = ctx.addMutableState(bufferClsName, "rightBuffer",
+      v => s"$v = new $bufferClsName();", forceInline = true)
+    val matchedClsName = classOf[BitSet].getName
+    val leftMatched = ctx.addMutableState(matchedClsName, "leftMatched",
+      v => s"$v = new $matchedClsName(1);", forceInline = true)
+    val rightMatched = ctx.addMutableState(matchedClsName, "rightMatched",
+      v => s"$v = new $matchedClsName(1);", forceInline = true)
+    val leftIndex = ctx.freshName("leftIndex")
+    val rightIndex = ctx.freshName("rightIndex")
+
+    // Generate code for join condition
+    // val leftVars = genOneSideJoinVars(ctx, leftOutputRow, left, 
setDefaultValue = false)

Review comment:
       @cloud-fan - ah sorry, was added during debugging, removed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to