Re: Question about serialization of java.util classes

2023-08-15 Thread Alexis Sarda-Espinosa
Check this answer: https://stackoverflow.com/a/64721838/5793905

You could then use, for example, something like: new
SetTypeInfo(Types.STRING) instead of Types.LIST(Types.STRING)

Am Di., 15. Aug. 2023 um 10:40 Uhr schrieb :

> Hello Alexis,
>
> Thank you for sharing the helper classes this but unfortunately I have no
> idea how to use these classes or how they might be able to help me. This is
> all very new to me and I honestly can't wrap my head around Flink's type
> information system.
>
> Best regards,
> Saleh.
>
> On 14 Aug 2023, at 4:05 PM, Alexis Sarda-Espinosa <
> sarda.espin...@gmail.com> wrote:
>
> Hello,
>
> AFAIK you cannot avoid TypeInformationFactory due to type erasure, nothing
> Flink can do about that. Here's an example of helper classes I've been
> using to support set serde in Flink POJOs, but note that it's hardcoded for
> LinkedHashSet, so you would have to create different implementations if you
> need to differentiate sorted sets:
>
> https://gist.github.com/asardaes/714b8c1db0c4020f5fde9865b95fc398
>
> Regards,
> Alexis.
>
>
> Am Mo., 14. Aug. 2023 um 12:14 Uhr schrieb :
>
>> Hi,
>>
>> Here's a minimal example using an ArrayList, a HashSet, and a TreeSet:
>> ```
>> package com.example;
>> import java.util.ArrayList;
>> import java.util.HashSet;
>> import java.util.TreeSet;
>>
>> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>> public class App {
>> public static class Pojo {
>> public ArrayList list;
>> public HashSet set;
>> public TreeSet treeset;
>> public Pojo() {
>> this.list = new ArrayList<>();
>> this.set = new HashSet<>();
>> this.treeset = new TreeSet<>();
>> }
>> }
>> public static void main(String[] args) throws Exception {
>> var env = StreamExecutionEnvironment.getExecutionEnvironment();
>> env.getConfig().disableGenericTypes();
>> env.fromElements(new Pojo()).print();
>> env.execute("Pipeline");
>> }
>> }
>> ```
>>
>> The result of running:
>> ```
>> 13:08:20,074 INFO  org.apache.flink.api.java.typeutils.TypeExtractor
>>[] - class java.util.ArrayList does not contain a setter for field
>> size
>> 13:08:20,077 INFO  org.apache.flink.api.java.typeutils.TypeExtractor
>>[] - Class class java.util.ArrayList cannot be used as a POJO
>> type because not all fields are valid POJO fields, and must be processed as
>> GenericType. Please read the Flink documentation on "Data Types
>> & Serialization" for details of the effect on performance and schema
>> evolution.
>> 13:08:20,078 INFO  org.apache.flink.api.java.typeutils.TypeExtractor
>>[] - Field Pojo#list will be processed as GenericType. Please read
>> the Flink documentation on "Data Types & Serialization" for details of the
>> effect on performance and schema evolution.
>> 13:08:20,079 INFO  org.apache.flink.api.java.typeutils.TypeExtractor
>>[] - No fields were detected for class java.util.HashSet so it
>> cannot be used as a POJO type and must be processed as GenericType. Please
>> read the Flink documentation on "Data Types & Serialization" for details
>> of the effect on performance and schema evolution.
>> 13:08:20,079 INFO  org.apache.flink.api.java.typeutils.TypeExtractor
>>[] - Field Pojo#set will be processed as GenericType. Please read
>> the Flink documentation on "Data Types & Serialization" for details of the
>> effect on performance and schema evolution.
>> 13:08:20,079 INFO  org.apache.flink.api.java.typeutils.TypeExtractor
>>[] - No fields were detected for class java.util.TreeSet so it
>> cannot be used as a POJO type and must be processed as GenericType. Please
>> read the Flink documentation on "Data Types & Serialization" for details
>> of the effect on performance and schema evolution.
>> 13:08:20,079 INFO  org.apache.flink.api.java.typeutils.TypeExtractor
>>[] - Field Pojo#sset will be processed as GenericType. Please read
>> the Flink documentation on "Data Types & Serialization" for details of the
>> effect on performance and schema evolution.
>> WARNING: An illegal reflective access operation has occurred
>> WARNING: Illegal reflective access by
>> org.apache.flink.api.java.ClosureCleaner
>> (file:/Users/sammar/.m2/repository/org/apache/flink/flink-core/1.17.1/flink-core-1.17.1.jar)
>> to field java.lang.String.value
>> WARNING: Please consider reporting this to the maintainers of
>> org.apache.flink.api.java.ClosureCleaner
>> WARNING: Use --illegal-access=warn to enable warnings of further illegal
>> reflective access operations
>> WARNING: All illegal access operations will be denied in a future release
>> Exception in thread "main" java.lang.UnsupportedOperationException:
>> Generic types have been disabled in the ExecutionConfig and
>> type java.util.ArrayList is treated as a generic type.
>> at
>> 

Re: Question about serialization of java.util classes

2023-08-15 Thread s
Hello Alexis,

Thank you for sharing the helper classes this but unfortunately I have no idea 
how to use these classes or how they might be able to help me. This is all very 
new to me and I honestly can't wrap my head around Flink's type information 
system.

Best regards,
Saleh.

> On 14 Aug 2023, at 4:05 PM, Alexis Sarda-Espinosa  
> wrote:
> 
> Hello,
> 
> AFAIK you cannot avoid TypeInformationFactory due to type erasure, nothing 
> Flink can do about that. Here's an example of helper classes I've been using 
> to support set serde in Flink POJOs, but note that it's hardcoded for 
> LinkedHashSet, so you would have to create different implementations if you 
> need to differentiate sorted sets:
> 
> https://gist.github.com/asardaes/714b8c1db0c4020f5fde9865b95fc398 
> 
> 
> Regards,
> Alexis.
> 
> 
> Am Mo., 14. Aug. 2023 um 12:14 Uhr schrieb  >:
> Hi,
> 
> Here's a minimal example using an ArrayList, a HashSet, and a TreeSet:
> ```
> package com.example;
> import java.util.ArrayList;
> import java.util.HashSet;
> import java.util.TreeSet;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> public class App {
> public static class Pojo {
> public ArrayList list;
> public HashSet set;
> public TreeSet treeset;
> public Pojo() {
> this.list = new ArrayList<>();
> this.set = new HashSet<>();
> this.treeset = new TreeSet<>();
> }
> }
> public static void main(String[] args) throws Exception {
> var env = StreamExecutionEnvironment.getExecutionEnvironment();
> env.getConfig().disableGenericTypes();
> env.fromElements(new Pojo()).print();
> env.execute("Pipeline");
> }
> }
> ```
> 
> The result of running:
> ```
> 13:08:20,074 INFO  org.apache.flink.api.java.typeutils.TypeExtractor  
>   [] - class java.util.ArrayList does not contain a setter for field size
> 13:08:20,077 INFO  org.apache.flink.api.java.typeutils.TypeExtractor  
>   [] - Class class java.util.ArrayList cannot be used as a POJO type because 
> not all fields are valid POJO fields, and must be processed as GenericType. 
> Please read the Flink documentation on "Data Types & Serialization" for 
> details of the effect on performance and schema evolution.
> 13:08:20,078 INFO  org.apache.flink.api.java.typeutils.TypeExtractor  
>   [] - Field Pojo#list will be processed as GenericType. Please read the 
> Flink documentation on "Data Types & Serialization" for details of the effect 
> on performance and schema evolution.
> 13:08:20,079 INFO  org.apache.flink.api.java.typeutils.TypeExtractor  
>   [] - No fields were detected for class java.util.HashSet so it cannot be 
> used as a POJO type and must be processed as GenericType. Please read the 
> Flink documentation on "Data Types & Serialization" for details of the effect 
> on performance and schema evolution.
> 13:08:20,079 INFO  org.apache.flink.api.java.typeutils.TypeExtractor  
>   [] - Field Pojo#set will be processed as GenericType. Please read the Flink 
> documentation on "Data Types & Serialization" for details of the effect on 
> performance and schema evolution.
> 13:08:20,079 INFO  org.apache.flink.api.java.typeutils.TypeExtractor  
>   [] - No fields were detected for class java.util.TreeSet so it cannot be 
> used as a POJO type and must be processed as GenericType. Please read the 
> Flink documentation on "Data Types & Serialization" for details of the effect 
> on performance and schema evolution.
> 13:08:20,079 INFO  org.apache.flink.api.java.typeutils.TypeExtractor  
>   [] - Field Pojo#sset will be processed as GenericType. Please read the 
> Flink documentation on "Data Types & Serialization" for details of the effect 
> on performance and schema evolution.
> WARNING: An illegal reflective access operation has occurred
> WARNING: Illegal reflective access by 
> org.apache.flink.api.java.ClosureCleaner 
> (file:/Users/sammar/.m2/repository/org/apache/flink/flink-core/1.17.1/flink-core-1.17.1.jar)
>  to field java.lang.String.value
> WARNING: Please consider reporting this to the maintainers of 
> org.apache.flink.api.java.ClosureCleaner
> WARNING: Use --illegal-access=warn to enable warnings of further illegal 
> reflective access operations
> WARNING: All illegal access operations will be denied in a future release
> Exception in thread "main" java.lang.UnsupportedOperationException: Generic 
> types have been disabled in the ExecutionConfig and type java.util.ArrayList 
> is treated as a generic type.
> at 
> org.apache.flink.api.java.typeutils.GenericTypeInfo.createSerializer(GenericTypeInfo.java:87)
> at 
> org.apache.flink.api.java.typeutils.PojoTypeInfo.createPojoSerializer(PojoTypeInfo.java:350)
> at 
> 

