Hi Gyula,

I have not verified it locally yet, but I think you are hitting yet
another problem of the unfinished migration from old TypeInformation
based type system to the new type system based on DataTypes. As far as I
understand the problem the information about the bridging class
(java.sql.Timestamp in this case) is lost in the stack. Because this
information is lost/not respected the planner produces LocalDateTime
instead of a proper java.sql.Timestamp time. The
AvroRowSerializationSchema expects java.sql.Timestamp for a column of
TIMESTAMP type and thus it fails for LocalDateTime. I really hope the
effort of FLIP-95 will significantly reduce the number of problems.

It's definitely worth reporting a bug.

BTW could you share how you create the Kafka Table sink to have the full
picture?

Best,

Dawid

On 29/04/2020 15:42, Gyula Fóra wrote:
> Hi All!
>
> We are trying to work with avro serialized data from Kafka using the
> Table API and use TIMESTAMP column type.
>
> According to the docs
> <https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#apache-avro-format>,
> we can use long type with logicalType: timestamp-millis.
> So we use the following avro field schema in the descriptor:
> {"name": "timestamp_field", "type": {"type":"long", "logicalType":
> "timestamp-millis"}}
> When trying to insert into the table we get the following error:
> Caused by: java.lang.ClassCastException: class java.time.LocalDateTime
> cannot be cast to class java.lang.Long (java.time.LocalDateTime and
> java.lang.Long are in module java.base of loader 'bootstrap')at
> org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:131)at
> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:72)
> It seems like the avro format (serializer) is not aware of the logical
> type conversion that is needed to convert back to the physical type long.
> I looked at the AvroTypesITCase which uses all kinds of logical types
> but I could only find logic that maps between Avro Pojos and tables
> and none that actually uses the serializaiton/deserialization logic
> with the format. Could someone please help me with this? Maybe what I
> am trying to do is not possible, or I just missed a crucial step.
> Thank you! Gyula
>
>

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to