Ah, nice. It works.

On Thu, Oct 19, 2017 at 1:44 PM, Eugene Kirpichov <kirpic...@google.com>
wrote:

> The following compiles fine:
>
>
>         p.apply(KafkaIO.<String, Envelope>read()
>                 .withBootstrapServers("kafka:9092")
>                 .withTopic("dbserver1.inventory.customers")
>                 .withKeyDeserializer(StringDeserializer.class)
>                 .withValueDeserializerAndCoder(
> (Class)KafkaAvroDeserializer.class, AvroCoder.of(Envelope.class))
>
>
> On Thu, Oct 19, 2017 at 12:21 PM Raghu Angadi <rang...@google.com> wrote:
>
>> Same for me. It does not look like there is an annotation to suppress the
>> error.
>>
>>
>> On Thu, Oct 19, 2017 at 12:18 PM, Tim Robertson <
>> timrobertson...@gmail.com> wrote:
>>
>>> Hi Eugene,
>>>
>>> I understood that was where Andrew started and reported this.  I tried
>>> and saw the same as him.
>>>
>>> incompatible types: 
>>> java.lang.Class<io.confluent.kafka.serializers.KafkaAvroDeserializer>
>>> cannot be converted to org.apache.kafka.common.
>>> serialization.Deserializer<org.gbif.pipelines.io.avro.Envelope>
>>>
>>> similarly with
>>> (Class<? extends Deserializer<Envelope>>) KafkaAvroDeserializer.class
>>>
>>>
>>>
>>> On Thu, Oct 19, 2017 at 9:00 PM, Eugene Kirpichov <kirpic...@google.com>
>>> wrote:
>>>
>>>> I don't think extending the class is necessary. Not sure I understand
>>>> why a simple type casting for withDeserializerAndCoder doesn't work? Have
>>>> you tried this?
>>>>
>>>> p.apply(KafkaIO.<String, Envelope>read()
>>>>   .withValueDeserializerAndCoder((Deserializer<Envelope>)
>>>> KafkaAvroDeserializer.class,
>>>>   AvroCoder.of(Envelope.class))
>>>>
>>>> On Thu, Oct 19, 2017 at 11:45 AM Tim Robertson <
>>>> timrobertson...@gmail.com> wrote:
>>>>
>>>>> Hi Raghu
>>>>>
>>>>> I tried that but with KafkaAvroDeserializer already implementing
>>>>> Deserializer<Object> I couldn't get it to work... I didn't spend too
>>>>> much time though and agree something like that would be cleaner.
>>>>>
>>>>> Cheers,
>>>>> Tim
>>>>>
>>>>> On Thu, Oct 19, 2017 at 7:54 PM, Raghu Angadi <rang...@google.com>
>>>>> wrote:
>>>>>
>>>>>> Thanks Tim.
>>>>>>
>>>>>> How about extending KafkaAvroDeserializer rather than
>>>>>> AbstractKafkaAvroDeserializer?
>>>>>>
>>>>>> TypedKafkaAvroDeserializer class below is useful, but not directly
>>>>>> usable by the yet. It needs to store the actual type in Kafka consumer
>>>>>> config to retrieve at run time.
>>>>>> Even without storing the class, it is still useful. It simplifies
>>>>>> user code:
>>>>>>
>>>>>> public class EnvelopeKafkaAvroDeserializer extends
>>>>>> TypedKafkaAvroDeserializer<Envelope> {}
>>>>>>
>>>>>> This should be part of same package as KafkaAvroDeserializer
>>>>>> (surprised it is not there yet).
>>>>>>
>>>>>> On Thu, Oct 19, 2017 at 3:07 AM, Tim Robertson <
>>>>>> timrobertson...@gmail.com> wrote:
>>>>>>
>>>>>>> Happy to hear
>>>>>>>
>>>>>>> I wonder if we could do something like this (totally untested):
>>>>>>>
>>>>>>> public class TypedKafkaAvroDeserializer<T> extends
>>>>>>> AbstractKafkaAvroDeserializer implements Deserializer<T> {
>>>>>>>    @Override
>>>>>>>     public T deserialize(String s, byte[] bytes) {
>>>>>>>         return (T) this.deserialize(bytes);
>>>>>>>     }
>>>>>>> }
>>>>>>>
>>>>>>> On Thu, Oct 19, 2017 at 12:03 PM, Andrew Jones <
>>>>>>> andrew+b...@andrew-jones.com> wrote:
>>>>>>>
>>>>>>>> Thanks Tim, that works!
>>>>>>>>
>>>>>>>> Full code is:
>>>>>>>>
>>>>>>>> public class EnvelopeKafkaAvroDeserializer extends
>>>>>>>> AbstractKafkaAvroDeserializer implements Deserializer<Envelope> {
>>>>>>>>     @Override
>>>>>>>>     public void configure(Map<String, ?> configs, boolean isKey) {
>>>>>>>>         configure(new KafkaAvroDeserializerConfig(configs));
>>>>>>>>     }
>>>>>>>>
>>>>>>>>     @Override
>>>>>>>>     public Envelope deserialize(String s, byte[] bytes) {
>>>>>>>>         return (Envelope) this.deserialize(bytes);
>>>>>>>>     }
>>>>>>>>
>>>>>>>>     @Override
>>>>>>>>     public void close() {}
>>>>>>>> }
>>>>>>>>
>>>>>>>> Nicer than my solution so think that is the one I'm going to go
>>>>>>>> with for now.
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> Andrew
>>>>>>>>
>>>>>>>>
>>>>>>>> On Thu, Oct 19, 2017, at 10:20 AM, Tim Robertson wrote:
>>>>>>>>
>>>>>>>> Hi Andrew,
>>>>>>>>
>>>>>>>> I also saw the same behaviour.
>>>>>>>>
>>>>>>>> It's not pretty but perhaps try this? It was my last idea I ran out
>>>>>>>> of time to try...
>>>>>>>>
>>>>>>>>
>>>>>>>> *// Basically a copy KafkaAvroDeserializer with the casts in 
>>>>>>>> deserialize**public class *EnvelopeAvroDeserializer *extends 
>>>>>>>> *AbstractKafkaAvroDeserializer *implements *Deserializer<Envelope> {
>>>>>>>>
>>>>>>>>   ...
>>>>>>>>
>>>>>>>>   *public *Envelope deserialize(String s, *byte*[] bytes) {
>>>>>>>>
>>>>>>>>     *return *(Envelope) *this*.deserialize(bytes);
>>>>>>>>
>>>>>>>>   }
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>   *public *Envelope deserialize(String s, *byte*[] bytes, Schema 
>>>>>>>> readerSchema) {
>>>>>>>>
>>>>>>>>     *return *(Envelope) *this*.deserialize(bytes, readerSchema);
>>>>>>>>
>>>>>>>>   }
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>   ...
>>>>>>>>
>>>>>>>> }
>>>>>>>>
>>>>>>>>  Tim
>>>>>>>>
>>>>>>>>
>>>>>>>> On Thu, Oct 19, 2017 at 10:52 AM, Andrew Jones <
>>>>>>>> andrew+b...@andrew-jones.com> wrote:
>>>>>>>>
>>>>>>>>
>>>>>>>> Using Object doesn't work unfortunately. I get an 'Unable to
>>>>>>>> automatically infer a Coder' error at runtime.
>>>>>>>>
>>>>>>>> This is the code:
>>>>>>>>
>>>>>>>> p.apply(KafkaIO.<String, Object>read()
>>>>>>>>                 .withValueDeserializer(KafkaAvroDeserializer.class)
>>>>>>>>
>>>>>>>> It compiles, but at runtime:
>>>>>>>>
>>>>>>>> Caused by: java.lang.RuntimeException: Unable to automatically
>>>>>>>> infer a Coder for the Kafka Deserializer class io.confluent.kafka.
>>>>>>>> serializers.KafkaAvroDeserializer: no coder registered for type
>>>>>>>> class java.lang.Object
>>>>>>>> at org.apache.beam.sdk.io.kafka.KafkaIO.inferCoder(KafkaIO.
>>>>>>>> java:1696)
>>>>>>>>
>>>>>>>> So far the only thing I've got working is this, where I use the
>>>>>>>> ByteArrayDeserializer and then parse Avro myself:
>>>>>>>>
>>>>>>>>     private static KafkaAvroDecoder avroDecoder;
>>>>>>>>     static {
>>>>>>>>         final Properties props = new Properties();
>>>>>>>>         props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
>>>>>>>> "kafka:9092");
>>>>>>>>         
>>>>>>>> props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
>>>>>>>> "http://registry:8081";);
>>>>>>>>         
>>>>>>>> props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG,
>>>>>>>> true);
>>>>>>>>         VerifiableProperties vProps = new
>>>>>>>> VerifiableProperties(props);
>>>>>>>>         avroDecoder = new KafkaAvroDecoder(vProps);
>>>>>>>>     }
>>>>>>>>
>>>>>>>>     public static void main(String[] args) {
>>>>>>>>
>>>>>>>>         PipelineOptions options = PipelineOptionsFactory.create();
>>>>>>>>         Pipeline p = Pipeline.create(options);
>>>>>>>>
>>>>>>>>         p.apply(KafkaIO.<byte[], byte[]>read()
>>>>>>>>                 .withBootstrapServers("kafka:9092")
>>>>>>>>                 .withTopic("dbserver1.inventory.customers")
>>>>>>>>                 .withKeyDeserializer(ByteArrayDeserializer.class)
>>>>>>>>                 .withValueDeserializer(ByteArrayDeserializer.class)
>>>>>>>>                 .withoutMetadata(
>>>>>>>>         )
>>>>>>>>                 .apply(Values.<byte[]>create())
>>>>>>>>                 .apply("ParseAvro", ParDo.of(new DoFn<byte[],
>>>>>>>> Envelope>() {
>>>>>>>>                     @ProcessElement
>>>>>>>>                     public void processElement(ProcessContext c) {
>>>>>>>>                         Envelope data = (Envelope)
>>>>>>>> avroDecoder.fromBytes(c.element());
>>>>>>>>                         c.output(data);
>>>>>>>>                     }
>>>>>>>>                 }))
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> Andrew
>>>>>>>>
>>>>>>>> On Wed, Oct 18, 2017, at 06:40 PM, Raghu Angadi wrote:
>>>>>>>>
>>>>>>>> On Wed, Oct 18, 2017 at 10:35 AM, Eugene Kirpichov <
>>>>>>>> kirpic...@google.com> wrote:
>>>>>>>>
>>>>>>>> It seems that KafkaAvroDeserializer implements
>>>>>>>> Deserializer<Object>, though I suppose with proper configuration that
>>>>>>>> Object will at run-time be your desired type. Have you tried adding 
>>>>>>>> some
>>>>>>>> Java type casts to make it compile?
>>>>>>>>
>>>>>>>>
>>>>>>>> +1, cast might be the simplest fix. Alternately you can wrap or
>>>>>>>> extend KafkaAvroDeserializer as Tim suggested. It would cast the Object
>>>>>>>> returned by KafkaAvroDeserializer::deserializer() to Envolope at
>>>>>>>> runtime.
>>>>>>>>
>>>>>>>>
>>>>>>>> On Wed, Oct 18, 2017 at 7:26 AM Tim Robertson <
>>>>>>>> timrobertson...@gmail.com> wrote:
>>>>>>>>
>>>>>>>> I just tried quickly and see the same as you Andrew.
>>>>>>>> We're missing something obvious or else extending
>>>>>>>> KafkaAvroDeserializer seems necessary right?
>>>>>>>>
>>>>>>>> On Wed, Oct 18, 2017 at 3:14 PM, Andrew Jones <
>>>>>>>> andrew+b...@andrew-jones.com> wrote:
>>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> I'm trying to read Avro data from a Kafka stream using KafkaIO. I
>>>>>>>> think
>>>>>>>> it should be as simple as:
>>>>>>>>
>>>>>>>> p.apply(KafkaIO.<String, Envelope>*read*()
>>>>>>>>   .withValueDeserializerAndCoder(KafkaAvroDeserializer.class,
>>>>>>>>   AvroCoder.of(Envelope.class))
>>>>>>>>
>>>>>>>> Where Envelope is the name of the Avro class. However, that does not
>>>>>>>> compile and I get the following error:
>>>>>>>>
>>>>>>>> incompatible types:
>>>>>>>> java.lang.Class<io.confluent.kafka.serializers.
>>>>>>>> KafkaAvroDeserializer>
>>>>>>>> cannot be converted to java.lang.Class<? extends
>>>>>>>> org.apache.kafka.common.serialization.Deserializer<
>>>>>>>> dbserver1.inventory.customers.Envelope>>
>>>>>>>>
>>>>>>>> I've tried a number of variations on this theme but haven't yet
>>>>>>>> worked
>>>>>>>> it out and am starting to run out of ideas...
>>>>>>>>
>>>>>>>> Has anyone successfully read Avro data from Kafka?
>>>>>>>>
>>>>>>>> The code I'm using can be found at
>>>>>>>> https://github.com/andrewrjones/debezium-kafka-beam-example and a
>>>>>>>> full
>>>>>>>> environment can be created with Docker.
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> Andrew
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>
>>

Reply via email to