Hi Nikos,

How many columns and distinct values of "some_column" are there in the DataFrame? Parquet writer is known to be very memory consuming for wide tables. And lots of distinct partition column values result in many concurrent Parquet writers. One possible workaround is to first repartition the data by partition columns first.

Cheng

On 7/15/15 7:05 PM, Nikos Viorres wrote:
Hi,

I am trying to test partitioning for DataFrames with parquet usage so i attempted to do df.write().partitionBy("some_column").parquet(path) on a small dataset of 20.000 records which when saved as parquet locally with gzip take 4mb of disk space. However, on my dev machine with -Dspark.master=local[4] -Dspark.executor.memory=2g -Xmx10g this always fails with an OutOfMemoryError.
Does anyone have any ideas?

stack trace:
[Stage 2:> (0 + 4) / 8]2015-07-15 13:57:21,021 ERROR Logging$class Exception in task 3.0 in stage 2.0 (TID 8)
java.lang.OutOfMemoryError: Java heap space
at parquet.bytes.CapacityByteArrayOutputStream.initSlabs(CapacityByteArrayOutputStream.java:65) at parquet.bytes.CapacityByteArrayOutputStream.<init>(CapacityByteArrayOutputStream.java:57) at parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.<init>(ColumnChunkPageWriteStore.java:68) at parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.<init>(ColumnChunkPageWriteStore.java:48) at parquet.hadoop.ColumnChunkPageWriteStore.getPageWriter(ColumnChunkPageWriteStore.java:215) at parquet.column.impl.ColumnWriteStoreImpl.newMemColumn(ColumnWriteStoreImpl.java:67) at parquet.column.impl.ColumnWriteStoreImpl.getColumnWriter(ColumnWriteStoreImpl.java:56) at parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.<init>(MessageColumnIO.java:178)
at parquet.io.MessageColumnIO.getRecordWriter(MessageColumnIO.java:369)
at parquet.hadoop.InternalParquetRecordWriter.initStore(InternalParquetRecordWriter.java:108) at parquet.hadoop.InternalParquetRecordWriter.<init>(InternalParquetRecordWriter.java:94)
at parquet.hadoop.ParquetRecordWriter.<init>(ParquetRecordWriter.java:64)
at parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:282) at parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:252) at org.apache.spark.sql.parquet.ParquetOutputWriter.<init>(newParquet.scala:111) at org.apache.spark.sql.parquet.ParquetRelation2$$anon$4.newInstance(newParquet.scala:244) at org.apache.spark.sql.sources.DynamicPartitionWriterContainer$$anonfun$outputWriterForRow$1.apply(commands.scala:441) at org.apache.spark.sql.sources.DynamicPartitionWriterContainer$$anonfun$outputWriterForRow$1.apply(commands.scala:436) at scala.collection.mutable.MapLike$class.getOrElseUpdate(MapLike.scala:189)
at scala.collection.mutable.AbstractMap.getOrElseUpdate(Map.scala:91)
at org.apache.spark.sql.sources.DynamicPartitionWriterContainer.outputWriterForRow(commands.scala:436) at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.org <http://org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.org>$apache$spark$sql$sources$InsertIntoHadoopFsRelation$$writeRows$2(commands.scala:227) at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insertWithDynamicPartitions$3.apply(commands.scala:196) at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insertWithDynamicPartitions$3.apply(commands.scala:196)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
at org.apache.spark.scheduler.Task.run(Task.scala:70)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
2015-07-15 13:57:21,051 ERROR Logging$class Uncaught exception in thread Thread[Executor task launch worker-2,5,main]
java.lang.OutOfMemoryError: Java heap space
at parquet.bytes.CapacityByteArrayOutputStream.initSlabs(CapacityByteArrayOutputStream.java:65) at parquet.bytes.CapacityByteArrayOutputStream.<init>(CapacityByteArrayOutputStream.java:57) at parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.<init>(ColumnChunkPageWriteStore.java:68) at parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.<init>(ColumnChunkPageWriteStore.java:48) at parquet.hadoop.ColumnChunkPageWriteStore.getPageWriter(ColumnChunkPageWriteStore.java:215) at parquet.column.impl.ColumnWriteStoreImpl.newMemColumn(ColumnWriteStoreImpl.java:67) at parquet.column.impl.ColumnWriteStoreImpl.getColumnWriter(ColumnWriteStoreImpl.java:56) at parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.<init>(MessageColumnIO.java:178)
at parquet.io.MessageColumnIO.getRecordWriter(MessageColumnIO.java:369)
at parquet.hadoop.InternalParquetRecordWriter.initStore(InternalParquetRecordWriter.java:108) at parquet.hadoop.InternalParquetRecordWriter.<init>(InternalParquetRecordWriter.java:94)
at parquet.hadoop.ParquetRecordWriter.<init>(ParquetRecordWriter.java:64)
at parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:282) at parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:252) at org.apache.spark.sql.parquet.ParquetOutputWriter.<init>(newParquet.scala:111) at org.apache.spark.sql.parquet.ParquetRelation2$$anon$4.newInstance(newParquet.scala:244) at org.apache.spark.sql.sources.DynamicPartitionWriterContainer$$anonfun$outputWriterForRow$1.apply(commands.scala:441) at org.apache.spark.sql.sources.DynamicPartitionWriterContainer$$anonfun$outputWriterForRow$1.apply(commands.scala:436) at scala.collection.mutable.MapLike$class.getOrElseUpdate(MapLike.scala:189)
at scala.collection.mutable.AbstractMap.getOrElseUpdate(Map.scala:91)
at org.apache.spark.sql.sources.DynamicPartitionWriterContainer.outputWriterForRow(commands.scala:436) at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.org <http://org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.org>$apache$spark$sql$sources$InsertIntoHadoopFsRelation$$writeRows$2(commands.scala:227) at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insertWithDynamicPartitions$3.apply(commands.scala:196) at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insertWithDynamicPartitions$3.apply(commands.scala:196)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
at org.apache.spark.scheduler.Task.run(Task.scala:70)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
2015-07-15 13:57:21,157 ERROR Logging$class Task 3 in stage 2.0 failed 1 times; aborting job
2015-07-15 13:57:21,194 ERROR Logging$class Aborting job.
org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in stage 2.0 failed 1 times, most recent failure: Lost task 3.0 in stage 2.0 (TID 8, localhost): java.lang.OutOfMemoryError: Java heap space at parquet.bytes.CapacityByteArrayOutputStream.initSlabs(CapacityByteArrayOutputStream.java:65) at parquet.bytes.CapacityByteArrayOutputStream.<init>(CapacityByteArrayOutputStream.java:57) at parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.<init>(ColumnChunkPageWriteStore.java:68) at parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.<init>(ColumnChunkPageWriteStore.java:48) at parquet.hadoop.ColumnChunkPageWriteStore.getPageWriter(ColumnChunkPageWriteStore.java:215) at parquet.column.impl.ColumnWriteStoreImpl.newMemColumn(ColumnWriteStoreImpl.java:67) at parquet.column.impl.ColumnWriteStoreImpl.getColumnWriter(ColumnWriteStoreImpl.java:56) at parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.<init>(MessageColumnIO.java:178)
at parquet.io.MessageColumnIO.getRecordWriter(MessageColumnIO.java:369)
at parquet.hadoop.InternalParquetRecordWriter.initStore(InternalParquetRecordWriter.java:108) at parquet.hadoop.InternalParquetRecordWriter.<init>(InternalParquetRecordWriter.java:94)
at parquet.hadoop.ParquetRecordWriter.<init>(ParquetRecordWriter.java:64)
at parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:282) at parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:252) at org.apache.spark.sql.parquet.ParquetOutputWriter.<init>(newParquet.scala:111) at org.apache.spark.sql.parquet.ParquetRelation2$$anon$4.newInstance(newParquet.scala:244) at org.apache.spark.sql.sources.DynamicPartitionWriterContainer$$anonfun$outputWriterForRow$1.apply(commands.scala:441) at org.apache.spark.sql.sources.DynamicPartitionWriterContainer$$anonfun$outputWriterForRow$1.apply(commands.scala:436) at scala.collection.mutable.MapLike$class.getOrElseUpdate(MapLike.scala:189)
at scala.collection.mutable.AbstractMap.getOrElseUpdate(Map.scala:91)
at org.apache.spark.sql.sources.DynamicPartitionWriterContainer.outputWriterForRow(commands.scala:436) at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.org <http://org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.org>$apache$spark$sql$sources$InsertIntoHadoopFsRelation$$writeRows$2(commands.scala:227) at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insertWithDynamicPartitions$3.apply(commands.scala:196) at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insertWithDynamicPartitions$3.apply(commands.scala:196)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
at org.apache.spark.scheduler.Task.run(Task.scala:70)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org <http://org.apache.spark.scheduler.DAGScheduler.org>$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1266) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1257) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1256) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1256) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1450) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1411)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
2015-07-15 13:57:21,221 ERROR Logging$class Job job_201507151056_0000 aborted.
Exception in thread "main" org.apache.spark.SparkException: Job aborted.
at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.insertWithDynamicPartitions(commands.scala:202) at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.run(commands.scala:118) at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:57) at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:57) at org.apache.spark.sql.execution.ExecutedCommand.doExecute(commands.scala:68) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:88) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:88) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:148)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:87)
at org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:939) at org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:939)
at org.apache.spark.sql.sources.ResolvedDataSource$.apply(ddl.scala:332)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:144)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:135)
at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:281)
at my.spark.test.PartitionTest.main(PartitionTest.java:147)




Reply via email to