[ https://issues.apache.org/jira/browse/SPARK-27991?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16882532#comment-16882532 ]
Josh Rosen edited comment on SPARK-27991 at 7/11/19 12:28 AM: -------------------------------------------------------------- I've tried to come up with a standalone reproduction of this issue, but so far I've been unable to find one that triggers this error. I've tried creating jobs which run 10000+ mappers shuffling tiny blocks to a single reducer, resulting in thousands of requests in flight, but this has failed to trigger the error posted above. However, I _did_ manage to get a more complete backtrace from a different internal workload: {code:java} Caused by: io.netty.util.internal.OutOfDirectMemoryError: failed to allocate 16777216 byte(s) of direct memory (used: 7918845952, max: 7923040256) at io.netty.util.internal.PlatformDependent.incrementMemoryCounter(PlatformDependent.java:640) at io.netty.util.internal.PlatformDependent.allocateDirectNoCleaner(PlatformDependent.java:594) at io.netty.buffer.PoolArena$DirectArena.allocateDirect(PoolArena.java:764) at io.netty.buffer.PoolArena$DirectArena.newChunk(PoolArena.java:740) at io.netty.buffer.PoolArena.allocateNormal(PoolArena.java:244) at io.netty.buffer.PoolArena.allocate(PoolArena.java:226) at io.netty.buffer.PoolArena.allocate(PoolArena.java:146) at io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:324) at io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:185) at io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:176) at io.netty.buffer.AbstractByteBufAllocator.ioBuffer(AbstractByteBufAllocator.java:137) at io.netty.channel.DefaultMaxMessagesRecvByteBufAllocator$MaxMessageHandle.allocate(DefaultMaxMessagesRecvByteBufAllocator.java:80) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:122) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459) at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858) at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138) ... 1 more{code} Something that jumps out to me is the {{DefaultMaxMessagesRecvByteBufAllocator}} (and the {{AdaptiveRecvByteBufAllocator}} in SPARK-24989): maybe there's something about these failing workloads which is leading to significant space wasting in receive buffers, causing tiny blocks to experience huge bloat in space requirements? was (Author: joshrosen): I've tried to come up with a standalone reproduction of this issue, but so far I've been unable to find one that triggers this error. I've tried creating jobs which run 10000+ mappers shuffling tiny blocks to a single reducer, resulting in thousands of requests in flight, but this has failed to trigger the error posted above. However, I _did_ manage to get a more complete backtrace from a different internal workload: {code:java} Caused by: io.netty.util.internal.OutOfDirectMemoryError: failed to allocate 16777216 byte(s) of direct memory (used: 7918845952, max: 7923040256) at io.netty.util.internal.PlatformDependent.incrementMemoryCounter(PlatformDependent.java:640) at io.netty.util.internal.PlatformDependent.allocateDirectNoCleaner(PlatformDependent.java:594) at io.netty.buffer.PoolArena$DirectArena.allocateDirect(PoolArena.java:764) at io.netty.buffer.PoolArena$DirectArena.newChunk(PoolArena.java:740) at io.netty.buffer.PoolArena.allocateNormal(PoolArena.java:244) at io.netty.buffer.PoolArena.allocate(PoolArena.java:226) at io.netty.buffer.PoolArena.allocate(PoolArena.java:146) at io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:324) at io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:185) at io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:176) at io.netty.buffer.AbstractByteBufAllocator.ioBuffer(AbstractByteBufAllocator.java:137) at io.netty.channel.DefaultMaxMessagesRecvByteBufAllocator$MaxMessageHandle.allocate(DefaultMaxMessagesRecvByteBufAllocator.java:80) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:122) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459) at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858) at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138) ... 1 more{code} Something that jumps out to me is the {{DefaultMaxMessagesRecvByteBufAllocator}} (and the {{AdaptiveRecvByteBufAllocator}} SPARK-24989): maybe there's something about these failing workloads which is leading to significant space wasting in receive buffers, causing tiny blocks to experience huge bloat in space requirements? > ShuffleBlockFetcherIterator should take Netty constant-factor overheads into > account when limiting number of simultaneous block fetches > --------------------------------------------------------------------------------------------------------------------------------------- > > Key: SPARK-27991 > URL: https://issues.apache.org/jira/browse/SPARK-27991 > Project: Spark > Issue Type: Bug > Components: Shuffle > Affects Versions: 2.4.0 > Reporter: Josh Rosen > Priority: Major > > ShuffleBlockFetcherIterator has logic to limit the number of simultaneous > block fetches. By default, this logic tries to keep the number of outstanding > block fetches [beneath a data size > limit|https://github.com/apache/spark/blob/v2.4.3/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala#L274] > ({{maxBytesInFlight}}). However, this limiting does not take fixed overheads > into account: even though a remote block might be, say, 4KB, there are > certain fixed-size internal overheads due to Netty buffer sizes which may > cause the actual space requirements to be larger. > As a result, if a map stage produces a huge number of extremely tiny blocks > then we may see errors like > {code:java} > org.apache.spark.shuffle.FetchFailedException: failed to allocate 16777216 > byte(s) of direct memory (used: 39325794304, max: 39325794304) > at > org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:554) > at > org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:485) > [...] > Caused by: io.netty.util.internal.OutOfDirectMemoryError: failed to allocate > 16777216 byte(s) of direct memory (used: 39325794304, max: 39325794304) > at > io.netty.util.internal.PlatformDependent.incrementMemoryCounter(PlatformDependent.java:640) > at > io.netty.util.internal.PlatformDependent.allocateDirectNoCleaner(PlatformDependent.java:594) > at io.netty.buffer.PoolArena$DirectArena.allocateDirect(PoolArena.java:764) > at io.netty.buffer.PoolArena$DirectArena.newChunk(PoolArena.java:740) > at io.netty.buffer.PoolArena.allocateNormal(PoolArena.java:244) > at io.netty.buffer.PoolArena.allocate(PoolArena.java:226) > at io.netty.buffer.PoolArena.allocate(PoolArena.java:146) > at > io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:324) > [...]{code} > SPARK-24989 is another report of this problem (but with a different proposed > fix). > This problem can currently be mitigated by setting > {{spark.reducer.maxReqsInFlight}} to some some non-IntMax value (SPARK-6166), > but this additional manual configuration step is cumbersome. > Instead, I think that Spark should take these fixed overheads into account in > the {{maxBytesInFlight}} calculation: instead of using blocks' actual sizes, > use {{Math.min(blockSize, minimumNettyBufferSize)}}. There might be some > tricky details involved to make this work on all configurations (e.g. to use > a different minimum when direct buffers are disabled, etc.), but I think the > core idea behind the fix is pretty simple. > This will improve Spark's stability and removes configuration / tuning burden > from end users. -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org