I am trying to read a parquet file into a datastream and then register that
stream as a temporary table. This file is created by spark 2.4 in HDFS on AWS
EMR. I am using flink version 1.10.0 with EMR 5.30.
I am getting the following error:
Caused by: org.apache.flink.streaming.runtime.tasks.AsynchronousException:
Caught exception when processing split: null
at
org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:1090)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:1058)
at
org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:351)
Caused by: java.lang.ClassCastException: Expected instance of group converter
but got
"org.apache.flink.formats.parquet.utils.RowConverter$RowPrimitiveConverter"
at
org.apache.parquet.io.api.Converter.asGroupConverter(Converter.java:34)
at
org.apache.parquet.io.RecordReaderImplementation.<init>(RecordReaderImplementation.java:267)
at
org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:147)
at
org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:109)
at
org.apache.parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:165)
at
org.apache.parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:109)
at
org.apache.flink.formats.parquet.utils.ParquetRecordReader.createRecordReader(ParquetRecordReader.java:118)
at
org.apache.flink.formats.parquet.utils.ParquetRecordReader.readNextRecord(ParquetRecordReader.java:227)
at
org.apache.flink.formats.parquet.utils.ParquetRecordReader.reachEnd(ParquetRecordReader.java:207)
at
org.apache.flink.formats.parquet.ParquetInputFormat.reachedEnd(ParquetInputFormat.java:233)
at
org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:327)
Below is a snippet of code that shows how I am trying to read the parquet file:
String filePath = "hdfs:///path/to/single/file.parquet";
ParquetFileReader reader =
ParquetFileReader.open(HadoopInputFile.fromPath(new
org.apache.hadoop.fs.Path(filePath), new Configuration()));
MessageType schema = reader.getFooter().getFileMetaData().getSchema();
String parquetPath = "hdfs:///path/to/parquet/directory”;
DataStream<Row> parquetStream = env.readFile(new ParquetRowInputFormat(new
org.apache.flink.core.fs.Path(parquetPath), schema), parquetPath);
Table parquetTable = tEnv.fromDataStream(parquetStream);
tEnv.createTemporaryView("isession", parquetTable);
Thanks,
Jesse