Hi Gyula, I have not verified it locally yet, but I think you are hitting yet another problem of the unfinished migration from old TypeInformation based type system to the new type system based on DataTypes. As far as I understand the problem the information about the bridging class (java.sql.Timestamp in this case) is lost in the stack. Because this information is lost/not respected the planner produces LocalDateTime instead of a proper java.sql.Timestamp time. The AvroRowSerializationSchema expects java.sql.Timestamp for a column of TIMESTAMP type and thus it fails for LocalDateTime. I really hope the effort of FLIP-95 will significantly reduce the number of problems.
It's definitely worth reporting a bug. BTW could you share how you create the Kafka Table sink to have the full picture? Best, Dawid On 29/04/2020 15:42, Gyula Fóra wrote: > Hi All! > > We are trying to work with avro serialized data from Kafka using the > Table API and use TIMESTAMP column type. > > According to the docs > <https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#apache-avro-format>, > we can use long type with logicalType: timestamp-millis. > So we use the following avro field schema in the descriptor: > {"name": "timestamp_field", "type": {"type":"long", "logicalType": > "timestamp-millis"}} > When trying to insert into the table we get the following error: > Caused by: java.lang.ClassCastException: class java.time.LocalDateTime > cannot be cast to class java.lang.Long (java.time.LocalDateTime and > java.lang.Long are in module java.base of loader 'bootstrap')at > org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:131)at > org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:72) > It seems like the avro format (serializer) is not aware of the logical > type conversion that is needed to convert back to the physical type long. > I looked at the AvroTypesITCase which uses all kinds of logical types > but I could only find logic that maps between Avro Pojos and tables > and none that actually uses the serializaiton/deserialization logic > with the format. Could someone please help me with this? Maybe what I > am trying to do is not possible, or I just missed a crucial step. > Thank you! Gyula > >
signature.asc
Description: OpenPGP digital signature