Ah, I think I finally got this. Spark v0.8.0-incubating uses Hadoop 1.0.4 by default. I needed to compile it with "SPARK_HADOOP_VERSION=1.1.0 sbt/sbt assembly", as this fix is only available from Hadoop 1.1.0 on.
http://hadoop.apache.org/releases.html#13+October%2C+2012%3A+Release+1.1.0+available lists "Splittable bzip2 files" under bug fixes. Grega On Mon, Oct 14, 2013 at 12:03 PM, Grega Kešpret <[email protected]> wrote: > I've tried using bzip2, but even with this method, when I do > sc.textFile("s3n://.../input.bz2", minSplits) for whatever value of > minSplits greater than 1, it doesn't seem to be able to process it in more > partitions: > > scala> val logs = sc.textFile("s3n://.../input.bz2", 10) > scala> logs.toDebugString > 13/10/14 09:55:42 INFO mapred.FileInputFormat: Total input paths to > process : 1 > res12: String = > MappedRDD[287] at textFile at <console>:21 (1 partitions) > HadoopRDD[286] at textFile at <console>:21 (1 partitions) > > I'm using Spark v0.8.0-incubating. > > > > Grega > -- > [image: Inline image 1] > *Grega Kešpret* > Analytics engineer > > Celtra — Rich Media Mobile Advertising > celtra.com <http://www.celtra.com/> | > @celtramobile<http://www.twitter.com/celtramobile> > > > On Sun, Oct 13, 2013 at 2:10 AM, Grega Kešpret <[email protected]> wrote: > >> Thanks a lot, didn't know this. If I use some other compresion format >> that supports splitting (like bzip2), do I get decompression for free when >> I do sc.textFile (like with gzipped files)? >> >> Grega >> -- >> [image: Inline image 1] >> *Grega Kešpret* >> Analytics engineer >> >> Celtra — Rich Media Mobile Advertising >> celtra.com <http://www.celtra.com/> | >> @celtramobile<http://www.twitter.com/celtramobile> >> >> >> On Sun, Oct 13, 2013 at 2:07 AM, Mark Hamstra <[email protected]>wrote: >> >>> The basic problem that you are running into is that gzipped file is not >>> splittable<https://www.inkling.com/read/hadoop-definitive-guide-tom-white-3rd/chapter-4/compression#8ca1fda1252b67145680b3a5e9d45b2a> >>> . >>> >>> >>> On Sat, Oct 12, 2013 at 4:51 PM, Grega Kešpret <[email protected]> wrote: >>> >>>> Hi, >>>> >>>> I'm getting Java OOM (Heap, GC overhead exceeded), Futures timed out >>>> after [10000] milliseconds, removing BlockManager with no recent heartbeat >>>> etc. I have narrowed down the cause to be a big input file from S3. I'm >>>> trying to make Spark split this file to several smaller chunks, so each of >>>> these chunks will fit in memory, but I'm out of luck. >>>> >>>> I have tried: >>>> - passing minSplits parameter to something greater than 1 in sc.textFile >>>> - increasing parameter numPartitions to groupByKey >>>> - using coalesce with numPartitions greater than 1 and shuffle = true >>>> >>>> Basically my flow is like this: >>>> val input = sc.textFile("s3n://.../input.gz", minSplits) >>>> input >>>> .mapPartitions(l => (key, l)) >>>> .groupByKey(numPartitions) >>>> .map(...) >>>> .saveAsTextFile >>>> >>>> If I do input.toDebugString, I always have 1 partition (even if the >>>> minSplits is greater than 1). It seems like Spark is trying to ingest the >>>> whole input at once. When I manually split the file into several smaller >>>> ones, I was able to progress successfully, and input.toDebugString was >>>> showing 10 partitions in case of 10 files. >>>> >>>> Thanks, >>>> >>>> Grega >>>> >>> >>> >> >
<<celtra_logo.png>>
