Re: Correctly serializing "Number" as state in ProcessFunction

2021-04-26 Thread Robert Metzger
Quick comment on the kryo type registration and the messages you are
seeing: The messages are expected: What the message is saying is that we
are not serializing the type using Flink's POJO serializer, but we are
falling back to Kryo.
Since you are registering all the instances of Number that you are using
(Integer, Double), you'll get better performance (or at least less CPU
load) with Kryo. So if you want to keep using Kryo, you are doing
everything right (and you generally won't be able to use Flink's POJO
serializer for Number-types).

On Fri, Apr 23, 2021 at 7:07 PM Miguel Araújo 
wrote:

> Thanks for your replies. I agree this is a somewhat general problem.
> I posted it here as I was trying to register the valid subclasses in Kryo
> but I couldn't get the message to go away, i.e., everything worked
> correctly but there was the complaint that GenericType serialization was
> being used.
>
> This is how I was registering these types:
>
> env.getConfig.registerKryoType(classOf[java.lang.Integer])
> env.getConfig.registerKryoType(classOf[java.lang.Double])
>
> and this is the message I got on every event:
>
> flink-task-manager_1  | 2021-04-23 16:48:29.274 [Processor Function 1
> (1/2)#0] INFO  org.apache.flink.api.java.typeutils.TypeExtractor  - No
> fields were detected for class java.lang.Number so it cannot be used as a
> POJO type and must be processed as GenericType. Please read the Flink
> documentation on "Data Types & Serialization" for details of the effect on
> performance.
>
> In the meanwhile, I've changed my approach to reuse a protobuf type I
> already had as part of my input event.
>
> Once again, thanks for your replies because they gave me the right
> perspective.
>
>
>
> Arvid Heise  escreveu no dia quarta, 21/04/2021 à(s)
> 18:26:
>
>> Hi Miguel,
>>
>> as Klemens said this is a rather general problem independent of Flink:
>> How do you map Polymorphism in serialization?
>>
>> Flink doesn't have an answer on its own, as it's discouraged (A Number
>> can have arbitrary many subclasses: how do you distinguish them except by
>> classname? That adds a ton of overhead.). The easiest solution in your case
>> is to convert ints into double.
>> Or you use Kryo which dictionary encodes the classes and also limits the
>> possible subclasses.
>>
>> On Tue, Apr 20, 2021 at 11:13 AM Klemens Muthmann <
>> klemens.muthm...@cyface.de> wrote:
>>
>>> Hi,
>>>
>>> I guess this is more of a Java Problem than a Flink Problem. If you want
>>> it quick and dirty you could implement a class such as:
>>>
>>> public class Value {
>>> private boolean isLongSet = false;
>>> private long longValue = 0L;
>>> private boolean isIntegerSet = false;
>>> private int intValue = 0;
>>>
>>>public Value(final long value) {
>>>setLong(value);
>>>}
>>>
>>> public void setLong(final long value) |
>>> longValue = value;
>>> isLongSet = true;
>>>}
>>>
>>>public long getLong() {
>>>if(isLongSet) {
>>>return longValue
>>>}
>>>}
>>>
>>>// Add same methods for int
>>>// to satisfy POJO requirements you will also need to add a
>>> no-argument constructor as well as getters and setters for the boolean flags
>>> }
>>>
>>> I guess a cleaner solution would be possible using a custom Kryo
>>> serializer as explained here:
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/custom_serializers.html
>>>
>>> Regards
>>>   Klemens
>>>
>>>
>>>
>>> > Am 20.04.2021 um 10:34 schrieb Miguel Araújo <
>>> miguelaraujo...@gmail.com>:
>>> >
>>> > Hi everyone,
>>> >
>>> > I have a ProcessFunction which needs to store different number types
>>> for different keys, e.g., some keys need to store an integer while others
>>> need to store a double.
>>> >
>>> > I tried to use java.lang.Number as the type for the ValueState, but I
>>> got the expected "No fields were detected for class java.lang.Number so it
>>> cannot be used as a POJO type and must be processed as GenericType."
>>> >
>>> > I have the feeling that this is not the right approach, but the exact
>>> type to be stored is only known at runtime which makes things a bit
>>> trickier. Is there a way to register these classes correctly, or Is it
>>> preferable to use different ValueState's for different types?
>>> >
>>> > Thanks,
>>> > Miguel
>>>
>>>


