Jerry, I need to correct what I said about the 100KB for each FastBufferedOutputStream -- this is actually a Spark buffer, not a compression buffer. The size can be configured using the "spark.shuffle.file.buffer.kb" System property, and it defaults to 100. I am still curious if you're using compression or seeing more than 48k DiskBlockObjectWriters to account for the remaining memory used.
On Fri, Nov 22, 2013 at 9:05 AM, Aaron Davidson <[email protected]> wrote: > Great, thanks for the feedback. It sounds like you're using the LZF > compression scheme -- switching to Snappy should see significantly less > buffer space used up per DiskBlockObjectWriter, but this doesn't really > solve the underlying problem. In general I've been thinking of "Spark > nodes" as having high memory and a moderate number of cores, but with 24 > cores and 40GB of memory, each core really doesn't get that much memory > individually, despite every one needing its own set of > DiskBlockObjectWriters. > > One thing that is a little odd is that with your numbers, you should have > 2000 (reducers) * 24 (cores) = 48k DiskBlockObjectWriters. These should > only require a total of 4.8GB for the entire node, though, rather than 80% > of your JVM memory. Were you seeing significantly more than 48k > DiskBlockObjectWriters? > > > On Fri, Nov 22, 2013 at 1:38 AM, Shao, Saisai <[email protected]>wrote: > >> Hi Aaron, >> >> >> >> I’ve also met the same problem that shuffle takes so much overhead for >> large number of partitions. I think it is an important issue when >> processing large data. >> >> >> >> In my case I have 2000 mapper and 2000 reducers, I dump the memory of >> executor and found that byte array takes about 80% of total jvm memory, >> which are referred by FastBufferedOutputStream, and created by >> DiskBlockObjectWriter. It seems that there are so many instances of >> DiskBlockObjectWriter and each DiskBlockObjectWriter will has 100KB buffer >> for FastBufferedOutputStream by default. These buffers are persisted >> through task execution period and cannot be garbage collected unless task >> is finished. >> >> >> >> My cluster has 6 nodes, and 40G memory and 24 core per node, I tried with >> 5000 partitions, this will easily got OOM. >> >> >> >> What a dilemma is that my application needs groupByKey transformation >> which requires small partitions size, but small partition size will lead to >> more partition numbers that also consumes lots of memory. >> >> >> >> Thanks >> >> Jerry >> >> >> >> *From:* Aaron Davidson [mailto:[email protected]] >> *Sent:* Friday, November 22, 2013 2:54 PM >> *To:* [email protected] >> *Subject:* Re: oome from blockmanager >> >> >> >> Thanks for your feedback; I think this is a very important issue on the >> usability front. One thing to consider is that at some data size, one >> simply needs larger or more nodes. m1.large is essentially the smallest ec2 >> instance size that can run a Spark job of any reasonable size. That's not >> an excuse for an OOM, really -- one should generally just see (heavily) >> degraded performance instead of actually failing the job. Additionally, the >> number of open files scales with the number of reducers in Spark, rather >> than, say, Map Reduce, where each mapper only writes to one file, at the >> cost of later sorting the entire thing. This unfortunately means that >> adding nodes isn't really a full solution in your case, since each one >> would try to have 36k compressed output streams open. >> >> >> >> The short term solutions have already been discussed: decrease the number >> of reducers (and mappers, if you need them to be tied) or potentially turn >> off compression if Snappy is holding too much buffer space. A third option >> would actually be to decrease the number of executors per node to 1, since >> that would double the available memory and roughly halve the usage. Clearly >> either of the latter two solutions will produce a significant slowdown, >> while the first should keep the same or better performance. While Spark is >> good at handling a large number of partitions, there is still some cost to >> schedule every task, as well as to store and forward the metadata for every >> shuffle block (which grows with R * M), so the ideal partition size is one >> that fits exactly into memory without OOMing -- although this is of course >> an unrealistic situation to aim for. >> >> >> >> The longer term solutions include algorithms which degrade gracefully >> instead of OOMing (although this would be a solution for too-large >> partitions instead of too-little, where the metadata and buffering becomes >> the issue) and to potentially adopt a more Map-Reducey style of shuffling >> where we would only need to write to 1 file per executor at a time, with >> some significant processing and disk bandwidth cost. I am currently >> investigating shuffle file performance, and thanks to your feedback here, >> I'll additionally investigate the memory overheads inherent in shuffling as >> well. >> >> >> >> >> >> On Thu, Nov 21, 2013 at 10:20 PM, Stephen Haberman < >> [email protected]> wrote: >> >> >> > More significant in shuffling data is the number of reducers >> >> Makes sense. >> >> >> > so the lower bound on the number of reducers is 1.1TB/8GB = 138 >> >> This seems slightly optimistic. My math would be: m1.large = 7.5gb total, >> leave >> 2gb to OS/worker/etc., split 5.5gb between 2 executors = 2.75gb, plus say >> Spark >> will need 20% or so as metadata/overhead, so ~2gb actually available to >> each >> executor to put our working data in memory. >> >> But the 1.1tb of data is compressed, say with a 50% reduction. And we >> wrap a >> case class around each line to abstract away the parsing logic, and, as >> you say, >> Java instances will be a good deal bigger than the raw data they >> encapsulate. >> Maybe 3x bigger? So, 2gb / 2 / 3 = ~300mb of raw data that, once >> uncompressed >> and loaded as Java objects, would likely fit in RAM. >> >> 1.1tb/.3gb = 3666 reducers. >> >> Perhaps I'm being pessmistic, but an 8gb partition size seems way high. >> Are >> other Spark users really using partitions this large? >> >> I'll admit our current value of 64mb is probably way low. We had seen a >> lot of OOMEs when first using Spark, due to having too many partitions >> (one per file loaded from S3). When writing our "auto coalesce" logic, >> I didn't know a good partition size to shoot for, but had read that >> HDFS used 64mb blocks. >> >> I thought we'd get the most parity with regular Spark/HDFS users by >> using the same value, so that's what we went with. Perhaps this was >> a bad assumption? >> >> >> > So a key question for you is, how many reducers did you use in this >> > task? >> >> 18,000. Yes, I know that seems naive. >> >> As an explanation, we prefer for our reports to not have/require any >> manual partitioning hints from the programmer. Our theory is that, once >> the data is loaded and we make a good guessimiate about partitioning >> (which is handled by a utility library that knows our goal partition >> size), the report logic itself just shouldn't care. >> >> So, in this case, the report is just cogrouping the 18k partition RDD with >> another RDD, and since we don't have spark.default.parallelism set, the >> resulting RDD is also 18k partitions. >> >> To us, this seems like the only safe default behavior; if the map-side >> RDD was >> correctly partitioned into 18k, and any fewer partitions would (in >> theory) risk >> OOMEs, then the reduce-side RDD should have the same number of partitions, >> because it will have, for a cogroup, data from multiple RDDs, not just the >> biggest upstream RDD. >> >> We would like to avoid having the report hard-code partition size >> overrides into a few/all of it cogroup calls--how would the report know >> what value to hard code? What date range is it currently being ran for? >> How much data is really there for this run? >> >> Also, I'm generally cautious about dropping the number of partitions >> too low, because my impression is that Spark excels at/prefers lots of >> small tasks, since its architecture allows it to schedule/move/recover >> them quickly. >> >> >> > I'll also be very interested so see any heap dumps >> >> Sure! I followed up with Aaron offlist. >> >> - Stephen >> >> >> > >
