> More significant in shuffling data is the number of reducers

Makes sense.

> so the lower bound on the number of reducers is 1.1TB/8GB = 138

This seems slightly optimistic. My math would be: m1.large = 7.5gb total, leave
2gb to OS/worker/etc., split 5.5gb between 2 executors = 2.75gb, plus say Spark
will need 20% or so as metadata/overhead, so ~2gb actually available to each
executor to put our working data in memory.

But the 1.1tb of data is compressed, say with a 50% reduction. And we wrap a
case class around each line to abstract away the parsing logic, and, as you say,
Java instances will be a good deal bigger than the raw data they encapsulate.
Maybe 3x bigger? So, 2gb / 2 / 3 = ~300mb of raw data that, once uncompressed
and loaded as Java objects, would likely fit in RAM.

1.1tb/.3gb = 3666 reducers.

Perhaps I'm being pessmistic, but an 8gb partition size seems way high. Are
other Spark users really using partitions this large?

I'll admit our current value of 64mb is probably way low. We had seen a
lot of OOMEs when first using Spark, due to having too many partitions
(one per file loaded from S3). When writing our "auto coalesce" logic,
I didn't know a good partition size to shoot for, but had read that
HDFS used 64mb blocks.

I thought we'd get the most parity with regular Spark/HDFS users by
using the same value, so that's what we went with. Perhaps this was
a bad assumption?

> So a key question for you is, how many reducers did you use in this
> task?

18,000. Yes, I know that seems naive.

As an explanation, we prefer for our reports to not have/require any
manual partitioning hints from the programmer. Our theory is that, once
the data is loaded and we make a good guessimiate about partitioning
(which is handled by a utility library that knows our goal partition
size), the report logic itself just shouldn't care.

So, in this case, the report is just cogrouping the 18k partition RDD with
another RDD, and since we don't have spark.default.parallelism set, the
resulting RDD is also 18k partitions.

To us, this seems like the only safe default behavior; if the map-side RDD was
correctly partitioned into 18k, and any fewer partitions would (in theory) risk
OOMEs, then the reduce-side RDD should have the same number of partitions,
because it will have, for a cogroup, data from multiple RDDs, not just the
biggest upstream RDD.

We would like to avoid having the report hard-code partition size
overrides into a few/all of it cogroup calls--how would the report know
what value to hard code? What date range is it currently being ran for?
How much data is really there for this run?

Also, I'm generally cautious about dropping the number of partitions
too low, because my impression is that Spark excels at/prefers lots of
small tasks, since its architecture allows it to schedule/move/recover
them quickly.

> I'll also be very interested so see any heap dumps

Sure! I followed up with Aaron offlist.

- Stephen

Reply via email to