mcdull-zhang opened a new pull request, #41419:
URL: https://github.com/apache/spark/pull/41419
### What changes were proposed in this pull request?
When SubqueryBroadcastExec reuses the keys of Broadcast HashedRelation for
dynamic partition pruning, it will put all the keys in an Array, and then call
the distinct of the Array to remove the duplicates.
In general, Broadcast HashedRelation may have many rows, and the repetition
rate of this key is high. Doing so will cause this Array to occupy a large
amount of memory (and this memory is not managed by MemoryManager), which may
trigger OOM.
The approach here is to directly use Set to consume iterators for
deduplication. If the repetition rate of the key is high, it can prevent the
Array from occupying a large amount of memory.
### Why are the changes needed?
Avoid the occurrence of the following OOM exceptions:
```text
Exception in thread "dynamicpruning-0" java.lang.OutOfMemoryError: Java heap
space
at
scala.collection.mutable.ResizableArray.ensureSize(ResizableArray.scala:106)
at
scala.collection.mutable.ResizableArray.ensureSize$(ResizableArray.scala:96)
at scala.collection.mutable.ArrayBuffer.ensureSize(ArrayBuffer.scala:49)
at scala.collection.mutable.ArrayBuffer.$plus$eq(ArrayBuffer.scala:85)
at scala.collection.mutable.ArrayBuffer.$plus$eq(ArrayBuffer.scala:49)
at
scala.collection.generic.Growable.$anonfun$$plus$plus$eq$1(Growable.scala:62)
at scala.collection.generic.Growable$$Lambda$7/1514840818.apply(Unknown
Source)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
at
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
at
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
at scala.collection.AbstractIterator.to(Iterator.scala:1431)
at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1431)
at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1431)
at
org.apache.spark.sql.execution.SubqueryBroadcastExec.$anonfun$relationFuture$2(SubqueryBroadcastExec.scala:92)
at
org.apache.spark.sql.execution.SubqueryBroadcastExec$$Lambda$4212/5099232.apply(Unknown
Source)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withExecutionId$1(SQLExecution.scala:140)
```
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Production environment manual verification && Pass existing unit tests
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]