Hi All

I am working on building a flink application which reads data from kafka
topics, apply some transformations and writes to the Iceberg table.

I read the data from kafka topic (which is in json) and use circe to decode
that to scala case class with scala Option values in it.
All the transformations on the datastream works fine.

Case Class Looks like below
Event(app_name: Option[String], service_name: Option[String], ......)

But when I try to convert the stream to a table to write to iceberg table
due to the case classes the columns are converted to Raw type as shown
below.

table.printSchema()

`app_name` RAW('scala.Option', '...'),
`service_name` RAW('scala.Option', '...'),
`conversion_id` RAW('scala.Option', '...'),
......

And the table write fails as below.
Cause: Incompatible types for sink column 'app_name' at position 0.
Query schema: [app_name: RAW('scala.Option', '...'), .........
Sink schema:  [app_name: STRING, actionName: STRING, .......

Does flink table api support scala case classes with option values?
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/serialization/types_serialization/#special-types

I found out that it is supported in datastream at this documentation.

Is there a way to do this in Table API.

Thanks in advance for the help..


-- 
Regards
Praneeth Ramesh

Reply via email to