Josh Rosen created SPARK-27991: ---------------------------------- Summary: ShuffleBlockFetcherIterator should take Netty constant-factor overheads into account when limiting number of simultaneous block fetches Key: SPARK-27991 URL: https://issues.apache.org/jira/browse/SPARK-27991 Project: Spark Issue Type: Bug Components: Shuffle Affects Versions: 2.4.0 Reporter: Josh Rosen
ShuffleBlockFetcherIterator has logic to limit the number of simultaneous block fetches. By default, this logic tries to keep the number of outstanding block fetches [beneath a data size limit|https://github.com/apache/spark/blob/v2.4.3/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala#L274] ({{maxBytesInFlight}}). However, this limiting does not take fixed overheads into account: even though a remote block might be, say, 4KB, there are certain fixed-size internal overheads due to Netty buffer sizes which may cause the actual space requirements to be larger. As a result, if a map stage produces a huge number of extremely tiny blocks then we may see errors like {code:java} org.apache.spark.shuffle.FetchFailedException: failed to allocate 16777216 byte(s) of direct memory (used: 39325794304, max: 39325794304) at org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:554) at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:485) [...] Caused by: io.netty.util.internal.OutOfDirectMemoryError: failed to allocate 16777216 byte(s) of direct memory (used: 39325794304, max: 39325794304) at io.netty.util.internal.PlatformDependent.incrementMemoryCounter(PlatformDependent.java:640) at io.netty.util.internal.PlatformDependent.allocateDirectNoCleaner(PlatformDependent.java:594) at io.netty.buffer.PoolArena$DirectArena.allocateDirect(PoolArena.java:764) at io.netty.buffer.PoolArena$DirectArena.newChunk(PoolArena.java:740) at io.netty.buffer.PoolArena.allocateNormal(PoolArena.java:244) at io.netty.buffer.PoolArena.allocate(PoolArena.java:226) at io.netty.buffer.PoolArena.allocate(PoolArena.java:146) at io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:324) [...]{code} SPARK-24989 is another report of this problem (but with a different proposed fix). This problem can currently be mitigated by setting {{spark.reducer.maxReqsInFlight}} to some some non-IntMax value (SPARK-6166), but this additional manual configuration step is cumbersome. Instead, I think that Spark should take these fixed overheads into account in the {{maxBytesInFlight}} calculation: instead of using blocks' actual sizes, use {{Math.min(blockSize, minimumNettyBufferSize)}}. There might be some tricky details involved to make this work on all configurations (e.g. to use a different minimum when direct buffers are disabled, etc.), but I think the core idea behind the fix is pretty simple. This will improve Spark's stability and removes configuration / tuning burden from end users. -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org