[ 
https://issues.apache.org/jira/browse/SPARK-27991?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16882532#comment-16882532
 ] 

Josh Rosen edited comment on SPARK-27991 at 7/11/19 12:28 AM:
--------------------------------------------------------------

I've tried to come up with a standalone reproduction of this issue, but so far 
I've been unable to find one that triggers this error. I've tried creating jobs 
which run 10000+ mappers shuffling tiny blocks to a single reducer, resulting 
in thousands of requests in flight, but this has failed to trigger the error 
posted above.

However, I _did_ manage to get a more complete backtrace from a different 
internal workload:
{code:java}
Caused by: io.netty.util.internal.OutOfDirectMemoryError: failed to allocate 
16777216 byte(s) of direct memory (used: 7918845952, max: 7923040256)
        at 
io.netty.util.internal.PlatformDependent.incrementMemoryCounter(PlatformDependent.java:640)
        at 
io.netty.util.internal.PlatformDependent.allocateDirectNoCleaner(PlatformDependent.java:594)
        at 
io.netty.buffer.PoolArena$DirectArena.allocateDirect(PoolArena.java:764)
        at io.netty.buffer.PoolArena$DirectArena.newChunk(PoolArena.java:740)
        at io.netty.buffer.PoolArena.allocateNormal(PoolArena.java:244)
        at io.netty.buffer.PoolArena.allocate(PoolArena.java:226)
        at io.netty.buffer.PoolArena.allocate(PoolArena.java:146)
        at 
io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:324)
        at 
io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:185)
        at 
io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:176)
        at 
io.netty.buffer.AbstractByteBufAllocator.ioBuffer(AbstractByteBufAllocator.java:137)
        at 
io.netty.channel.DefaultMaxMessagesRecvByteBufAllocator$MaxMessageHandle.allocate(DefaultMaxMessagesRecvByteBufAllocator.java:80)
        at 
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:122)
        at 
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645)
        at 
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)
        at 
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
        at 
io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
        at 
io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
        ... 1 more{code}
Something that jumps out to me is the 
{{DefaultMaxMessagesRecvByteBufAllocator}} (and the 
{{AdaptiveRecvByteBufAllocator}} in SPARK-24989): maybe there's something about 
these failing workloads which is leading to significant space wasting in 
receive buffers, causing tiny blocks to experience huge bloat in space 
requirements?


was (Author: joshrosen):
I've tried to come up with a standalone reproduction of this issue, but so far 
I've been unable to find one that triggers this error. I've tried creating jobs 
which run 10000+ mappers shuffling tiny blocks to a single reducer, resulting 
in thousands of requests in flight, but this has failed to trigger the error 
posted above.

However, I _did_ manage to get a more complete backtrace from a different 
internal workload:
{code:java}
Caused by: io.netty.util.internal.OutOfDirectMemoryError: failed to allocate 
16777216 byte(s) of direct memory (used: 7918845952, max: 7923040256)
        at 
io.netty.util.internal.PlatformDependent.incrementMemoryCounter(PlatformDependent.java:640)
        at 
io.netty.util.internal.PlatformDependent.allocateDirectNoCleaner(PlatformDependent.java:594)
        at 
io.netty.buffer.PoolArena$DirectArena.allocateDirect(PoolArena.java:764)
        at io.netty.buffer.PoolArena$DirectArena.newChunk(PoolArena.java:740)
        at io.netty.buffer.PoolArena.allocateNormal(PoolArena.java:244)
        at io.netty.buffer.PoolArena.allocate(PoolArena.java:226)
        at io.netty.buffer.PoolArena.allocate(PoolArena.java:146)
        at 
io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:324)
        at 
io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:185)
        at 
io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:176)
        at 
io.netty.buffer.AbstractByteBufAllocator.ioBuffer(AbstractByteBufAllocator.java:137)
        at 
io.netty.channel.DefaultMaxMessagesRecvByteBufAllocator$MaxMessageHandle.allocate(DefaultMaxMessagesRecvByteBufAllocator.java:80)
        at 
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:122)
        at 
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645)
        at 
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)
        at 
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
        at 
io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
        at 
io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
        ... 1 more{code}
Something that jumps out to me is the 
{{DefaultMaxMessagesRecvByteBufAllocator}} (and the 
{{AdaptiveRecvByteBufAllocator}} SPARK-24989): maybe there's something about 
these failing workloads which is leading to significant space wasting in 
receive buffers, causing tiny blocks to experience huge bloat in space 
requirements?

> ShuffleBlockFetcherIterator should take Netty constant-factor overheads into 
> account when limiting number of simultaneous block fetches
> ---------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-27991
>                 URL: https://issues.apache.org/jira/browse/SPARK-27991
>             Project: Spark
>          Issue Type: Bug
>          Components: Shuffle
>    Affects Versions: 2.4.0
>            Reporter: Josh Rosen
>            Priority: Major
>
> ShuffleBlockFetcherIterator has logic to limit the number of simultaneous 
> block fetches. By default, this logic tries to keep the number of outstanding 
> block fetches [beneath a data size 
> limit|https://github.com/apache/spark/blob/v2.4.3/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala#L274]
>  ({{maxBytesInFlight}}). However, this limiting does not take fixed overheads 
> into account: even though a remote block might be, say, 4KB, there are 
> certain fixed-size internal overheads due to Netty buffer sizes which may 
> cause the actual space requirements to be larger.
> As a result, if a map stage produces a huge number of extremely tiny blocks 
> then we may see errors like
> {code:java}
> org.apache.spark.shuffle.FetchFailedException: failed to allocate 16777216 
> byte(s) of direct memory (used: 39325794304, max: 39325794304)
> at 
> org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:554)
> at 
> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:485)
> [...]
> Caused by: io.netty.util.internal.OutOfDirectMemoryError: failed to allocate 
> 16777216 byte(s) of direct memory (used: 39325794304, max: 39325794304)
> at 
> io.netty.util.internal.PlatformDependent.incrementMemoryCounter(PlatformDependent.java:640)
> at 
> io.netty.util.internal.PlatformDependent.allocateDirectNoCleaner(PlatformDependent.java:594)
> at io.netty.buffer.PoolArena$DirectArena.allocateDirect(PoolArena.java:764)
> at io.netty.buffer.PoolArena$DirectArena.newChunk(PoolArena.java:740)
> at io.netty.buffer.PoolArena.allocateNormal(PoolArena.java:244)
> at io.netty.buffer.PoolArena.allocate(PoolArena.java:226)
> at io.netty.buffer.PoolArena.allocate(PoolArena.java:146)
> at 
> io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:324)
> [...]{code}
> SPARK-24989 is another report of this problem (but with a different proposed 
> fix).
> This problem can currently be mitigated by setting 
> {{spark.reducer.maxReqsInFlight}} to some some non-IntMax value (SPARK-6166), 
> but this additional manual configuration step is cumbersome.
> Instead, I think that Spark should take these fixed overheads into account in 
> the {{maxBytesInFlight}} calculation: instead of using blocks' actual sizes, 
> use {{Math.min(blockSize, minimumNettyBufferSize)}}. There might be some 
> tricky details involved to make this work on all configurations (e.g. to use 
> a different minimum when direct buffers are disabled, etc.), but I think the 
> core idea behind the fix is pretty simple.
> This will improve Spark's stability and removes configuration / tuning burden 
> from end users.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to