The Parquet output writer allocates one block for each table partition it
is processing and writes partitions in parallel. It will run out of memory
if (number of partitions) times (Parquet block size) is greater than the
available memory. You can try to decrease the number of partitions. And
could you share the value of "parquet.block.size" and your available memory?

2015-09-05 18:59 GMT+08:00 Yana Kadiyska <yana.kadiy...@gmail.com>:

> Hi folks, I have a strange issue. Trying to read a 7G file and do failry
> simple stuff with it:
>
> I can read the file/do simple operations on it. However, I'd prefer to
> increase the number of partitions in preparation for more memory-intensive
> operations (I'm happy to wait, I just need the job to complete).
> Repartition seems to cause an OOM for me?
> Could someone shed light/or speculate/ why this would happen -- I thought
> we repartition higher to relieve memory pressure?
>
> Im using Spark1.4.1 CDH4 if that makes a difference
>
> This works
>
> val res2 = sqlContext.parquetFile(lst:_*).where($"customer_id"===lit(254))
> res2.count
> res1: Long = 77885925
>
> scala> res2.explain
> == Physical Plan ==
> Filter (customer_id#314 = 254)
>  PhysicalRDD [....4], MapPartitionsRDD[11] at
>
> scala> res2.rdd.partitions.size
> res3: Int = 59
>
> ​
>
>
> This doesnt:
>
> scala> res2.repartition(60).count
> [Stage 2:>                                                        (1 + 45) / 
> 59]15/09/05 10:17:21 WARN TaskSetManager: Lost task 2.0 in stage 2.0 (TID 62, 
> fqdn): java.lang.OutOfMemoryError: Java heap space
>         at 
> parquet.hadoop.ParquetFileReader$ConsecutiveChunkList.readAll(ParquetFileReader.java:729)
>         at 
> parquet.hadoop.ParquetFileReader.readNextRowGroup(ParquetFileReader.java:490)
>         at 
> parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:116)
>         at 
> parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:193)
>         at 
> parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:204)
>         at 
> org.apache.spark.sql.sources.SqlNewHadoopRDD$anon$1.hasNext(SqlNewHadoopRDD.scala:163)
>         at 
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
>         at scala.collection.Iterator$anon$11.hasNext(Iterator.scala:327)
>         at scala.collection.Iterator$anon$14.hasNext(Iterator.scala:388)
>         at scala.collection.Iterator$anon$11.hasNext(Iterator.scala:327)
>         at scala.collection.Iterator$anon$11.hasNext(Iterator.scala:327)
>         at 
> org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:207)
>         at 
> org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:62)
>         at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70)
>         at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>         at org.apache.spark.scheduler.Task.run(Task.scala:70)
>         at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>         at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>         at java.lang.Thread.run(Thread.java:745)
>
> ​
>

Reply via email to