Hi, I have a dataflow with a Kafka source that uses the avro generated class to deserialize: AvroDeserializer<MyAvroType> avroSchema = new AvroDeserializer<>( MyAvroType.class); FlinkKafkaConsumer011<MyAvroType> kafkaReader = new FlinkKafkaConsumer011<myAvroType>( "kafkaTopic", avroSchema, properties);
From this point on I would like that Flink uses a specific Serializer for that type, so I define: final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // env.getConfig().registerTypeWithKryoSerializer(MyAvroType.class, AvroDataEncryptionSerializer.class); env.getConfig().addDefaultKryoSerializer(MyAvroType.class, AvroDataEncryptionSerializer.class); env.getConfig().enableForceKryo(); MyAvroType > is the class generated by Avro AvroDeserializer > Is a simple AvroDeserializer that implements org.apache.flink.api.common.serialization.DeserializationSchema AvroDataEncryptionSerializer > Is a custom serializer that extends com.esotericsoftware.kryo.Serializer<MyAvroType> however I see that each time the object needs to be serialized (e.g.: via State snapshot) the org.apache.flink.formats.avro.typeutils.AvroSerializer.serialize is invoked. Am I doing something wrong? Thanks, -Enrico