Re: Correctly serializing "Number" as state in ProcessFunction

2021-04-23 Thread Miguel Araújo
Thanks for your replies. I agree this is a somewhat general problem.
I posted it here as I was trying to register the valid subclasses in Kryo
but I couldn't get the message to go away, i.e., everything worked
correctly but there was the complaint that GenericType serialization was
being used.

This is how I was registering these types:

env.getConfig.registerKryoType(classOf[java.lang.Integer])
env.getConfig.registerKryoType(classOf[java.lang.Double])

and this is the message I got on every event:

flink-task-manager_1  | 2021-04-23 16:48:29.274 [Processor Function 1
(1/2)#0] INFO  org.apache.flink.api.java.typeutils.TypeExtractor  - No
fields were detected for class java.lang.Number so it cannot be used as a
POJO type and must be processed as GenericType. Please read the Flink
documentation on "Data Types & Serialization" for details of the effect on
performance.

In the meanwhile, I've changed my approach to reuse a protobuf type I
already had as part of my input event.

Once again, thanks for your replies because they gave me the right
perspective.



Arvid Heise  escreveu no dia quarta, 21/04/2021 à(s)
18:26:

> Hi Miguel,
>
> as Klemens said this is a rather general problem independent of Flink: How
> do you map Polymorphism in serialization?
>
> Flink doesn't have an answer on its own, as it's discouraged (A Number can
> have arbitrary many subclasses: how do you distinguish them except by
> classname? That adds a ton of overhead.). The easiest solution in your case
> is to convert ints into double.
> Or you use Kryo which dictionary encodes the classes and also limits the
> possible subclasses.
>
> On Tue, Apr 20, 2021 at 11:13 AM Klemens Muthmann <
> klemens.muthm...@cyface.de> wrote:
>
>> Hi,
>>
>> I guess this is more of a Java Problem than a Flink Problem. If you want
>> it quick and dirty you could implement a class such as:
>>
>> public class Value {
>> private boolean isLongSet = false;
>> private long longValue = 0L;
>> private boolean isIntegerSet = false;
>> private int intValue = 0;
>>
>>public Value(final long value) {
>>setLong(value);
>>}
>>
>> public void setLong(final long value) |
>> longValue = value;
>> isLongSet = true;
>>}
>>
>>public long getLong() {
>>if(isLongSet) {
>>return longValue
>>}
>>}
>>
>>// Add same methods for int
>>// to satisfy POJO requirements you will also need to add a
>> no-argument constructor as well as getters and setters for the boolean flags
>> }
>>
>> I guess a cleaner solution would be possible using a custom Kryo
>> serializer as explained here:
>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/custom_serializers.html
>>
>> Regards
>>   Klemens
>>
>>
>>
>> > Am 20.04.2021 um 10:34 schrieb Miguel Araújo > >:
>> >
>> > Hi everyone,
>> >
>> > I have a ProcessFunction which needs to store different number types
>> for different keys, e.g., some keys need to store an integer while others
>> need to store a double.
>> >
>> > I tried to use java.lang.Number as the type for the ValueState, but I
>> got the expected "No fields were detected for class java.lang.Number so it
>> cannot be used as a POJO type and must be processed as GenericType."
>> >
>> > I have the feeling that this is not the right approach, but the exact
>> type to be stored is only known at runtime which makes things a bit
>> trickier. Is there a way to register these classes correctly, or Is it
>> preferable to use different ValueState's for different types?
>> >
>> > Thanks,
>> > Miguel
>>
>>


Re: Correctly serializing "Number" as state in ProcessFunction

2021-04-21 Thread Arvid Heise
Hi Miguel,

as Klemens said this is a rather general problem independent of Flink: How
do you map Polymorphism in serialization?

