Hi,

I have a dataflow with a Kafka source that uses the avro generated class to 
deserialize:
    AvroDeserializer<MyAvroType> avroSchema = new AvroDeserializer<>( 
MyAvroType.class);
    FlinkKafkaConsumer011<MyAvroType> kafkaReader = new 
FlinkKafkaConsumer011<myAvroType>( "kafkaTopic", avroSchema, properties);

From this point on I would like that Flink uses a specific Serializer for that 
type, so I define:
    final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
    // env.getConfig().registerTypeWithKryoSerializer(MyAvroType.class, 
AvroDataEncryptionSerializer.class);
    env.getConfig().addDefaultKryoSerializer(MyAvroType.class, 
AvroDataEncryptionSerializer.class);
    env.getConfig().enableForceKryo();

MyAvroType > is the class generated by Avro
AvroDeserializer > Is a simple AvroDeserializer that implements 
org.apache.flink.api.common.serialization.DeserializationSchema
AvroDataEncryptionSerializer > Is a custom serializer that extends 
com.esotericsoftware.kryo.Serializer<MyAvroType>

however I see that each time the object needs to be serialized (e.g.: via State 
snapshot) the org.apache.flink.formats.avro.typeutils.AvroSerializer.serialize 
is invoked.

Am I doing something wrong?

Thanks,
-Enrico

Reply via email to