Re: Question about serialization of java.util classes

2023-08-14 Thread Alexis Sarda-Espinosa
Hello,

AFAIK you cannot avoid TypeInformationFactory due to type erasure, nothing
Flink can do about that. Here's an example of helper classes I've been
using to support set serde in Flink POJOs, but note that it's hardcoded for
LinkedHashSet, so you would have to create different implementations if you
need to differentiate sorted sets:

https://gist.github.com/asardaes/714b8c1db0c4020f5fde9865b95fc398

Regards,
Alexis.


Am Mo., 14. Aug. 2023 um 12:14 Uhr schrieb :

> Hi,
>
> Here's a minimal example using an ArrayList, a HashSet, and a TreeSet:
> ```
> package com.example;
> import java.util.ArrayList;
> import java.util.HashSet;
> import java.util.TreeSet;
>
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> public class App {
> public static class Pojo {
> public ArrayList list;
> public HashSet set;
> public TreeSet treeset;
> public Pojo() {
> this.list = new ArrayList<>();
> this.set = new HashSet<>();
> this.treeset = new TreeSet<>();
> }
> }
> public static void main(String[] args) throws Exception {
> var env = StreamExecutionEnvironment.getExecutionEnvironment();
> env.getConfig().disableGenericTypes();
> env.fromElements(new Pojo()).print();
> env.execute("Pipeline");
> }
> }
> ```
>
> The result of running:
> ```
> 13:08:20,074 INFO  org.apache.flink.api.java.typeutils.TypeExtractor
>  [] - class java.util.ArrayList does not contain a setter for field size
> 13:08:20,077 INFO  org.apache.flink.api.java.typeutils.TypeExtractor
>  [] - Class class java.util.ArrayList cannot be used as a POJO
> type because not all fields are valid POJO fields, and must be processed as
> GenericType. Please read the Flink documentation on "Data Types
> & Serialization" for details of the effect on performance and schema
> evolution.
> 13:08:20,078 INFO  org.apache.flink.api.java.typeutils.TypeExtractor
>  [] - Field Pojo#list will be processed as GenericType. Please read
> the Flink documentation on "Data Types & Serialization" for details of the
> effect on performance and schema evolution.
> 13:08:20,079 INFO  org.apache.flink.api.java.typeutils.TypeExtractor
>  [] - No fields were detected for class java.util.HashSet so it cannot
> be used as a POJO type and must be processed as GenericType. Please read
> the Flink documentation on "Data Types & Serialization" for details of the
> effect on performance and schema evolution.
> 13:08:20,079 INFO  org.apache.flink.api.java.typeutils.TypeExtractor
>  [] - Field Pojo#set will be processed as GenericType. Please read
> the Flink documentation on "Data Types & Serialization" for details of the
> effect on performance and schema evolution.
> 13:08:20,079 INFO  org.apache.flink.api.java.typeutils.TypeExtractor
>  [] - No fields were detected for class java.util.TreeSet so it cannot
> be used as a POJO type and must be processed as GenericType. Please read
> the Flink documentation on "Data Types & Serialization" for details of the
> effect on performance and schema evolution.
> 13:08:20,079 INFO  org.apache.flink.api.java.typeutils.TypeExtractor
>  [] - Field Pojo#sset will be processed as GenericType. Please read
> the Flink documentation on "Data Types & Serialization" for details of the
> effect on performance and schema evolution.
> WARNING: An illegal reflective access operation has occurred
> WARNING: Illegal reflective access by
> org.apache.flink.api.java.ClosureCleaner
> (file:/Users/sammar/.m2/repository/org/apache/flink/flink-core/1.17.1/flink-core-1.17.1.jar)
> to field java.lang.String.value
> WARNING: Please consider reporting this to the maintainers of
> org.apache.flink.api.java.ClosureCleaner
> WARNING: Use --illegal-access=warn to enable warnings of further illegal
> reflective access operations
> WARNING: All illegal access operations will be denied in a future release
> Exception in thread "main" java.lang.UnsupportedOperationException:
> Generic types have been disabled in the ExecutionConfig and
> type java.util.ArrayList is treated as a generic type.
> at
> org.apache.flink.api.java.typeutils.GenericTypeInfo.createSerializer(GenericTypeInfo.java:87)
> at
> org.apache.flink.api.java.typeutils.PojoTypeInfo.createPojoSerializer(PojoTypeInfo.java:350)
> at
> org.apache.flink.api.java.typeutils.PojoTypeInfo.createSerializer(PojoTypeInfo.java:342)
> at
> org.apache.flink.streaming.api.graph.StreamGraph.createSerializer(StreamGraph.java:1037)
> at
> org.apache.flink.streaming.api.graph.StreamGraph.addOperator(StreamGraph.java:419)
> at
> org.apache.flink.streaming.api.graph.StreamGraph.addOperator(StreamGraph.java:391)
> at
> org.apache.flink.streaming.api.graph.StreamGraph.addLegacySource(StreamGraph.java:345)
>
> at 
> 

Re: Question about serialization of java.util classes

2023-08-14 Thread s
Hi,

Here's a minimal example using an ArrayList, a HashSet, and a TreeSet:
```
package com.example;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.TreeSet;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class App {
public static class Pojo {
public ArrayList list;
public HashSet set;
public TreeSet treeset;
public Pojo() {
this.list = new ArrayList<>();
this.set = new HashSet<>();
this.treeset = new TreeSet<>();
}
}
public static void main(String[] args) throws Exception {
var env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().disableGenericTypes();
env.fromElements(new Pojo()).print();
env.execute("Pipeline");
}
}
```

The result of running:
```
13:08:20,074 INFO  org.apache.flink.api.java.typeutils.TypeExtractor
[] - class java.util.ArrayList does not contain a setter for field size
13:08:20,077 INFO  org.apache.flink.api.java.typeutils.TypeExtractor
[] - Class class java.util.ArrayList cannot be used as a POJO type because not 
all fields are valid POJO fields, and must be processed as GenericType. Please 
read the Flink documentation on "Data Types & Serialization" for details of the 
effect on performance and schema evolution.
13:08:20,078 INFO  org.apache.flink.api.java.typeutils.TypeExtractor
[] - Field Pojo#list will be processed as GenericType. Please read the Flink 
documentation on "Data Types & Serialization" for details of the effect on 
performance and schema evolution.
13:08:20,079 INFO  org.apache.flink.api.java.typeutils.TypeExtractor
[] - No fields were detected for class java.util.HashSet so it cannot be used 
as a POJO type and must be processed as GenericType. Please read the Flink 
documentation on "Data Types & Serialization" for details of the effect on 
performance and schema evolution.
13:08:20,079 INFO  org.apache.flink.api.java.typeutils.TypeExtractor
[] - Field Pojo#set will be processed as GenericType. Please read the Flink 
documentation on "Data Types & Serialization" for details of the effect on 
performance and schema evolution.
13:08:20,079 INFO  org.apache.flink.api.java.typeutils.TypeExtractor
[] - No fields were detected for class java.util.TreeSet so it cannot be used 
as a POJO type and must be processed as GenericType. Please read the Flink 
documentation on "Data Types & Serialization" for details of the effect on 
performance and schema evolution.
13:08:20,079 INFO  org.apache.flink.api.java.typeutils.TypeExtractor
[] - Field Pojo#sset will be processed as GenericType. Please read the Flink 
documentation on "Data Types & Serialization" for details of the effect on 
performance and schema evolution.
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.flink.api.java.ClosureCleaner 
(file:/Users/sammar/.m2/repository/org/apache/flink/flink-core/1.17.1/flink-core-1.17.1.jar)
 to field java.lang.String.value
