Josh Rosen created SPARK-27991:
----------------------------------

             Summary: ShuffleBlockFetcherIterator should take Netty 
constant-factor overheads into account when limiting number of simultaneous 
block fetches
                 Key: SPARK-27991
                 URL: https://issues.apache.org/jira/browse/SPARK-27991
             Project: Spark
          Issue Type: Bug
          Components: Shuffle
    Affects Versions: 2.4.0
            Reporter: Josh Rosen


ShuffleBlockFetcherIterator has logic to limit the number of simultaneous block 
fetches. By default, this logic tries to keep the number of outstanding block 
fetches [beneath a data size 
limit|https://github.com/apache/spark/blob/v2.4.3/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala#L274]
 ({{maxBytesInFlight}}). However, this limiting does not take fixed overheads 
into account: even though a remote block might be, say, 4KB, there are certain 
fixed-size internal overheads due to Netty buffer sizes which may cause the 
actual space requirements to be larger.

As a result, if a map stage produces a huge number of extremely tiny blocks 
then we may see errors like
{code:java}
org.apache.spark.shuffle.FetchFailedException: failed to allocate 16777216 
byte(s) of direct memory (used: 39325794304, max: 39325794304)
at 
org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:554)
at 
org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:485)
[...]
Caused by: io.netty.util.internal.OutOfDirectMemoryError: failed to allocate 
16777216 byte(s) of direct memory (used: 39325794304, max: 39325794304)
at 
io.netty.util.internal.PlatformDependent.incrementMemoryCounter(PlatformDependent.java:640)
at 
io.netty.util.internal.PlatformDependent.allocateDirectNoCleaner(PlatformDependent.java:594)
at io.netty.buffer.PoolArena$DirectArena.allocateDirect(PoolArena.java:764)
at io.netty.buffer.PoolArena$DirectArena.newChunk(PoolArena.java:740)
at io.netty.buffer.PoolArena.allocateNormal(PoolArena.java:244)
at io.netty.buffer.PoolArena.allocate(PoolArena.java:226)
at io.netty.buffer.PoolArena.allocate(PoolArena.java:146)
at 
io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:324)
[...]{code}
SPARK-24989 is another report of this problem (but with a different proposed 
fix).

This problem can currently be mitigated by setting 
{{spark.reducer.maxReqsInFlight}} to some some non-IntMax value (SPARK-6166), 
but this additional manual configuration step is cumbersome.

Instead, I think that Spark should take these fixed overheads into account in 
the {{maxBytesInFlight}} calculation: instead of using blocks' actual sizes, 
use {{Math.min(blockSize, minimumNettyBufferSize)}}. There might be some tricky 
details involved to make this work on all configurations (e.g. to use a 
different minimum when direct buffers are disabled, etc.), but I think the core 
idea behind the fix is pretty simple.

This will improve Spark's stability and removes configuration / tuning burden 
from end users.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to