Thanks for your detailed reply Imran. I'm writing this in Clojure (using Flambo which uses the Java API) but I don't think that's relevant. So here's the pseudocode (sorry I've not written Scala for a long time):
val rawData = sc.hadoopFile("/dir/to/gzfiles") // NB multiple files. val parsedFiles = rawData.map(parseFunction) // can return nil on failure val filtered = parsedFiles.filter(notNil) val partitioned = filtered.repartition(100) // guessed number val persisted = partitioned.persist(StorageLevels.DISK_ONLY) val resultA = stuffA(persisted) val resultB = stuffB(persisted) val resultC = stuffC(persisted) So, I think I'm already doing what you suggested. I would have assumed that partition size would be («size of expanded file» / «number of partitions»). In this case, 100 (which I picked out of the air). I wonder whether the «size of expanded file» is actually the size of all concatenated input files (probably about 800 GB)? In that case should I multiply it by the number of files? Or perhaps I'm barking up completely the wrong tree. Joe On 19 February 2015 at 14:44, Imran Rashid <iras...@cloudera.com> wrote: > Hi Joe, > > The issue is not that you have input partitions that are bigger than 2GB > -- its just that they are getting cached. You can see in the stack trace, > the problem is when you try to read data out of the DiskStore: > > org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:132) > > Also, just because you see this: > > 15/02/18 20:35:52 INFO scheduler.DAGScheduler: Submitting 100 missing > tasks from Stage 1 (MappedRDD[17] at mapToPair at > NativeMethodAccessorImpl.java:-2) > > it doesn't *necessarily* mean that this is coming from your map. It can > be pretty confusing how your operations on RDDs get turned into stages, it > could be a lot more than just your map. and actually, it might not even be > your map at all -- some of the other operations you invoke call map > underneath the covers. So its hard to say what is going on here w/ out > seeing more code. Anyway, maybe you've already considered all this (you > did mention the lazy execution of the DAG), but I wanted to make sure. it > might help to use rdd.setName() and also to look at rdd.toDebugString. > > As far as what you can do about this -- it could be as simple as moving > your rdd.persist() to after you have compressed and repartitioned your > data. eg., I'm blindly guessing you have something like this: > > val rawData = sc.hadoopFile(...) > rawData.persist(DISK) > rawData.count() > val compressedData = rawData.map{...} > val repartitionedData = compressedData.repartition(N) > ... > > change it to something like: > > val rawData = sc.hadoopFile(...) > val compressedData = rawData.map{...} > val repartitionedData = compressedData.repartition(N) > repartitionedData.persist(DISK) > repartitionedData.count() > ... > > > The point is, you avoid caching any data until you have ensured that the > partitions are small. You might have big partitions before that in > rawData, but that is OK. > > Imran > > > On Thu, Feb 19, 2015 at 4:43 AM, Joe Wass <jw...@crossref.org> wrote: > >> Thanks for your reply Sean. >> >> Looks like it's happening in a map: >> >> 15/02/18 20:35:52 INFO scheduler.DAGScheduler: Submitting 100 missing >> tasks from Stage 1 (MappedRDD[17] at mapToPair at >> NativeMethodAccessorImpl.java:-2) >> >> That's my initial 'parse' stage, done before repartitioning. It reduces >> the data size significantly so I thought it would be sensible to do before >> repartitioning, which involves moving lots of data around. That might be a >> stupid idea in hindsight! >> >> So the obvious thing to try would be to try repartitioning before the map >> as the first transformation. I would have done that if I could be sure that >> it would succeed or fail quickly. >> >> I'm not entirely clear about the lazy execution of transformations in >> DAG. It could be that the error is manifesting during the mapToPair, but >> caused by the earlier read from text file stage. >> >> Thanks for pointers to those compression formats. I'll give them a go >> (although it's not trivial to re-encode 200 GB of data on S3, so if I can >> get this working reasonably with gzip I'd like to). >> >> Any advice about whether this error can be worked round with an early >> partition? >> >> Cheers >> >> Joe >> >> >> On 19 February 2015 at 09:51, Sean Owen <so...@cloudera.com> wrote: >> >>> gzip and zip are not splittable compression formats; bzip and lzo are. >>> Ideally, use a splittable compression format. >>> >>> Repartitioning is not a great solution since it means a shuffle, >>> typically. >>> >>> This is not necessarily related to how big your partitions are. The >>> question is, when does this happen? what operation? >>> >>> On Thu, Feb 19, 2015 at 9:35 AM, Joe Wass <jw...@crossref.org> wrote: >>> > On the advice of some recent discussions on this list, I thought I >>> would try >>> > and consume gz files directly. I'm reading them, doing a preliminary >>> map, >>> > then repartitioning, then doing normal spark things. >>> > >>> > As I understand it, zip files aren't readable in partitions because of >>> the >>> > format, so I thought that repartitioning would be the next best thing >>> for >>> > parallelism. I have about 200 files, some about 1GB compressed and >>> some over >>> > 2GB uncompressed. >>> > >>> > I'm hitting the 2GB maximum partition size. It's been discussed on >>> this list >>> > (topic: "2GB limit for partitions?", tickets SPARK-1476 and >>> SPARK-1391). >>> > Stack trace at the end. This happened at 10 hours in (probably when it >>> saw >>> > its first file). I can't just re-run it quickly! >>> > >>> > Does anyone have any advice? Might I solve this by re-partitioning as >>> the >>> > first step after reading the file(s)? Or is it effectively impossible >>> to >>> > read a gz file that expands to over 2GB? Does anyone have any >>> experience >>> > with this? >>> > >>> > Thanks in advance >>> > >>> > Joe >>> > >>> > Stack trace: >>> > >>> > Exception in thread "main" 15/02/18 20:44:25 INFO >>> scheduler.TaskSetManager: >>> > Lost task 5.3 in stage 1.0 (TID 283) on executor: >>> > java.lang.IllegalArgumentException (Size exceeds Integer.MAX_VALUE) >>> > [duplicate 6] >>> > org.apache.spark.SparkException: Job aborted due to stage failure: >>> Task 2 in >>> > stage 1.0 failed 4 times, most recent failure: Lost task 2.3 in stage >>> 1.0: >>> > java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE >>> > at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:829) >>> > at >>> org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:123) >>> > at >>> org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:132) >>> > at >>> > >>> org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:517) >>> > at >>> > org.apache.spark.storage.BlockManager.getLocal(BlockManager.scala:432) >>> > at >>> org.apache.spark.storage.BlockManager.get(BlockManager.scala:618) >>> > at >>> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:44) >>> > at org.apache.spark.rdd.RDD.iterator(RDD.scala:245) >>> >> >> >