WARNING: Please consider reporting this to the maintainers of 
org.apache.flink.api.java.ClosureCleaner
WARNING: Use --illegal-access=warn to enable warnings of further illegal 
reflective access operations
WARNING: All illegal access operations will be denied in a future release
Exception in thread "main" java.lang.UnsupportedOperationException: Generic 
types have been disabled in the ExecutionConfig and type java.util.ArrayList is 
treated as a generic type.
at 
org.apache.flink.api.java.typeutils.GenericTypeInfo.createSerializer(GenericTypeInfo.java:87)
at 
org.apache.flink.api.java.typeutils.PojoTypeInfo.createPojoSerializer(PojoTypeInfo.java:350)
at 
org.apache.flink.api.java.typeutils.PojoTypeInfo.createSerializer(PojoTypeInfo.java:342)
at 
org.apache.flink.streaming.api.graph.StreamGraph.createSerializer(StreamGraph.java:1037)
at 
org.apache.flink.streaming.api.graph.StreamGraph.addOperator(StreamGraph.java:419)
at 
org.apache.flink.streaming.api.graph.StreamGraph.addOperator(StreamGraph.java:391)
at 
org.apache.flink.streaming.api.graph.StreamGraph.addLegacySource(StreamGraph.java:345)
at 
org.apache.flink.streaming.runtime.translators.LegacySourceTransformationTranslator.translateInternal(LegacySourceTransformationTranslator.java:66)
at 
org.apache.flink.streaming.runtime.translators.LegacySourceTransformationTranslator.translateForStreamingInternal(LegacySourceTransformationTranslator.java:53)
at 
org.apache.flink.streaming.runtime.translators.LegacySourceTransformationTranslator.translateForStreamingInternal(LegacySourceTransformationTranslator.java:40)
at 
org.apache.flink.streaming.api.graph.SimpleTransformationTranslator.translateForStreaming(SimpleTransformationTranslator.java:62)
at 

