Re: OutOfDirectMemoryError for Spark 2.2

2018-03-12 Thread Dave Cameron
gt; 113: 300 40800 io.netty.buffer.PoolArena$HeapArena
>>>> 114: 300 40800 io.netty.buffer.PoolArena$DirectArena
>>>> 192: 198 15840 io.netty.buffer.PoolChunk
>>>> 274: 120 8320 io.netty.buffer.PoolThreadCache$MemoryRegionCache[]
>>>> 406: 120 3840 io.netty.buffer.PoolThreadCache$NormalMemoryRegionCache
>>>> 422: 72 3552 io.netty.buffer.PoolArena[]
>>>> 458: 30 2640 io.netty.buffer.PooledUnsafeDirectByteBuf
>>>> 500: 36 2016 io.netty.buffer.PooledByteBufAllocator
>>>> 529: 32 1792 io.netty.buffer.UnpooledUnsafeHeapByteBuf
>>>> 589: 20 1440 io.netty.buffer.PoolThreadCache
>>>> 630: 37 1184 io.netty.buffer.EmptyByteBuf
>>>> 703: 36 864 io.netty.buffer.PooledByteBufAllocator$PoolThreadLocalCache
>>>> 852: 22 528 io.netty.buffer.AdvancedLeakAwareByteBuf
>>>> 889: 10 480 io.netty.buffer.SlicedAbstractByteBuf
>>>> 917: 8 448 io.netty.buffer.UnpooledHeapByteBuf
>>>> 1018: 20 320 io.netty.buffer.PoolThreadCache$1
>>>> 1305: 4 128 io.netty.buffer.PoolThreadCache$MemoryRegionCache$Entry
>>>> 1404: 1 80 io.netty.buffer.PooledUnsafeHeapByteBuf
>>>> 1473: 3 72 io.netty.buffer.PoolArena$SizeClass
>>>> 1529: 1 64 io.netty.buffer.AdvancedLeakAwareCompositeByteBuf
>>>> 1541: 2 64 io.netty.buffer.CompositeByteBuf$Component
>>>> 1568: 1 56 io.netty.buffer.CompositeByteBuf
>>>> 1896: 1 32 io.netty.buffer.PoolArena$SizeClass[]
>>>> 2042: 1 24 io.netty.buffer.PooledUnsafeDirectByteBuf$1
>>>> 2046: 1 24 io.netty.buffer.UnpooledByteBufAllocator
>>>> 2051: 1 24 io.netty.buffer.PoolThreadCache$MemoryRegionCache$1
>>>> 2078: 1 24 io.netty.buffer.PooledHeapByteBuf$1
>>>> 2135: 1 24 io.netty.buffer.PooledUnsafeHeapByteBuf$1
>>>> 2302: 1 16 io.netty.buffer.ByteBufUtil$1
>>>> 2769: 1 16
>>>> atcher
>>>> My Driver machine has 32 CPUs,  and as of now i have 15 machines in my
>>>> cluster.   As of now, the error happens on processing 5th or 6th chunk.  I
>>>> suspect the error is dependent on number of Executors and would happen
>>>> early if we add more executors.
>>>> I am trying to come up an explanation of what is filling up the Direct
>>>> Memory and how to quanitfy it as factor of Number of Executors.  Our
>>>> cluster is shared cluster,  And we need to understand how much Driver
>>>> Memory to allocate for most of the jobs.
>>>> Regards
>>>> Sumit Chawla

Dave Cameron
Senior Platform Engineer
(415) 646-5657 <415-646-5657>
We're Hiring! <> | @digitalocean
<> | @davcamer
<> |linkedin
<> | github
<>| blog <>

[Structured Streaming] Commit protocol to move temp files to dest path only when complete, with code

2018-02-09 Thread Dave Cameron

I have a Spark structured streaming job that reads from Kafka and writes
parquet files to Hive/HDFS. The files are not very large, but the Kafka
source is noisy so each spark job takes a long time to complete. There is a
significant window during which the parquet files are incomplete and other
tools, including PrestoDB, encounter errors while trying to read them.

I wrote this list and stackoverflow about the problem last summer:

After hesitating for a while, I wrote a custom commit protocol to solve the
problem. It combines HadoopMapReduceCommitProtocol's behavior of writing to
a temp file first, with ManifestFileCommitProtocol. From what I can tell
ManifestFileCommitProtocol is required for the normal Structured Streaming
behavior of being able to resume streams from a known point.

I think this commit protocol could be generally useful. Writing to a temp
file and moving it to the final location is low cost on HDFS and is the
standard behavior for non-streaming jobs, as implemented in
HadoopMapReduceCommitProtocol. At the same time ManifestFileCommitProtocol
is needed for structured streaming jobs. We have been running this for a
few months in production without problems.

Here is the code (at the moment not up to Spark standards, admittedly):

Did I miss a better approach? Does anyone else think this would be useful?

Thanks for reading,