Hi, Just wanted to comment on:
How to map the nullable types to union(null, something)? In our schema definition, we follow the Avro recommended definition, list 'null' as the first type. I've also spotted that problem and it will be fixed in 1.12 in https://issues.apache.org/jira/browse/FLINK-20175 Best, Dawid On 25/11/2020 09:44, Hongjian Peng wrote: > In Flink 1.10, we can pass this schema with 'format.avro-schema' > property to SQL DDL, but in Flink 1.11, the Avro schema is always > derived from the table schema. > We have two questions about the Flink 1.11 Avro format: > > 1. Flink 1.11 maps nullable types to Avro union(something, null). How > to map the nullable types to union(null, something)? In our schema > definition, we follow the Avro recommended definition, list 'null' as > the first type. With Flink default mapping, the > AvroRowDataDeserializationSchema will get the wrong/error result when > lists 'null' as the first type of union. > > 2. Now if we want to set the field in a Row to non-nullable, we have > to set the Row to non-nullable firstly. Is it the expected behavior of > the Avro format? > For example, > event ROW< createTimestamp BIGINT NOT NULL, sentTimestamp BIGINT NOT > NULL, eventId VARCHAR> will be maped to Avro schema: > > { > "type": "record", > "name": "event", > "fields": [ > { > "name": "header", > "type": { > "type": "record", > "name": "header", > "fields": [ > { > "name": "createTimestamp", > "type": ["long",null] > }, > { > "name": "sentTimestamp", > "type": ["long",null] > }, > { > "name": "eventId", > "type": [ > "null", > { > "type": "string", > "avro.java.string": "String" > } > ] > } > ] > } > } > ] > } > > We have to use "event ROW< createTimestamp BIGINT NOT NULL, > sentTimestamp BIGINT NOT NULL, eventId VARCHAR>NOT NULL" to make the > NOT NULL of fields: createTimestamp and sentTimestamp works. > > > > > > At 2020-11-25 16:12:58, "Hongjian Peng" <super...@163.com> wrote: > > Hi Flink Community, > > We are trying to upgrade our Flink SQL job from 1.10 to 1.11. We > used Kafka source table, and the data is stored in Kafka in Avro > format. > Schema is like this: > > { > "type": "record", > "name": "event", > "namespace": "busseniss.event", > "fields": [ > { > "name": "header", > "type": { > "type": "record", > "name": "header", > "fields": [ > { > "name": "createTimestamp", > "type": "long" > }, > { > "name": "sentTimestamp", > "type": "long" > }, > { > "name": "eventId", > "type": [ > "null", > { > "type": "string", > "avro.java.string": "String" > } > ] > } > ] > }, > "doc": "Rheos header " > }, > { > "name": "si", > "type": [ > "null", > "string" > ] > } > ] > } > > > > > > > -- > > Hongjian Peng > Department of Computer Science and Engineering > Shanghai Jiao Tong University > Email: super...@163.com <mailto:super...@163.com> >
signature.asc
Description: OpenPGP digital signature