Re: Question about serialization of java.util classes

2023-08-14 Thread Alexey Novakov via user
Hi Saleh,

If you could show us the minimal code example of the issue (event classes),
I think someone could help you to solve it.

Best regards,
Alexey

On Mon, Aug 14, 2023 at 9:23 AM  wrote:

> Hi,
>
> According to this blog post
> https://flink.apache.org/2020/04/15/flink-serialization-tuning-vol.-1-choosing-your-serializer-if-you-can/#pojoserializer
> The "Must be processed as GenericType" message means that the POJO
> serializer will not be used and instead, Kyro will be used.
>
> I created a simple POJO to test it again with a java Collection but I got
> the same message. Disabling generic types throws an exception.
>
> I'm not sure how to use these types along with the POJO serializer or any
> other fast serializer.
>
> Best regards,
> Saleh.
>
>
>
> On 14 Aug 2023, at 4:59 AM, liu ron  wrote:
>
> Hi,
>
> According to the test in [1], I think Flink can recognize Pojo class which
> contains java List, so I think you can refer to the related Pojo class
> implementation.
>
> [1]
> https://github.com/apache/flink/blob/fc2b5d8f53a41695117f6eaf4c798cc183cf1e36/flink-core/src/test/java/org/apache/flink/api/java/typeutils/PojoTypeExtractionTest.java#L192
>
> Best,
> Ron
>
>  于2023年8月13日周日 22:50写道:
>
>> Greetings,
>>
>> I am working on a project that needs to process around 100k events per
>> second and I'm trying to improve performance.
>>
>> Most of the classes being used are POJOs but have a couple of fields
>> using a `java.util` class, either `ArrayList`, `HashSet` or `SortedSet`
>> etc. This forces Flink to use Kyro and throw these warnings:
>>
>> ```
>> class java.util.ArrayList does not contain a setter for field size
>> Class class java.util.ArrayList cannot be used as a POJO type because not
>> all fields are valid POJO fields, and must be processed as GenericType.
>> Please read the Flink documentation on "Data Types & Serialization" for
>> details of the effect on performance and schema evolution.
>> ```
>>
>> ```
>> No fields were detected for class java.util.HashSet so it cannot be used
>> as a POJO type and must be processed as GenericType. Please read the Flink
>> documentation on "Data Types & Serialization" for details of the effect on
>> performance and schema evolution.
>> I read through the documentation and stackoverflow and the conclusion is
>> that I need to make a TypeInfoFactory and use it inside a TypeInfo
>> annotation over my POJO.
>> ```
>>
>> My question is what do I need to do to get Flink to recognize my classes
>> as POJOs and use the POJO serializer for better performance?
>> I read through the documentation and stackoverflow and the conclusion is
>> that I need to make a TypeInfoFactory and use it inside a TypeInfo
>> annotation over my POJO.
>> While this seems incredibly tedious and I keep thinking "there must be a
>> better way", I would be fine with this solution if I could figure out how
>> to do this for the Set types I'm using.
>>
>> Any help would be appreciated.
>
>
>


