Re: Kafka + confluent schema registry Avro parsing

2015-11-19 Thread Maximilian Michels
Hi Madhukar, Thanks for your question. When you instantiate the FlinkKafkaConsumer, you supply a DeserializationSchema in the constructor. You simply create a class which implements DeserializationSchema and contains the KafkaAvroDecoder with the schema registry. Like so: public class

Re: Kafka + confluent schema registry Avro parsing

2015-11-19 Thread Madhukar Thota
Hi Max Thanks for the example. Based on your example here is what i did: public class Streamingkafka { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

Re: Kafka + confluent schema registry Avro parsing

2015-11-19 Thread Stephan Ewen
The KafkaAvroDecoder is not serializable, and Flink uses serialization to distribute the code to the TaskManagers in the cluster. I think you need to "lazily" initialize the decoder, in the first invocation of "deserialize()". That should do it. Stephan On Thu, Nov 19, 2015 at 12:10 PM,

Re: Kafka + confluent schema registry Avro parsing

2015-11-19 Thread Maximilian Michels
Stephan is right, this should do it in deserialize(): if (decoder == null) { decoder = new KafkaAvroDecoder(vProps); } Further, you might have to specify the correct return type for getProducedType(). You may use public TypeInformation getProducedType() { return

Re: Kafka + confluent schema registry Avro parsing

2015-11-19 Thread Madhukar Thota
Hi Max/Ewen, Thank you for the inputs. I was able to solve the serialization issues. Now i am seeing the NullPoint Exceptions. public class MyAvroDeserializer implements DeserializationSchema { private transient KafkaAvroDecoder decoder; public MyAvroDeserializer(VerifiableProperties

Re: Kafka + confluent schema registry Avro parsing

2015-11-19 Thread Till Rohrmann
The constructor of Java classes after deserialization is not necessarily called. Thus, you should move the check if (this.decoder == null) { this.decoder = new KafkaAvroDecoder(vProps); } into the deserialize method of MyAvroDeserializer. Cheers, Till ​ On Thu, Nov 19, 2015 at 1:50 PM,

Re: Kafka + confluent schema registry Avro parsing

2015-11-19 Thread Maximilian Michels
You need to initialize the decoder in the deserialize method instead of in the constructor. On Thu, Nov 19, 2015 at 1:50 PM, Madhukar Thota wrote: > Hi Max/Ewen, > > Thank you for the inputs. I was able to solve the serialization issues. Now > i am seeing the NullPoint