This is due to Java doing type erasure in any expression that involves a raw type. This will compile if you extract the result of .apply(KafkaIO.read()...) into a local variable.
On Fri, Oct 20, 2017, 1:51 AM Andrew Jones <[email protected]> wrote: > Thanks Eugene. That does compile, although the rest of the pipeline > doesn't seem happy. > > The next line is: > > .apply(Values.<Envelope>create()) > > But that now doesn't compile with the following error: > > /usr/src/kafka/src/main/java/com/andrewjones/KafkaAvroConsumerExample.java:[54,17] > cannot find symbol > symbol: method > apply(org.apache.beam.sdk.transforms.Values<dbserver1.inventory.customers.Envelope>) > location: interface org.apache.beam.sdk.values.POutput > > Don't really understand what's wrong here. It works fine when using > the EnvelopeKafkaAvroDeserializer as suggested by Tim. > > Thanks, > Andrew > > > On Fri, Oct 20, 2017, at 06:57 AM, Tim Robertson wrote: > > Thanks Eugene > > On Thu, Oct 19, 2017 at 9:36 PM, Raghu Angadi <[email protected]> wrote: > > Ah, nice. It works. > > On Thu, Oct 19, 2017 at 1:44 PM, Eugene Kirpichov <[email protected]> > wrote: > > The following compiles fine: > > > p.apply(KafkaIO.<String, Envelope>read() > > .withBootstrapServers("kafka:9092") > .withTopic("dbserver1.inventory.customers") > > .withKeyDeserializer(StringDeserializer.class) > > .withValueDeserializerAndCoder((Class)KafkaAvroDeserializer.class, > AvroCoder.of(Envelope.class)) > > > On Thu, Oct 19, 2017 at 12:21 PM Raghu Angadi <[email protected]> wrote: > > Same for me. It does not look like there is an annotation to suppress the > error. > > > On Thu, Oct 19, 2017 at 12:18 PM, Tim Robertson <[email protected] > > wrote: > > Hi Eugene, > > I understood that was where Andrew started and reported this. I tried and > saw the same as him. > > incompatible types: > java.lang.Class<io.confluent.kafka.serializers.KafkaAvroDeserializer> > cannot be converted to > org.apache.kafka.common.serialization.Deserializer<org.gbif.pipelines.io.avro.Envelope> > > similarly with > (Class<? *extends *Deserializer<Envelope>>) KafkaAvroDeserializer.*class* > > > > On Thu, Oct 19, 2017 at 9:00 PM, Eugene Kirpichov <[email protected]> > wrote: > > I don't think extending the class is necessary. Not sure I understand why > a simple type casting for withDeserializerAndCoder doesn't work? Have you > tried this? > > p.apply(KafkaIO.<String, Envelope>read() > .withValueDeserializerAndCoder > ((Deserializer<Envelope>)KafkaAvroDeserializer.class, > AvroCoder.of(Envelope.class)) > > On Thu, Oct 19, 2017 at 11:45 AM Tim Robertson <[email protected]> > wrote: > > Hi Raghu > > I tried that but with KafkaAvroDeserializer already implementing > Deserializer<Object> I couldn't get it to work... I didn't spend too much > time though and agree something like that would be cleaner. > > Cheers, > Tim > > On Thu, Oct 19, 2017 at 7:54 PM, Raghu Angadi <[email protected]> wrote: > > Thanks Tim. > > How about extending KafkaAvroDeserializer rather > than AbstractKafkaAvroDeserializer? > > TypedKafkaAvroDeserializer class below is useful, but not directly usable > by the yet. It needs to store the actual type in Kafka consumer config to > retrieve at run time. > Even without storing the class, it is still useful. It simplifies user > code: > > public class EnvelopeKafkaAvroDeserializer extends > TypedKafkaAvroDeserializer<Envelope> {} > > This should be part of same package as KafkaAvroDeserializer (surprised it > is not there yet). > > On Thu, Oct 19, 2017 at 3:07 AM, Tim Robertson <[email protected]> > wrote: > > Happy to hear > > I wonder if we could do something like this (totally untested): > > public class TypedKafkaAvroDeserializer<T> extends > AbstractKafkaAvroDeserializer implements Deserializer<T> { > @Override > > public T deserialize(String s, byte[] bytes) { > return (T) this.deserialize(bytes); > } > > } > > On Thu, Oct 19, 2017 at 12:03 PM, Andrew Jones < > [email protected]> wrote: > > > Thanks Tim, that works! > > Full code is: > > public class EnvelopeKafkaAvroDeserializer extends > AbstractKafkaAvroDeserializer implements Deserializer<Envelope> { > @Override > public void configure(Map<String, ?> configs, boolean isKey) { > configure(new KafkaAvroDeserializerConfig(configs)); > } > > @Override > > public Envelope deserialize(String s, byte[] bytes) { > return (Envelope) this.deserialize(bytes); > } > > > @Override > public void close() {} > } > > Nicer than my solution so think that is the one I'm going to go with for > now. > > Thanks, > Andrew > > > On Thu, Oct 19, 2017, at 10:20 AM, Tim Robertson wrote: > > Hi Andrew, > > I also saw the same behaviour. > > It's not pretty but perhaps try this? It was my last idea I ran out of > time to try... > > > *// Basically a copy KafkaAvroDeserializer with the casts in > deserialize**public class *EnvelopeAvroDeserializer *extends > *AbstractKafkaAvroDeserializer *implements *Deserializer<Envelope> { > > > > ... > > > > *public *Envelope deserialize(String s, *byte*[] bytes) { > > > > *return *(Envelope) *this*.deserialize(bytes); > > > > } > > > > > > > > *public *Envelope deserialize(String s, *byte*[] bytes, Schema > readerSchema) { > > > > *return *(Envelope) *this*.deserialize(bytes, readerSchema); > > > > } > > > > > > > > ... > > > > } > > > > Tim > > > On Thu, Oct 19, 2017 at 10:52 AM, Andrew Jones < > [email protected]> wrote: > > > Using Object doesn't work unfortunately. I get an 'Unable to automatically > infer a Coder' error at runtime. > > This is the code: > > p.apply(KafkaIO.<String, Object>read() > .withValueDeserializer(KafkaAvroDeserializer.class) > > It compiles, but at runtime: > > Caused by: java.lang.RuntimeException: Unable to automatically infer a > Coder for the Kafka Deserializer class > io.confluent.kafka.serializers.KafkaAvroDeserializer: no coder registered > for type class java.lang.Object > at org.apache.beam.sdk.io.kafka.KafkaIO.inferCoder(KafkaIO.java:1696) > > So far the only thing I've got working is this, where I use the > ByteArrayDeserializer and then parse Avro myself: > > private static KafkaAvroDecoder avroDecoder; > static { > final Properties props = new Properties(); > props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092"); > props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, > "http://registry:8081"); > props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, > true); > VerifiableProperties vProps = new VerifiableProperties(props); > avroDecoder = new KafkaAvroDecoder(vProps); > } > > public static void main(String[] args) { > > PipelineOptions options = PipelineOptionsFactory.create(); > Pipeline p = Pipeline.create(options); > > p.apply(KafkaIO.<byte[], byte[]>read() > .withBootstrapServers("kafka:9092") > .withTopic("dbserver1.inventory.customers") > .withKeyDeserializer(ByteArrayDeserializer.class) > .withValueDeserializer(ByteArrayDeserializer.class) > .withoutMetadata( > ) > .apply(Values.<byte[]>create()) > .apply("ParseAvro", ParDo.of(new DoFn<byte[], Envelope>() { > @ProcessElement > public void processElement(ProcessContext c) { > Envelope data = (Envelope) > avroDecoder.fromBytes(c.element()); > c.output(data); > } > })) > > Thanks, > Andrew > > On Wed, Oct 18, 2017, at 06:40 PM, Raghu Angadi wrote: > > On Wed, Oct 18, 2017 at 10:35 AM, Eugene Kirpichov <[email protected]> > wrote: > > It seems that KafkaAvroDeserializer implements Deserializer<Object>, > though I suppose with proper configuration that Object will at run-time be > your desired type. Have you tried adding some Java type casts to make it > compile? > > > +1, cast might be the simplest fix. Alternately you can wrap or > extend KafkaAvroDeserializer as Tim suggested. It would cast the Object > returned by KafkaAvroDeserializer::deserializer() to Envolope at runtime. > > > On Wed, Oct 18, 2017 at 7:26 AM Tim Robertson <[email protected]> > wrote: > > I just tried quickly and see the same as you Andrew. > We're missing something obvious or else extending KafkaAvroDeserializer seems > necessary right? > > On Wed, Oct 18, 2017 at 3:14 PM, Andrew Jones < > [email protected]> wrote: > > Hi, > > I'm trying to read Avro data from a Kafka stream using KafkaIO. I think > it should be as simple as: > > p.apply(KafkaIO.<String, Envelope>*read*() > .withValueDeserializerAndCoder(KafkaAvroDeserializer.class, > AvroCoder.of(Envelope.class)) > > Where Envelope is the name of the Avro class. However, that does not > compile and I get the following error: > > incompatible types: > java.lang.Class<io.confluent.kafka.serializers.KafkaAvroDeserializer> > cannot be converted to java.lang.Class<? extends > > org.apache.kafka.common.serialization.Deserializer<dbserver1.inventory.customers.Envelope>> > > I've tried a number of variations on this theme but haven't yet worked > it out and am starting to run out of ideas... > > Has anyone successfully read Avro data from Kafka? > > The code I'm using can be found at > https://github.com/andrewrjones/debezium-kafka-beam-example and a full > environment can be created with Docker. > > Thanks, > Andrew > > > > >