Re: Question about serialization of java.util classes

2023-08-14 Thread s
Hi,

According to this blog post 
https://flink.apache.org/2020/04/15/flink-serialization-tuning-vol.-1-choosing-your-serializer-if-you-can/#pojoserializer
 

The "Must be processed as GenericType" message means that the POJO serializer 
will not be used and instead, Kyro will be used.

I created a simple POJO to test it again with a java Collection but I got the 
same message. Disabling generic types throws an exception.

I'm not sure how to use these types along with the POJO serializer or any other 
fast serializer.

Best regards,
Saleh.



> On 14 Aug 2023, at 4:59 AM, liu ron  wrote:
> 
> Hi,
> 
> According to the test in [1], I think Flink can recognize Pojo class which 
> contains java List, so I think you can refer to the related Pojo class 
> implementation.
> 
> [1] 
> https://github.com/apache/flink/blob/fc2b5d8f53a41695117f6eaf4c798cc183cf1e36/flink-core/src/test/java/org/apache/flink/api/java/typeutils/PojoTypeExtractionTest.java#L192
>  
> 
> 
> Best,
> Ron
> 
> mailto:s...@sammar.sa>> 于2023年8月13日周日 22:50写道:
> Greetings,
> 
> I am working on a project that needs to process around 100k events per second 
> and I'm trying to improve performance.
> 
> Most of the classes being used are POJOs but have a couple of fields using a 
> `java.util` class, either `ArrayList`, `HashSet` or `SortedSet` etc. This 
> forces Flink to use Kyro and throw these warnings:
> 
> ```
> class java.util.ArrayList does not contain a setter for field size
> Class class java.util.ArrayList cannot be used as a POJO type because not all 
> fields are valid POJO fields, and must be processed as GenericType. Please 
> read the Flink documentation on "Data Types & Serialization" for details of 
> the effect on performance and schema evolution.
> ```
> 
> ```
> No fields were detected for class java.util.HashSet so it cannot be used as a 
> POJO type and must be processed as GenericType. Please read the Flink 
> documentation on "Data Types & Serialization" for details of the effect on 
> performance and schema evolution.
> I read through the documentation and stackoverflow and the conclusion is that 
> I need to make a TypeInfoFactory and use it inside a TypeInfo annotation over 
> my POJO.
> ```
> 
> My question is what do I need to do to get Flink to recognize my classes as 
> POJOs and use the POJO serializer for better performance?
> I read through the documentation and stackoverflow and the conclusion is that 
> I need to make a TypeInfoFactory and use it inside a TypeInfo annotation over 
> my POJO.
> While this seems incredibly tedious and I keep thinking "there must be a 
> better way", I would be fine with this solution if I could figure out how to 
> do this for the Set types I'm using.
> 
> Any help would be appreciated.



