Hi Meghajit,

good catch! Thanks for correcting me. The question is about how to use
column-oriented storage format like Parquet. What I tried to explain was
that the original MessageType has been used to build a projected
MessageType, since only required columns should be read. Without the input
from the user, there is no way to build the projected schema except read
all columns. Even if we could convert the MessageType to RowType, we would
still need the user's input. The fieldTypes are therefore (mandatorily)
required with current implementation because, when the given fields could
not be found *by the ParquetVectorizedInputFormat *in the parquet footer, a
type info is still needed to build the projected schema.

Best regards
Jing

On Thu, Jan 6, 2022 at 12:38 PM Meghajit Mazumdar <
meghajit.mazum...@gojek.com> wrote:

> Hi Jing,
>
> Thanks for the reply.
> Had 2 doubts related to your answer :
>
> 1. There was a conversion from Flink GroupType to Parquet MessageType. It
> might be possible to build the conversion the other way around.
> -> Both GroupType and MessageType are parquet data structures I believe,
> present in the org.apache.parquet.schema package. I am actually looking if
> it is possible to convert it into a Flink data type, such as RowType.
>
> 2. The fieldTypes are required in case the given fields could not be found
> in the parquet footer, like for example typo.
> -> Does this mean that fieldTypes are not required to be given during the
> construction of RowType ? I tried leaving it empty as below, but it gave an
> exception *Caused by: java.lang.ClassNotFoundException:
> org.apache.flink.table.data.vector.ColumnVector*
>
>             final ParquetColumnarRowInputFormat<FileSourceSplit> format =
>                     new ParquetColumnarRowInputFormat<>(
>                             new Configuration(),
>                             RowType.of(new LogicalType[]{}, new
> String[]{"field_name_1", "field_name_2"}),
>                             500,
>                             false,
>                             true);
>
> Regards,
> Meghajit
>
> On Thu, Jan 6, 2022 at 3:43 PM Jing Ge <j...@ververica.com> wrote:
>
>> Hi Meghajit,
>>
>> thanks for asking. If you took a look at the source code
>> https://github.com/apache/flink/blob/9bbadb9b105b233b7565af120020ebd8dce69a4f/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetVectorizedInputFormat.java#L174,
>> you should see Parquet MessageType has been read from the footer and used.
>> There was a conversion from Flink GroupType to Parquet MessageType. It
>> might be possible to build the conversion the other way around. But the
>> question is about the performance, because only the required columns should
>> be read, therefore the column names should be given by the user. The
>> fieldTypes are required in case the given fields could not be found in the
>> parquet footer, like for example typo.
>>
>> Best regards
>> Jing
>>
>> On Thu, Jan 6, 2022 at 7:01 AM Meghajit Mazumdar <
>> meghajit.mazum...@gojek.com> wrote:
>>
>>> Hello,
>>>
>>> We want to read and process Parquet Files using a FileSource and the 
>>> DataStream API.
>>>
>>>
>>> Currently, as referenced from the documentation 
>>> <https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/formats/parquet/#:~:text=contain%20event%20timestamps.-,final%20LogicalType%5B%5D%20fieldTypes%20%3D%0A%20%20new%20LogicalType%5B%5D%20%7B%0A%20%20new%20DoubleType()%2C%20new%20IntType()%2C%20new,DataStream%3CRowData%3E%20stream%20%3D%0A%20%20env.fromSource(source%2C%20WatermarkStrategy.noWatermarks()%2C%20%22file%2Dsource%22)%3B,-Continuous%20read%20example>,
>>>  this is the way in which a FileSource for Parquet is created. As can be 
>>> seen, it requires the construction of a RowType like this
>>>
>>>
>>> RowType*.*of*(*fieldTypes*,* *new* String*[]* *{*"f7"*,* "f4"*,* "f99”*});*
>>>
>>>
>>> where fieldTypes is created like this:
>>>
>>>
>>> *final* LogicalType*[]* fieldTypes *=*
>>>
>>>   *new* LogicalType*[]* *{*
>>>
>>>   *new* DoubleType*(),* *new* IntType*(),* *new* VarCharType*()*
>>>
>>>   *};*
>>>
>>>
>>> Ideally, instead of specifying the column names( f7, f99,...) and their 
>>> data types(DoubleType, VarCharType, ...), we would like to use the schema 
>>> of the Parquet File itself to create a RowType.
>>>
>>> The schema is present in the footer of the Parquet file, inside the 
>>> metadata.
>>>
>>> We wanted to know if there is an easy way by which way we can convert a 
>>> parquet schema, i:e, *MessageType* into a Flink *RowType* directly ?
>>>
>>> The parquet schema of the file can be easily obtained by using 
>>> *org.apache.parquet.hadoop.ParquetFileReader* as follows:
>>>
>>>
>>> ParquetFileReader reader = 
>>> ParquetFileReader.open(HadoopInputFile.fromPath(path, conf));
>>>
>>> MessageType schema = reader.getFileMetaData().getSchema(); // this schema 
>>> has the field names as well as the data types of the parquet records
>>>
>>>
>>> As of now, because we couldn’t find a way to convert the schema into a 
>>> RowType directly, we resorted to writing our own custom parser to parse a 
>>> Parquet SimpleGroup into a Flink Row like this:
>>>
>>>
>>> ParquetFileReader reader = 
>>> ParquetFileReader.open(HadoopInputFile.fromPath(path, conf));
>>>
>>> PageReadStore nextPage = reader.readNextRowGroup();
>>>
>>> Row row = parseToRow(SimpleGroup g); // custom parser function
>>>
>>>
>>> Looking forward to an answer from the community. Thanks !
>>>
>>>
>>> Regards,
>>>
>>> Meghajit
>>>
>>>
>>>

Reply via email to