Re: OutOfDirectMemoryError for Spark 2.2
gt; 113: 300 40800 io.netty.buffer.PoolArena$HeapArena >>>> >>>> 114: 300 40800 io.netty.buffer.PoolArena$DirectArena >>>> >>>> 192: 198 15840 io.netty.buffer.PoolChunk >>>> >>>> 274: 120 8320 io.netty.buffer.PoolThreadCache$MemoryRegionCache[] >>>> >>>> 406: 120 3840 io.netty.buffer.PoolThreadCache$NormalMemoryRegionCache >>>> >>>> 422: 72 3552 io.netty.buffer.PoolArena[] >>>> >>>> 458: 30 2640 io.netty.buffer.PooledUnsafeDirectByteBuf >>>> >>>> 500: 36 2016 io.netty.buffer.PooledByteBufAllocator >>>> >>>> 529: 32 1792 io.netty.buffer.UnpooledUnsafeHeapByteBuf >>>> >>>> 589: 20 1440 io.netty.buffer.PoolThreadCache >>>> >>>> 630: 37 1184 io.netty.buffer.EmptyByteBuf >>>> >>>> 703: 36 864 io.netty.buffer.PooledByteBufAllocator$PoolThreadLocalCache >>>> >>>> 852: 22 528 io.netty.buffer.AdvancedLeakAwareByteBuf >>>> >>>> 889: 10 480 io.netty.buffer.SlicedAbstractByteBuf >>>> >>>> 917: 8 448 io.netty.buffer.UnpooledHeapByteBuf >>>> >>>> 1018: 20 320 io.netty.buffer.PoolThreadCache$1 >>>> >>>> 1305: 4 128 io.netty.buffer.PoolThreadCache$MemoryRegionCache$Entry >>>> >>>> 1404: 1 80 io.netty.buffer.PooledUnsafeHeapByteBuf >>>> >>>> 1473: 3 72 io.netty.buffer.PoolArena$SizeClass >>>> >>>> 1529: 1 64 io.netty.buffer.AdvancedLeakAwareCompositeByteBuf >>>> >>>> 1541: 2 64 io.netty.buffer.CompositeByteBuf$Component >>>> >>>> 1568: 1 56 io.netty.buffer.CompositeByteBuf >>>> >>>> 1896: 1 32 io.netty.buffer.PoolArena$SizeClass[] >>>> >>>> 2042: 1 24 io.netty.buffer.PooledUnsafeDirectByteBuf$1 >>>> >>>> 2046: 1 24 io.netty.buffer.UnpooledByteBufAllocator >>>> >>>> 2051: 1 24 io.netty.buffer.PoolThreadCache$MemoryRegionCache$1 >>>> >>>> 2078: 1 24 io.netty.buffer.PooledHeapByteBuf$1 >>>> >>>> 2135: 1 24 io.netty.buffer.PooledUnsafeHeapByteBuf$1 >>>> >>>> 2302: 1 16 io.netty.buffer.ByteBufUtil$1 >>>> >>>> 2769: 1 16 io.netty.util.internal.__matchers__.io.netty.buffer.ByteBufM >>>> atcher >>>> >>>> >>>> >>>> My Driver machine has 32 CPUs, and as of now i have 15 machines in my >>>> cluster. As of now, the error happens on processing 5th or 6th chunk. I >>>> suspect the error is dependent on number of Executors and would happen >>>> early if we add more executors. >>>> >>>> >>>> I am trying to come up an explanation of what is filling up the Direct >>>> Memory and how to quanitfy it as factor of Number of Executors. Our >>>> cluster is shared cluster, And we need to understand how much Driver >>>> Memory to allocate for most of the jobs. >>>> >>>> >>>> >>>> >>>> >>>> Regards >>>> Sumit Chawla >>>> >>>> >>> >> > -- Dave Cameron Senior Platform Engineer (415) 646-5657 <415-646-5657> d...@digitalocean.com -- We're Hiring! <http://grnh.se/w8o6y11> | @digitalocean <https://twitter.com/digitalocean> | @davcamer <https://twitter.com/davcamer> |linkedin <https://www.linkedin.com/in/dave-cameron-41b6b81/> | github <https://github.com/davcamer>| blog <http://intwoplacesatonce.com/>
[Structured Streaming] Commit protocol to move temp files to dest path only when complete, with code
Hi I have a Spark structured streaming job that reads from Kafka and writes parquet files to Hive/HDFS. The files are not very large, but the Kafka source is noisy so each spark job takes a long time to complete. There is a significant window during which the parquet files are incomplete and other tools, including PrestoDB, encounter errors while trying to read them. I wrote this list and stackoverflow about the problem last summer: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Structured-Streaming-truncated-Parquet-after-driver-crash-or-kill-tt29043.html https://stackoverflow.com/questions/47337030/not-a-parquet-file-too-small-from-presto-during-spark-structured-streaming-r/47339805#47339805 After hesitating for a while, I wrote a custom commit protocol to solve the problem. It combines HadoopMapReduceCommitProtocol's behavior of writing to a temp file first, with ManifestFileCommitProtocol. From what I can tell ManifestFileCommitProtocol is required for the normal Structured Streaming behavior of being able to resume streams from a known point. I think this commit protocol could be generally useful. Writing to a temp file and moving it to the final location is low cost on HDFS and is the standard behavior for non-streaming jobs, as implemented in HadoopMapReduceCommitProtocol. At the same time ManifestFileCommitProtocol is needed for structured streaming jobs. We have been running this for a few months in production without problems. Here is the code (at the moment not up to Spark standards, admittedly): https://github.com/davcamer/spark/commit/361f1c69851f0f94cfd974ce720c694407f9340b Did I miss a better approach? Does anyone else think this would be useful? Thanks for reading, Dave