Hi All I am working on building a flink application which reads data from kafka topics, apply some transformations and writes to the Iceberg table.
I read the data from kafka topic (which is in json) and use circe to decode that to scala case class with scala Option values in it. All the transformations on the datastream works fine. Case Class Looks like below Event(app_name: Option[String], service_name: Option[String], ......) But when I try to convert the stream to a table to write to iceberg table due to the case classes the columns are converted to Raw type as shown below. table.printSchema() `app_name` RAW('scala.Option', '...'), `service_name` RAW('scala.Option', '...'), `conversion_id` RAW('scala.Option', '...'), ...... And the table write fails as below. Cause: Incompatible types for sink column 'app_name' at position 0. Query schema: [app_name: RAW('scala.Option', '...'), ......... Sink schema: [app_name: STRING, actionName: STRING, ....... Does flink table api support scala case classes with option values? https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/serialization/types_serialization/#special-types I found out that it is supported in datastream at this documentation. Is there a way to do this in Table API. Thanks in advance for the help.. -- Regards Praneeth Ramesh