Github user davies commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11130#discussion_r52400364
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala
 ---
    @@ -163,64 +192,142 @@ case class BroadcastHashJoin(
     
         // find the matches from HashedRelation
         val matched = ctx.freshName("matched")
    +    val valid = ctx.freshName("invalid")
     
         // create variables for output
         ctx.currentVars = null
         ctx.INPUT_ROW = matched
         val buildColumns = buildPlan.output.zipWithIndex.map { case (a, i) =>
    -      BoundReference(i, a.dataType, a.nullable).gen(ctx)
    +      val ev = BoundReference(i, a.dataType, a.nullable).gen(ctx)
    +      if (joinType == Inner) {
    +        ev
    +      } else {
    +        val isNull = ctx.freshName("isNull")
    +        val value = ctx.freshName("value")
    +        val code = s"""
    +          |boolean $isNull = true;
    +          |${ctx.javaType(a.dataType)} $value = 
${ctx.defaultValue(a.dataType)};
    +          |if ($matched != null) {
    +          |  ${ev.code}
    +          |  $isNull = ${ev.isNull};
    +          |  $value = ${ev.value};
    +          |}
    +         """.stripMargin
    +        ExprCode(code, isNull, value)
    +      }
         }
    +
    +    // output variables
         val resultVars = buildSide match {
           case BuildLeft => buildColumns ++ input
           case BuildRight => input ++ buildColumns
         }
     
    -    val outputCode = if (condition.isDefined) {
    -      // filter the output via condition
    -      ctx.currentVars = resultVars
    -      val ev = BindReferences.bindReference(condition.get, 
this.output).gen(ctx)
    -      s"""
    -         | ${ev.code}
    -         | if (!${ev.isNull} && ${ev.value}) {
    -         |   ${consume(ctx, resultVars)}
    -         | }
    -       """.stripMargin
    -    } else {
    -      consume(ctx, resultVars)
    -    }
    +    if (joinType == Inner) {
    +      val outputCode = if (condition.isDefined) {
    +        // filter the output via condition
    +        ctx.currentVars = resultVars
    +        val ev = BindReferences.bindReference(condition.get, 
this.output).gen(ctx)
    +        s"""
    +         |${ev.code}
    +         |if (!${ev.isNull} && ${ev.value}) {
    +         |  ${consume(ctx, resultVars)}
    +         |}
    +         """.stripMargin
    +      } else {
    +        consume(ctx, resultVars)
    +      }
     
    -    if (broadcastRelation.value.isInstanceOf[UniqueHashedRelation]) {
    -      s"""
    -         | // generate join key
    -         | ${keyVal.code}
    -         | // find matches from HashedRelation
    -         | UnsafeRow $matched = $anyNull ? null: 
(UnsafeRow)$relationTerm.getValue(${keyVal.value});
    -         | if ($matched != null) {
    -         |   ${buildColumns.map(_.code).mkString("\n")}
    -         |   $outputCode
    -         | }
    -     """.stripMargin
    +      if (broadcastRelation.value.isInstanceOf[UniqueHashedRelation]) {
    +        s"""
    +         |// generate join key
    +         |${keyVal.code}
    +         |// find matches from HashedRelation
    +         |UnsafeRow $matched = $anyNull ? null: 
(UnsafeRow)$relationTerm.getValue(${keyVal.value});
    +         |if ($matched != null) {
    +         |  ${buildColumns.map(_.code).mkString("\n")}
    +         |  $outputCode
    +         |}
    +         """.stripMargin
    +
    +      } else {
    +        val matches = ctx.freshName("matches")
    +        val bufferType = classOf[CompactBuffer[UnsafeRow]].getName
    +        val i = ctx.freshName("i")
    +        val size = ctx.freshName("size")
    +        s"""
    +         |// generate join key
    +         |${keyVal.code}
    +         |// find matches from HashRelation
    +         |$bufferType $matches = $anyNull ? null : ($bufferType) 
$relationTerm.get(${keyVal.value});
    +         |if ($matches != null) {
    +         |  int $size = $matches.size();
    +         |  for (int $i = 0; $i < $size; $i++) {
    +         |    UnsafeRow $matched = (UnsafeRow) $matches.apply($i);
    +         |    ${buildColumns.map(_.code).mkString("\n")}
    +         |    $outputCode
    +         |  }
    +         |}
    +         """.stripMargin
    +      }
     
         } else {
    -      val matches = ctx.freshName("matches")
    -      val bufferType = classOf[CompactBuffer[UnsafeRow]].getName
    -      val i = ctx.freshName("i")
    -      val size = ctx.freshName("size")
    -      s"""
    -         | // generate join key
    -         | ${keyVal.code}
    -         | // find matches from HashRelation
    -         | $bufferType $matches = ${anyNull} ? null :
    -         |  ($bufferType) $relationTerm.get(${keyVal.value});
    -         | if ($matches != null) {
    -         |   int $size = $matches.size();
    -         |   for (int $i = 0; $i < $size; $i++) {
    -         |     UnsafeRow $matched = (UnsafeRow) $matches.apply($i);
    -         |     ${buildColumns.map(_.code).mkString("\n")}
    -         |     $outputCode
    -         |   }
    -         | }
    -     """.stripMargin
    +      // LeftOuter and RightOuter
    +
    +      // filter the output via condition
    +      val checkCondition = if (condition.isDefined) {
    +        ctx.currentVars = resultVars
    +        val ev = BindReferences.bindReference(condition.get, 
this.output).gen(ctx)
    +        s"""
    +         |boolean $valid = true;
    +         |if ($matched != null) {
    +         |  ${ev.code}
    +         |  $valid = !${ev.isNull} && ${ev.value};
    +         |}
    +         """.stripMargin
    +      } else {
    +        s"final boolean $valid = true;"
    +      }
    +
    +      if (broadcastRelation.value.isInstanceOf[UniqueHashedRelation]) {
    +        s"""
    +         |// generate join key
    +         |${keyVal.code}
    +         |// find matches from HashedRelation
    +         |UnsafeRow $matched = $anyNull ? null: 
(UnsafeRow)$relationTerm.getValue(${keyVal.value});
    +         |${buildColumns.map(_.code).mkString("\n")}
    +         |${checkCondition.trim}
    +         |if (!$valid) {
    +         |  // reset to null
    +         |  ${buildColumns.map(v => s"${v.isNull} = true;").mkString("\n")}
    --- End diff --
    
    This is left outer join, so the build part should be null. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to