[ 
https://issues.apache.org/jira/browse/FLINK-35324?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17849596#comment-17849596
 ] 

Weijie Guo commented on FLINK-35324:
------------------------------------

Does 1.20 has the same problem?

> Avro format can not perform projection pushdown for specific fields
> -------------------------------------------------------------------
>
>                 Key: FLINK-35324
>                 URL: https://issues.apache.org/jira/browse/FLINK-35324
>             Project: Flink
>          Issue Type: Bug
>          Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>    Affects Versions: 1.17.0
>            Reporter: SuDewei
>            Priority: Blocker
>
> AvroFormatFactory.java#createDecodingFormat would return a 
> ProjectableDecodingFormat,which means avro format deserializer could perform 
> the projection pushdown. However, it is found in practice that the Avro 
> format seems unable to perform projection pushdown for specific fields. 
> For example, there are such schema and sample data in Kafka:
> {code:java}
> -- schema
> CREATE TABLE kafka (
>    `user_id` BIGINT,
>    `name` STRING,
>     `timestamp` TIMESTAMP(3) METADATA,
>     `event_id` BIGINT,
>     `payload` STRING not null
> ) WITH (
>      'connector' = 'kafka',
>      ...
> )
>  
>  -- sample data like    
> (3, 'name 3', TIMESTAMP '2020-03-10 13:12:11.123', 102, 'payload 3') {code}
> The data can be successfully deserialized in this way:
> {code:java}
> Projection physicalProjections = Projection.of( new int[] {0,1,2} );
> DataType physicalFormatDataType = 
> physicalProjections.project(this.physicalDataType);
> (DeserializationSchema<RowData>) ((ProjectableDecodingFormat) format)
>     .createRuntimeDecoder(context, this.physicalDataType, 
> physicalProjections.toNestedIndexes()); {code}
> The data would be:
> {code:java}
> +I(3,name 3,102) {code}
> However, when the projection index is replaced with values that do not start 
> from 0, the data cannot be successfully deserialized, for example:
> {code:java}
> Projection physicalProjections = Projection.of( new int[] {1,2} );
> DataType physicalFormatDataType = 
> physicalProjections.project(this.physicalDataType);
> (DeserializationSchema<RowData>) ((ProjectableDecodingFormat) format)
>     .createRuntimeDecoder(context, this.physicalDataType, 
> physicalProjections.toNestedIndexes()); {code}
> The exception would be like:
> {code:java}
> Caused by: java.lang.ArrayIndexOutOfBoundsException: -49
>         at 
> org.apache.avro.io.parsing.Symbol$Alternative.getSymbol(Symbol.java:460)
>         at 
> org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:283)
>         at 
> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:188)
>         at 
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:161)
>         at 
> org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:260)
>         at 
> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:248)
>         at 
> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:180)
>         at 
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:161)
>         at 
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:154)
>         at 
> org.apache.flink.formats.avro.AvroDeserializationSchema.deserialize(AvroDeserializationSchema.java:142)
>         at 
> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:103)
>         ... 19 more {code}
> It seems that Avro format does not support projection pushdown for arbitrary 
> fields. Is my understanding correct?
> If this is the case, then I think Avro format should not implement the 
> ProjectableDecodingFormat interface , since it can only provide very limited 
> pushdown capabilities.
> This problem may block the connector implementing the projection pushdown 
> capability since the connector would determine whether projection pushdown 
> can be performed by judging whether the format has implemented the 
> ProjectableDecodingFormat interface or not.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to