Hi,

I am trying to read data from kafka, and my input in kafka is avro messages.

So I am using class “KafkaSinglePortByteArrayInputOperator” to emit records 
from kafka.. And in the next operator I am reading input as "byte[]” and 
deserializing the message!!

But the tuple deserialization is failing with below error in the log…

Can someone pls share your thoughts and help me fix this?



Caused by: com.esotericsoftware.kryo.KryoException: Class cannot be created 
(missing no-arg constructor): io.confluent.kafka.serializers.KafkaAvroDecoder
Serialization trace:
decoder (com.tgt.mighty.apexapps.AvroBytesConversionOperator)
        at 
com.esotericsoftware.kryo.Kryo$DefaultInstantiatorStrategy.newInstantiatorOf(Kryo.java:1228)
        at com.esotericsoftware.kryo.Kryo.newInstantiator(Kryo.java:1049)
        at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1058)
        at 
com.esotericsoftware.kryo.serializers.FieldSerializer.create(FieldSerializer.java:547)
        at 
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:523)
        at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)



Code FYR:


Application.java file:

public void populateDAG(DAG dag, Configuration conf)
{
  //KafkaSinglePortStringInputOperator kafkaInput =  
dag.addOperator("Kafka_Input", KafkaSinglePortStringInputOperator.class);

  KafkaSinglePortByteArrayInputOperator kafkaInput =  
dag.addOperator("Kafka_Input", new KafkaSinglePortByteArrayInputOperator());

  AvroBytesConversionOperator avroConversion = dag.addOperator("Avro_Convert", 
new AvroBytesConversionOperator(“schemaRegURL"));

  HDFSWrite hdfs = dag.addOperator("To_HDFS", HDFSWrite.class);

  //dag.addStream("Kafka_To_Hdfs_Ingestion", kafkaInput.outputPort, hdfs.input);
  dag.addStream("Kafka_Avro_Msg_Byte_Stream", kafkaInput.outputPort, 
avroConversion.input);
  dag.addStream("Avro_To_String_Stream", avroConversion.output, hdfs.input);

}


Operator Code:

public class AvroBytesConversionOperator extends BaseOperator {

    private String schemaRegURL;
    private KafkaAvroDecoder decoder;

    public AvroBytesConversionOperator(){

    }

    public AvroBytesConversionOperator(String schemaRegURL){
        this.schemaRegURL = schemaRegURL;
    }

    /**
     * Defines Input Port - DefaultInputPort
     * Accepts data from the upstream operator
     * Type byte[]
     */
    public transient DefaultInputPort<byte[]> input = new 
DefaultInputPort<byte[]>() {
        @Override
        public void process(byte[] tuple)
        {
            processTuple(tuple);
        }
    };


    /**
     * Defines Output Port - DefaultOutputPort
     * Sends data to the down stream operator which can consume this data
     * Type String
     */
    public transient DefaultOutputPort<String> output = new 
DefaultOutputPort<String>();


    /**
     * Setup call
     */
    @Override
    public void setup(OperatorContext context)
    {
        Properties props = new Properties();
        props.setProperty("schema.registry.url", this.schemaRegURL);
        this.decoder = new KafkaAvroDecoder(new VerifiableProperties(props));
    }

    /**
     * Begin window call for the operator.
     * @param windowId
     */
    public void beginWindow(long windowId)
    {

    }

    /**
     * Defines what should be done with each incoming tuple
     */
    protected void processTuple(byte[] tuple)
    {
        GenericRecord record = (GenericRecord) decoder.fromBytes(tuple);
        output.emit(record.toString());
    }

    /**
     * End window call for the operator
     * If sending per window, emit the updated counts here.
     */
    @Override
    public void endWindow()
    {

    }

}

Reply via email to