Ah, nice. It works. On Thu, Oct 19, 2017 at 1:44 PM, Eugene Kirpichov <kirpic...@google.com> wrote:
> The following compiles fine: > > > p.apply(KafkaIO.<String, Envelope>read() > .withBootstrapServers("kafka:9092") > .withTopic("dbserver1.inventory.customers") > .withKeyDeserializer(StringDeserializer.class) > .withValueDeserializerAndCoder( > (Class)KafkaAvroDeserializer.class, AvroCoder.of(Envelope.class)) > > > On Thu, Oct 19, 2017 at 12:21 PM Raghu Angadi <rang...@google.com> wrote: > >> Same for me. It does not look like there is an annotation to suppress the >> error. >> >> >> On Thu, Oct 19, 2017 at 12:18 PM, Tim Robertson < >> timrobertson...@gmail.com> wrote: >> >>> Hi Eugene, >>> >>> I understood that was where Andrew started and reported this. I tried >>> and saw the same as him. >>> >>> incompatible types: >>> java.lang.Class<io.confluent.kafka.serializers.KafkaAvroDeserializer> >>> cannot be converted to org.apache.kafka.common. >>> serialization.Deserializer<org.gbif.pipelines.io.avro.Envelope> >>> >>> similarly with >>> (Class<? extends Deserializer<Envelope>>) KafkaAvroDeserializer.class >>> >>> >>> >>> On Thu, Oct 19, 2017 at 9:00 PM, Eugene Kirpichov <kirpic...@google.com> >>> wrote: >>> >>>> I don't think extending the class is necessary. Not sure I understand >>>> why a simple type casting for withDeserializerAndCoder doesn't work? Have >>>> you tried this? >>>> >>>> p.apply(KafkaIO.<String, Envelope>read() >>>> .withValueDeserializerAndCoder((Deserializer<Envelope>) >>>> KafkaAvroDeserializer.class, >>>> AvroCoder.of(Envelope.class)) >>>> >>>> On Thu, Oct 19, 2017 at 11:45 AM Tim Robertson < >>>> timrobertson...@gmail.com> wrote: >>>> >>>>> Hi Raghu >>>>> >>>>> I tried that but with KafkaAvroDeserializer already implementing >>>>> Deserializer<Object> I couldn't get it to work... I didn't spend too >>>>> much time though and agree something like that would be cleaner. >>>>> >>>>> Cheers, >>>>> Tim >>>>> >>>>> On Thu, Oct 19, 2017 at 7:54 PM, Raghu Angadi <rang...@google.com> >>>>> wrote: >>>>> >>>>>> Thanks Tim. >>>>>> >>>>>> How about extending KafkaAvroDeserializer rather than >>>>>> AbstractKafkaAvroDeserializer? >>>>>> >>>>>> TypedKafkaAvroDeserializer class below is useful, but not directly >>>>>> usable by the yet. It needs to store the actual type in Kafka consumer >>>>>> config to retrieve at run time. >>>>>> Even without storing the class, it is still useful. It simplifies >>>>>> user code: >>>>>> >>>>>> public class EnvelopeKafkaAvroDeserializer extends >>>>>> TypedKafkaAvroDeserializer<Envelope> {} >>>>>> >>>>>> This should be part of same package as KafkaAvroDeserializer >>>>>> (surprised it is not there yet). >>>>>> >>>>>> On Thu, Oct 19, 2017 at 3:07 AM, Tim Robertson < >>>>>> timrobertson...@gmail.com> wrote: >>>>>> >>>>>>> Happy to hear >>>>>>> >>>>>>> I wonder if we could do something like this (totally untested): >>>>>>> >>>>>>> public class TypedKafkaAvroDeserializer<T> extends >>>>>>> AbstractKafkaAvroDeserializer implements Deserializer<T> { >>>>>>> @Override >>>>>>> public T deserialize(String s, byte[] bytes) { >>>>>>> return (T) this.deserialize(bytes); >>>>>>> } >>>>>>> } >>>>>>> >>>>>>> On Thu, Oct 19, 2017 at 12:03 PM, Andrew Jones < >>>>>>> andrew+b...@andrew-jones.com> wrote: >>>>>>> >>>>>>>> Thanks Tim, that works! >>>>>>>> >>>>>>>> Full code is: >>>>>>>> >>>>>>>> public class EnvelopeKafkaAvroDeserializer extends >>>>>>>> AbstractKafkaAvroDeserializer implements Deserializer<Envelope> { >>>>>>>> @Override >>>>>>>> public void configure(Map<String, ?> configs, boolean isKey) { >>>>>>>> configure(new KafkaAvroDeserializerConfig(configs)); >>>>>>>> } >>>>>>>> >>>>>>>> @Override >>>>>>>> public Envelope deserialize(String s, byte[] bytes) { >>>>>>>> return (Envelope) this.deserialize(bytes); >>>>>>>> } >>>>>>>> >>>>>>>> @Override >>>>>>>> public void close() {} >>>>>>>> } >>>>>>>> >>>>>>>> Nicer than my solution so think that is the one I'm going to go >>>>>>>> with for now. >>>>>>>> >>>>>>>> Thanks, >>>>>>>> Andrew >>>>>>>> >>>>>>>> >>>>>>>> On Thu, Oct 19, 2017, at 10:20 AM, Tim Robertson wrote: >>>>>>>> >>>>>>>> Hi Andrew, >>>>>>>> >>>>>>>> I also saw the same behaviour. >>>>>>>> >>>>>>>> It's not pretty but perhaps try this? It was my last idea I ran out >>>>>>>> of time to try... >>>>>>>> >>>>>>>> >>>>>>>> *// Basically a copy KafkaAvroDeserializer with the casts in >>>>>>>> deserialize**public class *EnvelopeAvroDeserializer *extends >>>>>>>> *AbstractKafkaAvroDeserializer *implements *Deserializer<Envelope> { >>>>>>>> >>>>>>>> ... >>>>>>>> >>>>>>>> *public *Envelope deserialize(String s, *byte*[] bytes) { >>>>>>>> >>>>>>>> *return *(Envelope) *this*.deserialize(bytes); >>>>>>>> >>>>>>>> } >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> *public *Envelope deserialize(String s, *byte*[] bytes, Schema >>>>>>>> readerSchema) { >>>>>>>> >>>>>>>> *return *(Envelope) *this*.deserialize(bytes, readerSchema); >>>>>>>> >>>>>>>> } >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> ... >>>>>>>> >>>>>>>> } >>>>>>>> >>>>>>>> Tim >>>>>>>> >>>>>>>> >>>>>>>> On Thu, Oct 19, 2017 at 10:52 AM, Andrew Jones < >>>>>>>> andrew+b...@andrew-jones.com> wrote: >>>>>>>> >>>>>>>> >>>>>>>> Using Object doesn't work unfortunately. I get an 'Unable to >>>>>>>> automatically infer a Coder' error at runtime. >>>>>>>> >>>>>>>> This is the code: >>>>>>>> >>>>>>>> p.apply(KafkaIO.<String, Object>read() >>>>>>>> .withValueDeserializer(KafkaAvroDeserializer.class) >>>>>>>> >>>>>>>> It compiles, but at runtime: >>>>>>>> >>>>>>>> Caused by: java.lang.RuntimeException: Unable to automatically >>>>>>>> infer a Coder for the Kafka Deserializer class io.confluent.kafka. >>>>>>>> serializers.KafkaAvroDeserializer: no coder registered for type >>>>>>>> class java.lang.Object >>>>>>>> at org.apache.beam.sdk.io.kafka.KafkaIO.inferCoder(KafkaIO. >>>>>>>> java:1696) >>>>>>>> >>>>>>>> So far the only thing I've got working is this, where I use the >>>>>>>> ByteArrayDeserializer and then parse Avro myself: >>>>>>>> >>>>>>>> private static KafkaAvroDecoder avroDecoder; >>>>>>>> static { >>>>>>>> final Properties props = new Properties(); >>>>>>>> props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, >>>>>>>> "kafka:9092"); >>>>>>>> >>>>>>>> props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, >>>>>>>> "http://registry:8081"); >>>>>>>> >>>>>>>> props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, >>>>>>>> true); >>>>>>>> VerifiableProperties vProps = new >>>>>>>> VerifiableProperties(props); >>>>>>>> avroDecoder = new KafkaAvroDecoder(vProps); >>>>>>>> } >>>>>>>> >>>>>>>> public static void main(String[] args) { >>>>>>>> >>>>>>>> PipelineOptions options = PipelineOptionsFactory.create(); >>>>>>>> Pipeline p = Pipeline.create(options); >>>>>>>> >>>>>>>> p.apply(KafkaIO.<byte[], byte[]>read() >>>>>>>> .withBootstrapServers("kafka:9092") >>>>>>>> .withTopic("dbserver1.inventory.customers") >>>>>>>> .withKeyDeserializer(ByteArrayDeserializer.class) >>>>>>>> .withValueDeserializer(ByteArrayDeserializer.class) >>>>>>>> .withoutMetadata( >>>>>>>> ) >>>>>>>> .apply(Values.<byte[]>create()) >>>>>>>> .apply("ParseAvro", ParDo.of(new DoFn<byte[], >>>>>>>> Envelope>() { >>>>>>>> @ProcessElement >>>>>>>> public void processElement(ProcessContext c) { >>>>>>>> Envelope data = (Envelope) >>>>>>>> avroDecoder.fromBytes(c.element()); >>>>>>>> c.output(data); >>>>>>>> } >>>>>>>> })) >>>>>>>> >>>>>>>> Thanks, >>>>>>>> Andrew >>>>>>>> >>>>>>>> On Wed, Oct 18, 2017, at 06:40 PM, Raghu Angadi wrote: >>>>>>>> >>>>>>>> On Wed, Oct 18, 2017 at 10:35 AM, Eugene Kirpichov < >>>>>>>> kirpic...@google.com> wrote: >>>>>>>> >>>>>>>> It seems that KafkaAvroDeserializer implements >>>>>>>> Deserializer<Object>, though I suppose with proper configuration that >>>>>>>> Object will at run-time be your desired type. Have you tried adding >>>>>>>> some >>>>>>>> Java type casts to make it compile? >>>>>>>> >>>>>>>> >>>>>>>> +1, cast might be the simplest fix. Alternately you can wrap or >>>>>>>> extend KafkaAvroDeserializer as Tim suggested. It would cast the Object >>>>>>>> returned by KafkaAvroDeserializer::deserializer() to Envolope at >>>>>>>> runtime. >>>>>>>> >>>>>>>> >>>>>>>> On Wed, Oct 18, 2017 at 7:26 AM Tim Robertson < >>>>>>>> timrobertson...@gmail.com> wrote: >>>>>>>> >>>>>>>> I just tried quickly and see the same as you Andrew. >>>>>>>> We're missing something obvious or else extending >>>>>>>> KafkaAvroDeserializer seems necessary right? >>>>>>>> >>>>>>>> On Wed, Oct 18, 2017 at 3:14 PM, Andrew Jones < >>>>>>>> andrew+b...@andrew-jones.com> wrote: >>>>>>>> >>>>>>>> Hi, >>>>>>>> >>>>>>>> I'm trying to read Avro data from a Kafka stream using KafkaIO. I >>>>>>>> think >>>>>>>> it should be as simple as: >>>>>>>> >>>>>>>> p.apply(KafkaIO.<String, Envelope>*read*() >>>>>>>> .withValueDeserializerAndCoder(KafkaAvroDeserializer.class, >>>>>>>> AvroCoder.of(Envelope.class)) >>>>>>>> >>>>>>>> Where Envelope is the name of the Avro class. However, that does not >>>>>>>> compile and I get the following error: >>>>>>>> >>>>>>>> incompatible types: >>>>>>>> java.lang.Class<io.confluent.kafka.serializers. >>>>>>>> KafkaAvroDeserializer> >>>>>>>> cannot be converted to java.lang.Class<? extends >>>>>>>> org.apache.kafka.common.serialization.Deserializer< >>>>>>>> dbserver1.inventory.customers.Envelope>> >>>>>>>> >>>>>>>> I've tried a number of variations on this theme but haven't yet >>>>>>>> worked >>>>>>>> it out and am starting to run out of ideas... >>>>>>>> >>>>>>>> Has anyone successfully read Avro data from Kafka? >>>>>>>> >>>>>>>> The code I'm using can be found at >>>>>>>> https://github.com/andrewrjones/debezium-kafka-beam-example and a >>>>>>>> full >>>>>>>> environment can be created with Docker. >>>>>>>> >>>>>>>> Thanks, >>>>>>>> Andrew >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>> >>