Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19781#discussion_r151966937
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala
 ---
    @@ -83,11 +83,12 @@ case class BroadcastHashJoinExec(
           // them in a buffer, because these result rows share one UnsafeRow 
instance. Note that here
           // we wait for the broadcast to be finished, which is a no-op 
because it's already finished
           // when we wait it in `doProduce`.
    -      !buildPlan.executeBroadcast[HashedRelation]().value.keyIsUnique
    +      streamedPlan.asInstanceOf[CodegenSupport].needCopyResult ||
    +        !buildPlan.executeBroadcast[HashedRelation]().value.keyIsUnique
     
         // Other joins types(semi, anti, existence) can at most produce one 
result row for one input
         // row from the streamed side, so no need to copy the result rows.
    --- End diff --
    
    sorry I realized then we need to update the comment too... the final 
proposal:
    ```
    def multipleOutputForOneInput: Boolean = ...
    // If the streaming side needs copy result, this join plan needs to copy 
too. Otherwise, this join plan only needs to copy result if it may output 
multiple rows for one input.
    override def needCopyResult: Boolean = 
streamedPlan.asInstanceOf[CodegenSupport].needCopyResult || 
multipleOutputForOneInput
    ```


---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to