Hi, I managed to backport the change to the 1.11 branch. It should be part of the 1.11.3 release.
Best, Dawid On 25/11/2020 16:23, Hongjian Peng wrote: > Thanks for Danny and Dawid's quick reply. > > Dawid, I find your fix > at > https://github.com/apache/flink/pull/14085/commits/bc9caab71f51024d1b48c6ee1a3f79777624b6bb#diff-6cc72acf0893bbaadc5b610afbbbae23227971cf5c7d0743dd4b997236baf771R450. > Appreciate > the fix. > But we may not move to Flink 1.12 in the near future. Does the > community have a plan to push the fix back to 1.11? > > > > > > -- > > Thanks, > Hongjian Peng > > > At 2020-11-25 22:26:32, "Dawid Wysakowicz" <dwysakow...@apache.org> wrote: > > Hi, > > Just wanted to comment on: > > How to map the nullable types to union(null, something)? In our > schema definition, we follow the Avro recommended definition, list > 'null' as the first type. > > I've also spotted that problem and it will be fixed in 1.12 in > https://issues.apache.org/jira/browse/FLINK-20175 > > Best, > > Dawid > > On 25/11/2020 09:44, Hongjian Peng wrote: >> In Flink 1.10, we can pass this schema with 'format.avro-schema' >> property to SQL DDL, but in Flink 1.11, the Avro schema is always >> derived from the table schema. >> We have two questions about the Flink 1.11 Avro format: >> >> 1. Flink 1.11 maps nullable types to Avro union(something, null). >> How to map the nullable types to union(null, something)? In our >> schema definition, we follow the Avro recommended definition, >> list 'null' as the first type. With Flink default mapping, the >> AvroRowDataDeserializationSchema will get the wrong/error result >> when lists 'null' as the first type of union. >> >> 2. Now if we want to set the field in a Row to non-nullable, we >> have to set the Row to non-nullable firstly. Is it the expected >> behavior of the Avro format? >> For example, >> event ROW< createTimestamp BIGINT NOT NULL, sentTimestamp BIGINT >> NOT NULL, eventId VARCHAR> will be maped to Avro schema: >> >> { >> "type": "record", >> "name": "event", >> "fields": [ >> { >> "name": "header", >> "type": { >> "type": "record", >> "name": "header", >> "fields": [ >> { >> "name": "createTimestamp", >> "type": ["long",null] >> }, >> { >> "name": "sentTimestamp", >> "type": ["long",null] >> }, >> { >> "name": "eventId", >> "type": [ >> "null", >> { >> "type": "string", >> "avro.java.string": "String" >> } >> ] >> } >> ] >> } >> } >> ] >> } >> >> We have to use "event ROW< createTimestamp BIGINT NOT NULL, >> sentTimestamp BIGINT NOT NULL, eventId VARCHAR>NOT NULL" to make >> the NOT NULL of fields: createTimestamp and sentTimestamp works. >> >> >> >> >> >> At 2020-11-25 16:12:58, "Hongjian Peng" <super...@163.com> wrote: >> >> Hi Flink Community, >> >> We are trying to upgrade our Flink SQL job from 1.10 to 1.11. >> We used Kafka source table, and the data is stored in Kafka >> in Avro format. >> Schema is like this: >> >> { >> "type": "record", >> "name": "event", >> "namespace": "busseniss.event", >> "fields": [ >> { >> "name": "header", >> "type": { >> "type": "record", >> "name": "header", >> "fields": [ >> { >> "name": "createTimestamp", >> "type": "long" >> }, >> { >> "name": "sentTimestamp", >> "type": "long" >> }, >> { >> "name": "eventId", >> "type": [ >> "null", >> { >> "type": "string", >> "avro.java.string": "String" >> } >> ] >> } >> ] >> }, >> "doc": "Rheos header " >> }, >> { >> "name": "si", >> "type": [ >> "null", >> "string" >> ] >> } >> ] >> } >> >> >> >> >> >> >> -- >> >> Hongjian Peng >> Department of Computer Science and Engineering >> Shanghai Jiao Tong University >> Email: super...@163.com <mailto:super...@163.com> >>
signature.asc
Description: OpenPGP digital signature