Okay bummer, but not completely unexpected.

The conversions should be automatically compiled into SpecificRecords. I'm
not sure how the Table API is doing it internally; I just saw
SpecificRecord in your stacktrace and figured to try it out.

On Thu, Apr 30, 2020 at 3:35 PM Gyula Fóra <gyula.f...@gmail.com> wrote:

> Hi Arvid!
>
> I tried it with Avro 1.9.2, and it lead to the same error.
> Seems like Avro cannot find the conversion class between LocalDateTime and
> timestamp-millis.
> Not sure how exactly this works, maybe we need to set the conversions
> ourselves?
>
> Thanks!
> Gyula
>
> On Thu, Apr 30, 2020 at 12:53 PM Arvid Heise <ar...@ververica.com> wrote:
>
>> Hi Gyula,
>>
>> it may still be worth to try to upgrade to Avro 1.9.2 (can never hurt)
>> and see if this solves your particular problem.
>> The code path in GenericDatumWriter is taking the conversion path, so it
>> might just work. Of course that depends on the schema being correctly
>> translated to a specific record that uses the new TimeConversions [1].
>>
>> [1]
>> https://github.com/apache/avro/blob/master/lang/java/avro/src/main/java/org/apache/avro/data/TimeConversions.java
>>
>> On Thu, Apr 30, 2020 at 10:41 AM Gyula Fóra <gyula.f...@gmail.com> wrote:
>>
>>> Hi!
>>>
>>> @Arvid: We are using Avro 1.8 I believe but this problem seems to come
>>> from the flink side as Dawid mentioned.
>>>
>>> @Dawid:
>>> Sounds like a reasonable explanation, here are the actual queries to
>>> reproduce within the SQL client/table api:
>>>
>>> CREATE TABLE source_table (
>>>     int_field INT,
>>>     timestamp_field TIMESTAMP(3)
>>> ) WITH (
>>>     'connector.type'                           = 'kafka',
>>>     'connector.version'                        = 'universal',
>>>     'connector.topic'                          = 'avro_tset',
>>>     'connector.properties.bootstrap.servers'   = '<...>',
>>>     'format.type'                              = 'avro',
>>>     'format.avro-schema' =
>>>     '{
>>>       "type": "record",
>>>       "name": "test",
>>>       "fields" : [
>>>         {"name": "int_field", "type": "int"},
>>>         {"name": "timestamp_field", "type": {"type":"long", "logicalType": 
>>> "timestamp-millis"}}
>>>       ]
>>>     }'
>>> )
>>>
>>> INSERT INTO source_table VALUES (12, TIMESTAMP '1999-11-11 11:11:11');
>>> And the error:
>>>
>>> Caused by: java.lang.ClassCastException: java.time.LocalDateTime cannot be 
>>> cast to java.lang.Long
>>>     at 
>>> org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:131)
>>>     at 
>>> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:72)
>>>     at 
>>> org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:166)
>>>     at 
>>> org.apache.avro.specific.SpecificDatumWriter.writeField(SpecificDatumWriter.java:90)
>>>     at 
>>> org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:156)
>>>     at 
>>> org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:118)
>>>     at 
>>> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:75)
>>>     at 
>>> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:62)
>>>     at 
>>> org.apache.flink.formats.avro.AvroRowSerializationSchema.serialize(AvroRowSerializationSchema.java:143)
>>>
>>> I will open a Jira ticket as well with these details.
>>>
>>> Thank you!
>>>
>>> Gyula
>>>
>>>
>>> On Thu, Apr 30, 2020 at 10:05 AM Dawid Wysakowicz <
>>> dwysakow...@apache.org> wrote:
>>>
>>>> Hi Gyula,
>>>>
>>>> I have not verified it locally yet, but I think you are hitting yet
>>>> another problem of the unfinished migration from old TypeInformation based
>>>> type system to the new type system based on DataTypes. As far as I
>>>> understand the problem the information about the bridging class
>>>> (java.sql.Timestamp in this case) is lost in the stack. Because this
>>>> information is lost/not respected the planner produces LocalDateTime
>>>> instead of a proper java.sql.Timestamp time. The AvroRowSerializationSchema
>>>> expects java.sql.Timestamp for a column of TIMESTAMP type and thus it fails
>>>> for LocalDateTime. I really hope the effort of FLIP-95 will significantly
>>>> reduce the number of problems.
>>>>
>>>> It's definitely worth reporting a bug.
>>>>
>>>> BTW could you share how you create the Kafka Table sink to have the
>>>> full picture?
>>>>
>>>> Best,
>>>>
>>>> Dawid
>>>> On 29/04/2020 15:42, Gyula Fóra wrote:
>>>>
>>>> Hi All!
>>>>
>>>> We are trying to work with avro serialized data from Kafka using the
>>>> Table API and use TIMESTAMP column type.
>>>>
>>>> According to the docs
>>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#apache-avro-format>,
>>>> we can use long type with logicalType: timestamp-millis.
>>>> So we use the following avro field schema in the descriptor:
>>>>
>>>>
>>>>   {"name": "timestamp_field", "type": {"type":"long", "logicalType": 
>>>> "timestamp-millis"}}
>>>>
>>>> When trying to insert into the table we get the following error:
>>>>
>>>> Caused by: java.lang.ClassCastException: class java.time.LocalDateTime 
>>>> cannot be cast to class java.lang.Long (java.time.LocalDateTime and 
>>>> java.lang.Long are in module java.base of loader 'bootstrap')   at 
>>>> org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:131)
>>>>        at 
>>>> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:72)
>>>>
>>>> It seems like the avro format (serializer) is not aware of the logical 
>>>> type conversion that is needed to convert back to the physical type long.
>>>>
>>>> I looked at the AvroTypesITCase which uses all kinds of logical types but 
>>>> I could only find logic that maps between Avro Pojos and tables and none 
>>>> that actually uses the serializaiton/deserialization logic with the format.
>>>>
>>>> Could someone please help me with this? Maybe what I am trying to do is 
>>>> not possible, or I just missed a crucial step.
>>>>
>>>> Thank you!
>>>> Gyula
>>>>
>>>>
>>>>
>>>>
>>
>> --
>>
>> Arvid Heise | Senior Java Developer
>>
>> <https://www.ververica.com/>
>>
>> Follow us @VervericaData
>>
>> --
>>
>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>> Conference
>>
>> Stream Processing | Event Driven | Real Time
>>
>> --
>>
>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>
>> --
>> Ververica GmbH
>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
>> (Toni) Cheng
>>
>

-- 

Arvid Heise | Senior Java Developer

<https://www.ververica.com/>

Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng

Reply via email to