Hi Agnelo,

if you reprocess all data and delete all old records with the old schema,
then you have to add the schema to DDL and it will work.

If you have records with old and new schema in your topic, you need to
attach the schema information to the records. Avro records themselves do
not have any metadata and Flink (or any other consumer) cannot convert them
in any way.
The usual approach to attach the schema information is to use the schema
registry which Flink also supports.

On Wed, Apr 14, 2021 at 5:22 PM Agnelo Dcosta <agnelo.m.dco...@gmail.com>
wrote:

> Hi Arvid, thanks for the reply.
> We are following the 1.12 documentation here:
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/formats/avro.html#data-type-mapping
> *Currently, the Avro schema is always derived from table schema.
> Explicitly defining an Avro schema is not supported yet.*
>
> And the issue is we need to have our producers use a newer topic
> schema(one additional column). Flink will also have this new column
> eventually. However for the time after deploying producer changes and
> before flink changes, flink will crash with failure to deserialize.
> Trying to see if there is any setting that could enable flink to continue
> reading new schema, without having that field specified in the table
> definition.
>
>
> On Wed, Apr 14, 2021 at 12:46 AM Arvid Heise <ar...@apache.org> wrote:
>
>> For any schema change to be gracefully supported. You need to know both
>> schemas (old + new) on reader side (=Flink). I'm curious how Flink should
>> know the old schema as you only provide the new schema, right?
>>
>> Usually, you use the schema registry, such that each record has it's own
>> schema attached to it (=old). You opted to not go for it, so I'm not sure
>> how Flink is supposed to know the old schema.
>>
>> On Wed, Apr 14, 2021 at 9:09 AM Agnelo Dcosta <agnelo.m.dco...@gmail.com>
>> wrote:
>>
>>> Hi Arvid,
>>> > writer schema encoded if you are using no schema registry?
>>> on the producer side we are using node with
>>> https://registry.npmjs.org/avsc/-/avsc-5.5.3.tgz
>>> and https://registry.npmjs.org/sinek/-/sinek-9.1.0.tgz libraries to
>>> publish messages. We specify the avro schema file to encode messages in
>>> avro format.
>>>
>>> >Flink know with which schema the data has been written so that it can
>>> map it to the new schema?
>>> With 1.11 we used to specify the schema file as part of the flink sql
>>> table definition.
>>> However with 1.12 the schema is derived from the message/table
>>> definition. We do not specify any schema as such.
>>>
>>>
>>> On Tue, Apr 13, 2021 at 11:58 PM Arvid Heise <ar...@apache.org> wrote:
>>>
>>>> Hi Agnelo,
>>>>
>>>> How is the writer schema encoded if you are using no schema registry?
>>>> Or phrased differently: how does Flink know with which schema the data has
>>>> been written so that it can map it to the new schema?
>>>>
>>>> On Wed, Apr 14, 2021 at 8:33 AM Agnelo Dcosta <
>>>> agnelo.m.dco...@gmail.com> wrote:
>>>>
>>>>> Hi, we are using Flink SQL 1.12 and have a couple of tables created
>>>>> from kafka topics. Format is avro (not confluent avro) and no schema
>>>>> registry as such.
>>>>>
>>>>> In flink 1.11 we used to specify the schema, however in 1.12 the
>>>>> schema is derived from the message itself.
>>>>>
>>>>> Is it possible for the producers to start sending new fields without
>>>>> changes in the flink app?
>>>>>
>>>>>
>>>>>
>>>>> For example :
>>>>>
>>>>> {
>>>>>
>>>>>   "name": "topic1",
>>>>>
>>>>>   "type": "record",
>>>>>
>>>>>   "fields": [
>>>>>
>>>>>       {
>>>>>
>>>>>       "name": "field1",
>>>>>
>>>>>       "type": "string"
>>>>>
>>>>>     },
>>>>>
>>>>> {
>>>>>
>>>>>       "name": "field2",
>>>>>
>>>>>       "type": "string"
>>>>>
>>>>>     },
>>>>>
>>>>> {
>>>>>
>>>>>       *"name": "field3",*
>>>>>
>>>>> *      "type": "string"*
>>>>>
>>>>>     },
>>>>>
>>>>> ]
>>>>>
>>>>> }
>>>>>
>>>>>
>>>>>
>>>>> Flink table has:
>>>>>
>>>>> CREATE TABLE topic1(\n"
>>>>>
>>>>>                 + " field1 string not null \n"
>>>>>
>>>>>                 + " ,field2 string not null \n"
>>>>>
>>>>> "'connector' = 'kafka' \n"
>>>>>
>>>>>              + ",'topic' = 'topic1' \n"
>>>>>
>>>>>              + ",'scan.startup.mode' = 'latest-offset' \n"
>>>>>
>>>>>              + ",'properties.group.id' = 'topic1' \n"
>>>>>
>>>>>              + ",'properties.bootstrap.servers' = 'localhost:8082' \n"
>>>>>
>>>>>               + ",'properties.enable.auto.commit' = 'true' \n"
>>>>>
>>>>>              + ",'format' = 'avro' \n";
>>>>>
>>>>>
>>>>>
>>>>> With above settings I get a deserialization error:
>>>>>
>>>>>
>>>>>
>>>>> *java.io.IOException: Failed to deserialize Avro record.*
>>>>>
>>>>> *        at
>>>>> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:104)
>>>>> ~[flink-sql-avro-1.12.0.jar:1.12.0]*
>>>>>
>>>>> *        at
>>>>> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:44)
>>>>> ~[flink-sql-avro-1.12.0.jar:1.12.0]*
>>>>>
>>>>> *        at
>>>>> org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82)
>>>>> ~[flink-core-1.12.0.jar:1.12.0]*
>>>>>
>>>>> *        at
>>>>> org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.deserialize(DynamicKafkaDeserializationSchema.java:113)
>>>>> ~[flink-connector-kafka_2.11-1.12.0.jar:1.12.0]*
>>>>>
>>>>> *        at
>>>>> org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:177)
>>>>> ~[flink-connector-kafka_2.11-1.12.0.jar:1.12.0]*
>>>>>
>>>>> *        at
>>>>> org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:137)
>>>>> ~[flink-connector-kafka_2.11-1.12.0.jar:1.12.0]*
>>>>>
>>>>> *        at
>>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:761)
>>>>> ~[flink-connector-kafka_2.11-1.12.0.jar:1.12.0]*
>>>>>
>>>>> *        at
>>>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
>>>>> ~[flink-streaming-java_2.11-1.12.0.jar:1.12.0]*
>>>>>
>>>>> *        at
>>>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
>>>>> ~[flink-streaming-java_2.11-1.12.0.jar:1.12.0]*
>>>>>
>>>>

Reply via email to