Parquet and repartition
Hi all. When I specify the number of partitions and save this RDD in parquet format, my app fail. For example selectTest.coalesce(28).saveAsParquetFile(hdfs://vm-clusterOutput) However, it works well if I store data in text selectTest.coalesce(28).saveAsTextFile(hdfs://vm-clusterOutput) My spark version is 1.2.1 Is this bug registered? -- Saludos. Miguel Ángel
Re: Parquet and repartition
Thanks Sean, I forgot it The ouput error is the following: java.lang.ClassCastException: scala.math.BigDecimal cannot be cast to org.apache.spark.sql.catalyst.types.decimal.Decimal at org.apache.spark.sql.parquet.MutableRowWriteSupport.consumeType(ParquetTableSupport.scala:359) at org.apache.spark.sql.parquet.MutableRowWriteSupport.write(ParquetTableSupport.scala:328) at org.apache.spark.sql.parquet.MutableRowWriteSupport.write(ParquetTableSupport.scala:314) at parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:115) at parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:81) at parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:37) at org.apache.spark.sql.parquet.InsertIntoParquetTable.org $apache$spark$sql$parquet$InsertIntoParquetTable$$writeShard$1(ParquetTableOperations.scala:308) at org.apache.spark.sql.parquet.InsertIntoParquetTable$$anonfun$saveAsHadoopFile$1.apply(ParquetTableOperations.scala:325) at org.apache.spark.sql.parquet.InsertIntoParquetTable$$anonfun$saveAsHadoopFile$1.apply(ParquetTableOperations.scala:325) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) 15/03/16 11:30:11 ERROR Executor: Exception in task 1.0 in stage 6.0 (TID 207) java.lang.ClassCastException: scala.math.BigDecimal cannot be cast to org.apache.spark.sql.catalyst.types.decimal.Decimal at org.apache.spark.sql.parquet.MutableRowWriteSupport.consumeType(ParquetTableSupport.scala:359) at org.apache.spark.sql.parquet.MutableRowWriteSupport.write(ParquetTableSupport.scala:328) at org.apache.spark.sql.parquet.MutableRowWriteSupport.write(ParquetTableSupport.scala:314) at parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:115) at parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:81) at parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:37) at org.apache.spark.sql.parquet.InsertIntoParquetTable.org $apache$spark$sql$parquet$InsertIntoParquetTable$$writeShard$1(ParquetTableOperations.scala:308) at org.apache.spark.sql.parquet.InsertIntoParquetTable$$anonfun$saveAsHadoopFile$1.apply(ParquetTableOperations.scala:325) at org.apache.spark.sql.parquet.InsertIntoParquetTable$$anonfun$saveAsHadoopFile$1.apply(ParquetTableOperations.scala:325) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) 15/03/16 11:30:11 INFO TaskSetManager: Starting task 2.0 in stage 6.0 (TID 208, localhost, ANY, 2878 bytes) 15/03/16 11:30:11 WARN TaskSetManager: Lost task 0.0 in stage 6.0 (TID 206, localhost): java.lang.ClassCastException: scala.math.BigDecimal cannot be cast to org.apache.spark.sql.catalyst.types.decimal.Decimal at org.apache.spark.sql.parquet.MutableRowWriteSupport.consumeType(ParquetTableSupport.scala:359) at org.apache.spark.sql.parquet.MutableRowWriteSupport.write(ParquetTableSupport.scala:328) at org.apache.spark.sql.parquet.MutableRowWriteSupport.write(ParquetTableSupport.scala:314) at parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:115) at parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:81) at parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:37) at org.apache.spark.sql.parquet.InsertIntoParquetTable.org $apache$spark$sql$parquet$InsertIntoParquetTable$$writeShard$1(ParquetTableOperations.scala:308) at org.apache.spark.sql.parquet.InsertIntoParquetTable$$anonfun$saveAsHadoopFile$1.apply(ParquetTableOperations.scala:325) at org.apache.spark.sql.parquet.InsertIntoParquetTable$$anonfun$saveAsHadoopFile$1.apply(ParquetTableOperations.scala:325) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) On Mon, Mar 16, 2015 at 12:19 PM, Sean Owen so...@cloudera.com wrote: You forgot to give any information about what fail means here. On Mon, Mar 16, 2015 at 11:11 AM, Masf masfwo...@gmail.com wrote: Hi all. When I specify the number of partitions and save this RDD in parquet format, my app fail. For example
Re: Parquet and repartition
Hey Masf, I’ve created SPARK-6360 https://issues.apache.org/jira/browse/SPARK-6360 to track this issue. Detailed analysis is provided there. The TL;DR is, for Spark 1.1 and 1.2, if a SchemaRDD contains decimal or UDT column(s), after applying any traditional RDD transformations (e.g. repartition, coalesce, distinct, …), calling saveAsParquetFile may trigger this issue. Fortunately, Spark 1.3 isn’t affected as we replaced SchemaRDD with DataFrame, which properly handles this case. Cheng On 3/16/15 7:30 PM, Masf wrote: Thanks Sean, I forgot it The ouput error is the following: java.lang.ClassCastException: scala.math.BigDecimal cannot be cast to org.apache.spark.sql.catalyst.types.decimal.Decimal at org.apache.spark.sql.parquet.MutableRowWriteSupport.consumeType(ParquetTableSupport.scala:359) at org.apache.spark.sql.parquet.MutableRowWriteSupport.write(ParquetTableSupport.scala:328) at org.apache.spark.sql.parquet.MutableRowWriteSupport.write(ParquetTableSupport.scala:314) at parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:115) at parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:81) at parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:37) at org.apache.spark.sql.parquet.InsertIntoParquetTable.org http://org.apache.spark.sql.parquet.InsertIntoParquetTable.org$apache$spark$sql$parquet$InsertIntoParquetTable$writeShard$1(ParquetTableOperations.scala:308) at org.apache.spark.sql.parquet.InsertIntoParquetTable$anonfun$saveAsHadoopFile$1.apply(ParquetTableOperations.scala:325) at org.apache.spark.sql.parquet.InsertIntoParquetTable$anonfun$saveAsHadoopFile$1.apply(ParquetTableOperations.scala:325) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) 15/03/16 11:30:11 ERROR Executor: Exception in task 1.0 in stage 6.0 (TID 207) java.lang.ClassCastException: scala.math.BigDecimal cannot be cast to org.apache.spark.sql.catalyst.types.decimal.Decimal at org.apache.spark.sql.parquet.MutableRowWriteSupport.consumeType(ParquetTableSupport.scala:359) at org.apache.spark.sql.parquet.MutableRowWriteSupport.write(ParquetTableSupport.scala:328) at org.apache.spark.sql.parquet.MutableRowWriteSupport.write(ParquetTableSupport.scala:314) at parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:115) at parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:81) at parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:37) at org.apache.spark.sql.parquet.InsertIntoParquetTable.org http://org.apache.spark.sql.parquet.InsertIntoParquetTable.org$apache$spark$sql$parquet$InsertIntoParquetTable$writeShard$1(ParquetTableOperations.scala:308) at org.apache.spark.sql.parquet.InsertIntoParquetTable$anonfun$saveAsHadoopFile$1.apply(ParquetTableOperations.scala:325) at org.apache.spark.sql.parquet.InsertIntoParquetTable$anonfun$saveAsHadoopFile$1.apply(ParquetTableOperations.scala:325) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) 15/03/16 11:30:11 INFO TaskSetManager: Starting task 2.0 in stage 6.0 (TID 208, localhost, ANY, 2878 bytes) 15/03/16 11:30:11 WARN TaskSetManager: Lost task 0.0 in stage 6.0 (TID 206, localhost): java.lang.ClassCastException: scala.math.BigDecimal cannot be cast to org.apache.spark.sql.catalyst.types.decimal.Decimal at org.apache.spark.sql.parquet.MutableRowWriteSupport.consumeType(ParquetTableSupport.scala:359) at org.apache.spark.sql.parquet.MutableRowWriteSupport.write(ParquetTableSupport.scala:328) at org.apache.spark.sql.parquet.MutableRowWriteSupport.write(ParquetTableSupport.scala:314) at parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:115) at parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:81) at parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:37) at org.apache.spark.sql.parquet.InsertIntoParquetTable.org http://org.apache.spark.sql.parquet.InsertIntoParquetTable.org$apache$spark$sql$parquet$InsertIntoParquetTable$writeShard$1(ParquetTableOperations.scala:308) at org.apache.spark.sql.parquet.InsertIntoParquetTable$anonfun$saveAsHadoopFile$1.apply(ParquetTableOperations.scala:325) at