Hi Meghajit,

thanks for asking. If you took a look at the source code
https://github.com/apache/flink/blob/9bbadb9b105b233b7565af120020ebd8dce69a4f/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetVectorizedInputFormat.java#L174,
you should see Parquet MessageType has been read from the footer and used.
There was a conversion from Flink GroupType to Parquet MessageType. It
might be possible to build the conversion the other way around. But the
question is about the performance, because only the required columns should
be read, therefore the column names should be given by the user. The
fieldTypes are required in case the given fields could not be found in the
parquet footer, like for example typo.

Best regards
Jing

On Thu, Jan 6, 2022 at 7:01 AM Meghajit Mazumdar <
meghajit.mazum...@gojek.com> wrote:

> Hello,
>
> We want to read and process Parquet Files using a FileSource and the 
> DataStream API.
>
>
> Currently, as referenced from the documentation 
> <https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/formats/parquet/#:~:text=contain%20event%20timestamps.-,final%20LogicalType%5B%5D%20fieldTypes%20%3D%0A%20%20new%20LogicalType%5B%5D%20%7B%0A%20%20new%20DoubleType()%2C%20new%20IntType()%2C%20new,DataStream%3CRowData%3E%20stream%20%3D%0A%20%20env.fromSource(source%2C%20WatermarkStrategy.noWatermarks()%2C%20%22file%2Dsource%22)%3B,-Continuous%20read%20example>,
>  this is the way in which a FileSource for Parquet is created. As can be 
> seen, it requires the construction of a RowType like this
>
>
> RowType*.*of*(*fieldTypes*,* *new* String*[]* *{*"f7"*,* "f4"*,* "f99”*});*
>
>
> where fieldTypes is created like this:
>
>
> *final* LogicalType*[]* fieldTypes *=*
>
>   *new* LogicalType*[]* *{*
>
>   *new* DoubleType*(),* *new* IntType*(),* *new* VarCharType*()*
>
>   *};*
>
>
> Ideally, instead of specifying the column names( f7, f99,...) and their data 
> types(DoubleType, VarCharType, ...), we would like to use the schema of the 
> Parquet File itself to create a RowType.
>
> The schema is present in the footer of the Parquet file, inside the metadata.
>
> We wanted to know if there is an easy way by which way we can convert a 
> parquet schema, i:e, *MessageType* into a Flink *RowType* directly ?
>
> The parquet schema of the file can be easily obtained by using 
> *org.apache.parquet.hadoop.ParquetFileReader* as follows:
>
>
> ParquetFileReader reader = 
> ParquetFileReader.open(HadoopInputFile.fromPath(path, conf));
>
> MessageType schema = reader.getFileMetaData().getSchema(); // this schema has 
> the field names as well as the data types of the parquet records
>
>
> As of now, because we couldn’t find a way to convert the schema into a 
> RowType directly, we resorted to writing our own custom parser to parse a 
> Parquet SimpleGroup into a Flink Row like this:
>
>
> ParquetFileReader reader = 
> ParquetFileReader.open(HadoopInputFile.fromPath(path, conf));
>
> PageReadStore nextPage = reader.readNextRowGroup();
>
> Row row = parseToRow(SimpleGroup g); // custom parser function
>
>
> Looking forward to an answer from the community. Thanks !
>
>
> Regards,
>
> Meghajit
>
>
>

Reply via email to