Hi,

I'm new in Avro. One of Avro's benefit is dynamic typing which code need
not to be generated to deserialize Avro schema.

I'm trying to find an example to deserialize Avro without specify the
schema but all of them require me to pass schema to GenericDatumReader
constructor or set the schema after construct the reader.

So, I'm wondering, *is it possible to deserialize Avro without defining
it's schema?*

I mean, it's like getting value from Map<String, Object> and cast to
desired data type without knowing the schema.

It should be possible right? Since the schema itself is embedded in the
data.

Attached my sample code, it get null pointer exception because I don't
specify the schema.

Thanks!

Rendy
import com.google.common.collect.ImmutableMap;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.Message;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
 * // TODO Comment
 */
public class ConsumerGroupExample {
  private final ConsumerConnector consumer;
  private final String topic;
  private ExecutorService executor;

  public ConsumerGroupExample(String a_zookeeper, String a_groupId, String a_topic) {
    consumer = Consumer.createJavaConsumerConnector(createCustomerConfig(a_zookeeper, a_groupId));
    topic = a_topic;
  }

  public void shutdown() {
    if (consumer != null) consumer.shutdown();
    if (executor != null) executor.shutdown();

    try {
      if (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) {
        System.out.println("Timed out waiting for consumer threads to shut down, exiting uncleanly");
      }
    } catch (InterruptedException e) {
      System.out.println("Interrupted during shutdown, exiting uncleanly");
    }
  }

  public void run (int a_numThreads) {
    Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(ImmutableMap.of(topic, a_numThreads));
    List<KafkaStream<byte[], byte[] >> streams = consumerMap.get(topic);

    executor = Executors.newFixedThreadPool(a_numThreads);

    int threadNumber = 0;
    for (final KafkaStream stream : streams) {
      executor.submit(new ConsumerTest(stream, threadNumber));
      threadNumber++;
    }
  }

  private static ConsumerConfig createCustomerConfig(String a_zookeeper, String a_groupId) {
    Properties props = new Properties();
    props.put("zookeeper.connect", a_zookeeper);
    props.put("group.id", a_groupId);
    props.put("zookeeper.session.timeout.ms", "400");
    props.put("zookeeper.sync.time.ms", "200");
    props.put("auto.commit.interval.ms", "1000");
//    props.put("rebalance.backoff.ms", "10000");

    return new ConsumerConfig(props);
  }

  public static void main(String args[]) {
    String zooKeeper = "localhost:2181";
    String groupId = "trololol";
    String topic = "my-replicated-topic";
    int threads = 3;

    ConsumerGroupExample example = new ConsumerGroupExample(zooKeeper, groupId, topic);
    example.run(threads);

    try {
      Thread.sleep(60*60*1000L);
    } catch (InterruptedException ie) {

    }

    example.shutdown();
  }
}
import com.google.common.collect.Lists;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.message.Message;
import kafka.message.MessageAndMetadata;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.avro.util.ByteBufferInputStream;
import tracking.avro.User;

import java.io.InputStream;
import java.nio.ByteBuffer;

/**
 * // TODO Comment
 */
public class ConsumerTest implements Runnable {
  private final GenericDatumReader<GenericData.Record> reader = new GenericDatumReader<GenericData.Record>();
//  private final SpecificDatumReader<User> avroEventReader = new SpecificDatumReader<User>(User.SCHEMA$);
  private final DecoderFactory avroDecoderFactory = DecoderFactory.get();

  private KafkaStream m_stream;
  private int m_threadNumber;

  public ConsumerTest(KafkaStream a_stream, int a_threadNumber) {
    m_stream = a_stream;
    m_threadNumber = a_threadNumber;
  }

  public void run() {

    InputStream kafkaMessageInputStream = null;
    BinaryDecoder avroBinaryDecoder = null;
    GenericData.Record avroEvent = null;

    ConsumerIterator<byte[], byte[]> it = m_stream.iterator();
    while (it.hasNext()) {
      try {
        kafkaMessageInputStream = new ByteBufferInputStream(Lists.newArrayList(ByteBuffer.wrap(it.next().message())));
        avroBinaryDecoder = avroDecoderFactory.binaryDecoder(kafkaMessageInputStream, avroBinaryDecoder);
        avroEvent = reader.read(avroEvent, avroBinaryDecoder);
        System.out.println(m_threadNumber);
        System.out.println(avroEvent);
        kafkaMessageInputStream.close();
      } catch (Exception ex) {
        System.out.println("Unable to process event from kafka, see inner exception details" + ex);
      }
    }
    System.out.println("Shutting down Thread: " + m_threadNumber);
  }
}

Reply via email to