I am trying to load data from csv format into parquet using Spark SQL.
It consistently runs out of memory.
The environment is:
* standalone cluster using HDFS and Hive metastore from HDP2.0
* spark1.1.0
* parquet jar files (v1.5) explicitly added when starting spark-sql.
* 20 workers - ec2 r3.large - set with SPARK_DAEMON_MEMORY of 10g
* 1 master - ec2 r3xlarge
The input is split across 12 files:
hdfs dfs -ls /tpcds/fcsv/catalog_returns
Found 12 items
-rw-r--r-- 3 spark hdfs 282305091 2014-09-22 11:31
/tpcds/fcsv/catalog_returns/000000_0
-rw-r--r-- 3 spark hdfs 282037998 2014-09-22 11:31
/tpcds/fcsv/catalog_returns/000001_0
-rw-r--r-- 3 spark hdfs 276284419 2014-09-22 11:31
/tpcds/fcsv/catalog_returns/000002_0
-rw-r--r-- 3 spark hdfs 269675703 2014-09-22 11:31
/tpcds/fcsv/catalog_returns/000003_0
-rw-r--r-- 3 spark hdfs 269673166 2014-09-22 11:31
/tpcds/fcsv/catalog_returns/000004_0
-rw-r--r-- 3 spark hdfs 269678197 2014-09-22 11:31
/tpcds/fcsv/catalog_returns/000005_0
-rw-r--r-- 3 spark hdfs 153478133 2014-09-22 11:31
/tpcds/fcsv/catalog_returns/000006_0
-rw-r--r-- 3 spark hdfs 147586385 2014-09-22 11:31
/tpcds/fcsv/catalog_returns/000007_0
-rw-r--r-- 3 spark hdfs 147542545 2014-09-22 11:31
/tpcds/fcsv/catalog_returns/000008_0
-rw-r--r-- 3 spark hdfs 141161085 2014-09-22 11:31
/tpcds/fcsv/catalog_returns/000009_0
-rw-r--r-- 3 spark hdfs 12110104 2014-09-22 11:31
/tpcds/fcsv/catalog_returns/000010_0
-rw-r--r-- 3 spark hdfs 6374442 2014-09-22 11:31
/tpcds/fcsv/catalog_returns/000011_0
The failure stack from spark-sql is this:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in
stage 0.0 failed 1 times, most recent failure: Lost task 1.0 in stage 0.0 (TID
1, localhost): java.lang.OutOfMemoryError: Java heap space
parquet.bytes.CapacityByteArrayOutputStream.addSlab(CapacityByteArrayOutputStream.java:97)
parquet.bytes.CapacityByteArrayOutputStream.write(CapacityByteArrayOutputStream.java:124)
parquet.bytes.CapacityByteArrayOutputStream.writeTo(CapacityByteArrayOutputStream.java:146)
parquet.bytes.BytesInput$CapacityBAOSBytesInput.writeAllTo(BytesInput.java:308)
parquet.bytes.BytesInput$SequenceBytesIn.writeAllTo(BytesInput.java:233)
parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.writePage(ColumnChunkPageWriteStore.java:84)
parquet.column.impl.ColumnWriterImpl.writePage(ColumnWriterImpl.java:119)
parquet.column.impl.ColumnWriterImpl.accountForValueWritten(ColumnWriterImpl.java:108)
parquet.column.impl.ColumnWriterImpl.write(ColumnWriterImpl.java:148)
parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.addDouble(MessageColumnIO.java:306)
org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriter.writePrimitive(DataWritableWriter.java:133)
org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriter.writeData(DataWritableWriter.java:75)
org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriter.write(DataWritableWriter.java:55)
org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriteSupport.write(DataWritableWriteSupport.java:59)
org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriteSupport.write(DataWritableWriteSupport.java:31)
parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:115)
parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:81)
parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:37)
org.apache.hadoop.hive.ql.io.parquet.write.ParquetRecordWriterWrapper.write(ParquetRecordWriterWrapper.java:77)
org.apache.hadoop.hive.ql.io.parquet.write.ParquetRecordWriterWrapper.write(ParquetRecordWriterWrapper.java:90)
org.apache.spark.sql.hive.SparkHiveHadoopWriter.write(SparkHadoopWriter.scala:98)
org.apache.spark.sql.hive.execution.InsertIntoHiveTable.org$apache$spark$sql$hive$execution$InsertIntoHiveTable$$writeToFile$1(InsertIntoHiveTable.scala:151)
org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHiveFile$1.apply(InsertIntoHiveTable.scala:158)
org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHiveFile$1.apply(InsertIntoHiveTable.scala:158)
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
org.apache.spark.scheduler.Task.run(Task.scala:54)
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
java.lang.Thread.run(Thread.java:745)
Am I missing something? Is this a case of "wrong tool for the job"?
Regards,
dd