Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19781#discussion_r152168668
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala
 ---
    @@ -76,20 +76,23 @@ case class BroadcastHashJoinExec(
         streamedPlan.asInstanceOf[CodegenSupport].inputRDDs()
       }
     
    -  override def needCopyResult: Boolean = joinType match {
    +  private def multipleOutputForOneInput: Boolean = joinType match {
         case _: InnerLike | LeftOuter | RightOuter =>
           // For inner and outer joins, one row from the streamed side may 
produce multiple result rows,
    -      // if the build side has duplicated keys. Then we need to copy the 
result rows before putting
    -      // them in a buffer, because these result rows share one UnsafeRow 
instance. Note that here
    -      // we wait for the broadcast to be finished, which is a no-op 
because it's already finished
    -      // when we wait it in `doProduce`.
    +      // if the build side has duplicated keys. Note that here we wait for 
the broadcast to be
    +      // finished, which is a no-op because it's already finished when we 
wait it in `doProduce`.
           !buildPlan.executeBroadcast[HashedRelation]().value.keyIsUnique
     
         // Other joins types(semi, anti, existence) can at most produce one 
result row for one input
    -    // row from the streamed side, so no need to copy the result rows.
    +    // row from the streamed side.
         case _ => false
       }
     
    +  // If the streaming side needs to copy result, this join plan needs to 
copy too. Otherwise,
    +  // this join plan only needs to copy result if it may output multiple 
rows for one input.
    +  override def needCopyResult: Boolean =
    +    streamedPlan.asInstanceOf[CodegenSupport].needCopyResult || 
multipleOutputForOneInput
    --- End diff --
    
    For the regression test, I think we can have a streamedPlan which needs 
copying result (Sample with replacement or Expand, etc.), and performs a 
broadcast join which either an inner join where the build side has unique keys, 
or a semi join.
    
    So before this, `needCopyResult` is false, but after this, it is true.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to