Hi Peter,
as a temporary workaround I would simply implement a UDF like:
public class EverythingToString extends ScalarFunction {
public String eval(@DataTypeHint(inputGroup = ANY) Object o) {
return o.toString();
}
}
For the Utf8 issue, you can instruct Avro to generate Java classes with
String instead using the `avro.java.string` option.
The rework of the type system messed up the Avro support in Flink. This
is a known issue that is tracked under
https://issues.apache.org/jira/browse/FLINK-8183
Regards,
Timo
On 20.10.21 17:30, Peter Schrott wrote:
Hi Timo,
thanks a lot for your suggestion.
I also considered this workaround but when going from DataStreams API to
Table API (using the POJO generated by maven avro plugin) types are not
mapped correctly, esp. UTF8 (avros implementation of CharSquence) and
also enums. In the table I have then mostly RAW types, which are not
handy to perform SQL statements on. It is already discussed here:
https://www.mail-archive.com/user@flink.apache.org/msg44449.html
<https://www.mail-archive.com/user@flink.apache.org/msg44449.html>
Best, Peter
On Wed, Oct 20, 2021 at 5:21 PM Timo Walther <twal...@apache.org
<mailto:twal...@apache.org>> wrote:
A current workaround is to use DataStream API to read the data and
provide your custom Avro schema to configure the format. Then switch to
Table API.
StreamTableEnvironment.fromDataStream(...) accepts all data types. Enum
classes will be represented as RAW types but you can forward them as
blackboxes or convert them in a UDF.
We will further improve the support of external types in the Table API
type system in the near future.
Regards,
Timo
On 20.10.21 15:51, Peter Schrott wrote:
> Hi people!
>
> I was digging deeper this days and found the "root cause" of the
issue and the difference between avro reading from files and avro
reading from Kafka & SR.
>
> plz see:
https://lists.apache.org/x/thread.html/r8ad7bd574f7dc4904139295c7de612a35438571c5b9caac673521d22@%3Cuser.flink.apache.org%3E
<https://lists.apache.org/x/thread.html/r8ad7bd574f7dc4904139295c7de612a35438571c5b9caac673521d22@%3Cuser.flink.apache.org%3E>
>
> The main problem with Kafka & SR is, that the
"org.apache.avro.generic.GenericDatumReader" is initialized with and
"expected" schema which is taken from the flinks sql table
definition. When it comes to deserializing the and attribute with
type "enum" it does not match with the expected schema where this
same attribute is typed as "string". Hence avro deserializer breaks
here.
>
> Not sure how to tackle that issue. The functioning of the
"GeneraticDatumReader" can not really be changed. A solution could
be to create an analogues reader for reading data based on SQL ddl.
>
> Cheers, Peter
>
> On 2021/10/12 16:18:30 Dongwon Kim wrote:
>> Hi community,
>>
>> Can I get advice on this question?
>>
>> Another user just sent me an email asking whether I found a
solution or a
>> workaround for this question, but I'm still stuck there.
>>
>> Any suggestions?
>>
>> Thanks in advance,
>>
>> Dongwon
>>
>> ---------- Forwarded message ---------
>> From: Dongwon Kim <eastcirc...@gmail.com
<mailto:eastcirc...@gmail.com>>
>> Date: Mon, Aug 9, 2021 at 7:26 PM
>> Subject: How to deserialize Avro enum type in Flink SQL?
>> To: user <user@flink.apache.org <mailto:user@flink.apache.org>>
>>
>>
>> Hi community,
>>
>> I have a Kafka topic where the schema of its values is defined
by the
>> "MyRecord" record in the following Avro IDL and registered to
the Confluent
>> Schema Registry.
>>
>>> @namespace("my.type.avro")
>>> protocol MyProtocol {
>>> enum MyEnumType {
>>> TypeVal1, TypeVal2
>>> }
>>> record MyEntry {
>>> MyEnumType type;
>>> }
>>> record MyRecord {
>>> array<MyEntry> entries;
>>> }
>>> }
>>
>>
>> To read from the topic, I've defined the following DDL:
>>
>>> CREATE TABLE my_table
>>
>> (
>>> `entries` ARRAY<ROW<
>>> *`type` ??? (This is the main question)*
>>> >>
>>> ) WITH (
>>> 'connector' = 'kafka',
>>> 'topic' = 'my-topic',
>>> 'properties.bootstrap.servers' = '...:9092',
>>> 'scan.startup.mode' = 'latest-offset',
>>> 'value.format' = 'avro-confluent',
>>> 'value.avro-confluent.schema-registry.url' = 'http://...:8081'
>>>
>> )
>>
>>
>> And I run the following query :
>>
>>> SELECT * FROM my_table
>>
>>
>> Now I got the following messages in Flink-1.13.1 when I use
*STRING* for
>> the type:
>>
>>> *Caused by: java.io.IOException: Failed to deserialize Avro
record.*
>>> at
>>>
org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:106)
>>> at
>>>
org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:46)
>>> at
>>>
org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82)
>>> at
>>>
org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.deserialize(DynamicKafkaDeserializationSchema.java:113)
>>> at
>>>
org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:179)
>>> at
>>>
org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142)
>>> at
>>>
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826)
>>> at
>>>
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
>>> at
>>>
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
>>> at
>>>
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)
>>> *Caused by: org.apache.avro.AvroTypeException: Found
>>> my.type.avro.MyEnumType, expecting union*
>>> at
>>> org.apache.avro.io
<http://org.apache.avro.io>.ResolvingDecoder.doAction(ResolvingDecoder.java:308)
>>> at org.apache.avro.io.parsing.Parser.advance(Parser.java:86)
>>> at
>>> org.apache.avro.io
<http://org.apache.avro.io>.ResolvingDecoder.readIndex(ResolvingDecoder.java:275)
>>> at
>>>
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:187)
>>> at
>>>
org.apache.avro.generic.GenericDatumReader.readArray(GenericDatumReader.java:298)
>>> at
>>>
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:183)
>>> at
>>>
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
>>> at
>>>
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:187)
>>> at
>>>
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
>>> at
>>>
org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:259)
>>> at
>>>
org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247)
>>> at
>>>
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
>>> at
>>>
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
>>> at
>>>
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
>>> at
>>>
org.apache.flink.formats.avro.RegistryAvroDeserializationSchema.deserialize(RegistryAvroDeserializationSchema.java:81)
>>> at
>>>
org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:103)
>>> ... 9 more
>>
>> The reason I use the STRING type is just for fast-prototyping.
>>
>> While reading through [1], I've been thinking about using
*RAW('class',
>> 'snapshot')* where 'class' is my.type.avro.MyEnumType, but I'm
not sure
>> whether it is a good idea and if so, what can be a value for the
snapshot.
>>
>> [1]
>>
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/types/#raw
<https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/types/#raw>
>>
>> Thanks in advance,
>>
>> Dongwon
>>
>