Hi,

I have read [1] when it comes to using Avro for serialization, but I'm
stuck with a mysterious exception when Flink is doing type resolution.
(Flink 1.13.1)

Basically, I'm able to use a SpecificRecord type in my source, but I am
unable to use different SpecificRecord types later in the pipeline, getting
an exception "Expecting type to be a PojoTypeInfo" from AvroTypeInfo[2].

Let's say I have a schema "Foo" with one field "foo" of type "Bar", and
schema "Bar" with one field "message" of type "string". My input data is a
single Foo record of the form {"foo": {"message": "hi"}}.

This works:

    env.fromElements(myInput).print();

But this does not:

    env.fromElements(myInput).map(foo -> (Bar) foo.getFoo()).print();

(nor does it work if I use a full MapFunction<Foo, Bar>)

Does anyone know what I might be running into here? If necessary, I can put
together a full reproducing.

[1]
https://flink.apache.org/news/2020/04/15/flink-serialization-tuning-vol-1.html#avro
[2]
https://github.com/apache/flink/blob/release-1.13.1/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroTypeInfo.java#L72

Thanks,
Patrick

Reply via email to