Re: Question about serialization of java.util classes

2023-08-13 Thread liu ron
Hi,

According to the test in [1], I think Flink can recognize Pojo class which
contains java List, so I think you can refer to the related Pojo class
implementation.

[1]
https://github.com/apache/flink/blob/fc2b5d8f53a41695117f6eaf4c798cc183cf1e36/flink-core/src/test/java/org/apache/flink/api/java/typeutils/PojoTypeExtractionTest.java#L192

Best,
Ron

 于2023年8月13日周日 22:50写道:

> Greetings,
>
> I am working on a project that needs to process around 100k events per
> second and I'm trying to improve performance.
>
> Most of the classes being used are POJOs but have a couple of fields using
> a `java.util` class, either `ArrayList`, `HashSet` or `SortedSet` etc. This
> forces Flink to use Kyro and throw these warnings:
>
> ```
> class java.util.ArrayList does not contain a setter for field size
> Class class java.util.ArrayList cannot be used as a POJO type because not
> all fields are valid POJO fields, and must be processed as GenericType.
> Please read the Flink documentation on "Data Types & Serialization" for
> details of the effect on performance and schema evolution.
> ```
>
> ```
> No fields were detected for class java.util.HashSet so it cannot be used
> as a POJO type and must be processed as GenericType. Please read the Flink
> documentation on "Data Types & Serialization" for details of the effect on
> performance and schema evolution.
> I read through the documentation and stackoverflow and the conclusion is
> that I need to make a TypeInfoFactory and use it inside a TypeInfo
> annotation over my POJO.
> ```
>
> My question is what do I need to do to get Flink to recognize my classes
> as POJOs and use the POJO serializer for better performance?
> I read through the documentation and stackoverflow and the conclusion is
> that I need to make a TypeInfoFactory and use it inside a TypeInfo
> annotation over my POJO.
> While this seems incredibly tedious and I keep thinking "there must be a
> better way", I would be fine with this solution if I could figure out how
> to do this for the Set types I'm using.
>
> Any help would be appreciated.


