Hi Arvid,
> writer schema encoded if you are using no schema registry?
on the producer side we are using node with
https://registry.npmjs.org/avsc/-/avsc-5.5.3.tgz
and https://registry.npmjs.org/sinek/-/sinek-9.1.0.tgz libraries to publish
messages. We specify the avro schema file to encode messages in avro format.

>Flink know with which schema the data has been written so that it can map
it to the new schema?
With 1.11 we used to specify the schema file as part of the flink sql table
definition.
However with 1.12 the schema is derived from the message/table definition.
We do not specify any schema as such.


On Tue, Apr 13, 2021 at 11:58 PM Arvid Heise <ar...@apache.org> wrote:

> Hi Agnelo,
>
> How is the writer schema encoded if you are using no schema registry? Or
> phrased differently: how does Flink know with which schema the data has
> been written so that it can map it to the new schema?
>
> On Wed, Apr 14, 2021 at 8:33 AM Agnelo Dcosta <agnelo.m.dco...@gmail.com>
> wrote:
>
>> Hi, we are using Flink SQL 1.12 and have a couple of tables created from
>> kafka topics. Format is avro (not confluent avro) and no schema registry as
>> such.
>>
>> In flink 1.11 we used to specify the schema, however in 1.12 the schema
>> is derived from the message itself.
>>
>> Is it possible for the producers to start sending new fields without
>> changes in the flink app?
>>
>>
>>
>> For example :
>>
>> {
>>
>>   "name": "topic1",
>>
>>   "type": "record",
>>
>>   "fields": [
>>
>>       {
>>
>>       "name": "field1",
>>
>>       "type": "string"
>>
>>     },
>>
>> {
>>
>>       "name": "field2",
>>
>>       "type": "string"
>>
>>     },
>>
>> {
>>
>>       *"name": "field3",*
>>
>> *      "type": "string"*
>>
>>     },
>>
>> ]
>>
>> }
>>
>>
>>
>> Flink table has:
>>
>> CREATE TABLE topic1(\n"
>>
>>                 + " field1 string not null \n"
>>
>>                 + " ,field2 string not null \n"
>>
>> "'connector' = 'kafka' \n"
>>
>>              + ",'topic' = 'topic1' \n"
>>
>>              + ",'scan.startup.mode' = 'latest-offset' \n"
>>
>>              + ",'properties.group.id' = 'topic1' \n"
>>
>>              + ",'properties.bootstrap.servers' = 'localhost:8082' \n"
>>
>>               + ",'properties.enable.auto.commit' = 'true' \n"
>>
>>              + ",'format' = 'avro' \n";
>>
>>
>>
>> With above settings I get a deserialization error:
>>
>>
>>
>> *java.io.IOException: Failed to deserialize Avro record.*
>>
>> *        at
>> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:104)
>> ~[flink-sql-avro-1.12.0.jar:1.12.0]*
>>
>> *        at
>> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:44)
>> ~[flink-sql-avro-1.12.0.jar:1.12.0]*
>>
>> *        at
>> org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82)
>> ~[flink-core-1.12.0.jar:1.12.0]*
>>
>> *        at
>> org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.deserialize(DynamicKafkaDeserializationSchema.java:113)
>> ~[flink-connector-kafka_2.11-1.12.0.jar:1.12.0]*
>>
>> *        at
>> org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:177)
>> ~[flink-connector-kafka_2.11-1.12.0.jar:1.12.0]*
>>
>> *        at
>> org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:137)
>> ~[flink-connector-kafka_2.11-1.12.0.jar:1.12.0]*
>>
>> *        at
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:761)
>> ~[flink-connector-kafka_2.11-1.12.0.jar:1.12.0]*
>>
>> *        at
>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
>> ~[flink-streaming-java_2.11-1.12.0.jar:1.12.0]*
>>
>> *        at
>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
>> ~[flink-streaming-java_2.11-1.12.0.jar:1.12.0]*
>>
>

Reply via email to