Hi Arvid! I tried it with Avro 1.9.2, and it lead to the same error. Seems like Avro cannot find the conversion class between LocalDateTime and timestamp-millis. Not sure how exactly this works, maybe we need to set the conversions ourselves?
Thanks! Gyula On Thu, Apr 30, 2020 at 12:53 PM Arvid Heise <ar...@ververica.com> wrote: > Hi Gyula, > > it may still be worth to try to upgrade to Avro 1.9.2 (can never hurt) and > see if this solves your particular problem. > The code path in GenericDatumWriter is taking the conversion path, so it > might just work. Of course that depends on the schema being correctly > translated to a specific record that uses the new TimeConversions [1]. > > [1] > https://github.com/apache/avro/blob/master/lang/java/avro/src/main/java/org/apache/avro/data/TimeConversions.java > > On Thu, Apr 30, 2020 at 10:41 AM Gyula Fóra <gyula.f...@gmail.com> wrote: > >> Hi! >> >> @Arvid: We are using Avro 1.8 I believe but this problem seems to come >> from the flink side as Dawid mentioned. >> >> @Dawid: >> Sounds like a reasonable explanation, here are the actual queries to >> reproduce within the SQL client/table api: >> >> CREATE TABLE source_table ( >> int_field INT, >> timestamp_field TIMESTAMP(3) >> ) WITH ( >> 'connector.type' = 'kafka', >> 'connector.version' = 'universal', >> 'connector.topic' = 'avro_tset', >> 'connector.properties.bootstrap.servers' = '<...>', >> 'format.type' = 'avro', >> 'format.avro-schema' = >> '{ >> "type": "record", >> "name": "test", >> "fields" : [ >> {"name": "int_field", "type": "int"}, >> {"name": "timestamp_field", "type": {"type":"long", "logicalType": >> "timestamp-millis"}} >> ] >> }' >> ) >> >> INSERT INTO source_table VALUES (12, TIMESTAMP '1999-11-11 11:11:11'); >> And the error: >> >> Caused by: java.lang.ClassCastException: java.time.LocalDateTime cannot be >> cast to java.lang.Long >> at >> org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:131) >> at >> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:72) >> at >> org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:166) >> at >> org.apache.avro.specific.SpecificDatumWriter.writeField(SpecificDatumWriter.java:90) >> at >> org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:156) >> at >> org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:118) >> at >> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:75) >> at >> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:62) >> at >> org.apache.flink.formats.avro.AvroRowSerializationSchema.serialize(AvroRowSerializationSchema.java:143) >> >> I will open a Jira ticket as well with these details. >> >> Thank you! >> >> Gyula >> >> >> On Thu, Apr 30, 2020 at 10:05 AM Dawid Wysakowicz <dwysakow...@apache.org> >> wrote: >> >>> Hi Gyula, >>> >>> I have not verified it locally yet, but I think you are hitting yet >>> another problem of the unfinished migration from old TypeInformation based >>> type system to the new type system based on DataTypes. As far as I >>> understand the problem the information about the bridging class >>> (java.sql.Timestamp in this case) is lost in the stack. Because this >>> information is lost/not respected the planner produces LocalDateTime >>> instead of a proper java.sql.Timestamp time. The AvroRowSerializationSchema >>> expects java.sql.Timestamp for a column of TIMESTAMP type and thus it fails >>> for LocalDateTime. I really hope the effort of FLIP-95 will significantly >>> reduce the number of problems. >>> >>> It's definitely worth reporting a bug. >>> >>> BTW could you share how you create the Kafka Table sink to have the full >>> picture? >>> >>> Best, >>> >>> Dawid >>> On 29/04/2020 15:42, Gyula Fóra wrote: >>> >>> Hi All! >>> >>> We are trying to work with avro serialized data from Kafka using the >>> Table API and use TIMESTAMP column type. >>> >>> According to the docs >>> <https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#apache-avro-format>, >>> we can use long type with logicalType: timestamp-millis. >>> So we use the following avro field schema in the descriptor: >>> >>> >>> {"name": "timestamp_field", "type": {"type":"long", "logicalType": >>> "timestamp-millis"}} >>> >>> When trying to insert into the table we get the following error: >>> >>> Caused by: java.lang.ClassCastException: class java.time.LocalDateTime >>> cannot be cast to class java.lang.Long (java.time.LocalDateTime and >>> java.lang.Long are in module java.base of loader 'bootstrap') at >>> org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:131) >>> at >>> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:72) >>> >>> It seems like the avro format (serializer) is not aware of the logical type >>> conversion that is needed to convert back to the physical type long. >>> >>> I looked at the AvroTypesITCase which uses all kinds of logical types but I >>> could only find logic that maps between Avro Pojos and tables and none that >>> actually uses the serializaiton/deserialization logic with the format. >>> >>> Could someone please help me with this? Maybe what I am trying to do is not >>> possible, or I just missed a crucial step. >>> >>> Thank you! >>> Gyula >>> >>> >>> >>> > > -- > > Arvid Heise | Senior Java Developer > > <https://www.ververica.com/> > > Follow us @VervericaData > > -- > > Join Flink Forward <https://flink-forward.org/> - The Apache Flink > Conference > > Stream Processing | Event Driven | Real Time > > -- > > Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany > > -- > Ververica GmbH > Registered at Amtsgericht Charlottenburg: HRB 158244 B > Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji > (Toni) Cheng >