Re: OutOfDirectMemoryError for Spark 2.2

2018-03-12 Thread Dave Cameron
gt; 113: 300 40800 io.netty.buffer.PoolArena$HeapArena
>>>>
>>>> 114: 300 40800 io.netty.buffer.PoolArena$DirectArena
>>>>
>>>> 192: 198 15840 io.netty.buffer.PoolChunk
>>>>
>>>> 274: 120 8320 io.netty.buffer.PoolThreadCache$MemoryRegionCache[]
>>>>
>>>> 406: 120 3840 io.netty.buffer.PoolThreadCache$NormalMemoryRegionCache
>>>>
>>>> 422: 72 3552 io.netty.buffer.PoolArena[]
>>>>
>>>> 458: 30 2640 io.netty.buffer.PooledUnsafeDirectByteBuf
>>>>
>>>> 500: 36 2016 io.netty.buffer.PooledByteBufAllocator
>>>>
>>>> 529: 32 1792 io.netty.buffer.UnpooledUnsafeHeapByteBuf
>>>>
>>>> 589: 20 1440 io.netty.buffer.PoolThreadCache
>>>>
>>>> 630: 37 1184 io.netty.buffer.EmptyByteBuf
>>>>
>>>> 703: 36 864 io.netty.buffer.PooledByteBufAllocator$PoolThreadLocalCache
>>>>
>>>> 852: 22 528 io.netty.buffer.AdvancedLeakAwareByteBuf
>>>>
>>>> 889: 10 480 io.netty.buffer.SlicedAbstractByteBuf
>>>>
>>>> 917: 8 448 io.netty.buffer.UnpooledHeapByteBuf
>>>>
>>>> 1018: 20 320 io.netty.buffer.PoolThreadCache$1
>>>>
>>>> 1305: 4 128 io.netty.buffer.PoolThreadCache$MemoryRegionCache$Entry
>>>>
>>>> 1404: 1 80 io.netty.buffer.PooledUnsafeHeapByteBuf
>>>>
>>>> 1473: 3 72 io.netty.buffer.PoolArena$SizeClass
>>>>
>>>> 1529: 1 64 io.netty.buffer.AdvancedLeakAwareCompositeByteBuf
>>>>
>>>> 1541: 2 64 io.netty.buffer.CompositeByteBuf$Component
>>>>
>>>> 1568: 1 56 io.netty.buffer.CompositeByteBuf
>>>>
>>>> 1896: 1 32 io.netty.buffer.PoolArena$SizeClass[]
>>>>
>>>> 2042: 1 24 io.netty.buffer.PooledUnsafeDirectByteBuf$1
>>>>
>>>> 2046: 1 24 io.netty.buffer.UnpooledByteBufAllocator
>>>>
>>>> 2051: 1 24 io.netty.buffer.PoolThreadCache$MemoryRegionCache$1
>>>>
>>>> 2078: 1 24 io.netty.buffer.PooledHeapByteBuf$1
>>>>
>>>> 2135: 1 24 io.netty.buffer.PooledUnsafeHeapByteBuf$1
>>>>
>>>> 2302: 1 16 io.netty.buffer.ByteBufUtil$1
>>>>
>>>> 2769: 1 16 io.netty.util.internal.__matchers__.io.netty.buffer.ByteBufM
>>>> atcher
>>>>
>>>>
>>>>
>>>> My Driver machine has 32 CPUs,  and as of now i have 15 machines in my
>>>> cluster.   As of now, the error happens on processing 5th or 6th chunk.  I
>>>> suspect the error is dependent on number of Executors and would happen
>>>> early if we add more executors.
>>>>
>>>>
>>>> I am trying to come up an explanation of what is filling up the Direct
>>>> Memory and how to quanitfy it as factor of Number of Executors.  Our
>>>> cluster is shared cluster,  And we need to understand how much Driver
>>>> Memory to allocate for most of the jobs.
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> Regards
>>>> Sumit Chawla
>>>>
>>>>
>>>
>>
>


-- 
Dave Cameron
Senior Platform Engineer
(415) 646-5657 <415-646-5657>
d...@digitalocean.com
--
We're Hiring! <http://grnh.se/w8o6y11> | @digitalocean
<https://twitter.com/digitalocean> | @davcamer
<https://twitter.com/davcamer> |linkedin
<https://www.linkedin.com/in/dave-cameron-41b6b81/> | github
<https://github.com/davcamer>| blog <http://intwoplacesatonce.com/>


[Structured Streaming] Commit protocol to move temp files to dest path only when complete, with code

2018-02-09 Thread Dave Cameron
Hi


I have a Spark structured streaming job that reads from Kafka and writes
parquet files to Hive/HDFS. The files are not very large, but the Kafka
source is noisy so each spark job takes a long time to complete. There is a
significant window during which the parquet files are incomplete and other
tools, including PrestoDB, encounter errors while trying to read them.

I wrote this list and stackoverflow about the problem last summer:
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Structured-Streaming-truncated-Parquet-after-driver-crash-or-kill-tt29043.html
https://stackoverflow.com/questions/47337030/not-a-parquet-file-too-small-from-presto-during-spark-structured-streaming-r/47339805#47339805

After hesitating for a while, I wrote a custom commit protocol to solve the
problem. It combines HadoopMapReduceCommitProtocol's behavior of writing to
a temp file first, with ManifestFileCommitProtocol. From what I can tell
ManifestFileCommitProtocol is required for the normal Structured Streaming
behavior of being able to resume streams from a known point.

I think this commit protocol could be generally useful. Writing to a temp
file and moving it to the final location is low cost on HDFS and is the
standard behavior for non-streaming jobs, as implemented in
HadoopMapReduceCommitProtocol. At the same time ManifestFileCommitProtocol
is needed for structured streaming jobs. We have been running this for a
few months in production without problems.

Here is the code (at the moment not up to Spark standards, admittedly):
https://github.com/davcamer/spark/commit/361f1c69851f0f94cfd974ce720c694407f9340b

Did I miss a better approach? Does anyone else think this would be useful?

Thanks for reading,
Dave