Hussein,
To use a JsonRowDeserializationSchema you'll need to use the Table API, and
not DataStream.
You'll want to use a JsonRowSchemaConverter to convert your json schema
into the TypeInformation needed by Flink, which is done for you by
the JsonRowDeserializationSchema builder:
json_row_schema =
JsonRowDeserializationSchema.builder().json_schema(json_schema_string).build()
Given that schema, you can pass it to constructor for the kafka consumer:
kafka_consumer = FlinkKafkaConsumer("stream-source", json_row_schema,
kafka_props)
To read from 3 different topics, you can either instantiate three different
sources, or specify that a single source is to be used to read from
multiple topics, which you can do by passing a list of strings as the
topics parameter.
Regards,
David
On Fri, Jan 28, 2022 at 12:07 PM Hussein El Ghoul <[email protected]>
wrote:
> Hello,
>
> How to specify the deserialization schema for multiple Kafka topics using
> Flink (python)
>
> I want to read from multiple Kafka topics with JSON schema using
> FlinkKafkaConsumer, and I assume that I need to use
> JsonRowDeserializationSchema to deserialize the data. The schema of the
> topics is very large (around 1500 lines for each topic), so I want to read
> it from a file instead of manually typing the types in the program. How can
> I do that?
>
> 1. How to specify deserialization schema for multiple topics (3 topics)
> 2. How to read the JSON schema from a file?
>
>
> https://stackoverflow.com/q/70892579/13067721?sem=2
>
> Thanks in advance,
> Hussein
> Quiqup - Data Engineer