This is due to Java doing type erasure in any expression that involves a
raw type. This will compile if you extract the result of
.apply(KafkaIO.read()...) into a local variable.

On Fri, Oct 20, 2017, 1:51 AM Andrew Jones <[email protected]>
wrote:

> Thanks Eugene. That does compile, although the rest of the pipeline
> doesn't seem happy.
>
> The next line is:
>
> .apply(Values.<Envelope>create())
>
> But that now doesn't compile with the following error:
>
> /usr/src/kafka/src/main/java/com/andrewjones/KafkaAvroConsumerExample.java:[54,17]
> cannot find symbol
>   symbol:   method
> apply(org.apache.beam.sdk.transforms.Values<dbserver1.inventory.customers.Envelope>)
>   location: interface org.apache.beam.sdk.values.POutput
>
> Don't really understand what's wrong here. It works fine when using
> the EnvelopeKafkaAvroDeserializer as suggested by Tim.
>
> Thanks,
> Andrew
>
>
> On Fri, Oct 20, 2017, at 06:57 AM, Tim Robertson wrote:
>
> Thanks Eugene
>
> On Thu, Oct 19, 2017 at 9:36 PM, Raghu Angadi <[email protected]> wrote:
>
> Ah, nice. It works.
>
> On Thu, Oct 19, 2017 at 1:44 PM, Eugene Kirpichov <[email protected]>
> wrote:
>
> The following compiles fine:
>
>
>         p.apply(KafkaIO.<String, Envelope>read()
>
>                 .withBootstrapServers("kafka:9092")
>                 .withTopic("dbserver1.inventory.customers")
>
>                 .withKeyDeserializer(StringDeserializer.class)
>
> .withValueDeserializerAndCoder((Class)KafkaAvroDeserializer.class,
> AvroCoder.of(Envelope.class))
>
>
> On Thu, Oct 19, 2017 at 12:21 PM Raghu Angadi <[email protected]> wrote:
>
> Same for me. It does not look like there is an annotation to suppress the
> error.
>
>
> On Thu, Oct 19, 2017 at 12:18 PM, Tim Robertson <[email protected]
> > wrote:
>
> Hi Eugene,
>
> I understood that was where Andrew started and reported this.  I tried and
> saw the same as him.
>
> incompatible types:
> java.lang.Class<io.confluent.kafka.serializers.KafkaAvroDeserializer>
> cannot be converted to
> org.apache.kafka.common.serialization.Deserializer<org.gbif.pipelines.io.avro.Envelope>
>
> similarly with
> (Class<? *extends *Deserializer<Envelope>>) KafkaAvroDeserializer.*class*
>
>
>
> On Thu, Oct 19, 2017 at 9:00 PM, Eugene Kirpichov <[email protected]>
> wrote:
>
> I don't think extending the class is necessary. Not sure I understand why
> a simple type casting for withDeserializerAndCoder doesn't work? Have you
> tried this?
>
> p.apply(KafkaIO.<String, Envelope>read()
>   .withValueDeserializerAndCoder
> ((Deserializer<Envelope>)KafkaAvroDeserializer.class,
>   AvroCoder.of(Envelope.class))
>
> On Thu, Oct 19, 2017 at 11:45 AM Tim Robertson <[email protected]>
> wrote:
>
> Hi Raghu
>
> I tried that but with KafkaAvroDeserializer already implementing
> Deserializer<Object> I couldn't get it to work... I didn't spend too much
> time though and agree something like that would be cleaner.
>
> Cheers,
> Tim
>
> On Thu, Oct 19, 2017 at 7:54 PM, Raghu Angadi <[email protected]> wrote:
>
> Thanks Tim.
>
> How about extending KafkaAvroDeserializer rather
> than AbstractKafkaAvroDeserializer?
>
> TypedKafkaAvroDeserializer class below is useful, but not directly usable
> by the yet. It needs to store the actual type in Kafka consumer config to
> retrieve at run time.
> Even without storing the class, it is still useful. It simplifies user
> code:
>
> public class EnvelopeKafkaAvroDeserializer extends
> TypedKafkaAvroDeserializer<Envelope> {}
>
> This should be part of same package as KafkaAvroDeserializer (surprised it
> is not there yet).
>
> On Thu, Oct 19, 2017 at 3:07 AM, Tim Robertson <[email protected]>
> wrote:
>
> Happy to hear
>
> I wonder if we could do something like this (totally untested):
>
> public class TypedKafkaAvroDeserializer<T> extends
> AbstractKafkaAvroDeserializer implements Deserializer<T> {
>    @Override
>
>     public T deserialize(String s, byte[] bytes) {
>         return (T) this.deserialize(bytes);
>     }
>
> }
>
> On Thu, Oct 19, 2017 at 12:03 PM, Andrew Jones <
> [email protected]> wrote:
>
>
> Thanks Tim, that works!
>
> Full code is:
>
> public class EnvelopeKafkaAvroDeserializer extends
> AbstractKafkaAvroDeserializer implements Deserializer<Envelope> {
>     @Override
>     public void configure(Map<String, ?> configs, boolean isKey) {
>         configure(new KafkaAvroDeserializerConfig(configs));
>     }
>
>     @Override
>
>     public Envelope deserialize(String s, byte[] bytes) {
>         return (Envelope) this.deserialize(bytes);
>     }
>
>
>     @Override
>     public void close() {}
> }
>
> Nicer than my solution so think that is the one I'm going to go with for
> now.
>
> Thanks,
> Andrew
>
>
> On Thu, Oct 19, 2017, at 10:20 AM, Tim Robertson wrote:
>
> Hi Andrew,
>
> I also saw the same behaviour.
>
> It's not pretty but perhaps try this? It was my last idea I ran out of
> time to try...
>
>
> *// Basically a copy KafkaAvroDeserializer with the casts in 
> deserialize**public class *EnvelopeAvroDeserializer *extends 
> *AbstractKafkaAvroDeserializer *implements *Deserializer<Envelope> {
>
>
>
>   ...
>
>
>
>   *public *Envelope deserialize(String s, *byte*[] bytes) {
>
>
>
>     *return *(Envelope) *this*.deserialize(bytes);
>
>
>
>   }
>
>
>
>
>
>
>
>   *public *Envelope deserialize(String s, *byte*[] bytes, Schema 
> readerSchema) {
>
>
>
>     *return *(Envelope) *this*.deserialize(bytes, readerSchema);
>
>
>
>   }
>
>
>
>
>
>
>
>   ...
>
>
>
> }
>
>
>
>  Tim
>
>
> On Thu, Oct 19, 2017 at 10:52 AM, Andrew Jones <
> [email protected]> wrote:
>
>
> Using Object doesn't work unfortunately. I get an 'Unable to automatically
> infer a Coder' error at runtime.
>
> This is the code:
>
> p.apply(KafkaIO.<String, Object>read()
>                 .withValueDeserializer(KafkaAvroDeserializer.class)
>
> It compiles, but at runtime:
>
> Caused by: java.lang.RuntimeException: Unable to automatically infer a
> Coder for the Kafka Deserializer class
> io.confluent.kafka.serializers.KafkaAvroDeserializer: no coder registered
> for type class java.lang.Object
> at org.apache.beam.sdk.io.kafka.KafkaIO.inferCoder(KafkaIO.java:1696)
>
> So far the only thing I've got working is this, where I use the
> ByteArrayDeserializer and then parse Avro myself:
>
>     private static KafkaAvroDecoder avroDecoder;
>     static {
>         final Properties props = new Properties();
>         props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
>         props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
> "http://registry:8081";);
>         props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG,
> true);
>         VerifiableProperties vProps = new VerifiableProperties(props);
>         avroDecoder = new KafkaAvroDecoder(vProps);
>     }
>
>     public static void main(String[] args) {
>
>         PipelineOptions options = PipelineOptionsFactory.create();
>         Pipeline p = Pipeline.create(options);
>
>         p.apply(KafkaIO.<byte[], byte[]>read()
>                 .withBootstrapServers("kafka:9092")
>                 .withTopic("dbserver1.inventory.customers")
>                 .withKeyDeserializer(ByteArrayDeserializer.class)
>                 .withValueDeserializer(ByteArrayDeserializer.class)
>                 .withoutMetadata(
>         )
>                 .apply(Values.<byte[]>create())
>                 .apply("ParseAvro", ParDo.of(new DoFn<byte[], Envelope>() {
>                     @ProcessElement
>                     public void processElement(ProcessContext c) {
>                         Envelope data = (Envelope)
> avroDecoder.fromBytes(c.element());
>                         c.output(data);
>                     }
>                 }))
>
> Thanks,
> Andrew
>
> On Wed, Oct 18, 2017, at 06:40 PM, Raghu Angadi wrote:
>
> On Wed, Oct 18, 2017 at 10:35 AM, Eugene Kirpichov <[email protected]>
> wrote:
>
> It seems that KafkaAvroDeserializer implements Deserializer<Object>,
> though I suppose with proper configuration that Object will at run-time be
> your desired type. Have you tried adding some Java type casts to make it
> compile?
>
>
> +1, cast might be the simplest fix. Alternately you can wrap or
> extend KafkaAvroDeserializer as Tim suggested. It would cast the Object
> returned by KafkaAvroDeserializer::deserializer() to Envolope at runtime.
>
>
> On Wed, Oct 18, 2017 at 7:26 AM Tim Robertson <[email protected]>
> wrote:
>
> I just tried quickly and see the same as you Andrew.
> We're missing something obvious or else extending KafkaAvroDeserializer seems
> necessary right?
>
> On Wed, Oct 18, 2017 at 3:14 PM, Andrew Jones <
> [email protected]> wrote:
>
> Hi,
>
> I'm trying to read Avro data from a Kafka stream using KafkaIO. I think
> it should be as simple as:
>
> p.apply(KafkaIO.<String, Envelope>*read*()
>   .withValueDeserializerAndCoder(KafkaAvroDeserializer.class,
>   AvroCoder.of(Envelope.class))
>
> Where Envelope is the name of the Avro class. However, that does not
> compile and I get the following error:
>
> incompatible types:
> java.lang.Class<io.confluent.kafka.serializers.KafkaAvroDeserializer>
> cannot be converted to java.lang.Class<? extends
>
> org.apache.kafka.common.serialization.Deserializer<dbserver1.inventory.customers.Envelope>>
>
> I've tried a number of variations on this theme but haven't yet worked
> it out and am starting to run out of ideas...
>
> Has anyone successfully read Avro data from Kafka?
>
> The code I'm using can be found at
> https://github.com/andrewrjones/debezium-kafka-beam-example and a full
> environment can be created with Docker.
>
> Thanks,
> Andrew
>
>
>
>
>

Reply via email to