Hi Kamil, Actually FlinkKafkaConsumer expects a DeserializationSchema instead of JsonRowDeserialization and so I guess you could try SimpleStringSchema.
Regards, Dian On Sat, Nov 20, 2021 at 5:55 AM Kamil ty <kamilt...@gmail.com> wrote: > Hello all, > > I'm working on a pyflink job that's supposed to consume json messages from > Kafka and save them to a partitioned avro file sink. > I'm having difficulties finding a solution on how to process the > messages, because there is only one kafka topic for multiple > message schemas. As pyflinks FlinkKafkaConsumer expects a > JsonRowDeserialization schema, I assume that all of the messages need a > constant defined schema. I expect the same for the Kafka Table API. > > The messages follow a general debezium message schema: > Example data taken from flink docs: > > { > "schema": {...}, > "payload": { > "before": { > "id": 111, > "name": "scooter", > "description": "Big 2-wheel scooter", > "weight": 5.18 > }, > "after": { > "id": 111, > "name": "scooter", > "description": "Big 2-wheel scooter", > "weight": 5.15 > }, > "source": {...}, > "op": "u", > "ts_ms": 1589362330904, > "transaction": null > }} > > The messages are coming to a single Kafka topic, where the 'schema', > 'after', 'before' fields can be different for each message. The kafka > message key also contains the 'schema' field from the above example. My > question is if there is a way to process such messages coming from a single > Kafka topic with pyflink without writing a custom DeserializationSchema. > Any help would be appreciated. > > Kind Regards > Kamil >