Re: Flink Pojo Serialization for Map Values

2020-07-16 Thread Theo Diefenthal
hi Krzysztof,

That's why my goal is to always set 
   env.getConfig().disableGenericTypes();
in my streaming jobs. This way, you will receive an early crash if GenericTypes 
are used somewhere. (They are bad for the performance so I try to avoid them 
everywhere).

Sadly, if you build up streaming jobs based on kafka, you must have Flink 
1.10.1 or flink 1.11 to be able to have this setting set. ( 
https://issues.apache.org/jira/browse/FLINK-15904 ). 

As my project is still on Flink 1.9, I currently have an e2e-like test with 
"disableGenericTypes", which differs from the real job only by not using kafka 
consumer and instead have another input, so I can make sure the remaining part 
of my pipeline has no generic types within. 

Best regards
Theo



- Ursprüngliche Mail -
Von: "KristoffSC" 
An: "user" 
Gesendet: Donnerstag, 16. Juli 2020 20:57:09
Betreff: Re: Flink Pojo Serialization for Map Values

Theo,
thank you for clarification and code examples. 
I was actually suspectign that this is becase the Java type erasure.s

The thing that bothers me though is fact that Flink was failing over to Kryo
silently in my case. Without any information in the logs. And actually we
found it just by luck.




Re: Flink Pojo Serialization for Map Values

2020-07-16 Thread KristoffSC
Theo,
thank you for clarification and code examples. 
I was actually suspectign that this is becase the Java type erasure.s

The thing that bothers me though is fact that Flink was failing over to Kryo
silently in my case. Without any information in the logs. And actually we
found it just by luck.






--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Flink Pojo Serialization for Map Values

2020-07-15 Thread Theo Diefenthal


Hi Krzysztof,

Your problems arise due to Java type erasure. If you have DataPoint with 
Map, all Flinks type system will see is a Map, i.e. 
Map. 

So in the first case, with DataPoint having an explicit member of type 
"BadPojo", Flink will deduce "DataPoint" to be a PojoType with two fields, 
whereas the second field "badPojo" itself is of type GenericType and 
thus, the second field will be serialized via kryo. 

In the second case, DataPoint will still be a PojoType and your "badPojo" field 
will also be a GenericType, but this time of type "GenericType". 
So no complaints about BadPOJO here because the entire map will be serialized 
via kryo already and flink doesn't deduce any further and doesn't see the 
BadPojo here. => No win for you :)  

In the second case, you need to explicitly tell flink that your "badPojo" field 
is a map and should be detected as "MapType" from flink. If Flink detects it as 
MapType, it will again complain about BadPojo itself and you are back to the 
roots and still need to fix the BadPojo to finally avoid kryo. :)  

I wrote myself a small utility function once when I had to tell flink about a 
POJO from an external library that contained a Map in order to serialize it 
efficiently:



/**
 * Flink has a Types.POJO function where next to a class, one can specify a 
Map with fieldnames and types to be set.
 * This function is kind of a utility. Where Flink Types.POJO function 
creates the POJO type for the class only
 * with the fields specified, this function here creates the POJOType 
normally, but replaces the type of the
 * provided field with the provided type and keeps all other fields as they 
are generated normally.
 * @throws org.apache.flink.api.common.functions.InvalidTypesException
 */
public static  TypeInformation 
pojoTypeWithElementReplacement(Class pojo, String replacementFieldName, 
TypeInformation replacementFieldType) {
final PojoTypeInfo pojoType = (PojoTypeInfo) Types.POJO(pojo);

final Map> pojoFieldTypes = IntStream
.range(0, pojoType.getArity())
.mapToObj(fieldNr -> pojoType.getPojoFieldAt(fieldNr))
.collect(Collectors.toMap(
pojoField -> pojoField.getField().getName(),
pojoField -> pojoField.getTypeInformation()
));

final TypeInformation oldTypeForElements = 
pojoFieldTypes.remove(replacementFieldName);
if (oldTypeForElements == null) {
throw new 
org.apache.flink.api.common.functions.InvalidTypesException("Expected " + 
replacementFieldName + " field to exist in order to replace it properly with 
custom type infos");
}
pojoFieldTypes.put(replacementFieldName, replacementFieldType);

return Types.POJO(pojo, pojoFieldTypes);
}


In your case, you could call it like this:

TypeInformation pojoMapType = 
FlinkTypeHints.pojoTypeWithElementReplacement(DataPoint.class, "badPojo", 
Types.MAP(Types.STRING, TypeExtractor.createTypeInfo(BadPojo.class)));

a bit less verbose if BadPojo would really be a PojoType:

TypeInformation pojoMapType = 
FlinkTypeHints.pojoTypeWithElementReplacement(DataPoint.class, "badPojo", 
Types.MAP(Types.STRING, Types.POJO(BadPojo.class)));

If the POJO is e.g. returned from a mapFunction, you can write something like 
stream.map(myMapFunctionReturningDataPoint).returns(pojoMapType);



Note that I wrote this for Flink 1.9. I read somewhere that Flink now has a new 
type system somehow, somewhere but I didn't check this out yet and have no idea 
what changed. 

Best regards
Theo




- Ursprüngliche Mail -
Von: "KristoffSC" 
An: "user" 
Gesendet: Montag, 13. Juli 2020 23:06:20
Betreff: Flink Pojo Serialization for Map Values

Hi,
I would like to ask Flink Pojo Serialziation described in [1]

I have a case where my custom event source produces Events described by
Pojo:

public class DataPoint
{
public long timestamp;
public double value;
public BadPojo badPojo = new BadPojo();

public DataPoint() {}

}

Where BadPojo class is something like this:
public class BadPojo {

private final String fieldA = "X";
}

So this is case where Flink, using default configuration should fall back to
Kryo, and it does.
In logs I can see entries:
org.apache.flink.api.java.typeutils.TypeExtractor - class
org.home.streaming.events.BadPojo does not contain a getter for field fieldA

So this is an expected result.

However when I change DataPoint class to use:
public Map badPojo = new HashMap<>();

instead direct BadPojo field no longer see logs complaining about BadPojo
class.

In this case DataPoint class looks like this:
public class DataPoint
{
public long timestamp;
public double value;
public M

Re: Flink Pojo Serialization for Map Values

2020-07-15 Thread KristoffSC
Hi,
Any ideas about that one?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Flink Pojo Serialization for Map Values

2020-07-13 Thread KristoffSC
Hi,
I would like to ask Flink Pojo Serialziation described in [1]

I have a case where my custom event source produces Events described by
Pojo:

public class DataPoint
{
public long timestamp;
public double value;
public BadPojo badPojo = new BadPojo();

public DataPoint() {}

}

Where BadPojo class is something like this:
public class BadPojo {

private final String fieldA = "X";
}

So this is case where Flink, using default configuration should fall back to
Kryo, and it does.
In logs I can see entries:
org.apache.flink.api.java.typeutils.TypeExtractor - class
org.home.streaming.events.BadPojo does not contain a getter for field fieldA

So this is an expected result.

However when I change DataPoint class to use:
public Map badPojo = new HashMap<>();

instead direct BadPojo field no longer see logs complaining about BadPojo
class.

In this case DataPoint class looks like this:
public class DataPoint
{
public long timestamp;
public double value;
public Map badPojo = new HashMap<>();

public DataPoint() {}

}

My questions:
1. What actually happen here?
2. Which setrializator is used by Flink?
3. How Maps should be handled in Pojo definition to get best Serialization
performance (assuming that I do need access that map).

Thanks,
Krzysztof


[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/types_serialization.html#pojos



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/