Github user viirya commented on a diff in the pull request:
https://github.com/apache/spark/pull/19781#discussion_r152168668
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala
---
@@ -76,20 +76,23 @@ case class BroadcastHashJoinExec(
streamedPlan.asInstanceOf[CodegenSupport].inputRDDs()
}
- override def needCopyResult: Boolean = joinType match {
+ private def multipleOutputForOneInput: Boolean = joinType match {
case _: InnerLike | LeftOuter | RightOuter =>
// For inner and outer joins, one row from the streamed side may
produce multiple result rows,
- // if the build side has duplicated keys. Then we need to copy the
result rows before putting
- // them in a buffer, because these result rows share one UnsafeRow
instance. Note that here
- // we wait for the broadcast to be finished, which is a no-op
because it's already finished
- // when we wait it in `doProduce`.
+ // if the build side has duplicated keys. Note that here we wait for
the broadcast to be
+ // finished, which is a no-op because it's already finished when we
wait it in `doProduce`.
!buildPlan.executeBroadcast[HashedRelation]().value.keyIsUnique
// Other joins types(semi, anti, existence) can at most produce one
result row for one input
- // row from the streamed side, so no need to copy the result rows.
+ // row from the streamed side.
case _ => false
}
+ // If the streaming side needs to copy result, this join plan needs to
copy too. Otherwise,
+ // this join plan only needs to copy result if it may output multiple
rows for one input.
+ override def needCopyResult: Boolean =
+ streamedPlan.asInstanceOf[CodegenSupport].needCopyResult ||
multipleOutputForOneInput
--- End diff --
For the regression test, I think we can have a streamedPlan which needs
copying result (Sample with replacement or Expand, etc.), and performs a
broadcast join which either an inner join where the build side has unique keys,
or a semi join.
So before this, `needCopyResult` is false, but after this, it is true.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]