mcdull-zhang opened a new pull request, #41419: URL: https://github.com/apache/spark/pull/41419
### What changes were proposed in this pull request? When SubqueryBroadcastExec reuses the keys of Broadcast HashedRelation for dynamic partition pruning, it will put all the keys in an Array, and then call the distinct of the Array to remove the duplicates. In general, Broadcast HashedRelation may have many rows, and the repetition rate of this key is high. Doing so will cause this Array to occupy a large amount of memory (and this memory is not managed by MemoryManager), which may trigger OOM. The approach here is to directly use Set to consume iterators for deduplication. If the repetition rate of the key is high, it can prevent the Array from occupying a large amount of memory. ### Why are the changes needed? Avoid the occurrence of the following OOM exceptions: ```text Exception in thread "dynamicpruning-0" java.lang.OutOfMemoryError: Java heap space at scala.collection.mutable.ResizableArray.ensureSize(ResizableArray.scala:106) at scala.collection.mutable.ResizableArray.ensureSize$(ResizableArray.scala:96) at scala.collection.mutable.ArrayBuffer.ensureSize(ArrayBuffer.scala:49) at scala.collection.mutable.ArrayBuffer.$plus$eq(ArrayBuffer.scala:85) at scala.collection.mutable.ArrayBuffer.$plus$eq(ArrayBuffer.scala:49) at scala.collection.generic.Growable.$anonfun$$plus$plus$eq$1(Growable.scala:62) at scala.collection.generic.Growable$$Lambda$7/1514840818.apply(Unknown Source) at scala.collection.Iterator.foreach(Iterator.scala:943) at scala.collection.Iterator.foreach$(Iterator.scala:943) at scala.collection.AbstractIterator.foreach(Iterator.scala:1431) at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62) at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49) at scala.collection.TraversableOnce.to(TraversableOnce.scala:366) at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364) at scala.collection.AbstractIterator.to(Iterator.scala:1431) at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358) at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1431) at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345) at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339) at scala.collection.AbstractIterator.toArray(Iterator.scala:1431) at org.apache.spark.sql.execution.SubqueryBroadcastExec.$anonfun$relationFuture$2(SubqueryBroadcastExec.scala:92) at org.apache.spark.sql.execution.SubqueryBroadcastExec$$Lambda$4212/5099232.apply(Unknown Source) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withExecutionId$1(SQLExecution.scala:140) ``` ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Production environment manual verification && Pass existing unit tests -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org