[ https://issues.apache.org/jira/browse/FLINK-35324?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17849596#comment-17849596 ]
Weijie Guo commented on FLINK-35324: ------------------------------------ Does 1.20 has the same problem? > Avro format can not perform projection pushdown for specific fields > ------------------------------------------------------------------- > > Key: FLINK-35324 > URL: https://issues.apache.org/jira/browse/FLINK-35324 > Project: Flink > Issue Type: Bug > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) > Affects Versions: 1.17.0 > Reporter: SuDewei > Priority: Blocker > > AvroFormatFactory.java#createDecodingFormat would return a > ProjectableDecodingFormat,which means avro format deserializer could perform > the projection pushdown. However, it is found in practice that the Avro > format seems unable to perform projection pushdown for specific fields. > For example, there are such schema and sample data in Kafka: > {code:java} > -- schema > CREATE TABLE kafka ( > `user_id` BIGINT, > `name` STRING, > `timestamp` TIMESTAMP(3) METADATA, > `event_id` BIGINT, > `payload` STRING not null > ) WITH ( > 'connector' = 'kafka', > ... > ) > > -- sample data like > (3, 'name 3', TIMESTAMP '2020-03-10 13:12:11.123', 102, 'payload 3') {code} > The data can be successfully deserialized in this way: > {code:java} > Projection physicalProjections = Projection.of( new int[] {0,1,2} ); > DataType physicalFormatDataType = > physicalProjections.project(this.physicalDataType); > (DeserializationSchema<RowData>) ((ProjectableDecodingFormat) format) > .createRuntimeDecoder(context, this.physicalDataType, > physicalProjections.toNestedIndexes()); {code} > The data would be: > {code:java} > +I(3,name 3,102) {code} > However, when the projection index is replaced with values that do not start > from 0, the data cannot be successfully deserialized, for example: > {code:java} > Projection physicalProjections = Projection.of( new int[] {1,2} ); > DataType physicalFormatDataType = > physicalProjections.project(this.physicalDataType); > (DeserializationSchema<RowData>) ((ProjectableDecodingFormat) format) > .createRuntimeDecoder(context, this.physicalDataType, > physicalProjections.toNestedIndexes()); {code} > The exception would be like: > {code:java} > Caused by: java.lang.ArrayIndexOutOfBoundsException: -49 > at > org.apache.avro.io.parsing.Symbol$Alternative.getSymbol(Symbol.java:460) > at > org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:283) > at > org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:188) > at > org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:161) > at > org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:260) > at > org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:248) > at > org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:180) > at > org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:161) > at > org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:154) > at > org.apache.flink.formats.avro.AvroDeserializationSchema.deserialize(AvroDeserializationSchema.java:142) > at > org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:103) > ... 19 more {code} > It seems that Avro format does not support projection pushdown for arbitrary > fields. Is my understanding correct? > If this is the case, then I think Avro format should not implement the > ProjectableDecodingFormat interface , since it can only provide very limited > pushdown capabilities. > This problem may block the connector implementing the projection pushdown > capability since the connector would determine whether projection pushdown > can be performed by judging whether the format has implemented the > ProjectableDecodingFormat interface or not. > -- This message was sent by Atlassian Jira (v8.20.10#820010)