Github user mallman commented on a diff in the pull request:
https://github.com/apache/spark/pull/16578#discussion_r148863673
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
---
@@ -63,9 +74,22 @@ private[parquet] class ParquetReadSupport extends
ReadSupport[UnsafeRow] with Lo
StructType.fromString(schemaString)
}
- val parquetRequestedSchema =
+ val clippedParquetSchema =
ParquetReadSupport.clipParquetSchema(context.getFileSchema,
catalystRequestedSchema)
+ val parquetRequestedSchema = if (parquetMrCompatibility) {
+ // Parquet-mr will throw an exception if we try to read a superset
of the file's schema.
+ // Therefore, we intersect our clipped schema with the underlying
file's schema
--- End diff --
As for the problem of requesting a read of a superset of a file's fields,
if we disable the `parquetMrCompatibility` code, then the "partial schema
intersection - select missing subfield" test in
`ParquetSchemaPruningSuite.scala` fails with the following stack trace:
```
[info] Cause: org.apache.parquet.io.ParquetDecodingException: Can not
read value at 0 in block -1 in file
file:/Volumes/VideoAmpCS/msa/workspace/spark-public/target/tmp/spark-a0bda193-9d3f-4cd1-885c-9e8b5b0fc1ed/contacts/p=2/part-00001-4a8671f1-afb2-482f-8c4d-4f6f4df896bc-c000.snappy.parquet
[info] at
org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:223)
[info] at
org.apache.parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:215)
[info] at
org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
[info] at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
[info] at
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:106)
[info] at
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:182)
[info] at
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:106)
[info] at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
Source)
[info] at
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
[info] at
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$7$$anon$1.hasNext(WholeStageCodegenExec.scala:432)
[info] at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
[info] at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
[info] at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1820)
[info] at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1160)
[info] at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1160)
[info] at
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2064)
[info] at
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2064)
[info] at
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
[info] at org.apache.spark.scheduler.Task.run(Task.scala:108)
[info] at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
[info] at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
[info] at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
[info] at java.lang.Thread.run(Thread.java:748)
[info] Cause: java.lang.IndexOutOfBoundsException: Index: 0, Size: 0
[info] at java.util.ArrayList.rangeCheck(ArrayList.java:653)
[info] at java.util.ArrayList.get(ArrayList.java:429)
[info] at
org.apache.parquet.io.GroupColumnIO.getFirst(GroupColumnIO.java:103)
[info] at
org.apache.parquet.io.GroupColumnIO.getFirst(GroupColumnIO.java:103)
[info] at
org.apache.parquet.io.PrimitiveColumnIO.getFirst(PrimitiveColumnIO.java:102)
[info] at
org.apache.parquet.io.PrimitiveColumnIO.isFirst(PrimitiveColumnIO.java:97)
[info] at
org.apache.parquet.io.RecordReaderImplementation.<init>(RecordReaderImplementation.java:278)
[info] at
org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:141)
[info] at
org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:107)
[info] at
org.apache.parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:155)
[info] at
org.apache.parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:107)
[info] at
org.apache.parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:136)
[info] at
org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:194)
[info] at
org.apache.parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:215)
[info] at
org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
[info] at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
[info] at
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:106)
[info] at
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:182)
[info] at
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:106)
[info] at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
Source)
[info] at
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
[info] at
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$7$$anon$1.hasNext(WholeStageCodegenExec.scala:432)
[info] at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
[info] at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
[info] at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1820)
[info] at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1160)
[info] at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1160)
[info] at
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2064)
[info] at
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2064)
[info] at
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
[info] at org.apache.spark.scheduler.Task.run(Task.scala:108)
[info] at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
[info] at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
[info] at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
[info] at java.lang.Thread.run(Thread.java:748)
```
As you can see, the problem comes when attempting the read. The problem is
deep within the logic of the Parquet record reader code and is not something I
want to dig into.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]