Hi, we are using Flink SQL 1.12 and have a couple of tables created from
kafka topics. Format is avro (not confluent avro) and no schema registry as
such.

In flink 1.11 we used to specify the schema, however in 1.12 the schema is
derived from the message itself.

Is it possible for the producers to start sending new fields without
changes in the flink app?



For example :

{

  "name": "topic1",

  "type": "record",

  "fields": [

      {

      "name": "field1",

      "type": "string"

    },

{

      "name": "field2",

      "type": "string"

    },

{

      *"name": "field3",*

*      "type": "string"*

    },

]

}



Flink table has:

CREATE TABLE topic1(\n"

                + " field1 string not null \n"

                + " ,field2 string not null \n"

"'connector' = 'kafka' \n"

             + ",'topic' = 'topic1' \n"

             + ",'scan.startup.mode' = 'latest-offset' \n"

             + ",'properties.group.id' = 'topic1' \n"

             + ",'properties.bootstrap.servers' = 'localhost:8082' \n"

              + ",'properties.enable.auto.commit' = 'true' \n"

             + ",'format' = 'avro' \n";



With above settings I get a deserialization error:



*java.io.IOException: Failed to deserialize Avro record.*

*        at
org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:104)
~[flink-sql-avro-1.12.0.jar:1.12.0]*

*        at
org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:44)
~[flink-sql-avro-1.12.0.jar:1.12.0]*

*        at
org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82)
~[flink-core-1.12.0.jar:1.12.0]*

*        at
org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.deserialize(DynamicKafkaDeserializationSchema.java:113)
~[flink-connector-kafka_2.11-1.12.0.jar:1.12.0]*

*        at
org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:177)
~[flink-connector-kafka_2.11-1.12.0.jar:1.12.0]*

*        at
org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:137)
~[flink-connector-kafka_2.11-1.12.0.jar:1.12.0]*

*        at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:761)
~[flink-connector-kafka_2.11-1.12.0.jar:1.12.0]*

*        at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
~[flink-streaming-java_2.11-1.12.0.jar:1.12.0]*

*        at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
~[flink-streaming-java_2.11-1.12.0.jar:1.12.0]*

Reply via email to