Re: [DISCUSS] Adding configure and close methods to the Kafka(De)SerializationSchema interfaces

2019-09-05 Thread Gyula Fóra
Hi Arvid, The ConfluentRegistryAvroDeserializationSchema uses a checkAvroInitialized() call for every single record to initialize the schema for the first time. This is clearly an indication of a missing open/configure method. In addition some of the Kafka serializers rely on properties that are

Re: [DISCUSS] Adding configure and close methods to the Kafka(De)SerializationSchema interfaces

2019-09-05 Thread Arvid Heise
Hi Gyula, when looking at the ConfluentRegistryAvroDeserializationSchema [1], it seems like the intended way is to pass all configuration parameters in the constructor. So you could call open there. Could you please line out in more details why this is not enough? What would you do in open and

[DISCUSS] Adding configure and close methods to the Kafka(De)SerializationSchema interfaces

2019-09-05 Thread Gyula Fóra
Hi all! While implementing a new custom flink serialization schema that wraps an existing Kafka serializer, I realized we are missing 2 key methods that could be easily added: void configure(java.util.Map configs); void close(); We could rename configure to open but Kafka serializers have a