Thanks for your detailed reply Imran. I'm writing this in Clojure (using
Flambo which uses the Java API) but I don't think that's relevant. So
here's the pseudocode (sorry I've not written Scala for a long time):

val rawData = sc.hadoopFile("/dir/to/gzfiles") // NB multiple files.
val parsedFiles = rawData.map(parseFunction)   // can return nil on failure
val filtered = parsedFiles.filter(notNil)
val partitioned = filtered.repartition(100) // guessed number
val persisted = partitioned.persist(StorageLevels.DISK_ONLY)

val resultA = stuffA(persisted)
val resultB = stuffB(persisted)
val resultC = stuffC(persisted)

So, I think I'm already doing what you suggested. I would have assumed that
partition size would be («size of expanded file» / «number of partitions»).
In this case, 100 (which I picked out of the air).

I wonder whether the «size of expanded file» is actually the size of all
concatenated input files (probably about 800 GB)? In that case should I
multiply it by the number of files? Or perhaps I'm barking up completely
the wrong tree.

Joe




On 19 February 2015 at 14:44, Imran Rashid <iras...@cloudera.com> wrote:

> Hi Joe,
>
> The issue is not that you have input partitions that are bigger than 2GB
> -- its just that they are getting cached.  You can see in the stack trace,
> the problem is when you try to read data out of the DiskStore:
>
> org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:132)
>
> Also, just because you see this:
>
> 15/02/18 20:35:52 INFO scheduler.DAGScheduler: Submitting 100 missing
> tasks from Stage 1 (MappedRDD[17] at mapToPair at
> NativeMethodAccessorImpl.java:-2)
>
> it doesn't *necessarily* mean that this is coming from your map.  It can
> be pretty confusing how your operations on RDDs get turned into stages, it
> could be a lot more than just your map.  and actually, it might not even be
> your map at all -- some of the other operations you invoke call map
> underneath the covers.  So its hard to say what is going on here w/ out
> seeing more code.  Anyway, maybe you've already considered all this (you
> did mention the lazy execution of the DAG), but I wanted to make sure.  it
> might help to use rdd.setName() and also to look at rdd.toDebugString.
>
> As far as what you can do about this -- it could be as simple as moving
> your rdd.persist() to after you have compressed and repartitioned your
> data.  eg., I'm blindly guessing you have something like this:
>
> val rawData = sc.hadoopFile(...)
> rawData.persist(DISK)
> rawData.count()
> val compressedData = rawData.map{...}
> val repartitionedData = compressedData.repartition(N)
> ...
>
> change it to something like:
>
> val rawData = sc.hadoopFile(...)
> val compressedData = rawData.map{...}
> val repartitionedData = compressedData.repartition(N)
> repartitionedData.persist(DISK)
> repartitionedData.count()
> ...
>
>
> The point is, you avoid caching any data until you have ensured that the
> partitions are small.  You might have big partitions before that in
> rawData, but that is OK.
>
> Imran
>
>
> On Thu, Feb 19, 2015 at 4:43 AM, Joe Wass <jw...@crossref.org> wrote:
>
>> Thanks for your reply Sean.
>>
>> Looks like it's happening in a map:
>>
>> 15/02/18 20:35:52 INFO scheduler.DAGScheduler: Submitting 100 missing
>> tasks from Stage 1 (MappedRDD[17] at mapToPair at
>> NativeMethodAccessorImpl.java:-2)
>>
>> That's my initial 'parse' stage, done before repartitioning. It reduces
>> the data size significantly so I thought it would be sensible to do before
>> repartitioning, which involves moving lots of data around. That might be a
>> stupid idea in hindsight!
>>
>> So the obvious thing to try would be to try repartitioning before the map
>> as the first transformation. I would have done that if I could be sure that
>> it would succeed or fail quickly.
>>
>> I'm not entirely clear about the lazy execution of transformations in
>> DAG. It could be that the error is manifesting during the mapToPair, but
>> caused by the earlier read from text file stage.
>>
>> Thanks for pointers to those compression formats. I'll give them a go
>> (although it's not trivial to re-encode 200 GB of data on S3, so if I can
>> get this working reasonably with gzip I'd like to).
>>
>> Any advice about whether this error can be worked round with an early
>> partition?
>>
>> Cheers
>>
>> Joe
>>
>>
>> On 19 February 2015 at 09:51, Sean Owen <so...@cloudera.com> wrote:
>>
>>> gzip and zip are not splittable compression formats; bzip and lzo are.
>>> Ideally, use a splittable compression format.
>>>
>>> Repartitioning is not a great solution since it means a shuffle,
>>> typically.
>>>
>>> This is not necessarily related to how big your partitions are. The
>>> question is, when does this happen? what operation?
>>>
>>> On Thu, Feb 19, 2015 at 9:35 AM, Joe Wass <jw...@crossref.org> wrote:
>>> > On the advice of some recent discussions on this list, I thought I
>>> would try
>>> > and consume gz files directly. I'm reading them, doing a preliminary
>>> map,
>>> > then repartitioning, then doing normal spark things.
>>> >
>>> > As I understand it, zip files aren't readable in partitions because of
>>> the
>>> > format, so I thought that repartitioning would be the next best thing
>>> for
>>> > parallelism. I have about 200 files, some about 1GB compressed and
>>> some over
>>> > 2GB uncompressed.
>>> >
>>> > I'm hitting the 2GB maximum partition size. It's been discussed on
>>> this list
>>> > (topic: "2GB limit for partitions?", tickets SPARK-1476 and
>>> SPARK-1391).
>>> > Stack trace at the end. This happened at 10 hours in (probably when it
>>> saw
>>> > its first file). I can't just re-run it quickly!
>>> >
>>> > Does anyone have any advice? Might I solve this by re-partitioning as
>>> the
>>> > first step after reading the file(s)? Or is it effectively impossible
>>> to
>>> > read a gz file that expands to over 2GB? Does anyone have any
>>> experience
>>> > with this?
>>> >
>>> > Thanks in advance
>>> >
>>> > Joe
>>> >
>>> > Stack trace:
>>> >
>>> > Exception in thread "main" 15/02/18 20:44:25 INFO
>>> scheduler.TaskSetManager:
>>> > Lost task 5.3 in stage 1.0 (TID 283) on executor:
>>> > java.lang.IllegalArgumentException (Size exceeds Integer.MAX_VALUE)
>>> > [duplicate 6]
>>> > org.apache.spark.SparkException: Job aborted due to stage failure:
>>> Task 2 in
>>> > stage 1.0 failed 4 times, most recent failure: Lost task 2.3 in stage
>>> 1.0:
>>> > java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
>>> >         at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:829)
>>> >         at
>>> org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:123)
>>> >         at
>>> org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:132)
>>> >         at
>>> >
>>> org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:517)
>>> >         at
>>> > org.apache.spark.storage.BlockManager.getLocal(BlockManager.scala:432)
>>> >         at
>>> org.apache.spark.storage.BlockManager.get(BlockManager.scala:618)
>>> >         at
>>> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:44)
>>> >         at org.apache.spark.rdd.RDD.iterator(RDD.scala:245)
>>>
>>
>>
>

Reply via email to