Check this answer: https://stackoverflow.com/a/64721838/5793905

You could then use, for example, something like: new
SetTypeInfo(Types.STRING) instead of Types.LIST(Types.STRING)

Am Di., 15. Aug. 2023 um 10:40 Uhr schrieb <s...@sammar.sa>:

> Hello Alexis,
>
> Thank you for sharing the helper classes this but unfortunately I have no
> idea how to use these classes or how they might be able to help me. This is
> all very new to me and I honestly can't wrap my head around Flink's type
> information system.
>
> Best regards,
> Saleh.
>
> On 14 Aug 2023, at 4:05 PM, Alexis Sarda-Espinosa <
> sarda.espin...@gmail.com> wrote:
>
> Hello,
>
> AFAIK you cannot avoid TypeInformationFactory due to type erasure, nothing
> Flink can do about that. Here's an example of helper classes I've been
> using to support set serde in Flink POJOs, but note that it's hardcoded for
> LinkedHashSet, so you would have to create different implementations if you
> need to differentiate sorted sets:
>
> https://gist.github.com/asardaes/714b8c1db0c4020f5fde9865b95fc398
>
> Regards,
> Alexis.
>
>
> Am Mo., 14. Aug. 2023 um 12:14 Uhr schrieb <s...@sammar.sa>:
>
>> Hi,
>>
>> Here's a minimal example using an ArrayList, a HashSet, and a TreeSet:
>> ```
>> package com.example;
>> import java.util.ArrayList;
>> import java.util.HashSet;
>> import java.util.TreeSet;
>>
>> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>> public class App {
>>     public static class Pojo {
>>         public ArrayList<Integer> list;
>>         public HashSet<Integer> set;
>>         public TreeSet<Integer> treeset;
>>         public Pojo() {
>>             this.list = new ArrayList<>();
>>             this.set = new HashSet<>();
>>             this.treeset = new TreeSet<>();
>>         }
>>     }
>>     public static void main(String[] args) throws Exception {
>>         var env = StreamExecutionEnvironment.getExecutionEnvironment();
>>         env.getConfig().disableGenericTypes();
>>         env.fromElements(new Pojo()).print();
>>         env.execute("Pipeline");
>>     }
>> }
>> ```
>>
>> The result of running:
>> ```
>> 13:08:20,074 INFO  org.apache.flink.api.java.typeutils.TypeExtractor
>>        [] - class java.util.ArrayList does not contain a setter for field
>> size
>> 13:08:20,077 INFO  org.apache.flink.api.java.typeutils.TypeExtractor
>>        [] - Class class java.util.ArrayList cannot be used as a POJO
>> type because not all fields are valid POJO fields, and must be processed as
>> GenericType. Please read the Flink documentation on "Data Types
>> & Serialization" for details of the effect on performance and schema
>> evolution.
>> 13:08:20,078 INFO  org.apache.flink.api.java.typeutils.TypeExtractor
>>        [] - Field Pojo#list will be processed as GenericType. Please read
>> the Flink documentation on "Data Types & Serialization" for details of the
>> effect on performance and schema evolution.
>> 13:08:20,079 INFO  org.apache.flink.api.java.typeutils.TypeExtractor
>>        [] - No fields were detected for class java.util.HashSet so it
>> cannot be used as a POJO type and must be processed as GenericType. Please
>> read the Flink documentation on "Data Types & Serialization" for details
>> of the effect on performance and schema evolution.
>> 13:08:20,079 INFO  org.apache.flink.api.java.typeutils.TypeExtractor
>>        [] - Field Pojo#set will be processed as GenericType. Please read
>> the Flink documentation on "Data Types & Serialization" for details of the
>> effect on performance and schema evolution.
>> 13:08:20,079 INFO  org.apache.flink.api.java.typeutils.TypeExtractor
>>        [] - No fields were detected for class java.util.TreeSet so it
>> cannot be used as a POJO type and must be processed as GenericType. Please
>> read the Flink documentation on "Data Types & Serialization" for details
>> of the effect on performance and schema evolution.
>> 13:08:20,079 INFO  org.apache.flink.api.java.typeutils.TypeExtractor
>>        [] - Field Pojo#sset will be processed as GenericType. Please read
>> the Flink documentation on "Data Types & Serialization" for details of the
>> effect on performance and schema evolution.
>> WARNING: An illegal reflective access operation has occurred
>> WARNING: Illegal reflective access by
>> org.apache.flink.api.java.ClosureCleaner
>> (file:/Users/sammar/.m2/repository/org/apache/flink/flink-core/1.17.1/flink-core-1.17.1.jar)
>> to field java.lang.String.value
>> WARNING: Please consider reporting this to the maintainers of
>> org.apache.flink.api.java.ClosureCleaner
>> WARNING: Use --illegal-access=warn to enable warnings of further illegal
>> reflective access operations
>> WARNING: All illegal access operations will be denied in a future release
>> Exception in thread "main" java.lang.UnsupportedOperationException:
>> Generic types have been disabled in the ExecutionConfig and
>> type java.util.ArrayList is treated as a generic type.
>>         at
>> org.apache.flink.api.java.typeutils.GenericTypeInfo.createSerializer(GenericTypeInfo.java:87)
>>         at
>> org.apache.flink.api.java.typeutils.PojoTypeInfo.createPojoSerializer(PojoTypeInfo.java:350)
>>         at
>> org.apache.flink.api.java.typeutils.PojoTypeInfo.createSerializer(PojoTypeInfo.java:342)
>>         at
>> org.apache.flink.streaming.api.graph.StreamGraph.createSerializer(StreamGraph.java:1037)
>>         at
>> org.apache.flink.streaming.api.graph.StreamGraph.addOperator(StreamGraph.java:419)
>>         at
>> org.apache.flink.streaming.api.graph.StreamGraph.addOperator(StreamGraph.java:391)
>>         at
>> org.apache.flink.streaming.api.graph.StreamGraph.addLegacySource(StreamGraph.java:345)
>>
>> at 
>> org.apache.flink.streaming.runtime.translators.LegacySourceTransformationTranslator.translateInternal(LegacySourceTransformationTranslator.java:66)
>>
>> at 
>> org.apache.flink.streaming.runtime.translators.LegacySourceTransformationTranslator.translateForStreamingInternal(LegacySourceTransformationTranslator.java:53)
>>
>> at 
>> org.apache.flink.streaming.runtime.translators.LegacySourceTransformationTranslator.translateForStreamingInternal(LegacySourceTransformationTranslator.java:40)
>>         at
>> org.apache.flink.streaming.api.graph.SimpleTransformationTranslator.translateForStreaming(SimpleTransformationTranslator.java:62)
>>         at
>> org.apache.flink.streaming.api.graph.StreamGraphGenerator.translate(StreamGraphGenerator.java:849)
>>         at
>> org.apache.flink.streaming.api.graph.StreamGraphGenerator.transform(StreamGraphGenerator.java:579)
>>         at
>> org.apache.flink.streaming.api.graph.StreamGraphGenerator.getParentInputIds(StreamGraphGenerator.java:870)
>>         at
>> org.apache.flink.streaming.api.graph.StreamGraphGenerator.translate(StreamGraphGenerator.java:828)
>>         at
>> org.apache.flink.streaming.api.graph.StreamGraphGenerator.transform(StreamGraphGenerator.java:579)
>>         at
>> org.apache.flink.streaming.api.graph.StreamGraphGenerator.generate(StreamGraphGenerator.java:319)
>>         at
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:2248)
>>         at
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:2239)
>>         at
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:2225)
>>         at
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:2052)
>>         at com.example.App.main(App.java:26)
>> ```
>>
>> Best regards,
>> Saleh.
>>
>>
>> On 14 Aug 2023, at 12:48 PM, Alexey Novakov via user <
>> user@flink.apache.org> wrote:
>>
>> Hi Saleh,
>>
>> If you could show us the minimal code example of the issue (event
>> classes), I think someone could help you to solve it.
>>
>> Best regards,
>> Alexey
>>
>> On Mon, Aug 14, 2023 at 9:23 AM <s...@sammar.sa> wrote:
>>
>>> Hi,
>>>
>>> According to this blog post
>>> https://flink.apache.org/2020/04/15/flink-serialization-tuning-vol.-1-choosing-your-serializer-if-you-can/#pojoserializer
>>> The "Must be processed as GenericType" message means that the POJO
>>> serializer will not be used and instead, Kyro will be used.
>>>
>>> I created a simple POJO to test it again with a java Collection but I
>>> got the same message. Disabling generic types throws an exception.
>>>
>>> I'm not sure how to use these types along with the POJO serializer or
>>> any other fast serializer.
>>>
>>> Best regards,
>>> Saleh.
>>>
>>>
>>>
>>> On 14 Aug 2023, at 4:59 AM, liu ron <ron9....@gmail.com> wrote:
>>>
>>> Hi,
>>>
>>> According to the test in [1], I think Flink can recognize Pojo class
>>> which contains java List, so I think you can refer to the related Pojo
>>> class implementation.
>>>
>>> [1]
>>> https://github.com/apache/flink/blob/fc2b5d8f53a41695117f6eaf4c798cc183cf1e36/flink-core/src/test/java/org/apache/flink/api/java/typeutils/PojoTypeExtractionTest.java#L192
>>>
>>> Best,
>>> Ron
>>>
>>> <s...@sammar.sa> 于2023年8月13日周日 22:50写道:
>>>
>>>> Greetings,
>>>>
>>>> I am working on a project that needs to process around 100k events per
>>>> second and I'm trying to improve performance.
>>>>
>>>> Most of the classes being used are POJOs but have a couple of fields
>>>> using a `java.util` class, either `ArrayList`, `HashSet` or `SortedSet`
>>>> etc. This forces Flink to use Kyro and throw these warnings:
>>>>
>>>> ```
>>>> class java.util.ArrayList does not contain a setter for field size
>>>> Class class java.util.ArrayList cannot be used as a POJO type because
>>>> not all fields are valid POJO fields, and must be processed as GenericType.
>>>> Please read the Flink documentation on "Data Types & Serialization" for
>>>> details of the effect on performance and schema evolution.
>>>> ```
>>>>
>>>> ```
>>>> No fields were detected for class java.util.HashSet so it cannot be
>>>> used as a POJO type and must be processed as GenericType. Please read the
>>>> Flink documentation on "Data Types & Serialization" for details of the
>>>> effect on performance and schema evolution.
>>>> I read through the documentation and stackoverflow and the conclusion
>>>> is that I need to make a TypeInfoFactory and use it inside a TypeInfo
>>>> annotation over my POJO.
>>>> ```
>>>>
>>>> My question is what do I need to do to get Flink to recognize my
>>>> classes as POJOs and use the POJO serializer for better performance?
>>>> I read through the documentation and stackoverflow and the conclusion
>>>> is that I need to make a TypeInfoFactory and use it inside a TypeInfo
>>>> annotation over my POJO.
>>>> While this seems incredibly tedious and I keep thinking "there must be
>>>> a better way", I would be fine with this solution if I could figure out how
>>>> to do this for the Set types I'm using.
>>>>
>>>> Any help would be appreciated.
>>>
>>>
>>>
>>
>

Reply via email to