Question about serialization of java.util classes

2023-08-13 Thread s
Greetings,

I am working on a project that needs to process around 100k events per second 
and I'm trying to improve performance.

Most of the classes being used are POJOs but have a couple of fields using a 
`java.util` class, either `ArrayList`, `HashSet` or `SortedSet` etc. This 
forces Flink to use Kyro and throw these warnings:

```
class java.util.ArrayList does not contain a setter for field size
Class class java.util.ArrayList cannot be used as a POJO type because not all 
fields are valid POJO fields, and must be processed as GenericType. Please read 
the Flink documentation on "Data Types & Serialization" for details of the 
effect on performance and schema evolution.
```

```
No fields were detected for class java.util.HashSet so it cannot be used as a 
POJO type and must be processed as GenericType. Please read the Flink 
documentation on "Data Types & Serialization" for details of the effect on 
performance and schema evolution.
I read through the documentation and stackoverflow and the conclusion is that I 
need to make a TypeInfoFactory and use it inside a TypeInfo annotation over my 
POJO.
```

My question is what do I need to do to get Flink to recognize my classes as 
POJOs and use the POJO serializer for better performance?
I read through the documentation and stackoverflow and the conclusion is that I 
need to make a TypeInfoFactory and use it inside a TypeInfo annotation over my 
POJO.
While this seems incredibly tedious and I keep thinking "there must be a better 
way", I would be fine with this solution if I could figure out how to do this 
for the Set types I'm using.

Any help would be appreciated.