mcdull-zhang opened a new pull request, #41419:
URL: https://github.com/apache/spark/pull/41419

   ### What changes were proposed in this pull request?
   When SubqueryBroadcastExec reuses the keys of Broadcast HashedRelation for 
dynamic partition pruning, it will put all the keys in an Array, and then call 
the distinct of the Array to remove the duplicates.
   
   In general, Broadcast HashedRelation may have many rows, and the repetition 
rate of this key is high. Doing so will cause this Array to occupy a large 
amount of memory (and this memory is not managed by MemoryManager), which may 
trigger OOM.
   
   The approach here is to directly use Set to consume iterators for 
deduplication. If the repetition rate of the key is high, it can prevent the 
Array from occupying a large amount of memory.
   
   
   ### Why are the changes needed?
   Avoid the occurrence of the following OOM exceptions:
   ```text
   Exception in thread "dynamicpruning-0" java.lang.OutOfMemoryError: Java heap 
space
        at 
scala.collection.mutable.ResizableArray.ensureSize(ResizableArray.scala:106)
        at 
scala.collection.mutable.ResizableArray.ensureSize$(ResizableArray.scala:96)
        at scala.collection.mutable.ArrayBuffer.ensureSize(ArrayBuffer.scala:49)
        at scala.collection.mutable.ArrayBuffer.$plus$eq(ArrayBuffer.scala:85)
        at scala.collection.mutable.ArrayBuffer.$plus$eq(ArrayBuffer.scala:49)
        at 
scala.collection.generic.Growable.$anonfun$$plus$plus$eq$1(Growable.scala:62)
        at scala.collection.generic.Growable$$Lambda$7/1514840818.apply(Unknown 
Source)
        at scala.collection.Iterator.foreach(Iterator.scala:943)
        at scala.collection.Iterator.foreach$(Iterator.scala:943)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
        at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
        at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
        at 
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
        at 
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
        at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
        at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
        at scala.collection.AbstractIterator.to(Iterator.scala:1431)
        at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
        at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
        at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1431)
        at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
        at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
        at scala.collection.AbstractIterator.toArray(Iterator.scala:1431)
        at 
org.apache.spark.sql.execution.SubqueryBroadcastExec.$anonfun$relationFuture$2(SubqueryBroadcastExec.scala:92)
        at 
org.apache.spark.sql.execution.SubqueryBroadcastExec$$Lambda$4212/5099232.apply(Unknown
 Source)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withExecutionId$1(SQLExecution.scala:140)
   ```
   
   
   ### Does this PR introduce _any_ user-facing change?
   No
   
   
   ### How was this patch tested?
   Production environment manual verification && Pass existing unit tests
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to