Flink doesn't have an answer on its own, as it's discouraged (A Number can
have arbitrary many subclasses: how do you distinguish them except by
classname? That adds a ton of overhead.). The easiest solution in your case
is to convert ints into double.
Or you use Kryo which dictionary encodes the classes and also limits the
possible subclasses.

On Tue, Apr 20, 2021 at 11:13 AM Klemens Muthmann <
klemens.muthm...@cyface.de> wrote:

> Hi,
>
> I guess this is more of a Java Problem than a Flink Problem. If you want
> it quick and dirty you could implement a class such as:
>
> public class Value {
> private boolean isLongSet = false;
> private long longValue = 0L;
> private boolean isIntegerSet = false;
> private int intValue = 0;
>
>public Value(final long value) {
>setLong(value);
>}
>
> public void setLong(final long value) |
> longValue = value;
> isLongSet = true;
>}
>
>public long getLong() {
>if(isLongSet) {
>return longValue
>}
>}
>
>// Add same methods for int
>// to satisfy POJO requirements you will also need to add a no-argument
> constructor as well as getters and setters for the boolean flags
> }
>
> I guess a cleaner solution would be possible using a custom Kryo
> serializer as explained here:
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/custom_serializers.html
>
> Regards
>   Klemens
>
>
>
> > Am 20.04.2021 um 10:34 schrieb Miguel Araújo  >:
> >
> > Hi everyone,
> >
> > I have a ProcessFunction which needs to store different number types for
> different keys, e.g., some keys need to store an integer while others need
> to store a double.
> >
> > I tried to use java.lang.Number as the type for the ValueState, but I
> got the expected "No fields were detected for class java.lang.Number so it
> cannot be used as a POJO type and must be processed as GenericType."
> >
> > I have the feeling that this is not the right approach, but the exact
> type to be stored is only known at runtime which makes things a bit
> trickier. Is there a way to register these classes correctly, or Is it
> preferable to use different ValueState's for different types?
> >
> > Thanks,
> > Miguel
>
>


Re: Correctly serializing "Number" as state in ProcessFunction

2021-04-20 Thread Klemens Muthmann
Hi,

I guess this is more of a Java Problem than a Flink Problem. If you want it 
quick and dirty you could implement a class such as:

public class Value {
private boolean isLongSet = false;
private long longValue = 0L;
private boolean isIntegerSet = false;
private int intValue = 0;

   public Value(final long value) {
   setLong(value);
   }

public void setLong(final long value) |
longValue = value;
isLongSet = true;
   }

   public long getLong() {
   if(isLongSet) {
   return longValue
   }
   }

   // Add same methods for int
   // to satisfy POJO requirements you will also need to add a no-argument 
constructor as well as getters and setters for the boolean flags
}

I guess a cleaner solution would be possible using a custom Kryo serializer as 
explained here: 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/custom_serializers.html

Regards
  Klemens



> Am 20.04.2021 um 10:34 schrieb Miguel Araújo :
> 
> Hi everyone,
> 
> I have a ProcessFunction which needs to store different number types for 
> different keys, e.g., some keys need to store an integer while others need to 
> store a double.
> 
> I tried to use java.lang.Number as the type for the ValueState, but I got the 
> expected "No fields were detected for class java.lang.Number so it cannot be 
> used as a POJO type and must be processed as GenericType." 
> 
> I have the feeling that this is not the right approach, but the exact type to 
> be stored is only known at runtime which makes things a bit trickier. Is 
> there a way to register these classes correctly, or Is it preferable to use 
> different ValueState's for different types?
> 
> Thanks,
> Miguel



Correctly serializing "Number" as state in ProcessFunction

2021-04-20 Thread Miguel Araújo
Hi everyone,

I have a ProcessFunction which needs to store different number types for
different keys, e.g., some keys need to store an integer while others need
to store a double.

I tried to use java.lang.Number as the type for the ValueState, but I got
the expected "No fields were detected for class java.lang.Number so it
cannot be used as a POJO type and must be processed as GenericType."

I have the feeling that this is not the right approach, but the exact type
to be stored is only known at runtime which makes things a bit trickier. Is
there a way to register these classes correctly, or Is it preferable to use
different ValueState's for different types?

Thanks,
Miguel