Hi All I have a job which processes a large dataset. All items in the dataset are unrelated. To save on cluster resources, I process these items in chunks. Since chunks are independent of each other, I start and shut down the spark context for each chunk. This allows me to keep DAG smaller and not retry the entire DAG in case of failures. This mechanism used to work fine with Spark 1.6. Now, as we have moved to 2.2, the job started failing with OutOfDirectMemoryError error.
2018-03-03 22:00:59,687 WARN [rpc-server-48-1] server.TransportChannelHandler (TransportChannelHandler.java:exceptionCaught(78)) - Exception in connection from /10.66.73.27:60374 io.netty.util.internal.OutOfDirectMemoryError: failed to allocate 8388608 byte(s) of direct memory (used: 1023410176, max: 1029177344) at io.netty.util.internal.PlatformDependent.incrementMemoryCounter(PlatformDependent.java:506) at io.netty.util.internal.PlatformDependent.allocateDirectNoCleaner(PlatformDependent.java:460) at io.netty.buffer.PoolArena$DirectArena.allocateDirect(PoolArena.java:701) at io.netty.buffer.PoolArena$DirectArena.newChunk(PoolArena.java:690) at io.netty.buffer.PoolArena.allocateNormal(PoolArena.java:237) at io.netty.buffer.PoolArena.allocate(PoolArena.java:213) at io.netty.buffer.PoolArena.allocate(PoolArena.java:141) at io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:271) at io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:177) at io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:168) at io.netty.buffer.AbstractByteBufAllocator.ioBuffer(AbstractByteBufAllocator.java:129) at io.netty.channel.AdaptiveRecvByteBufAllocator$HandleImpl.allocate(AdaptiveRecvByteBufAllocator.java:104) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:117) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:564) I got some clue on what is causing this from https://github.com/netty/netty/issues/6343, However I am not able to add up numbers on what is causing 1 GB of Direct Memory to fill up. Output from jmap 7: 22230 1422720 io.netty.buffer.PoolSubpage 12: 1370 804640 io.netty.buffer.PoolSubpage[] 41: 3600 144000 io.netty.buffer.PoolChunkList 98: 1440 46080 io.netty.buffer.PoolThreadCache$SubPageMemoryRegionCache 113: 300 40800 io.netty.buffer.PoolArena$HeapArena 114: 300 40800 io.netty.buffer.PoolArena$DirectArena 192: 198 15840 io.netty.buffer.PoolChunk 274: 120 8320 io.netty.buffer.PoolThreadCache$MemoryRegionCache[] 406: 120 3840 io.netty.buffer.PoolThreadCache$NormalMemoryRegionCache 422: 72 3552 io.netty.buffer.PoolArena[] 458: 30 2640 io.netty.buffer.PooledUnsafeDirectByteBuf 500: 36 2016 io.netty.buffer.PooledByteBufAllocator 529: 32 1792 io.netty.buffer.UnpooledUnsafeHeapByteBuf 589: 20 1440 io.netty.buffer.PoolThreadCache 630: 37 1184 io.netty.buffer.EmptyByteBuf 703: 36 864 io.netty.buffer.PooledByteBufAllocator$PoolThreadLocalCache 852: 22 528 io.netty.buffer.AdvancedLeakAwareByteBuf 889: 10 480 io.netty.buffer.SlicedAbstractByteBuf 917: 8 448 io.netty.buffer.UnpooledHeapByteBuf 1018: 20 320 io.netty.buffer.PoolThreadCache$1 1305: 4 128 io.netty.buffer.PoolThreadCache$MemoryRegionCache$Entry 1404: 1 80 io.netty.buffer.PooledUnsafeHeapByteBuf 1473: 3 72 io.netty.buffer.PoolArena$SizeClass 1529: 1 64 io.netty.buffer.AdvancedLeakAwareCompositeByteBuf 1541: 2 64 io.netty.buffer.CompositeByteBuf$Component 1568: 1 56 io.netty.buffer.CompositeByteBuf 1896: 1 32 io.netty.buffer.PoolArena$SizeClass[] 2042: 1 24 io.netty.buffer.PooledUnsafeDirectByteBuf$1 2046: 1 24 io.netty.buffer.UnpooledByteBufAllocator 2051: 1 24 io.netty.buffer.PoolThreadCache$MemoryRegionCache$1 2078: 1 24 io.netty.buffer.PooledHeapByteBuf$1 2135: 1 24 io.netty.buffer.PooledUnsafeHeapByteBuf$1 2302: 1 16 io.netty.buffer.ByteBufUtil$1 2769: 1 16 io.netty.util.internal.__matchers__.io.netty.buffer.ByteBufMatcher My Driver machine has 32 CPUs, and as of now i have 15 machines in my cluster. As of now, the error happens on processing 5th or 6th chunk. I suspect the error is dependent on number of Executors and would happen early if we add more executors. I am trying to come up an explanation of what is filling up the Direct Memory and how to quanitfy it as factor of Number of Executors. Our cluster is shared cluster, And we need to understand how much Driver Memory to allocate for most of the jobs. Regards Sumit Chawla