
I am trying to read data from kafka, and my input in kafka is avro messages.

So I am using class “KafkaSinglePortByteArrayInputOperator” to emit records 
from kafka.. And in the next operator I am reading input as "byte[]” and 
deserializing the message!!

But the tuple deserialization is failing with below error in the log…

Can someone pls share your thoughts and help me fix this?

Caused by: com.esotericsoftware.kryo.KryoException: Class cannot be created 
(missing no-arg constructor): io.confluent.kafka.serializers.KafkaAvroDecoder
Serialization trace:
decoder (com.tgt.mighty.apexapps.AvroBytesConversionOperator)
        at com.esotericsoftware.kryo.Kryo.newInstantiator(Kryo.java:1049)
        at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1058)
        at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)

Code FYR:

Application.java file:

public void populateDAG(DAG dag, Configuration conf)
  //KafkaSinglePortStringInputOperator kafkaInput =  
dag.addOperator("Kafka_Input", KafkaSinglePortStringInputOperator.class);

  KafkaSinglePortByteArrayInputOperator kafkaInput =  
dag.addOperator("Kafka_Input", new KafkaSinglePortByteArrayInputOperator());

  AvroBytesConversionOperator avroConversion = dag.addOperator("Avro_Convert", 
new AvroBytesConversionOperator(“schemaRegURL"));

  HDFSWrite hdfs = dag.addOperator("To_HDFS", HDFSWrite.class);

  //dag.addStream("Kafka_To_Hdfs_Ingestion", kafkaInput.outputPort, hdfs.input);
  dag.addStream("Kafka_Avro_Msg_Byte_Stream", kafkaInput.outputPort, 
  dag.addStream("Avro_To_String_Stream", avroConversion.output, hdfs.input);


Operator Code:

public class AvroBytesConversionOperator extends BaseOperator {

    private String schemaRegURL;
    private KafkaAvroDecoder decoder;

    public AvroBytesConversionOperator(){


    public AvroBytesConversionOperator(String schemaRegURL){
        this.schemaRegURL = schemaRegURL;

     * Defines Input Port - DefaultInputPort
     * Accepts data from the upstream operator
     * Type byte[]
    public transient DefaultInputPort<byte[]> input = new 
DefaultInputPort<byte[]>() {
        public void process(byte[] tuple)

     * Defines Output Port - DefaultOutputPort
     * Sends data to the down stream operator which can consume this data
     * Type String
    public transient DefaultOutputPort<String> output = new 

     * Setup call
    public void setup(OperatorContext context)
        Properties props = new Properties();
        props.setProperty("schema.registry.url", this.schemaRegURL);
        this.decoder = new KafkaAvroDecoder(new VerifiableProperties(props));

     * Begin window call for the operator.
     * @param windowId
    public void beginWindow(long windowId)


     * Defines what should be done with each incoming tuple
    protected void processTuple(byte[] tuple)
        GenericRecord record = (GenericRecord) decoder.fromBytes(tuple);

     * End window call for the operator
     * If sending per window, emit the updated counts here.
    public void endWindow()



Reply via email to