Happy to hear I wonder if we could do something like this (totally untested):
public class TypedKafkaAvroDeserializer<T> extends AbstractKafkaAvroDeserializer implements Deserializer<T> { @Override public T deserialize(String s, byte[] bytes) { return (T) this.deserialize(bytes); } } On Thu, Oct 19, 2017 at 12:03 PM, Andrew Jones <andrew+b...@andrew-jones.com > wrote: > Thanks Tim, that works! > > Full code is: > > public class EnvelopeKafkaAvroDeserializer extends > AbstractKafkaAvroDeserializer implements Deserializer<Envelope> { > @Override > public void configure(Map<String, ?> configs, boolean isKey) { > configure(new KafkaAvroDeserializerConfig(configs)); > } > > @Override > public Envelope deserialize(String s, byte[] bytes) { > return (Envelope) this.deserialize(bytes); > } > > @Override > public void close() {} > } > > Nicer than my solution so think that is the one I'm going to go with for > now. > > Thanks, > Andrew > > > On Thu, Oct 19, 2017, at 10:20 AM, Tim Robertson wrote: > > Hi Andrew, > > I also saw the same behaviour. > > It's not pretty but perhaps try this? It was my last idea I ran out of > time to try... > > > *// Basically a copy KafkaAvroDeserializer with the casts in > deserialize**public class *EnvelopeAvroDeserializer *extends > *AbstractKafkaAvroDeserializer *implements *Deserializer<Envelope> { > > ... > > *public *Envelope deserialize(String s, *byte*[] bytes) { > > *return *(Envelope) *this*.deserialize(bytes); > > } > > > > *public *Envelope deserialize(String s, *byte*[] bytes, Schema > readerSchema) { > > *return *(Envelope) *this*.deserialize(bytes, readerSchema); > > } > > > > ... > > } > > Tim > > > On Thu, Oct 19, 2017 at 10:52 AM, Andrew Jones < > andrew+b...@andrew-jones.com> wrote: > > > Using Object doesn't work unfortunately. I get an 'Unable to automatically > infer a Coder' error at runtime. > > This is the code: > > p.apply(KafkaIO.<String, Object>read() > .withValueDeserializer(KafkaAvroDeserializer.class) > > It compiles, but at runtime: > > Caused by: java.lang.RuntimeException: Unable to automatically infer a > Coder for the Kafka Deserializer class > io.confluent.kafka.serializers.KafkaAvroDeserializer: > no coder registered for type class java.lang.Object > at org.apache.beam.sdk.io.kafka.KafkaIO.inferCoder(KafkaIO.java:1696) > > So far the only thing I've got working is this, where I use the > ByteArrayDeserializer and then parse Avro myself: > > private static KafkaAvroDecoder avroDecoder; > static { > final Properties props = new Properties(); > props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092"); > props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, > "http://registry:8081"); > props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, > true); > VerifiableProperties vProps = new VerifiableProperties(props); > avroDecoder = new KafkaAvroDecoder(vProps); > } > > public static void main(String[] args) { > > PipelineOptions options = PipelineOptionsFactory.create(); > Pipeline p = Pipeline.create(options); > > p.apply(KafkaIO.<byte[], byte[]>read() > .withBootstrapServers("kafka:9092") > .withTopic("dbserver1.inventory.customers") > .withKeyDeserializer(ByteArrayDeserializer.class) > .withValueDeserializer(ByteArrayDeserializer.class) > .withoutMetadata( > ) > .apply(Values.<byte[]>create()) > .apply("ParseAvro", ParDo.of(new DoFn<byte[], Envelope>() { > @ProcessElement > public void processElement(ProcessContext c) { > Envelope data = (Envelope) > avroDecoder.fromBytes(c.element()); > c.output(data); > } > })) > > Thanks, > Andrew > > On Wed, Oct 18, 2017, at 06:40 PM, Raghu Angadi wrote: > > On Wed, Oct 18, 2017 at 10:35 AM, Eugene Kirpichov <kirpic...@google.com> > wrote: > > It seems that KafkaAvroDeserializer implements Deserializer<Object>, > though I suppose with proper configuration that Object will at run-time be > your desired type. Have you tried adding some Java type casts to make it > compile? > > > +1, cast might be the simplest fix. Alternately you can wrap or > extend KafkaAvroDeserializer as Tim suggested. It would cast the Object > returned by KafkaAvroDeserializer::deserializer() to Envolope at runtime. > > > On Wed, Oct 18, 2017 at 7:26 AM Tim Robertson <timrobertson...@gmail.com> > wrote: > > I just tried quickly and see the same as you Andrew. > We're missing something obvious or else extending KafkaAvroDeserializer seems > necessary right? > > On Wed, Oct 18, 2017 at 3:14 PM, Andrew Jones < > andrew+b...@andrew-jones.com> wrote: > > Hi, > > I'm trying to read Avro data from a Kafka stream using KafkaIO. I think > it should be as simple as: > > p.apply(KafkaIO.<String, Envelope>*read*() > .withValueDeserializerAndCoder(KafkaAvroDeserializer.class, > AvroCoder.of(Envelope.class)) > > Where Envelope is the name of the Avro class. However, that does not > compile and I get the following error: > > incompatible types: > java.lang.Class<io.confluent.kafka.serializers.KafkaAvroDeserializer> > cannot be converted to java.lang.Class<? extends > org.apache.kafka.common.serialization.Deserializer<dbserver1 > .inventory.customers.Envelope>> > > I've tried a number of variations on this theme but haven't yet worked > it out and am starting to run out of ideas... > > Has anyone successfully read Avro data from Kafka? > > The code I'm using can be found at > https://github.com/andrewrjones/debezium-kafka-beam-example and a full > environment can be created with Docker. > > Thanks, > Andrew > > > >