Ηι Lian,

Thank you for the tip. Indeed, there were a lot of distinct values in my
result set (approximately 3000). As you suggested i decided to partition
the data firstly on a column with much smaller cardinality.
Thanks

n

On Thu, Jul 16, 2015 at 2:09 PM, Cheng Lian <lian.cs....@gmail.com> wrote:

>  Hi Nikos,
>
> How many columns and distinct values of "some_column" are there in the
> DataFrame? Parquet writer is known to be very memory consuming for wide
> tables. And lots of distinct partition column values result in many
> concurrent Parquet writers. One possible workaround is to first repartition
> the data by partition columns first.
>
> Cheng
>
>
> On 7/15/15 7:05 PM, Nikos Viorres wrote:
>
> Hi,
>
>  I am trying to test partitioning for DataFrames with parquet usage so i
> attempted to do df.write().partitionBy("some_column").parquet(path) on a
> small dataset of 20.000 records which when saved as parquet locally with
> gzip take 4mb of disk space.
> However, on my dev machine with
> -Dspark.master=local[4] -Dspark.executor.memory=2g -Xmx10g this always
> fails with an OutOfMemoryError.
> Does anyone have any ideas?
>
>  stack trace:
>  [Stage 2:>                                                          (0 +
> 4) / 8]2015-07-15 13:57:21,021 ERROR Logging$class Exception in task 3.0 in
> stage 2.0 (TID 8)
> java.lang.OutOfMemoryError: Java heap space
>  at
> parquet.bytes.CapacityByteArrayOutputStream.initSlabs(CapacityByteArrayOutputStream.java:65)
>  at
> parquet.bytes.CapacityByteArrayOutputStream.<init>(CapacityByteArrayOutputStream.java:57)
>  at
> parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.<init>(ColumnChunkPageWriteStore.java:68)
>  at
> parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.<init>(ColumnChunkPageWriteStore.java:48)
>  at
> parquet.hadoop.ColumnChunkPageWriteStore.getPageWriter(ColumnChunkPageWriteStore.java:215)
>  at
> parquet.column.impl.ColumnWriteStoreImpl.newMemColumn(ColumnWriteStoreImpl.java:67)
>  at
> parquet.column.impl.ColumnWriteStoreImpl.getColumnWriter(ColumnWriteStoreImpl.java:56)
>  at
> parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.<init>(MessageColumnIO.java:178)
>  at parquet.io.MessageColumnIO.getRecordWriter(MessageColumnIO.java:369)
>  at
> parquet.hadoop.InternalParquetRecordWriter.initStore(InternalParquetRecordWriter.java:108)
>  at
> parquet.hadoop.InternalParquetRecordWriter.<init>(InternalParquetRecordWriter.java:94)
>  at parquet.hadoop.ParquetRecordWriter.<init>(ParquetRecordWriter.java:64)
>  at
> parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:282)
>  at
> parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:252)
>  at
> org.apache.spark.sql.parquet.ParquetOutputWriter.<init>(newParquet.scala:111)
>  at
> org.apache.spark.sql.parquet.ParquetRelation2$$anon$4.newInstance(newParquet.scala:244)
>  at
> org.apache.spark.sql.sources.DynamicPartitionWriterContainer$$anonfun$outputWriterForRow$1.apply(commands.scala:441)
>  at
> org.apache.spark.sql.sources.DynamicPartitionWriterContainer$$anonfun$outputWriterForRow$1.apply(commands.scala:436)
>  at
> scala.collection.mutable.MapLike$class.getOrElseUpdate(MapLike.scala:189)
>  at scala.collection.mutable.AbstractMap.getOrElseUpdate(Map.scala:91)
>  at
> org.apache.spark.sql.sources.DynamicPartitionWriterContainer.outputWriterForRow(commands.scala:436)
>  at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.org
> $apache$spark$sql$sources$InsertIntoHadoopFsRelation$$writeRows$2(commands.scala:227)
>  at
> org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insertWithDynamicPartitions$3.apply(commands.scala:196)
>  at
> org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insertWithDynamicPartitions$3.apply(commands.scala:196)
>  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
>  at org.apache.spark.scheduler.Task.run(Task.scala:70)
>  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>  at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>  at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>  at java.lang.Thread.run(Thread.java:745)
> 2015-07-15 13:57:21,051 ERROR Logging$class Uncaught exception in thread
> Thread[Executor task launch worker-2,5,main]
> java.lang.OutOfMemoryError: Java heap space
>  at
> parquet.bytes.CapacityByteArrayOutputStream.initSlabs(CapacityByteArrayOutputStream.java:65)
>  at
> parquet.bytes.CapacityByteArrayOutputStream.<init>(CapacityByteArrayOutputStream.java:57)
>  at
> parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.<init>(ColumnChunkPageWriteStore.java:68)
>  at
> parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.<init>(ColumnChunkPageWriteStore.java:48)
>  at
> parquet.hadoop.ColumnChunkPageWriteStore.getPageWriter(ColumnChunkPageWriteStore.java:215)
>  at
> parquet.column.impl.ColumnWriteStoreImpl.newMemColumn(ColumnWriteStoreImpl.java:67)
>  at
> parquet.column.impl.ColumnWriteStoreImpl.getColumnWriter(ColumnWriteStoreImpl.java:56)
>  at
> parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.<init>(MessageColumnIO.java:178)
>  at parquet.io.MessageColumnIO.getRecordWriter(MessageColumnIO.java:369)
>  at
> parquet.hadoop.InternalParquetRecordWriter.initStore(InternalParquetRecordWriter.java:108)
>  at
> parquet.hadoop.InternalParquetRecordWriter.<init>(InternalParquetRecordWriter.java:94)
>  at parquet.hadoop.ParquetRecordWriter.<init>(ParquetRecordWriter.java:64)
>  at
> parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:282)
>  at
> parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:252)
>  at
> org.apache.spark.sql.parquet.ParquetOutputWriter.<init>(newParquet.scala:111)
>  at
> org.apache.spark.sql.parquet.ParquetRelation2$$anon$4.newInstance(newParquet.scala:244)
>  at
> org.apache.spark.sql.sources.DynamicPartitionWriterContainer$$anonfun$outputWriterForRow$1.apply(commands.scala:441)
>  at
> org.apache.spark.sql.sources.DynamicPartitionWriterContainer$$anonfun$outputWriterForRow$1.apply(commands.scala:436)
>  at
> scala.collection.mutable.MapLike$class.getOrElseUpdate(MapLike.scala:189)
>  at scala.collection.mutable.AbstractMap.getOrElseUpdate(Map.scala:91)
>  at
> org.apache.spark.sql.sources.DynamicPartitionWriterContainer.outputWriterForRow(commands.scala:436)
>  at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.org
> $apache$spark$sql$sources$InsertIntoHadoopFsRelation$$writeRows$2(commands.scala:227)
>  at
> org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insertWithDynamicPartitions$3.apply(commands.scala:196)
>  at
> org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insertWithDynamicPartitions$3.apply(commands.scala:196)
>  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
>  at org.apache.spark.scheduler.Task.run(Task.scala:70)
>  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>  at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>  at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>  at java.lang.Thread.run(Thread.java:745)
> 2015-07-15 13:57:21,157 ERROR Logging$class Task 3 in stage 2.0 failed 1
> times; aborting job
> 2015-07-15 13:57:21,194 ERROR Logging$class Aborting job.
> org.apache.spark.SparkException: Job aborted due to stage failure: Task 3
> in stage 2.0 failed 1 times, most recent failure: Lost task 3.0 in stage
> 2.0 (TID 8, localhost): java.lang.OutOfMemoryError: Java heap space
>  at
> parquet.bytes.CapacityByteArrayOutputStream.initSlabs(CapacityByteArrayOutputStream.java:65)
>  at
> parquet.bytes.CapacityByteArrayOutputStream.<init>(CapacityByteArrayOutputStream.java:57)
>  at
> parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.<init>(ColumnChunkPageWriteStore.java:68)
>  at
> parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.<init>(ColumnChunkPageWriteStore.java:48)
>  at
> parquet.hadoop.ColumnChunkPageWriteStore.getPageWriter(ColumnChunkPageWriteStore.java:215)
>  at
> parquet.column.impl.ColumnWriteStoreImpl.newMemColumn(ColumnWriteStoreImpl.java:67)
>  at
> parquet.column.impl.ColumnWriteStoreImpl.getColumnWriter(ColumnWriteStoreImpl.java:56)
>  at
> parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.<init>(MessageColumnIO.java:178)
>  at parquet.io.MessageColumnIO.getRecordWriter(MessageColumnIO.java:369)
>  at
> parquet.hadoop.InternalParquetRecordWriter.initStore(InternalParquetRecordWriter.java:108)
>  at
> parquet.hadoop.InternalParquetRecordWriter.<init>(InternalParquetRecordWriter.java:94)
>  at parquet.hadoop.ParquetRecordWriter.<init>(ParquetRecordWriter.java:64)
>  at
> parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:282)
>  at
> parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:252)
>  at
> org.apache.spark.sql.parquet.ParquetOutputWriter.<init>(newParquet.scala:111)
>  at
> org.apache.spark.sql.parquet.ParquetRelation2$$anon$4.newInstance(newParquet.scala:244)
>  at
> org.apache.spark.sql.sources.DynamicPartitionWriterContainer$$anonfun$outputWriterForRow$1.apply(commands.scala:441)
>  at
> org.apache.spark.sql.sources.DynamicPartitionWriterContainer$$anonfun$outputWriterForRow$1.apply(commands.scala:436)
>  at
> scala.collection.mutable.MapLike$class.getOrElseUpdate(MapLike.scala:189)
>  at scala.collection.mutable.AbstractMap.getOrElseUpdate(Map.scala:91)
>  at
> org.apache.spark.sql.sources.DynamicPartitionWriterContainer.outputWriterForRow(commands.scala:436)
>  at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.org
> $apache$spark$sql$sources$InsertIntoHadoopFsRelation$$writeRows$2(commands.scala:227)
>  at
> org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insertWithDynamicPartitions$3.apply(commands.scala:196)
>  at
> org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insertWithDynamicPartitions$3.apply(commands.scala:196)
>  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
>  at org.apache.spark.scheduler.Task.run(Task.scala:70)
>  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>  at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>  at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>  at java.lang.Thread.run(Thread.java:745)
>
>  Driver stacktrace:
>  at org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1266)
>  at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1257)
>  at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1256)
>  at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>  at
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1256)
>  at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
>  at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
>  at scala.Option.foreach(Option.scala:236)
>  at
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730)
>  at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1450)
>  at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1411)
>  at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
> 2015-07-15 13:57:21,221 ERROR Logging$class Job job_201507151056_0000
> aborted.
> Exception in thread "main" org.apache.spark.SparkException: Job aborted.
>  at
> org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.insertWithDynamicPartitions(commands.scala:202)
>  at
> org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.run(commands.scala:118)
>  at
> org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:57)
>  at
> org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:57)
>  at
> org.apache.spark.sql.execution.ExecutedCommand.doExecute(commands.scala:68)
>  at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:88)
>  at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:88)
>  at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:148)
>  at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:87)
>  at
> org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:939)
>  at
> org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:939)
>  at org.apache.spark.sql.sources.ResolvedDataSource$.apply(ddl.scala:332)
>  at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:144)
>  at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:135)
>  at
> org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:281)
>  at my.spark.test.PartitionTest.main(PartitionTest.java:147)
>
>
>
>
>

Reply via email to