Hi Arvid!

I tried it with Avro 1.9.2, and it lead to the same error.
Seems like Avro cannot find the conversion class between LocalDateTime and
timestamp-millis.
Not sure how exactly this works, maybe we need to set the conversions
ourselves?

Thanks!
Gyula

On Thu, Apr 30, 2020 at 12:53 PM Arvid Heise <ar...@ververica.com> wrote:

> Hi Gyula,
>
> it may still be worth to try to upgrade to Avro 1.9.2 (can never hurt) and
> see if this solves your particular problem.
> The code path in GenericDatumWriter is taking the conversion path, so it
> might just work. Of course that depends on the schema being correctly
> translated to a specific record that uses the new TimeConversions [1].
>
> [1]
> https://github.com/apache/avro/blob/master/lang/java/avro/src/main/java/org/apache/avro/data/TimeConversions.java
>
> On Thu, Apr 30, 2020 at 10:41 AM Gyula Fóra <gyula.f...@gmail.com> wrote:
>
>> Hi!
>>
>> @Arvid: We are using Avro 1.8 I believe but this problem seems to come
>> from the flink side as Dawid mentioned.
>>
>> @Dawid:
>> Sounds like a reasonable explanation, here are the actual queries to
>> reproduce within the SQL client/table api:
>>
>> CREATE TABLE source_table (
>>      int_field INT,
>>      timestamp_field TIMESTAMP(3)
>> ) WITH (
>>      'connector.type'                           = 'kafka',
>>      'connector.version'                        = 'universal',
>>      'connector.topic'                          = 'avro_tset',
>>      'connector.properties.bootstrap.servers'   = '<...>',
>>      'format.type'                              = 'avro',
>>     'format.avro-schema' =
>>     '{
>>       "type": "record",
>>       "name": "test",
>>       "fields" : [
>>         {"name": "int_field", "type": "int"},
>>         {"name": "timestamp_field", "type": {"type":"long", "logicalType": 
>> "timestamp-millis"}}
>>       ]
>>     }'
>> )
>>
>> INSERT INTO source_table VALUES (12, TIMESTAMP '1999-11-11 11:11:11');
>> And the error:
>>
>> Caused by: java.lang.ClassCastException: java.time.LocalDateTime cannot be 
>> cast to java.lang.Long
>>      at 
>> org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:131)
>>      at 
>> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:72)
>>      at 
>> org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:166)
>>      at 
>> org.apache.avro.specific.SpecificDatumWriter.writeField(SpecificDatumWriter.java:90)
>>      at 
>> org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:156)
>>      at 
>> org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:118)
>>      at 
>> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:75)
>>      at 
>> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:62)
>>      at 
>> org.apache.flink.formats.avro.AvroRowSerializationSchema.serialize(AvroRowSerializationSchema.java:143)
>>
>> I will open a Jira ticket as well with these details.
>>
>> Thank you!
>>
>> Gyula
>>
>>
>> On Thu, Apr 30, 2020 at 10:05 AM Dawid Wysakowicz <dwysakow...@apache.org>
>> wrote:
>>
>>> Hi Gyula,
>>>
>>> I have not verified it locally yet, but I think you are hitting yet
>>> another problem of the unfinished migration from old TypeInformation based
>>> type system to the new type system based on DataTypes. As far as I
>>> understand the problem the information about the bridging class
>>> (java.sql.Timestamp in this case) is lost in the stack. Because this
>>> information is lost/not respected the planner produces LocalDateTime
>>> instead of a proper java.sql.Timestamp time. The AvroRowSerializationSchema
>>> expects java.sql.Timestamp for a column of TIMESTAMP type and thus it fails
>>> for LocalDateTime. I really hope the effort of FLIP-95 will significantly
>>> reduce the number of problems.
>>>
>>> It's definitely worth reporting a bug.
>>>
>>> BTW could you share how you create the Kafka Table sink to have the full
>>> picture?
>>>
>>> Best,
>>>
>>> Dawid
>>> On 29/04/2020 15:42, Gyula Fóra wrote:
>>>
>>> Hi All!
>>>
>>> We are trying to work with avro serialized data from Kafka using the
>>> Table API and use TIMESTAMP column type.
>>>
>>> According to the docs
>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#apache-avro-format>,
>>> we can use long type with logicalType: timestamp-millis.
>>> So we use the following avro field schema in the descriptor:
>>>
>>>
>>>   {"name": "timestamp_field", "type": {"type":"long", "logicalType": 
>>> "timestamp-millis"}}
>>>
>>> When trying to insert into the table we get the following error:
>>>
>>> Caused by: java.lang.ClassCastException: class java.time.LocalDateTime 
>>> cannot be cast to class java.lang.Long (java.time.LocalDateTime and 
>>> java.lang.Long are in module java.base of loader 'bootstrap')    at 
>>> org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:131)
>>>        at 
>>> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:72)
>>>
>>> It seems like the avro format (serializer) is not aware of the logical type 
>>> conversion that is needed to convert back to the physical type long.
>>>
>>> I looked at the AvroTypesITCase which uses all kinds of logical types but I 
>>> could only find logic that maps between Avro Pojos and tables and none that 
>>> actually uses the serializaiton/deserialization logic with the format.
>>>
>>> Could someone please help me with this? Maybe what I am trying to do is not 
>>> possible, or I just missed a crucial step.
>>>
>>> Thank you!
>>> Gyula
>>>
>>>
>>>
>>>
>
> --
>
> Arvid Heise | Senior Java Developer
>
> <https://www.ververica.com/>
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> (Toni) Cheng
>

Reply via email to