Hi, I am trying to read data from kafka, and my input in kafka is avro messages.
So I am using class “KafkaSinglePortByteArrayInputOperator” to emit records from kafka.. And in the next operator I am reading input as "byte[]” and deserializing the message!! But the tuple deserialization is failing with below error in the log… Can someone pls share your thoughts and help me fix this? Caused by: com.esotericsoftware.kryo.KryoException: Class cannot be created (missing no-arg constructor): io.confluent.kafka.serializers.KafkaAvroDecoder Serialization trace: decoder (com.tgt.mighty.apexapps.AvroBytesConversionOperator) at com.esotericsoftware.kryo.Kryo$DefaultInstantiatorStrategy.newInstantiatorOf(Kryo.java:1228) at com.esotericsoftware.kryo.Kryo.newInstantiator(Kryo.java:1049) at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1058) at com.esotericsoftware.kryo.serializers.FieldSerializer.create(FieldSerializer.java:547) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:523) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) Code FYR: Application.java file: public void populateDAG(DAG dag, Configuration conf) { //KafkaSinglePortStringInputOperator kafkaInput = dag.addOperator("Kafka_Input", KafkaSinglePortStringInputOperator.class); KafkaSinglePortByteArrayInputOperator kafkaInput = dag.addOperator("Kafka_Input", new KafkaSinglePortByteArrayInputOperator()); AvroBytesConversionOperator avroConversion = dag.addOperator("Avro_Convert", new AvroBytesConversionOperator(“schemaRegURL")); HDFSWrite hdfs = dag.addOperator("To_HDFS", HDFSWrite.class); //dag.addStream("Kafka_To_Hdfs_Ingestion", kafkaInput.outputPort, hdfs.input); dag.addStream("Kafka_Avro_Msg_Byte_Stream", kafkaInput.outputPort, avroConversion.input); dag.addStream("Avro_To_String_Stream", avroConversion.output, hdfs.input); } Operator Code: public class AvroBytesConversionOperator extends BaseOperator { private String schemaRegURL; private KafkaAvroDecoder decoder; public AvroBytesConversionOperator(){ } public AvroBytesConversionOperator(String schemaRegURL){ this.schemaRegURL = schemaRegURL; } /** * Defines Input Port - DefaultInputPort * Accepts data from the upstream operator * Type byte[] */ public transient DefaultInputPort<byte[]> input = new DefaultInputPort<byte[]>() { @Override public void process(byte[] tuple) { processTuple(tuple); } }; /** * Defines Output Port - DefaultOutputPort * Sends data to the down stream operator which can consume this data * Type String */ public transient DefaultOutputPort<String> output = new DefaultOutputPort<String>(); /** * Setup call */ @Override public void setup(OperatorContext context) { Properties props = new Properties(); props.setProperty("schema.registry.url", this.schemaRegURL); this.decoder = new KafkaAvroDecoder(new VerifiableProperties(props)); } /** * Begin window call for the operator. * @param windowId */ public void beginWindow(long windowId) { } /** * Defines what should be done with each incoming tuple */ protected void processTuple(byte[] tuple) { GenericRecord record = (GenericRecord) decoder.fromBytes(tuple); output.emit(record.toString()); } /** * End window call for the operator * If sending per window, emit the updated counts here. */ @Override public void endWindow() { } }