This is an automated email from the ASF dual-hosted git repository. yumwang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new db51ec6b9c4 [SPARK-40487][SQL] Make defaultJoin in BroadcastNestedLoopJoinExec running in parallel db51ec6b9c4 is described below commit db51ec6b9c4669a94e55d52add01c9e568d2bbf3 Author: Xingchao, Zhang <xingczh...@ebay.com> AuthorDate: Thu Sep 22 11:14:11 2022 +0800 [SPARK-40487][SQL] Make defaultJoin in BroadcastNestedLoopJoinExec running in parallel ### What changes were proposed in this pull request? Currently, the defaultJoin method in BroadcastNestedLoopJoinExec collects notMatchedBroadcastRows firstly, then collects matchedStreamRows. The two steps could run in parallel instead of serial. ### Why are the changes needed? Make defaultJoin in BroadcastNestedLoopJoinExec running in parallel. ### Does this PR introduce _any_ user-facing change? NO. ### How was this patch tested? UT. Closes #37930 from xingchaozh/SPARK-40487. Authored-by: Xingchao, Zhang <xingczh...@ebay.com> Signed-off-by: Yuming Wang <yumw...@ebay.com> --- .../joins/BroadcastNestedLoopJoinExec.scala | 43 ++++++++++++++-------- .../scala/org/apache/spark/sql/JoinSuite.scala | 15 ++++++++ 2 files changed, 42 insertions(+), 16 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoinExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoinExec.scala index 23b5b614369..84c0cd127f4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoinExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoinExec.scala @@ -286,21 +286,25 @@ case class BroadcastNestedLoopJoinExec( */ private def defaultJoin(relation: Broadcast[Array[InternalRow]]): RDD[InternalRow] = { val streamRdd = streamed.execute() - val matchedBroadcastRows = getMatchedBroadcastRowsBitSet(streamRdd, relation) - val notMatchedBroadcastRows: Seq[InternalRow] = { - val nulls = new GenericInternalRow(streamed.output.size) - val buf: CompactBuffer[InternalRow] = new CompactBuffer() - val joinedRow = new JoinedRow - joinedRow.withLeft(nulls) - var i = 0 - val buildRows = relation.value - while (i < buildRows.length) { - if (!matchedBroadcastRows.get(i)) { - buf += joinedRow.withRight(buildRows(i)).copy() + def notMatchedBroadcastRows: RDD[InternalRow] = { + getMatchedBroadcastRowsBitSetRDD(streamRdd, relation) + .repartition(1) + .mapPartitions(iter => Seq(iter.fold(new BitSet(relation.value.length))(_ | _)).toIterator) + .flatMap { matchedBroadcastRows => + val nulls = new GenericInternalRow(streamed.output.size) + val buf: CompactBuffer[InternalRow] = new CompactBuffer() + val joinedRow = new JoinedRow + joinedRow.withLeft(nulls) + var i = 0 + val buildRows = relation.value + while (i < buildRows.length) { + if (!matchedBroadcastRows.get(i)) { + buf += joinedRow.withRight(buildRows(i)).copy() + } + i += 1 + } + buf.iterator } - i += 1 - } - buf } val matchedStreamRows = streamRdd.mapPartitionsInternal { streamedIter => @@ -330,7 +334,7 @@ case class BroadcastNestedLoopJoinExec( sparkContext.union( matchedStreamRows, - sparkContext.makeRDD(notMatchedBroadcastRows) + notMatchedBroadcastRows ) } @@ -342,6 +346,13 @@ case class BroadcastNestedLoopJoinExec( private def getMatchedBroadcastRowsBitSet( streamRdd: RDD[InternalRow], relation: Broadcast[Array[InternalRow]]): BitSet = { + getMatchedBroadcastRowsBitSetRDD(streamRdd, relation) + .fold(new BitSet(relation.value.length))(_ | _) + } + + private def getMatchedBroadcastRowsBitSetRDD( + streamRdd: RDD[InternalRow], + relation: Broadcast[Array[InternalRow]]): RDD[BitSet] = { val matchedBuildRows = streamRdd.mapPartitionsInternal { streamedIter => val buildRows = relation.value val matched = new BitSet(buildRows.length) @@ -359,7 +370,7 @@ case class BroadcastNestedLoopJoinExec( Seq(matched).iterator } - matchedBuildRows.fold(new BitSet(relation.value.length))(_ | _) + matchedBuildRows } protected override def doExecute(): RDD[InternalRow] = { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala index f41944d2ed5..6dd34d41cf6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala @@ -1440,4 +1440,19 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan } } } + + test("SPARK-40487: Make defaultJoin in BroadcastNestedLoopJoinExec running in parallel") { + withTable("t1", "t2") { + spark.range(5, 15).toDF("k").write.saveAsTable("t1") + spark.range(4, 8).toDF("k").write.saveAsTable("t2") + + val queryBuildLeft = "SELECT /*+ BROADCAST(t1) */ * FROM t1 LEFT JOIN t2 ON t1.k < t2.k" + val result1 = sql(queryBuildLeft) + + val queryBuildRight = "SELECT /*+ BROADCAST(t2) */ * FROM t1 LEFT JOIN t2 ON t1.k < t2.k" + val result2 = sql(queryBuildRight) + + checkAnswer(result1, result2) + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org