Hi Krzysztof,
Your problems arise due to Java type erasure. If you have DataPoint with
Map, all Flinks type system will see is a Map, i.e.
Map.
So in the first case, with DataPoint having an explicit member of type
"BadPojo", Flink will deduce "DataPoint" to be a PojoType with two fields,
whereas the second field "badPojo" itself is of type GenericType and
thus, the second field will be serialized via kryo.
In the second case, DataPoint will still be a PojoType and your "badPojo" field
will also be a GenericType, but this time of type "GenericType".
So no complaints about BadPOJO here because the entire map will be serialized
via kryo already and flink doesn't deduce any further and doesn't see the
BadPojo here. => No win for you :)
In the second case, you need to explicitly tell flink that your "badPojo" field
is a map and should be detected as "MapType" from flink. If Flink detects it as
MapType, it will again complain about BadPojo itself and you are back to the
roots and still need to fix the BadPojo to finally avoid kryo. :)
I wrote myself a small utility function once when I had to tell flink about a
POJO from an external library that contained a Map in order to serialize it
efficiently:
/**
* Flink has a Types.POJO function where next to a class, one can specify a
Map with fieldnames and types to be set.
* This function is kind of a utility. Where Flink Types.POJO function
creates the POJO type for the class only
* with the fields specified, this function here creates the POJOType
normally, but replaces the type of the
* provided field with the provided type and keeps all other fields as they
are generated normally.
* @throws org.apache.flink.api.common.functions.InvalidTypesException
*/
public static TypeInformation
pojoTypeWithElementReplacement(Class pojo, String replacementFieldName,
TypeInformation replacementFieldType) {
final PojoTypeInfo pojoType = (PojoTypeInfo) Types.POJO(pojo);
final Map> pojoFieldTypes = IntStream
.range(0, pojoType.getArity())
.mapToObj(fieldNr -> pojoType.getPojoFieldAt(fieldNr))
.collect(Collectors.toMap(
pojoField -> pojoField.getField().getName(),
pojoField -> pojoField.getTypeInformation()
));
final TypeInformation oldTypeForElements =
pojoFieldTypes.remove(replacementFieldName);
if (oldTypeForElements == null) {
throw new
org.apache.flink.api.common.functions.InvalidTypesException("Expected " +
replacementFieldName + " field to exist in order to replace it properly with
custom type infos");
}
pojoFieldTypes.put(replacementFieldName, replacementFieldType);
return Types.POJO(pojo, pojoFieldTypes);
}
In your case, you could call it like this:
TypeInformation pojoMapType =
FlinkTypeHints.pojoTypeWithElementReplacement(DataPoint.class, "badPojo",
Types.MAP(Types.STRING, TypeExtractor.createTypeInfo(BadPojo.class)));
a bit less verbose if BadPojo would really be a PojoType:
TypeInformation pojoMapType =
FlinkTypeHints.pojoTypeWithElementReplacement(DataPoint.class, "badPojo",
Types.MAP(Types.STRING, Types.POJO(BadPojo.class)));
If the POJO is e.g. returned from a mapFunction, you can write something like
stream.map(myMapFunctionReturningDataPoint).returns(pojoMapType);
Note that I wrote this for Flink 1.9. I read somewhere that Flink now has a new
type system somehow, somewhere but I didn't check this out yet and have no idea
what changed.
Best regards
Theo
- Ursprüngliche Mail -
Von: "KristoffSC"
An: "user"
Gesendet: Montag, 13. Juli 2020 23:06:20
Betreff: Flink Pojo Serialization for Map Values
Hi,
I would like to ask Flink Pojo Serialziation described in [1]
I have a case where my custom event source produces Events described by
Pojo:
public class DataPoint
{
public long timestamp;
public double value;
public BadPojo badPojo = new BadPojo();
public DataPoint() {}
}
Where BadPojo class is something like this:
public class BadPojo {
private final String fieldA = "X";
}
So this is case where Flink, using default configuration should fall back to
Kryo, and it does.
In logs I can see entries:
org.apache.flink.api.java.typeutils.TypeExtractor - class
org.home.streaming.events.BadPojo does not contain a getter for field fieldA
So this is an expected result.
However when I change DataPoint class to use:
public Map badPojo = new HashMap<>();
instead direct BadPojo field no longer see logs complaining about BadPojo
class.
In this case DataPoint class looks like this:
public class DataPoint
{
public long timestamp;
public double value;
public M