Ηι Lian, Thank you for the tip. Indeed, there were a lot of distinct values in my result set (approximately 3000). As you suggested i decided to partition the data firstly on a column with much smaller cardinality. Thanks
n On Thu, Jul 16, 2015 at 2:09 PM, Cheng Lian <lian.cs....@gmail.com> wrote: > Hi Nikos, > > How many columns and distinct values of "some_column" are there in the > DataFrame? Parquet writer is known to be very memory consuming for wide > tables. And lots of distinct partition column values result in many > concurrent Parquet writers. One possible workaround is to first repartition > the data by partition columns first. > > Cheng > > > On 7/15/15 7:05 PM, Nikos Viorres wrote: > > Hi, > > I am trying to test partitioning for DataFrames with parquet usage so i > attempted to do df.write().partitionBy("some_column").parquet(path) on a > small dataset of 20.000 records which when saved as parquet locally with > gzip take 4mb of disk space. > However, on my dev machine with > -Dspark.master=local[4] -Dspark.executor.memory=2g -Xmx10g this always > fails with an OutOfMemoryError. > Does anyone have any ideas? > > stack trace: > [Stage 2:> (0 + > 4) / 8]2015-07-15 13:57:21,021 ERROR Logging$class Exception in task 3.0 in > stage 2.0 (TID 8) > java.lang.OutOfMemoryError: Java heap space > at > parquet.bytes.CapacityByteArrayOutputStream.initSlabs(CapacityByteArrayOutputStream.java:65) > at > parquet.bytes.CapacityByteArrayOutputStream.<init>(CapacityByteArrayOutputStream.java:57) > at > parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.<init>(ColumnChunkPageWriteStore.java:68) > at > parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.<init>(ColumnChunkPageWriteStore.java:48) > at > parquet.hadoop.ColumnChunkPageWriteStore.getPageWriter(ColumnChunkPageWriteStore.java:215) > at > parquet.column.impl.ColumnWriteStoreImpl.newMemColumn(ColumnWriteStoreImpl.java:67) > at > parquet.column.impl.ColumnWriteStoreImpl.getColumnWriter(ColumnWriteStoreImpl.java:56) > at > parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.<init>(MessageColumnIO.java:178) > at parquet.io.MessageColumnIO.getRecordWriter(MessageColumnIO.java:369) > at > parquet.hadoop.InternalParquetRecordWriter.initStore(InternalParquetRecordWriter.java:108) > at > parquet.hadoop.InternalParquetRecordWriter.<init>(InternalParquetRecordWriter.java:94) > at parquet.hadoop.ParquetRecordWriter.<init>(ParquetRecordWriter.java:64) > at > parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:282) > at > parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:252) > at > org.apache.spark.sql.parquet.ParquetOutputWriter.<init>(newParquet.scala:111) > at > org.apache.spark.sql.parquet.ParquetRelation2$$anon$4.newInstance(newParquet.scala:244) > at > org.apache.spark.sql.sources.DynamicPartitionWriterContainer$$anonfun$outputWriterForRow$1.apply(commands.scala:441) > at > org.apache.spark.sql.sources.DynamicPartitionWriterContainer$$anonfun$outputWriterForRow$1.apply(commands.scala:436) > at > scala.collection.mutable.MapLike$class.getOrElseUpdate(MapLike.scala:189) > at scala.collection.mutable.AbstractMap.getOrElseUpdate(Map.scala:91) > at > org.apache.spark.sql.sources.DynamicPartitionWriterContainer.outputWriterForRow(commands.scala:436) > at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.org > $apache$spark$sql$sources$InsertIntoHadoopFsRelation$$writeRows$2(commands.scala:227) > at > org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insertWithDynamicPartitions$3.apply(commands.scala:196) > at > org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insertWithDynamicPartitions$3.apply(commands.scala:196) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63) > at org.apache.spark.scheduler.Task.run(Task.scala:70) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > 2015-07-15 13:57:21,051 ERROR Logging$class Uncaught exception in thread > Thread[Executor task launch worker-2,5,main] > java.lang.OutOfMemoryError: Java heap space > at > parquet.bytes.CapacityByteArrayOutputStream.initSlabs(CapacityByteArrayOutputStream.java:65) > at > parquet.bytes.CapacityByteArrayOutputStream.<init>(CapacityByteArrayOutputStream.java:57) > at > parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.<init>(ColumnChunkPageWriteStore.java:68) > at > parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.<init>(ColumnChunkPageWriteStore.java:48) > at > parquet.hadoop.ColumnChunkPageWriteStore.getPageWriter(ColumnChunkPageWriteStore.java:215) > at > parquet.column.impl.ColumnWriteStoreImpl.newMemColumn(ColumnWriteStoreImpl.java:67) > at > parquet.column.impl.ColumnWriteStoreImpl.getColumnWriter(ColumnWriteStoreImpl.java:56) > at > parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.<init>(MessageColumnIO.java:178) > at parquet.io.MessageColumnIO.getRecordWriter(MessageColumnIO.java:369) > at > parquet.hadoop.InternalParquetRecordWriter.initStore(InternalParquetRecordWriter.java:108) > at > parquet.hadoop.InternalParquetRecordWriter.<init>(InternalParquetRecordWriter.java:94) > at parquet.hadoop.ParquetRecordWriter.<init>(ParquetRecordWriter.java:64) > at > parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:282) > at > parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:252) > at > org.apache.spark.sql.parquet.ParquetOutputWriter.<init>(newParquet.scala:111) > at > org.apache.spark.sql.parquet.ParquetRelation2$$anon$4.newInstance(newParquet.scala:244) > at > org.apache.spark.sql.sources.DynamicPartitionWriterContainer$$anonfun$outputWriterForRow$1.apply(commands.scala:441) > at > org.apache.spark.sql.sources.DynamicPartitionWriterContainer$$anonfun$outputWriterForRow$1.apply(commands.scala:436) > at > scala.collection.mutable.MapLike$class.getOrElseUpdate(MapLike.scala:189) > at scala.collection.mutable.AbstractMap.getOrElseUpdate(Map.scala:91) > at > org.apache.spark.sql.sources.DynamicPartitionWriterContainer.outputWriterForRow(commands.scala:436) > at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.org > $apache$spark$sql$sources$InsertIntoHadoopFsRelation$$writeRows$2(commands.scala:227) > at > org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insertWithDynamicPartitions$3.apply(commands.scala:196) > at > org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insertWithDynamicPartitions$3.apply(commands.scala:196) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63) > at org.apache.spark.scheduler.Task.run(Task.scala:70) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > 2015-07-15 13:57:21,157 ERROR Logging$class Task 3 in stage 2.0 failed 1 > times; aborting job > 2015-07-15 13:57:21,194 ERROR Logging$class Aborting job. > org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 > in stage 2.0 failed 1 times, most recent failure: Lost task 3.0 in stage > 2.0 (TID 8, localhost): java.lang.OutOfMemoryError: Java heap space > at > parquet.bytes.CapacityByteArrayOutputStream.initSlabs(CapacityByteArrayOutputStream.java:65) > at > parquet.bytes.CapacityByteArrayOutputStream.<init>(CapacityByteArrayOutputStream.java:57) > at > parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.<init>(ColumnChunkPageWriteStore.java:68) > at > parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.<init>(ColumnChunkPageWriteStore.java:48) > at > parquet.hadoop.ColumnChunkPageWriteStore.getPageWriter(ColumnChunkPageWriteStore.java:215) > at > parquet.column.impl.ColumnWriteStoreImpl.newMemColumn(ColumnWriteStoreImpl.java:67) > at > parquet.column.impl.ColumnWriteStoreImpl.getColumnWriter(ColumnWriteStoreImpl.java:56) > at > parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.<init>(MessageColumnIO.java:178) > at parquet.io.MessageColumnIO.getRecordWriter(MessageColumnIO.java:369) > at > parquet.hadoop.InternalParquetRecordWriter.initStore(InternalParquetRecordWriter.java:108) > at > parquet.hadoop.InternalParquetRecordWriter.<init>(InternalParquetRecordWriter.java:94) > at parquet.hadoop.ParquetRecordWriter.<init>(ParquetRecordWriter.java:64) > at > parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:282) > at > parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:252) > at > org.apache.spark.sql.parquet.ParquetOutputWriter.<init>(newParquet.scala:111) > at > org.apache.spark.sql.parquet.ParquetRelation2$$anon$4.newInstance(newParquet.scala:244) > at > org.apache.spark.sql.sources.DynamicPartitionWriterContainer$$anonfun$outputWriterForRow$1.apply(commands.scala:441) > at > org.apache.spark.sql.sources.DynamicPartitionWriterContainer$$anonfun$outputWriterForRow$1.apply(commands.scala:436) > at > scala.collection.mutable.MapLike$class.getOrElseUpdate(MapLike.scala:189) > at scala.collection.mutable.AbstractMap.getOrElseUpdate(Map.scala:91) > at > org.apache.spark.sql.sources.DynamicPartitionWriterContainer.outputWriterForRow(commands.scala:436) > at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.org > $apache$spark$sql$sources$InsertIntoHadoopFsRelation$$writeRows$2(commands.scala:227) > at > org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insertWithDynamicPartitions$3.apply(commands.scala:196) > at > org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insertWithDynamicPartitions$3.apply(commands.scala:196) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63) > at org.apache.spark.scheduler.Task.run(Task.scala:70) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > > Driver stacktrace: > at org.apache.spark.scheduler.DAGScheduler.org > $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1266) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1257) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1256) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > at > org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1256) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730) > at scala.Option.foreach(Option.scala:236) > at > org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1450) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1411) > at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) > 2015-07-15 13:57:21,221 ERROR Logging$class Job job_201507151056_0000 > aborted. > Exception in thread "main" org.apache.spark.SparkException: Job aborted. > at > org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.insertWithDynamicPartitions(commands.scala:202) > at > org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.run(commands.scala:118) > at > org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:57) > at > org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:57) > at > org.apache.spark.sql.execution.ExecutedCommand.doExecute(commands.scala:68) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:88) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:88) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:148) > at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:87) > at > org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:939) > at > org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:939) > at org.apache.spark.sql.sources.ResolvedDataSource$.apply(ddl.scala:332) > at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:144) > at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:135) > at > org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:281) > at my.spark.test.PartitionTest.main(PartitionTest.java:147) > > > > >