AngersZhuuuu opened a new pull request #34308:
URL: https://github.com/apache/spark/pull/34308
### What changes were proposed in this pull request?
When we use parquet, found vectorized read won't show error message about
failed read parquet file path, it cause different to find out which file have
problem.
None-vectorized parquet reader
```
Exception: Encounter error while reading parquet files. One possible cause:
Parquet column cannot be converted in the corresponding files. Details:
at
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:193)
at
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:101)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:462)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at
org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
at org.apache.spark.scheduler.Task.run(Task.scala:123)
at
org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.parquet.io.ParquetDecodingException: Can not read
value at 1 in block 0 in file
hdfs://R2/projects/data_notificationmart/dwd_traceid_sent_civ_first_di/tz_type=local/grass_region=TW/grass_date=2021-10-13/noti_type=AR/part-00013-22bdd509-4469-47f7-a37e-50fddd4266a7-c000.zstd.parquet
at
org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:251)
at
org.apache.parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:207)
at
org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:101)
at
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:181)
... 15 more
```
Vectorize parquet reader
```
21/10/15 18:01:54 WARN TaskSetManager: Lost task 1881.0 in stage 16.0 (TID
10380, ip-10-130-169-140.idata-server.shopee.io, executor 168): TaskKilled
(Stage cancelled)
: An error occurred while calling o362.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task
963 in stage 17.0 failed 4 times, most recent failure: Lost task 963.3 in stage
17.0 (TID 10351, ip-10-130-75-201.idata-server.shopee.io, executor 99):
java.lang.UnsupportedOperationException:
org.apache.parquet.column.values.dictionary.PlainValuesDictionary$PlainIntegerDictionary
at org.apache.parquet.column.Dictionary.decodeToLong(Dictionary.java:49)
at
org.apache.spark.sql.execution.datasources.parquet.ParquetDictionary.decodeToLong(ParquetDictionary.java:36)
at
org.apache.spark.sql.execution.vectorized.OnHeapColumnVector.getLong(OnHeapColumnVector.java:364)
at
org.apache.spark.sql.execution.vectorized.MutableColumnarRow.getLong(MutableColumnarRow.java:120)
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.writeFields_0_0$(Unknown
Source)
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown
Source)
at
org.apache.spark.sql.execution.FileSourceScanExec$$anonfun$doExecute$2$$anonfun$apply$2.apply(DataSourceScanExec.scala:351)
at
org.apache.spark.sql.execution.FileSourceScanExec$$anonfun$doExecute$2$$anonfun$apply$2.apply(DataSourceScanExec.scala:349)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:463)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at
org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
at org.apache.spark.scheduler.Task.run(Task.scala:123)
at
org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
```
This is caused in vectorized reader, it use `Dictionary` don't have file
information,
```
public abstract class Dictionary {
private final Encoding encoding;
public Dictionary(Encoding encoding) {
this.encoding = encoding;
}
public Encoding getEncoding() {
return this.encoding;
}
public abstract int getMaxId();
public Binary decodeToBinary(int id) {
throw new UnsupportedOperationException(this.getClass().getName());
}
public int decodeToInt(int id) {
throw new UnsupportedOperationException(this.getClass().getName());
}
public long decodeToLong(int id) {
throw new UnsupportedOperationException(this.getClass().getName());
}
public float decodeToFloat(int id) {
throw new UnsupportedOperationException(this.getClass().getName());
}
public double decodeToDouble(int id) {
throw new UnsupportedOperationException(this.getClass().getName());
}
public boolean decodeToBoolean(int id) {
throw new UnsupportedOperationException(this.getClass().getName());
}
}
```
and spark have wrapper a `ParquetDictionary` so we can do this in
`ParquetDictionary`, to add file information in error message.
### Why are the changes needed?
Improve error message to help make sure which file have problem.
### Does this PR introduce _any_ user-facing change?
User can know which file failed to read when use parquet vectorized reader
### How was this patch tested?
WIP
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]