Hi Kamil,

Actually FlinkKafkaConsumer expects a DeserializationSchema instead of
JsonRowDeserialization and so I guess you could try SimpleStringSchema.

Regards,
Dian

On Sat, Nov 20, 2021 at 5:55 AM Kamil ty <kamilt...@gmail.com> wrote:

> Hello all,
>
> I'm working on a pyflink job that's supposed to consume json messages from
> Kafka and save them to a partitioned avro file sink.
> I'm having difficulties finding a solution on how to process the
> messages, because there is only one kafka topic for multiple
> message schemas. As pyflinks FlinkKafkaConsumer expects a
> JsonRowDeserialization schema, I assume that all of the messages need a
> constant defined schema. I expect the same for the Kafka Table API.
>
> The messages follow a general debezium message schema:
> Example data taken from flink docs:
>
> {
>   "schema": {...},
>   "payload": {
>     "before": {
>       "id": 111,
>       "name": "scooter",
>       "description": "Big 2-wheel scooter",
>       "weight": 5.18
>     },
>     "after": {
>       "id": 111,
>       "name": "scooter",
>       "description": "Big 2-wheel scooter",
>       "weight": 5.15
>     },
>     "source": {...},
>     "op": "u",
>     "ts_ms": 1589362330904,
>     "transaction": null
>   }}
>
> The messages are coming to a single Kafka topic, where the 'schema',
> 'after', 'before' fields can be different for each message. The kafka
> message key also contains the 'schema' field from the above example. My
> question is if there is a way to process such messages coming from a single
> Kafka topic with pyflink without writing a custom DeserializationSchema.
> Any help would be appreciated.
>
> Kind Regards
> Kamil
>

Reply via email to