Great, thanks for the feedback. It sounds like you're using the LZF compression scheme -- switching to Snappy should see significantly less buffer space used up per DiskBlockObjectWriter, but this doesn't really solve the underlying problem. In general I've been thinking of "Spark nodes" as having high memory and a moderate number of cores, but with 24 cores and 40GB of memory, each core really doesn't get that much memory individually, despite every one needing its own set of DiskBlockObjectWriters.
One thing that is a little odd is that with your numbers, you should have 2000 (reducers) * 24 (cores) = 48k DiskBlockObjectWriters. These should only require a total of 4.8GB for the entire node, though, rather than 80% of your JVM memory. Were you seeing significantly more than 48k DiskBlockObjectWriters? On Fri, Nov 22, 2013 at 1:38 AM, Shao, Saisai <[email protected]> wrote: > Hi Aaron, > > > > I’ve also met the same problem that shuffle takes so much overhead for > large number of partitions. I think it is an important issue when > processing large data. > > > > In my case I have 2000 mapper and 2000 reducers, I dump the memory of > executor and found that byte array takes about 80% of total jvm memory, > which are referred by FastBufferedOutputStream, and created by > DiskBlockObjectWriter. It seems that there are so many instances of > DiskBlockObjectWriter and each DiskBlockObjectWriter will has 100KB buffer > for FastBufferedOutputStream by default. These buffers are persisted > through task execution period and cannot be garbage collected unless task > is finished. > > > > My cluster has 6 nodes, and 40G memory and 24 core per node, I tried with > 5000 partitions, this will easily got OOM. > > > > What a dilemma is that my application needs groupByKey transformation > which requires small partitions size, but small partition size will lead to > more partition numbers that also consumes lots of memory. > > > > Thanks > > Jerry > > > > *From:* Aaron Davidson [mailto:[email protected]] > *Sent:* Friday, November 22, 2013 2:54 PM > *To:* [email protected] > *Subject:* Re: oome from blockmanager > > > > Thanks for your feedback; I think this is a very important issue on the > usability front. One thing to consider is that at some data size, one > simply needs larger or more nodes. m1.large is essentially the smallest ec2 > instance size that can run a Spark job of any reasonable size. That's not > an excuse for an OOM, really -- one should generally just see (heavily) > degraded performance instead of actually failing the job. Additionally, the > number of open files scales with the number of reducers in Spark, rather > than, say, Map Reduce, where each mapper only writes to one file, at the > cost of later sorting the entire thing. This unfortunately means that > adding nodes isn't really a full solution in your case, since each one > would try to have 36k compressed output streams open. > > > > The short term solutions have already been discussed: decrease the number > of reducers (and mappers, if you need them to be tied) or potentially turn > off compression if Snappy is holding too much buffer space. A third option > would actually be to decrease the number of executors per node to 1, since > that would double the available memory and roughly halve the usage. Clearly > either of the latter two solutions will produce a significant slowdown, > while the first should keep the same or better performance. While Spark is > good at handling a large number of partitions, there is still some cost to > schedule every task, as well as to store and forward the metadata for every > shuffle block (which grows with R * M), so the ideal partition size is one > that fits exactly into memory without OOMing -- although this is of course > an unrealistic situation to aim for. > > > > The longer term solutions include algorithms which degrade gracefully > instead of OOMing (although this would be a solution for too-large > partitions instead of too-little, where the metadata and buffering becomes > the issue) and to potentially adopt a more Map-Reducey style of shuffling > where we would only need to write to 1 file per executor at a time, with > some significant processing and disk bandwidth cost. I am currently > investigating shuffle file performance, and thanks to your feedback here, > I'll additionally investigate the memory overheads inherent in shuffling as > well. > > > > > > On Thu, Nov 21, 2013 at 10:20 PM, Stephen Haberman < > [email protected]> wrote: > > > > More significant in shuffling data is the number of reducers > > Makes sense. > > > > so the lower bound on the number of reducers is 1.1TB/8GB = 138 > > This seems slightly optimistic. My math would be: m1.large = 7.5gb total, > leave > 2gb to OS/worker/etc., split 5.5gb between 2 executors = 2.75gb, plus say > Spark > will need 20% or so as metadata/overhead, so ~2gb actually available to > each > executor to put our working data in memory. > > But the 1.1tb of data is compressed, say with a 50% reduction. And we wrap > a > case class around each line to abstract away the parsing logic, and, as > you say, > Java instances will be a good deal bigger than the raw data they > encapsulate. > Maybe 3x bigger? So, 2gb / 2 / 3 = ~300mb of raw data that, once > uncompressed > and loaded as Java objects, would likely fit in RAM. > > 1.1tb/.3gb = 3666 reducers. > > Perhaps I'm being pessmistic, but an 8gb partition size seems way high. Are > other Spark users really using partitions this large? > > I'll admit our current value of 64mb is probably way low. We had seen a > lot of OOMEs when first using Spark, due to having too many partitions > (one per file loaded from S3). When writing our "auto coalesce" logic, > I didn't know a good partition size to shoot for, but had read that > HDFS used 64mb blocks. > > I thought we'd get the most parity with regular Spark/HDFS users by > using the same value, so that's what we went with. Perhaps this was > a bad assumption? > > > > So a key question for you is, how many reducers did you use in this > > task? > > 18,000. Yes, I know that seems naive. > > As an explanation, we prefer for our reports to not have/require any > manual partitioning hints from the programmer. Our theory is that, once > the data is loaded and we make a good guessimiate about partitioning > (which is handled by a utility library that knows our goal partition > size), the report logic itself just shouldn't care. > > So, in this case, the report is just cogrouping the 18k partition RDD with > another RDD, and since we don't have spark.default.parallelism set, the > resulting RDD is also 18k partitions. > > To us, this seems like the only safe default behavior; if the map-side RDD > was > correctly partitioned into 18k, and any fewer partitions would (in theory) > risk > OOMEs, then the reduce-side RDD should have the same number of partitions, > because it will have, for a cogroup, data from multiple RDDs, not just the > biggest upstream RDD. > > We would like to avoid having the report hard-code partition size > overrides into a few/all of it cogroup calls--how would the report know > what value to hard code? What date range is it currently being ran for? > How much data is really there for this run? > > Also, I'm generally cautious about dropping the number of partitions > too low, because my impression is that Spark excels at/prefers lots of > small tasks, since its architecture allows it to schedule/move/recover > them quickly. > > > > I'll also be very interested so see any heap dumps > > Sure! I followed up with Aaron offlist. > > - Stephen > > >
