Okay bummer, but not completely unexpected. The conversions should be automatically compiled into SpecificRecords. I'm not sure how the Table API is doing it internally; I just saw SpecificRecord in your stacktrace and figured to try it out.
On Thu, Apr 30, 2020 at 3:35 PM Gyula Fóra <gyula.f...@gmail.com> wrote: > Hi Arvid! > > I tried it with Avro 1.9.2, and it lead to the same error. > Seems like Avro cannot find the conversion class between LocalDateTime and > timestamp-millis. > Not sure how exactly this works, maybe we need to set the conversions > ourselves? > > Thanks! > Gyula > > On Thu, Apr 30, 2020 at 12:53 PM Arvid Heise <ar...@ververica.com> wrote: > >> Hi Gyula, >> >> it may still be worth to try to upgrade to Avro 1.9.2 (can never hurt) >> and see if this solves your particular problem. >> The code path in GenericDatumWriter is taking the conversion path, so it >> might just work. Of course that depends on the schema being correctly >> translated to a specific record that uses the new TimeConversions [1]. >> >> [1] >> https://github.com/apache/avro/blob/master/lang/java/avro/src/main/java/org/apache/avro/data/TimeConversions.java >> >> On Thu, Apr 30, 2020 at 10:41 AM Gyula Fóra <gyula.f...@gmail.com> wrote: >> >>> Hi! >>> >>> @Arvid: We are using Avro 1.8 I believe but this problem seems to come >>> from the flink side as Dawid mentioned. >>> >>> @Dawid: >>> Sounds like a reasonable explanation, here are the actual queries to >>> reproduce within the SQL client/table api: >>> >>> CREATE TABLE source_table ( >>> int_field INT, >>> timestamp_field TIMESTAMP(3) >>> ) WITH ( >>> 'connector.type' = 'kafka', >>> 'connector.version' = 'universal', >>> 'connector.topic' = 'avro_tset', >>> 'connector.properties.bootstrap.servers' = '<...>', >>> 'format.type' = 'avro', >>> 'format.avro-schema' = >>> '{ >>> "type": "record", >>> "name": "test", >>> "fields" : [ >>> {"name": "int_field", "type": "int"}, >>> {"name": "timestamp_field", "type": {"type":"long", "logicalType": >>> "timestamp-millis"}} >>> ] >>> }' >>> ) >>> >>> INSERT INTO source_table VALUES (12, TIMESTAMP '1999-11-11 11:11:11'); >>> And the error: >>> >>> Caused by: java.lang.ClassCastException: java.time.LocalDateTime cannot be >>> cast to java.lang.Long >>> at >>> org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:131) >>> at >>> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:72) >>> at >>> org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:166) >>> at >>> org.apache.avro.specific.SpecificDatumWriter.writeField(SpecificDatumWriter.java:90) >>> at >>> org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:156) >>> at >>> org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:118) >>> at >>> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:75) >>> at >>> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:62) >>> at >>> org.apache.flink.formats.avro.AvroRowSerializationSchema.serialize(AvroRowSerializationSchema.java:143) >>> >>> I will open a Jira ticket as well with these details. >>> >>> Thank you! >>> >>> Gyula >>> >>> >>> On Thu, Apr 30, 2020 at 10:05 AM Dawid Wysakowicz < >>> dwysakow...@apache.org> wrote: >>> >>>> Hi Gyula, >>>> >>>> I have not verified it locally yet, but I think you are hitting yet >>>> another problem of the unfinished migration from old TypeInformation based >>>> type system to the new type system based on DataTypes. As far as I >>>> understand the problem the information about the bridging class >>>> (java.sql.Timestamp in this case) is lost in the stack. Because this >>>> information is lost/not respected the planner produces LocalDateTime >>>> instead of a proper java.sql.Timestamp time. The AvroRowSerializationSchema >>>> expects java.sql.Timestamp for a column of TIMESTAMP type and thus it fails >>>> for LocalDateTime. I really hope the effort of FLIP-95 will significantly >>>> reduce the number of problems. >>>> >>>> It's definitely worth reporting a bug. >>>> >>>> BTW could you share how you create the Kafka Table sink to have the >>>> full picture? >>>> >>>> Best, >>>> >>>> Dawid >>>> On 29/04/2020 15:42, Gyula Fóra wrote: >>>> >>>> Hi All! >>>> >>>> We are trying to work with avro serialized data from Kafka using the >>>> Table API and use TIMESTAMP column type. >>>> >>>> According to the docs >>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#apache-avro-format>, >>>> we can use long type with logicalType: timestamp-millis. >>>> So we use the following avro field schema in the descriptor: >>>> >>>> >>>> {"name": "timestamp_field", "type": {"type":"long", "logicalType": >>>> "timestamp-millis"}} >>>> >>>> When trying to insert into the table we get the following error: >>>> >>>> Caused by: java.lang.ClassCastException: class java.time.LocalDateTime >>>> cannot be cast to class java.lang.Long (java.time.LocalDateTime and >>>> java.lang.Long are in module java.base of loader 'bootstrap') at >>>> org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:131) >>>> at >>>> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:72) >>>> >>>> It seems like the avro format (serializer) is not aware of the logical >>>> type conversion that is needed to convert back to the physical type long. >>>> >>>> I looked at the AvroTypesITCase which uses all kinds of logical types but >>>> I could only find logic that maps between Avro Pojos and tables and none >>>> that actually uses the serializaiton/deserialization logic with the format. >>>> >>>> Could someone please help me with this? Maybe what I am trying to do is >>>> not possible, or I just missed a crucial step. >>>> >>>> Thank you! >>>> Gyula >>>> >>>> >>>> >>>> >> >> -- >> >> Arvid Heise | Senior Java Developer >> >> <https://www.ververica.com/> >> >> Follow us @VervericaData >> >> -- >> >> Join Flink Forward <https://flink-forward.org/> - The Apache Flink >> Conference >> >> Stream Processing | Event Driven | Real Time >> >> -- >> >> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany >> >> -- >> Ververica GmbH >> Registered at Amtsgericht Charlottenburg: HRB 158244 B >> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji >> (Toni) Cheng >> > -- Arvid Heise | Senior Java Developer <https://www.ververica.com/> Follow us @VervericaData -- Join Flink Forward <https://flink-forward.org/> - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany -- Ververica GmbH Registered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng