GitHub user aws-awinstan opened a pull request:
https://github.com/apache/spark/pull/20826
[Spark-2489][SQL] Unsupported parquet datatype optional fixed_len_byte_array
## What changes were proposed in this pull request?
This PR adds support for reading Parquet FIXED_LENGTH_BYTE_ARRAYs as a
Binary column if no OriginalType is specified. Parquet-avro writes the Avro
fixed type as a Parquet FIXED_LENGTH_BYTE_ARRAY type. Currently when trying to
load Parquet files with a column of this type with Spark SQL it throws an
exception similar to the following:
```
Caused by: org.apache.spark.sql.AnalysisException: Illegal Parquet type:
FIXED_LEN_BYTE_ARRAY;
at
org.apache.spark.sql.execution.datasources.parquet.ParquetToSparkSchemaConverter.illegalType$1(ParquetSchemaConverter.scala:108)
at
org.apache.spark.sql.execution.datasources.parquet.ParquetToSparkSchemaConverter.convertPrimitiveField(ParquetSchemaConverter.scala:177)
at
org.apache.spark.sql.execution.datasources.parquet.ParquetToSparkSchemaConverter.convertField(ParquetSchemaConverter.scala:90)
at
org.apache.spark.sql.execution.datasources.parquet.ParquetToSparkSchemaConverter$$anonfun$2.apply(ParquetSchemaConverter.scala:72)
at
org.apache.spark.sql.execution.datasources.parquet.ParquetToSparkSchemaConverter$$anonfun$2.apply(ParquetSchemaConverter.scala:66)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at
org.apache.spark.sql.execution.datasources.parquet.ParquetToSparkSchemaConverter.org$apache$spark$sql$execution$datasources$parquet$ParquetToSparkSchemaConverter$$convert(ParquetSchemaConverter.scala:66)
at
org.apache.spark.sql.execution.datasources.parquet.ParquetToSparkSchemaConverter.convert(ParquetSchemaConverter.scala:63)
at
org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$readSchemaFromFooter$2.apply(ParquetFileFormat.scala:642)
at
org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$readSchemaFromFooter$2.apply(ParquetFileFormat.scala:642)
at scala.Option.getOrElse(Option.scala:121)
at
org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$.readSchemaFromFooter(ParquetFileFormat.scala:642)
at
org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$9.apply(ParquetFileFormat.scala:599)
at
org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$9.apply(ParquetFileFormat.scala:581)
at
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:800)
at
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:800)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
```
After this change Spark SQL is able to correctly load the Parquet files.
There was a PR to fix this 3 years ago
(https://github.com/apache/spark/pull/1737) however it was ultimately rejected
as the committer went down the path of adding a new SQL Type specifically for
FIXED_LENGTH_BYTE_ARRAYs and the maintainers believed this was too intrusive of
a change. This PR simply defaults to Binary if no OriginalType is specified. A
few updates were required to the VectorizedColumnReader to support Binary
FIXED_LENGTH_BYTE_ARRAYs.
Note: All the changes to the gen-java/* files were generated by
avro-tools-1.8.1 and the mostly documentation updates look to come from changes
in the template avro-tools uses.
## How was this patch tested?
I added a fixed attribute to the AvroPrimitives and AvroOptionalPrimitives
record types which are used by the ParquetAvroCompatibilitySuite. These values
were populated by taking the same value as other type ("val_$i"), padding it to
8 bytes (the chosen fixed length), and storing it as the fixed type. I verified
that before my fix the "required primitives" and "optional primitives" failed
with the same exception we're seeing in our clusters. After my change the tests
succeed with the expected results.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/aws-awinstan/spark master
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/spark/pull/20826.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #20826
----
commit 0b6c9e84d68dad0b7198eba68796eccc3f70b6cb
Author: Adam Winstanley <awinstan@...>
Date: 2018-03-13T23:25:27Z
[SPARK-2489][SQL] Unsupported parquet datatype optional fixed_len_byte_array
commit e5e4adeb443ce366af1f1c6c2f7e830cd6d5f187
Author: Adam Winstanley <awinstan@...>
Date: 2018-03-14T16:43:52Z
Remove unnecessary semicolon.
----
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]