GitHub user aws-awinstan opened a pull request:

    https://github.com/apache/spark/pull/20826

    [Spark-2489][SQL] Unsupported parquet datatype optional fixed_len_byte_array

    ## What changes were proposed in this pull request?
    This PR adds support for reading Parquet FIXED_LENGTH_BYTE_ARRAYs as a 
Binary column if no OriginalType is specified. Parquet-avro writes the Avro 
fixed type as a Parquet FIXED_LENGTH_BYTE_ARRAY type. Currently when trying to 
load Parquet files with a column of this type with Spark SQL it throws an 
exception similar to the following:
    
    ```
    Caused by: org.apache.spark.sql.AnalysisException: Illegal Parquet type: 
FIXED_LEN_BYTE_ARRAY;
        at 
org.apache.spark.sql.execution.datasources.parquet.ParquetToSparkSchemaConverter.illegalType$1(ParquetSchemaConverter.scala:108)
        at 
org.apache.spark.sql.execution.datasources.parquet.ParquetToSparkSchemaConverter.convertPrimitiveField(ParquetSchemaConverter.scala:177)
        at 
org.apache.spark.sql.execution.datasources.parquet.ParquetToSparkSchemaConverter.convertField(ParquetSchemaConverter.scala:90)
        at 
org.apache.spark.sql.execution.datasources.parquet.ParquetToSparkSchemaConverter$$anonfun$2.apply(ParquetSchemaConverter.scala:72)
        at 
org.apache.spark.sql.execution.datasources.parquet.ParquetToSparkSchemaConverter$$anonfun$2.apply(ParquetSchemaConverter.scala:66)
        at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.Iterator$class.foreach(Iterator.scala:893)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
        at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
        at scala.collection.AbstractTraversable.map(Traversable.scala:104)
        at 
org.apache.spark.sql.execution.datasources.parquet.ParquetToSparkSchemaConverter.org$apache$spark$sql$execution$datasources$parquet$ParquetToSparkSchemaConverter$$convert(ParquetSchemaConverter.scala:66)
        at 
org.apache.spark.sql.execution.datasources.parquet.ParquetToSparkSchemaConverter.convert(ParquetSchemaConverter.scala:63)
        at 
org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$readSchemaFromFooter$2.apply(ParquetFileFormat.scala:642)
        at 
org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$readSchemaFromFooter$2.apply(ParquetFileFormat.scala:642)
        at scala.Option.getOrElse(Option.scala:121)
        at 
org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$.readSchemaFromFooter(ParquetFileFormat.scala:642)
        at 
org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$9.apply(ParquetFileFormat.scala:599)
        at 
org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$9.apply(ParquetFileFormat.scala:581)
        at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:800)
        at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:800)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
        at org.apache.spark.scheduler.Task.run(Task.scala:109)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
    ```
    
    After this change Spark SQL is able to correctly load the Parquet files. 
There was a PR to fix this 3 years ago 
(https://github.com/apache/spark/pull/1737) however it was ultimately rejected 
as the committer went down the path of adding a new SQL Type specifically for 
FIXED_LENGTH_BYTE_ARRAYs and the maintainers believed this was too intrusive of 
a change. This PR simply defaults to Binary if no OriginalType is specified. A 
few updates were required to the VectorizedColumnReader to support Binary 
FIXED_LENGTH_BYTE_ARRAYs.
    
    Note: All the changes to the gen-java/* files were generated by 
avro-tools-1.8.1 and the mostly documentation updates look to come from changes 
in the template avro-tools uses.
    
    ## How was this patch tested?
    
    I added a fixed attribute to the AvroPrimitives and AvroOptionalPrimitives 
record types which are used by the ParquetAvroCompatibilitySuite. These values 
were populated by taking the same value as other type ("val_$i"), padding it to 
8 bytes (the chosen fixed length), and storing it as the fixed type. I verified 
that before my fix the "required primitives" and "optional primitives" failed 
with the same exception we're seeing in our clusters. After my change the tests 
succeed with the expected results.


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/aws-awinstan/spark master

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/20826.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #20826
    
----
commit 0b6c9e84d68dad0b7198eba68796eccc3f70b6cb
Author: Adam Winstanley <awinstan@...>
Date:   2018-03-13T23:25:27Z

    [SPARK-2489][SQL] Unsupported parquet datatype optional fixed_len_byte_array

commit e5e4adeb443ce366af1f1c6c2f7e830cd6d5f187
Author: Adam Winstanley <awinstan@...>
Date:   2018-03-14T16:43:52Z

    Remove unnecessary semicolon.

----


---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to