Re: KafkaIO and Avro

2017-10-24 Thread Andrew Jones
Thanks Eugene, that worked perfectly!

Full final code at
https://github.com/andrewrjones/debezium-kafka-beam-example/blob/master/src/main/java/com/andrewjones/KafkaAvroConsumerExample.java.
Thanks,
Andrew


On Fri, Oct 20, 2017, at 05:10 PM, Eugene Kirpichov wrote:
> This is due to Java doing type erasure in any expression that involves
> a raw type. This will compile if you extract the result of
> .apply(KafkaIO.read()...) into a local variable.> 
> On Fri, Oct 20, 2017, 1:51 AM Andrew Jones  jones.com[1]> wrote:>> __
>> Thanks Eugene. That does compile, although the rest of the pipeline
>> doesn't seem happy.>> 
>> The next line is:
>> 
>> .apply(Values.create())
>> 
>> But that now doesn't compile with the following error:
>> 
>> /usr/src/kafka/src/main/java/com/andrewjones/KafkaAvroConsumerExampl-
>> e.java:[54,17] cannot find symbol>>   symbol:   method 
>> apply(org.apache.beam.sdk.transforms.Values>   ver1.inventory.customers.Envelope>)>>   location: interface 
>> org.apache.beam.sdk.values.POutput
>> 
>> Don't really understand what's wrong here. It works fine when using
>> the EnvelopeKafkaAvroDeserializer as suggested by Tim.>> 
>> Thanks,
>> Andrew
>> 
>> 
>> On Fri, Oct 20, 2017, at 06:57 AM, Tim Robertson wrote:
>>> Thanks Eugene 
>>> 
>>> On Thu, Oct 19, 2017 at 9:36 PM, Raghu Angadi 
>>> wrote: Ah, nice. It works. 
 
 On Thu, Oct 19, 2017 at 1:44 PM, Eugene Kirpichov
  wrote:> The following compiles fine:
> 
> 
> p.apply(KafkaIO.read()
> 
> .withBootstrapServers("kafka:9092")
> .withTopic("dbserver1.inventory.customers")
> 
> .withKeyDeserializer(StringDeserializer.class)
> .withValueDeserializerAndCoder((Class)KafkaAvroDe-
> serializer.class, AvroCoder.of(Envelope.class))> 
> 
> On Thu, Oct 19, 2017 at 12:21 PM Raghu Angadi 
> wrote:>> Same for me. It does not look like there is an annotation to
>> suppress the error.>> 
>> 
>> On Thu, Oct 19, 2017 at 12:18 PM, Tim Robertson
>>  wrote:>>> Hi Eugene,
>>> 
>>> I understood that was where Andrew started and reported this.  I
>>> tried and saw the same as him.>>> 
>>> incompatible types: java.lang.Class>> ers.KafkaAvroDeserializer> cannot be converted to org.apache.ka-
>>> fka.common.serialization.Deserializer>> o.Envelope 
>>> similarly with 
>>> (Class>)
>>> KafkaAvroDeserializer.*class*>>> 
>>> 
>>> 
>>> On Thu, Oct 19, 2017 at 9:00 PM, Eugene Kirpichov
>>>  wrote: I don't think extending the class 
>>> is necessary. Not sure I
 understand why a simple type casting for
 withDeserializerAndCoder doesn't work? Have you tried this? 
 p.apply(KafkaIO.read()
   .withValueDeserializerAndCoder((Deserializer)Kafka-
   AvroDeserializer.class,   AvroCoder.of(Envelope.class))
 
 On Thu, Oct 19, 2017 at 11:45 AM Tim Robertson
  wrote:> Hi Raghu
> 
> I tried that but with KafkaAvroDeserializer already
> implementing Deserializer I couldn't get it to work...
> I didn't spend too much time though and agree something like
> that would be cleaner.> 
> Cheers,
> Tim 
> 
> On Thu, Oct 19, 2017 at 7:54 PM, Raghu Angadi
>  wrote:>> Thanks Tim. 
>> 
>> How about extending KafkaAvroDeserializer rather than
>> AbstractKafkaAvroDeserializer?>> 
>> TypedKafkaAvroDeserializer class below is useful, but not
>> directly usable by the yet. It needs to store the actual type
>> in Kafka consumer config to retrieve at run time.>> Even 
>> without storing the class, it is still useful. It
>> simplifies user code:>> 
>> public class EnvelopeKafkaAvroDeserializer extends
>> TypedKafkaAvroDeserializer {}>> 
>> This should be part of same package as KafkaAvroDeserializer
>> (surprised it is not there yet).>> 
>> On Thu, Oct 19, 2017 at 3:07 AM, Tim Robertson
>>  wrote:>>> Happy to hear
>>> 
>>> I wonder if we could do something like this (totally
>>> untested):>>> 
>>> public class TypedKafkaAvroDeserializer extends
>>> AbstractKafkaAvroDeserializer implements Deserializer 
>>> {>>>@Override
>>> 
>>> public T deserialize(String s, byte[] bytes) {
>>> return (T) this.deserialize(bytes);
>>> }
>>> 
>>> 

Re: KafkaIO and Avro

2017-10-20 Thread Eugene Kirpichov
This is due to Java doing type erasure in any expression that involves a
raw type. This will compile if you extract the result of
.apply(KafkaIO.read()...) into a local variable.

On Fri, Oct 20, 2017, 1:51 AM Andrew Jones 
wrote:

> Thanks Eugene. That does compile, although the rest of the pipeline
> doesn't seem happy.
>
> The next line is:
>
> .apply(Values.create())
>
> But that now doesn't compile with the following error:
>
> /usr/src/kafka/src/main/java/com/andrewjones/KafkaAvroConsumerExample.java:[54,17]
> cannot find symbol
>   symbol:   method
> apply(org.apache.beam.sdk.transforms.Values)
>   location: interface org.apache.beam.sdk.values.POutput
>
> Don't really understand what's wrong here. It works fine when using
> the EnvelopeKafkaAvroDeserializer as suggested by Tim.
>
> Thanks,
> Andrew
>
>
> On Fri, Oct 20, 2017, at 06:57 AM, Tim Robertson wrote:
>
> Thanks Eugene
>
> On Thu, Oct 19, 2017 at 9:36 PM, Raghu Angadi  wrote:
>
> Ah, nice. It works.
>
> On Thu, Oct 19, 2017 at 1:44 PM, Eugene Kirpichov 
> wrote:
>
> The following compiles fine:
>
>
> p.apply(KafkaIO.read()
>
> .withBootstrapServers("kafka:9092")
> .withTopic("dbserver1.inventory.customers")
>
> .withKeyDeserializer(StringDeserializer.class)
>
> .withValueDeserializerAndCoder((Class)KafkaAvroDeserializer.class,
> AvroCoder.of(Envelope.class))
>
>
> On Thu, Oct 19, 2017 at 12:21 PM Raghu Angadi  wrote:
>
> Same for me. It does not look like there is an annotation to suppress the
> error.
>
>
> On Thu, Oct 19, 2017 at 12:18 PM, Tim Robertson  > wrote:
>
> Hi Eugene,
>
> I understood that was where Andrew started and reported this.  I tried and
> saw the same as him.
>
> incompatible types:
> java.lang.Class
> cannot be converted to
> org.apache.kafka.common.serialization.Deserializer
>
> similarly with
> (Class>) KafkaAvroDeserializer.*class*
>
>
>
> On Thu, Oct 19, 2017 at 9:00 PM, Eugene Kirpichov 
> wrote:
>
> I don't think extending the class is necessary. Not sure I understand why
> a simple type casting for withDeserializerAndCoder doesn't work? Have you
> tried this?
>
> p.apply(KafkaIO.read()
>   .withValueDeserializerAndCoder
> ((Deserializer)KafkaAvroDeserializer.class,
>   AvroCoder.of(Envelope.class))
>
> On Thu, Oct 19, 2017 at 11:45 AM Tim Robertson 
> wrote:
>
> Hi Raghu
>
> I tried that but with KafkaAvroDeserializer already implementing
> Deserializer I couldn't get it to work... I didn't spend too much
> time though and agree something like that would be cleaner.
>
> Cheers,
> Tim
>
> On Thu, Oct 19, 2017 at 7:54 PM, Raghu Angadi  wrote:
>
> Thanks Tim.
>
> How about extending KafkaAvroDeserializer rather
> than AbstractKafkaAvroDeserializer?
>
> TypedKafkaAvroDeserializer class below is useful, but not directly usable
> by the yet. It needs to store the actual type in Kafka consumer config to
> retrieve at run time.
> Even without storing the class, it is still useful. It simplifies user
> code:
>
> public class EnvelopeKafkaAvroDeserializer extends
> TypedKafkaAvroDeserializer {}
>
> This should be part of same package as KafkaAvroDeserializer (surprised it
> is not there yet).
>
> On Thu, Oct 19, 2017 at 3:07 AM, Tim Robertson 
> wrote:
>
> Happy to hear
>
> I wonder if we could do something like this (totally untested):
>
> public class TypedKafkaAvroDeserializer extends
> AbstractKafkaAvroDeserializer implements Deserializer {
>@Override
>
> public T deserialize(String s, byte[] bytes) {
> return (T) this.deserialize(bytes);
> }
>
> }
>
> On Thu, Oct 19, 2017 at 12:03 PM, Andrew Jones <
> andrew+b...@andrew-jones.com> wrote:
>
>
> Thanks Tim, that works!
>
> Full code is:
>
> public class EnvelopeKafkaAvroDeserializer extends
> AbstractKafkaAvroDeserializer implements Deserializer {
> @Override
> public void configure(Map configs, boolean isKey) {
> configure(new KafkaAvroDeserializerConfig(configs));
> }
>
> @Override
>
> public Envelope deserialize(String s, byte[] bytes) {
> return (Envelope) this.deserialize(bytes);
> }
>
>
> @Override
> public void close() {}
> }
>
> Nicer than my solution so think that is the one I'm going to go with for
> now.
>
> Thanks,
> Andrew
>
>
> On Thu, Oct 19, 2017, at 10:20 AM, Tim Robertson wrote:
>
> Hi Andrew,
>
> I also saw the same behaviour.
>
> It's not pretty but perhaps try this? It was my last idea I ran out of
> time to try...
>
>
> *// Basically a copy KafkaAvroDeserializer with the casts in 
> deserialize**public class *EnvelopeAvroDeserializer *extends 
> *AbstractKafkaAvroDeserializer *implements *Deserializer {
>
>
>
>   ...
>
>
>
>   *public *Envelope deserialize(String s, 

Re: KafkaIO and Avro

2017-10-20 Thread Andrew Jones
Thanks Eugene. That does compile, although the rest of the pipeline
doesn't seem happy.
The next line is:

.apply(Values.create())

But that now doesn't compile with the following error:

/usr/src/kafka/src/main/java/com/andrewjones/KafkaAvroConsumerExample.j-
ava:[54,17] cannot find symbol  symbol:   method 
apply(org.apache.beam.sdk.transforms.Values)  location: interface 
org.apache.beam.sdk.values.POutput

Don't really understand what's wrong here. It works fine when using the
EnvelopeKafkaAvroDeserializer as suggested by Tim.
Thanks,
Andrew


On Fri, Oct 20, 2017, at 06:57 AM, Tim Robertson wrote:
> Thanks Eugene 
> 
> On Thu, Oct 19, 2017 at 9:36 PM, Raghu Angadi
>  wrote:>> Ah, nice. It works. 
>> 
>> On Thu, Oct 19, 2017 at 1:44 PM, Eugene Kirpichov
>>  wrote:>>> The following compiles fine:
>>> 
>>> 
>>> p.apply(KafkaIO.read()
>>> 
>>> .withBootstrapServers("kafka:9092")
>>> .withTopic("dbserver1.inventory.customers")
>>> 
>>> .withKeyDeserializer(StringDeserializer.class)
>>> .withValueDeserializerAndCoder((Class)KafkaAvroDese-
>>> rializer.class, AvroCoder.of(Envelope.class))>>> 
>>> 
>>> On Thu, Oct 19, 2017 at 12:21 PM Raghu Angadi 
>>> wrote: Same for me. It does not look like there is an annotation to
 suppress the error. 
 
 On Thu, Oct 19, 2017 at 12:18 PM, Tim Robertson
  wrote:> Hi Eugene,
> 
> I understood that was where Andrew started and reported this.  I
> tried and saw the same as him.> 
> incompatible types: java.lang.Class s.KafkaAvroDeserializer> cannot be converted to org.apache.kafka.-
> common.serialization.Deserializer lope>> 
> similarly with 
> (Class>)
> KafkaAvroDeserializer.*class*> 
> 
> 
> On Thu, Oct 19, 2017 at 9:00 PM, Eugene Kirpichov
>  wrote:>> I don't think extending the class is 
> necessary. Not sure I
>> understand why a simple type casting for withDeserializerAndCoder
>> doesn't work? Have you tried this?>> 
>> p.apply(KafkaIO.read()
>>   .withValueDeserializerAndCoder((Deserializer)KafkaAv-
>>   roDeserializer.class,>>   AvroCoder.of(Envelope.class))
>> 
>> On Thu, Oct 19, 2017 at 11:45 AM Tim Robertson
>>  wrote:>>> Hi Raghu
>>> 
>>> I tried that but with KafkaAvroDeserializer already implementing
>>> Deserializer I couldn't get it to work... I didn't spend
>>> too much time though and agree something like that would be
>>> cleaner.>>> 
>>> Cheers,
>>> Tim 
>>> 
>>> On Thu, Oct 19, 2017 at 7:54 PM, Raghu Angadi
>>>  wrote: Thanks Tim. 
 
 How about extending KafkaAvroDeserializer rather than
 AbstractKafkaAvroDeserializer? 
 TypedKafkaAvroDeserializer class below is useful, but not
 directly usable by the yet. It needs to store the actual type
 in Kafka consumer config to retrieve at run time. Even without 
 storing the class, it is still useful. It
 simplifies user code: 
 public class EnvelopeKafkaAvroDeserializer extends
 TypedKafkaAvroDeserializer {} 
 This should be part of same package as KafkaAvroDeserializer
 (surprised it is not there yet). 
 On Thu, Oct 19, 2017 at 3:07 AM, Tim Robertson
  wrote:> Happy to hear
> 
> I wonder if we could do something like this (totally
> untested):> 
> public class TypedKafkaAvroDeserializer extends
> AbstractKafkaAvroDeserializer implements Deserializer {>   
>  @Override
> 
> public T deserialize(String s, byte[] bytes) {
> return (T) this.deserialize(bytes);
> }
> 
> }
> 
> On Thu, Oct 19, 2017 at 12:03 PM, Andrew Jones  jones.com> wrote:>> __
>> Thanks Tim, that works!
>> 
>> Full code is:
>> 
>> public class EnvelopeKafkaAvroDeserializer extends
>> AbstractKafkaAvroDeserializer implements
>> Deserializer {>> @Override
>> public void configure(Map configs, boolean
>> isKey) {>> configure(new 
>> KafkaAvroDeserializerConfig(configs));>> }
>> 
>> @Override
>> 
>> public Envelope deserialize(String s, byte[] bytes) {
>> return (Envelope) this.deserialize(bytes);
>> }
>> 
>> 
>> @Override
>> public void close() {}
>> }

Re: KafkaIO and Avro

2017-10-19 Thread Tim Robertson
Thanks Eugene

On Thu, Oct 19, 2017 at 9:36 PM, Raghu Angadi  wrote:

> Ah, nice. It works.
>
> On Thu, Oct 19, 2017 at 1:44 PM, Eugene Kirpichov 
> wrote:
>
>> The following compiles fine:
>>
>>
>> p.apply(KafkaIO.read()
>> .withBootstrapServers("kafka:9092")
>> .withTopic("dbserver1.inventory.customers")
>> .withKeyDeserializer(StringDeserializer.class)
>> .withValueDeserializerAndCoder
>> ((Class)KafkaAvroDeserializer.class, AvroCoder.of(Envelope.class))
>>
>>
>> On Thu, Oct 19, 2017 at 12:21 PM Raghu Angadi  wrote:
>>
>>> Same for me. It does not look like there is an annotation to suppress
>>> the error.
>>>
>>>
>>> On Thu, Oct 19, 2017 at 12:18 PM, Tim Robertson <
>>> timrobertson...@gmail.com> wrote:
>>>
 Hi Eugene,

 I understood that was where Andrew started and reported this.  I tried
 and saw the same as him.

 incompatible types: java.lang.Class>>> afka.serializers.KafkaAvroDeserializer> cannot be converted to
 org.apache.kafka.common.serialization.Deserializer>>> pipelines.io.avro.Envelope>

 similarly with
 (Class>) KafkaAvroDeserializer.class



 On Thu, Oct 19, 2017 at 9:00 PM, Eugene Kirpichov  wrote:

> I don't think extending the class is necessary. Not sure I understand
> why a simple type casting for withDeserializerAndCoder doesn't work? Have
> you tried this?
>
> p.apply(KafkaIO.read()
>   .withValueDeserializerAndCoder((Deserializer)Kafka
> AvroDeserializer.class,
>   AvroCoder.of(Envelope.class))
>
> On Thu, Oct 19, 2017 at 11:45 AM Tim Robertson <
> timrobertson...@gmail.com> wrote:
>
>> Hi Raghu
>>
>> I tried that but with KafkaAvroDeserializer already implementing
>> Deserializer I couldn't get it to work... I didn't spend too
>> much time though and agree something like that would be cleaner.
>>
>> Cheers,
>> Tim
>>
>> On Thu, Oct 19, 2017 at 7:54 PM, Raghu Angadi 
>> wrote:
>>
>>> Thanks Tim.
>>>
>>> How about extending KafkaAvroDeserializer rather
>>> than AbstractKafkaAvroDeserializer?
>>>
>>> TypedKafkaAvroDeserializer class below is useful, but not directly
>>> usable by the yet. It needs to store the actual type in Kafka consumer
>>> config to retrieve at run time.
>>> Even without storing the class, it is still useful. It simplifies
>>> user code:
>>>
>>> public class EnvelopeKafkaAvroDeserializer extends
>>> TypedKafkaAvroDeserializer {}
>>>
>>> This should be part of same package as KafkaAvroDeserializer
>>> (surprised it is not there yet).
>>>
>>> On Thu, Oct 19, 2017 at 3:07 AM, Tim Robertson <
>>> timrobertson...@gmail.com> wrote:
>>>
 Happy to hear

 I wonder if we could do something like this (totally untested):

 public class TypedKafkaAvroDeserializer extends
 AbstractKafkaAvroDeserializer implements Deserializer {
@Override
 public T deserialize(String s, byte[] bytes) {
 return (T) this.deserialize(bytes);
 }
 }

 On Thu, Oct 19, 2017 at 12:03 PM, Andrew Jones <
 andrew+b...@andrew-jones.com> wrote:

> Thanks Tim, that works!
>
> Full code is:
>
> public class EnvelopeKafkaAvroDeserializer extends
> AbstractKafkaAvroDeserializer implements Deserializer {
> @Override
> public void configure(Map configs, boolean isKey) {
> configure(new KafkaAvroDeserializerConfig(configs));
> }
>
> @Override
> public Envelope deserialize(String s, byte[] bytes) {
> return (Envelope) this.deserialize(bytes);
> }
>
> @Override
> public void close() {}
> }
>
> Nicer than my solution so think that is the one I'm going to go
> with for now.
>
> Thanks,
> Andrew
>
>
> On Thu, Oct 19, 2017, at 10:20 AM, Tim Robertson wrote:
>
> Hi Andrew,
>
> I also saw the same behaviour.
>
> It's not pretty but perhaps try this? It was my last idea I ran
> out of time to try...
>
>
> *// Basically a copy KafkaAvroDeserializer with the casts in 
> deserialize**public class *EnvelopeAvroDeserializer *extends 
> *AbstractKafkaAvroDeserializer *implements *Deserializer {
>
>   ...
>
>   *public *Envelope deserialize(String s, *byte*[] bytes) {
>
> *return *(Envelope) *this*.deserialize(bytes);

Re: KafkaIO and Avro

2017-10-19 Thread Raghu Angadi
Ah, nice. It works.

On Thu, Oct 19, 2017 at 1:44 PM, Eugene Kirpichov 
wrote:

> The following compiles fine:
>
>
> p.apply(KafkaIO.read()
> .withBootstrapServers("kafka:9092")
> .withTopic("dbserver1.inventory.customers")
> .withKeyDeserializer(StringDeserializer.class)
> .withValueDeserializerAndCoder(
> (Class)KafkaAvroDeserializer.class, AvroCoder.of(Envelope.class))
>
>
> On Thu, Oct 19, 2017 at 12:21 PM Raghu Angadi  wrote:
>
>> Same for me. It does not look like there is an annotation to suppress the
>> error.
>>
>>
>> On Thu, Oct 19, 2017 at 12:18 PM, Tim Robertson <
>> timrobertson...@gmail.com> wrote:
>>
>>> Hi Eugene,
>>>
>>> I understood that was where Andrew started and reported this.  I tried
>>> and saw the same as him.
>>>
>>> incompatible types: 
>>> java.lang.Class
>>> cannot be converted to org.apache.kafka.common.
>>> serialization.Deserializer
>>>
>>> similarly with
>>> (Class>) KafkaAvroDeserializer.class
>>>
>>>
>>>
>>> On Thu, Oct 19, 2017 at 9:00 PM, Eugene Kirpichov 
>>> wrote:
>>>
 I don't think extending the class is necessary. Not sure I understand
 why a simple type casting for withDeserializerAndCoder doesn't work? Have
 you tried this?

 p.apply(KafkaIO.read()
   .withValueDeserializerAndCoder((Deserializer)
 KafkaAvroDeserializer.class,
   AvroCoder.of(Envelope.class))

 On Thu, Oct 19, 2017 at 11:45 AM Tim Robertson <
 timrobertson...@gmail.com> wrote:

> Hi Raghu
>
> I tried that but with KafkaAvroDeserializer already implementing
> Deserializer I couldn't get it to work... I didn't spend too
> much time though and agree something like that would be cleaner.
>
> Cheers,
> Tim
>
> On Thu, Oct 19, 2017 at 7:54 PM, Raghu Angadi 
> wrote:
>
>> Thanks Tim.
>>
>> How about extending KafkaAvroDeserializer rather than
>> AbstractKafkaAvroDeserializer?
>>
>> TypedKafkaAvroDeserializer class below is useful, but not directly
>> usable by the yet. It needs to store the actual type in Kafka consumer
>> config to retrieve at run time.
>> Even without storing the class, it is still useful. It simplifies
>> user code:
>>
>> public class EnvelopeKafkaAvroDeserializer extends
>> TypedKafkaAvroDeserializer {}
>>
>> This should be part of same package as KafkaAvroDeserializer
>> (surprised it is not there yet).
>>
>> On Thu, Oct 19, 2017 at 3:07 AM, Tim Robertson <
>> timrobertson...@gmail.com> wrote:
>>
>>> Happy to hear
>>>
>>> I wonder if we could do something like this (totally untested):
>>>
>>> public class TypedKafkaAvroDeserializer extends
>>> AbstractKafkaAvroDeserializer implements Deserializer {
>>>@Override
>>> public T deserialize(String s, byte[] bytes) {
>>> return (T) this.deserialize(bytes);
>>> }
>>> }
>>>
>>> On Thu, Oct 19, 2017 at 12:03 PM, Andrew Jones <
>>> andrew+b...@andrew-jones.com> wrote:
>>>
 Thanks Tim, that works!

 Full code is:

 public class EnvelopeKafkaAvroDeserializer extends
 AbstractKafkaAvroDeserializer implements Deserializer {
 @Override
 public void configure(Map configs, boolean isKey) {
 configure(new KafkaAvroDeserializerConfig(configs));
 }

 @Override
 public Envelope deserialize(String s, byte[] bytes) {
 return (Envelope) this.deserialize(bytes);
 }

 @Override
 public void close() {}
 }

 Nicer than my solution so think that is the one I'm going to go
 with for now.

 Thanks,
 Andrew


 On Thu, Oct 19, 2017, at 10:20 AM, Tim Robertson wrote:

 Hi Andrew,

 I also saw the same behaviour.

 It's not pretty but perhaps try this? It was my last idea I ran out
 of time to try...


 *// Basically a copy KafkaAvroDeserializer with the casts in 
 deserialize**public class *EnvelopeAvroDeserializer *extends 
 *AbstractKafkaAvroDeserializer *implements *Deserializer {

   ...

   *public *Envelope deserialize(String s, *byte*[] bytes) {

 *return *(Envelope) *this*.deserialize(bytes);

   }



   *public *Envelope deserialize(String s, *byte*[] bytes, Schema 
 readerSchema) {

 *return *(Envelope) *this*.deserialize(bytes, readerSchema);

   }



   ...


Re: KafkaIO and Avro

2017-10-19 Thread Eugene Kirpichov
The following compiles fine:


p.apply(KafkaIO.read()
.withBootstrapServers("kafka:9092")
.withTopic("dbserver1.inventory.customers")
.withKeyDeserializer(StringDeserializer.class)

.withValueDeserializerAndCoder((Class)KafkaAvroDeserializer.class,
AvroCoder.of(Envelope.class))


On Thu, Oct 19, 2017 at 12:21 PM Raghu Angadi  wrote:

> Same for me. It does not look like there is an annotation to suppress the
> error.
>
>
> On Thu, Oct 19, 2017 at 12:18 PM, Tim Robertson  > wrote:
>
>> Hi Eugene,
>>
>> I understood that was where Andrew started and reported this.  I tried
>> and saw the same as him.
>>
>> incompatible types:
>> java.lang.Class
>> cannot be converted to
>> org.apache.kafka.common.serialization.Deserializer
>>
>> similarly with
>> (Class>) KafkaAvroDeserializer.class
>>
>>
>>
>> On Thu, Oct 19, 2017 at 9:00 PM, Eugene Kirpichov 
>> wrote:
>>
>>> I don't think extending the class is necessary. Not sure I understand
>>> why a simple type casting for withDeserializerAndCoder doesn't work? Have
>>> you tried this?
>>>
>>> p.apply(KafkaIO.read()
>>>   .withValueDeserializerAndCoder
>>> ((Deserializer)KafkaAvroDeserializer.class,
>>>   AvroCoder.of(Envelope.class))
>>>
>>> On Thu, Oct 19, 2017 at 11:45 AM Tim Robertson <
>>> timrobertson...@gmail.com> wrote:
>>>
 Hi Raghu

 I tried that but with KafkaAvroDeserializer already implementing
 Deserializer I couldn't get it to work... I didn't spend too
 much time though and agree something like that would be cleaner.

 Cheers,
 Tim

 On Thu, Oct 19, 2017 at 7:54 PM, Raghu Angadi 
 wrote:

> Thanks Tim.
>
> How about extending KafkaAvroDeserializer rather
> than AbstractKafkaAvroDeserializer?
>
> TypedKafkaAvroDeserializer class below is useful, but not directly
> usable by the yet. It needs to store the actual type in Kafka consumer
> config to retrieve at run time.
> Even without storing the class, it is still useful. It simplifies user
> code:
>
> public class EnvelopeKafkaAvroDeserializer extends
> TypedKafkaAvroDeserializer {}
>
> This should be part of same package as KafkaAvroDeserializer
> (surprised it is not there yet).
>
> On Thu, Oct 19, 2017 at 3:07 AM, Tim Robertson <
> timrobertson...@gmail.com> wrote:
>
>> Happy to hear
>>
>> I wonder if we could do something like this (totally untested):
>>
>> public class TypedKafkaAvroDeserializer extends
>> AbstractKafkaAvroDeserializer implements Deserializer {
>>@Override
>> public T deserialize(String s, byte[] bytes) {
>> return (T) this.deserialize(bytes);
>> }
>> }
>>
>> On Thu, Oct 19, 2017 at 12:03 PM, Andrew Jones <
>> andrew+b...@andrew-jones.com> wrote:
>>
>>> Thanks Tim, that works!
>>>
>>> Full code is:
>>>
>>> public class EnvelopeKafkaAvroDeserializer extends
>>> AbstractKafkaAvroDeserializer implements Deserializer {
>>> @Override
>>> public void configure(Map configs, boolean isKey) {
>>> configure(new KafkaAvroDeserializerConfig(configs));
>>> }
>>>
>>> @Override
>>> public Envelope deserialize(String s, byte[] bytes) {
>>> return (Envelope) this.deserialize(bytes);
>>> }
>>>
>>> @Override
>>> public void close() {}
>>> }
>>>
>>> Nicer than my solution so think that is the one I'm going to go with
>>> for now.
>>>
>>> Thanks,
>>> Andrew
>>>
>>>
>>> On Thu, Oct 19, 2017, at 10:20 AM, Tim Robertson wrote:
>>>
>>> Hi Andrew,
>>>
>>> I also saw the same behaviour.
>>>
>>> It's not pretty but perhaps try this? It was my last idea I ran out
>>> of time to try...
>>>
>>>
>>> *// Basically a copy KafkaAvroDeserializer with the casts in 
>>> deserialize**public class *EnvelopeAvroDeserializer *extends 
>>> *AbstractKafkaAvroDeserializer *implements *Deserializer {
>>>
>>>   ...
>>>
>>>   *public *Envelope deserialize(String s, *byte*[] bytes) {
>>>
>>> *return *(Envelope) *this*.deserialize(bytes);
>>>
>>>   }
>>>
>>>
>>>
>>>   *public *Envelope deserialize(String s, *byte*[] bytes, Schema 
>>> readerSchema) {
>>>
>>> *return *(Envelope) *this*.deserialize(bytes, readerSchema);
>>>
>>>   }
>>>
>>>
>>>
>>>   ...
>>>
>>> }
>>>
>>>  Tim
>>>
>>>
>>> On Thu, Oct 19, 2017 at 10:52 AM, Andrew Jones <
>>> andrew+b...@andrew-jones.com> wrote:
>>>
>>>
>>> Using Object doesn't work unfortunately. I get an 'Unable to
>>> automatically infer a Coder' error at runtime.

Re: KafkaIO and Avro

2017-10-19 Thread Raghu Angadi
Same for me. It does not look like there is an annotation to suppress the
error.

On Thu, Oct 19, 2017 at 12:18 PM, Tim Robertson 
wrote:

> Hi Eugene,
>
> I understood that was where Andrew started and reported this.  I tried and
> saw the same as him.
>
> incompatible types: 
> java.lang.Class
> cannot be converted to org.apache.kafka.common.serialization.Deserializer<
> org.gbif.pipelines.io.avro.Envelope>
>
> similarly with
> (Class>) KafkaAvroDeserializer.class
>
>
>
> On Thu, Oct 19, 2017 at 9:00 PM, Eugene Kirpichov 
> wrote:
>
>> I don't think extending the class is necessary. Not sure I understand why
>> a simple type casting for withDeserializerAndCoder doesn't work? Have you
>> tried this?
>>
>> p.apply(KafkaIO.read()
>>   .withValueDeserializerAndCoder((Deserializer)Kafka
>> AvroDeserializer.class,
>>   AvroCoder.of(Envelope.class))
>>
>> On Thu, Oct 19, 2017 at 11:45 AM Tim Robertson 
>> wrote:
>>
>>> Hi Raghu
>>>
>>> I tried that but with KafkaAvroDeserializer already implementing
>>> Deserializer I couldn't get it to work... I didn't spend too
>>> much time though and agree something like that would be cleaner.
>>>
>>> Cheers,
>>> Tim
>>>
>>> On Thu, Oct 19, 2017 at 7:54 PM, Raghu Angadi 
>>> wrote:
>>>
 Thanks Tim.

 How about extending KafkaAvroDeserializer rather
 than AbstractKafkaAvroDeserializer?

 TypedKafkaAvroDeserializer class below is useful, but not directly
 usable by the yet. It needs to store the actual type in Kafka consumer
 config to retrieve at run time.
 Even without storing the class, it is still useful. It simplifies user
 code:

 public class EnvelopeKafkaAvroDeserializer extends
 TypedKafkaAvroDeserializer {}

 This should be part of same package as KafkaAvroDeserializer (surprised
 it is not there yet).

 On Thu, Oct 19, 2017 at 3:07 AM, Tim Robertson <
 timrobertson...@gmail.com> wrote:

> Happy to hear
>
> I wonder if we could do something like this (totally untested):
>
> public class TypedKafkaAvroDeserializer extends
> AbstractKafkaAvroDeserializer implements Deserializer {
>@Override
> public T deserialize(String s, byte[] bytes) {
> return (T) this.deserialize(bytes);
> }
> }
>
> On Thu, Oct 19, 2017 at 12:03 PM, Andrew Jones <
> andrew+b...@andrew-jones.com> wrote:
>
>> Thanks Tim, that works!
>>
>> Full code is:
>>
>> public class EnvelopeKafkaAvroDeserializer extends
>> AbstractKafkaAvroDeserializer implements Deserializer {
>> @Override
>> public void configure(Map configs, boolean isKey) {
>> configure(new KafkaAvroDeserializerConfig(configs));
>> }
>>
>> @Override
>> public Envelope deserialize(String s, byte[] bytes) {
>> return (Envelope) this.deserialize(bytes);
>> }
>>
>> @Override
>> public void close() {}
>> }
>>
>> Nicer than my solution so think that is the one I'm going to go with
>> for now.
>>
>> Thanks,
>> Andrew
>>
>>
>> On Thu, Oct 19, 2017, at 10:20 AM, Tim Robertson wrote:
>>
>> Hi Andrew,
>>
>> I also saw the same behaviour.
>>
>> It's not pretty but perhaps try this? It was my last idea I ran out
>> of time to try...
>>
>>
>> *// Basically a copy KafkaAvroDeserializer with the casts in 
>> deserialize**public class *EnvelopeAvroDeserializer *extends 
>> *AbstractKafkaAvroDeserializer *implements *Deserializer {
>>
>>   ...
>>
>>   *public *Envelope deserialize(String s, *byte*[] bytes) {
>>
>> *return *(Envelope) *this*.deserialize(bytes);
>>
>>   }
>>
>>
>>
>>   *public *Envelope deserialize(String s, *byte*[] bytes, Schema 
>> readerSchema) {
>>
>> *return *(Envelope) *this*.deserialize(bytes, readerSchema);
>>
>>   }
>>
>>
>>
>>   ...
>>
>> }
>>
>>  Tim
>>
>>
>> On Thu, Oct 19, 2017 at 10:52 AM, Andrew Jones <
>> andrew+b...@andrew-jones.com> wrote:
>>
>>
>> Using Object doesn't work unfortunately. I get an 'Unable to
>> automatically infer a Coder' error at runtime.
>>
>> This is the code:
>>
>> p.apply(KafkaIO.read()
>> .withValueDeserializer(KafkaAvroDeserializer.class)
>>
>> It compiles, but at runtime:
>>
>> Caused by: java.lang.RuntimeException: Unable to automatically infer
>> a Coder for the Kafka Deserializer class 
>> io.confluent.kafka.serializers.KafkaAvroDeserializer:
>> no coder registered for type class java.lang.Object
>> at org.apache.beam.sdk.io.kafka.KafkaIO.inferCoder(KafkaIO.java:1696)
>>
>> So far 

Re: KafkaIO and Avro

2017-10-19 Thread Tim Robertson
Hi Eugene,

I understood that was where Andrew started and reported this.  I tried and
saw the same as him.

incompatible types:
java.lang.Class
cannot be converted to
org.apache.kafka.common.serialization.Deserializer

similarly with
(Class>) KafkaAvroDeserializer.class



On Thu, Oct 19, 2017 at 9:00 PM, Eugene Kirpichov 
wrote:

> I don't think extending the class is necessary. Not sure I understand why
> a simple type casting for withDeserializerAndCoder doesn't work? Have you
> tried this?
>
> p.apply(KafkaIO.read()
>   .withValueDeserializerAndCoder((Deserializer)
> KafkaAvroDeserializer.class,
>   AvroCoder.of(Envelope.class))
>
> On Thu, Oct 19, 2017 at 11:45 AM Tim Robertson 
> wrote:
>
>> Hi Raghu
>>
>> I tried that but with KafkaAvroDeserializer already implementing
>> Deserializer I couldn't get it to work... I didn't spend too
>> much time though and agree something like that would be cleaner.
>>
>> Cheers,
>> Tim
>>
>> On Thu, Oct 19, 2017 at 7:54 PM, Raghu Angadi  wrote:
>>
>>> Thanks Tim.
>>>
>>> How about extending KafkaAvroDeserializer rather than
>>> AbstractKafkaAvroDeserializer?
>>>
>>> TypedKafkaAvroDeserializer class below is useful, but not directly
>>> usable by the yet. It needs to store the actual type in Kafka consumer
>>> config to retrieve at run time.
>>> Even without storing the class, it is still useful. It simplifies user
>>> code:
>>>
>>> public class EnvelopeKafkaAvroDeserializer extends
>>> TypedKafkaAvroDeserializer {}
>>>
>>> This should be part of same package as KafkaAvroDeserializer (surprised
>>> it is not there yet).
>>>
>>> On Thu, Oct 19, 2017 at 3:07 AM, Tim Robertson <
>>> timrobertson...@gmail.com> wrote:
>>>
 Happy to hear

 I wonder if we could do something like this (totally untested):

 public class TypedKafkaAvroDeserializer extends
 AbstractKafkaAvroDeserializer implements Deserializer {
@Override
 public T deserialize(String s, byte[] bytes) {
 return (T) this.deserialize(bytes);
 }
 }

 On Thu, Oct 19, 2017 at 12:03 PM, Andrew Jones <
 andrew+b...@andrew-jones.com> wrote:

> Thanks Tim, that works!
>
> Full code is:
>
> public class EnvelopeKafkaAvroDeserializer extends
> AbstractKafkaAvroDeserializer implements Deserializer {
> @Override
> public void configure(Map configs, boolean isKey) {
> configure(new KafkaAvroDeserializerConfig(configs));
> }
>
> @Override
> public Envelope deserialize(String s, byte[] bytes) {
> return (Envelope) this.deserialize(bytes);
> }
>
> @Override
> public void close() {}
> }
>
> Nicer than my solution so think that is the one I'm going to go with
> for now.
>
> Thanks,
> Andrew
>
>
> On Thu, Oct 19, 2017, at 10:20 AM, Tim Robertson wrote:
>
> Hi Andrew,
>
> I also saw the same behaviour.
>
> It's not pretty but perhaps try this? It was my last idea I ran out of
> time to try...
>
>
> *// Basically a copy KafkaAvroDeserializer with the casts in 
> deserialize**public class *EnvelopeAvroDeserializer *extends 
> *AbstractKafkaAvroDeserializer *implements *Deserializer {
>
>   ...
>
>   *public *Envelope deserialize(String s, *byte*[] bytes) {
>
> *return *(Envelope) *this*.deserialize(bytes);
>
>   }
>
>
>
>   *public *Envelope deserialize(String s, *byte*[] bytes, Schema 
> readerSchema) {
>
> *return *(Envelope) *this*.deserialize(bytes, readerSchema);
>
>   }
>
>
>
>   ...
>
> }
>
>  Tim
>
>
> On Thu, Oct 19, 2017 at 10:52 AM, Andrew Jones <
> andrew+b...@andrew-jones.com> wrote:
>
>
> Using Object doesn't work unfortunately. I get an 'Unable to
> automatically infer a Coder' error at runtime.
>
> This is the code:
>
> p.apply(KafkaIO.read()
> .withValueDeserializer(KafkaAvroDeserializer.class)
>
> It compiles, but at runtime:
>
> Caused by: java.lang.RuntimeException: Unable to automatically infer a
> Coder for the Kafka Deserializer class 
> io.confluent.kafka.serializers.KafkaAvroDeserializer:
> no coder registered for type class java.lang.Object
> at org.apache.beam.sdk.io.kafka.KafkaIO.inferCoder(KafkaIO.java:1696)
>
> So far the only thing I've got working is this, where I use the
> ByteArrayDeserializer and then parse Avro myself:
>
> private static KafkaAvroDecoder avroDecoder;
> static {
> final Properties props = new Properties();
> props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
> "kafka:9092");
> 

Re: KafkaIO and Avro

2017-10-19 Thread Eugene Kirpichov
I don't think extending the class is necessary. Not sure I understand why a
simple type casting for withDeserializerAndCoder doesn't work? Have you
tried this?

p.apply(KafkaIO.read()
  .withValueDeserializerAndCoder
((Deserializer)KafkaAvroDeserializer.class,
  AvroCoder.of(Envelope.class))

On Thu, Oct 19, 2017 at 11:45 AM Tim Robertson 
wrote:

> Hi Raghu
>
> I tried that but with KafkaAvroDeserializer already implementing
> Deserializer I couldn't get it to work... I didn't spend too much
> time though and agree something like that would be cleaner.
>
> Cheers,
> Tim
>
> On Thu, Oct 19, 2017 at 7:54 PM, Raghu Angadi  wrote:
>
>> Thanks Tim.
>>
>> How about extending KafkaAvroDeserializer rather
>> than AbstractKafkaAvroDeserializer?
>>
>> TypedKafkaAvroDeserializer class below is useful, but not directly usable
>> by the yet. It needs to store the actual type in Kafka consumer config to
>> retrieve at run time.
>> Even without storing the class, it is still useful. It simplifies user
>> code:
>>
>> public class EnvelopeKafkaAvroDeserializer extends
>> TypedKafkaAvroDeserializer {}
>>
>> This should be part of same package as KafkaAvroDeserializer (surprised
>> it is not there yet).
>>
>> On Thu, Oct 19, 2017 at 3:07 AM, Tim Robertson > > wrote:
>>
>>> Happy to hear
>>>
>>> I wonder if we could do something like this (totally untested):
>>>
>>> public class TypedKafkaAvroDeserializer extends
>>> AbstractKafkaAvroDeserializer implements Deserializer {
>>>@Override
>>> public T deserialize(String s, byte[] bytes) {
>>> return (T) this.deserialize(bytes);
>>> }
>>> }
>>>
>>> On Thu, Oct 19, 2017 at 12:03 PM, Andrew Jones <
>>> andrew+b...@andrew-jones.com> wrote:
>>>
 Thanks Tim, that works!

 Full code is:

 public class EnvelopeKafkaAvroDeserializer extends
 AbstractKafkaAvroDeserializer implements Deserializer {
 @Override
 public void configure(Map configs, boolean isKey) {
 configure(new KafkaAvroDeserializerConfig(configs));
 }

 @Override
 public Envelope deserialize(String s, byte[] bytes) {
 return (Envelope) this.deserialize(bytes);
 }

 @Override
 public void close() {}
 }

 Nicer than my solution so think that is the one I'm going to go with
 for now.

 Thanks,
 Andrew


 On Thu, Oct 19, 2017, at 10:20 AM, Tim Robertson wrote:

 Hi Andrew,

 I also saw the same behaviour.

 It's not pretty but perhaps try this? It was my last idea I ran out of
 time to try...


 *// Basically a copy KafkaAvroDeserializer with the casts in 
 deserialize**public class *EnvelopeAvroDeserializer *extends 
 *AbstractKafkaAvroDeserializer *implements *Deserializer {

   ...

   *public *Envelope deserialize(String s, *byte*[] bytes) {

 *return *(Envelope) *this*.deserialize(bytes);

   }



   *public *Envelope deserialize(String s, *byte*[] bytes, Schema 
 readerSchema) {

 *return *(Envelope) *this*.deserialize(bytes, readerSchema);

   }



   ...

 }

  Tim


 On Thu, Oct 19, 2017 at 10:52 AM, Andrew Jones <
 andrew+b...@andrew-jones.com> wrote:


 Using Object doesn't work unfortunately. I get an 'Unable to
 automatically infer a Coder' error at runtime.

 This is the code:

 p.apply(KafkaIO.read()
 .withValueDeserializer(KafkaAvroDeserializer.class)

 It compiles, but at runtime:

 Caused by: java.lang.RuntimeException: Unable to automatically infer a
 Coder for the Kafka Deserializer class
 io.confluent.kafka.serializers.KafkaAvroDeserializer: no coder registered
 for type class java.lang.Object
 at org.apache.beam.sdk.io.kafka.KafkaIO.inferCoder(KafkaIO.java:1696)

 So far the only thing I've got working is this, where I use the
 ByteArrayDeserializer and then parse Avro myself:

 private static KafkaAvroDecoder avroDecoder;
 static {
 final Properties props = new Properties();
 props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
 "kafka:9092");

 props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "
 http://registry:8081;);

 props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);
 VerifiableProperties vProps = new VerifiableProperties(props);
 avroDecoder = new KafkaAvroDecoder(vProps);
 }

 public static void main(String[] args) {

 PipelineOptions options = PipelineOptionsFactory.create();
 Pipeline p = Pipeline.create(options);

 p.apply(KafkaIO.

Re: KafkaIO and Avro

2017-10-19 Thread Raghu Angadi
Thanks Tim.

How about extending KafkaAvroDeserializer rather
than AbstractKafkaAvroDeserializer?

TypedKafkaAvroDeserializer class below is useful, but not directly usable
by the yet. It needs to store the actual type in Kafka consumer config to
retrieve at run time.
Even without storing the class, it is still useful. It simplifies user code:

public class EnvelopeKafkaAvroDeserializer extends
TypedKafkaAvroDeserializer {}

This should be part of same package as KafkaAvroDeserializer (surprised it
is not there yet).

On Thu, Oct 19, 2017 at 3:07 AM, Tim Robertson 
wrote:

> Happy to hear
>
> I wonder if we could do something like this (totally untested):
>
> public class TypedKafkaAvroDeserializer extends
> AbstractKafkaAvroDeserializer implements Deserializer {
>@Override
> public T deserialize(String s, byte[] bytes) {
> return (T) this.deserialize(bytes);
> }
> }
>
> On Thu, Oct 19, 2017 at 12:03 PM, Andrew Jones <
> andrew+b...@andrew-jones.com> wrote:
>
>> Thanks Tim, that works!
>>
>> Full code is:
>>
>> public class EnvelopeKafkaAvroDeserializer extends
>> AbstractKafkaAvroDeserializer implements Deserializer {
>> @Override
>> public void configure(Map configs, boolean isKey) {
>> configure(new KafkaAvroDeserializerConfig(configs));
>> }
>>
>> @Override
>> public Envelope deserialize(String s, byte[] bytes) {
>> return (Envelope) this.deserialize(bytes);
>> }
>>
>> @Override
>> public void close() {}
>> }
>>
>> Nicer than my solution so think that is the one I'm going to go with for
>> now.
>>
>> Thanks,
>> Andrew
>>
>>
>> On Thu, Oct 19, 2017, at 10:20 AM, Tim Robertson wrote:
>>
>> Hi Andrew,
>>
>> I also saw the same behaviour.
>>
>> It's not pretty but perhaps try this? It was my last idea I ran out of
>> time to try...
>>
>>
>> *// Basically a copy KafkaAvroDeserializer with the casts in 
>> deserialize**public class *EnvelopeAvroDeserializer *extends 
>> *AbstractKafkaAvroDeserializer *implements *Deserializer {
>>
>>   ...
>>
>>   *public *Envelope deserialize(String s, *byte*[] bytes) {
>>
>> *return *(Envelope) *this*.deserialize(bytes);
>>
>>   }
>>
>>
>>
>>   *public *Envelope deserialize(String s, *byte*[] bytes, Schema 
>> readerSchema) {
>>
>> *return *(Envelope) *this*.deserialize(bytes, readerSchema);
>>
>>   }
>>
>>
>>
>>   ...
>>
>> }
>>
>>  Tim
>>
>>
>> On Thu, Oct 19, 2017 at 10:52 AM, Andrew Jones <
>> andrew+b...@andrew-jones.com> wrote:
>>
>>
>> Using Object doesn't work unfortunately. I get an 'Unable to
>> automatically infer a Coder' error at runtime.
>>
>> This is the code:
>>
>> p.apply(KafkaIO.read()
>> .withValueDeserializer(KafkaAvroDeserializer.class)
>>
>> It compiles, but at runtime:
>>
>> Caused by: java.lang.RuntimeException: Unable to automatically infer a
>> Coder for the Kafka Deserializer class 
>> io.confluent.kafka.serializers.KafkaAvroDeserializer:
>> no coder registered for type class java.lang.Object
>> at org.apache.beam.sdk.io.kafka.KafkaIO.inferCoder(KafkaIO.java:1696)
>>
>> So far the only thing I've got working is this, where I use the
>> ByteArrayDeserializer and then parse Avro myself:
>>
>> private static KafkaAvroDecoder avroDecoder;
>> static {
>> final Properties props = new Properties();
>> props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
>> props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
>> "http://registry:8081;);
>> props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG,
>> true);
>> VerifiableProperties vProps = new VerifiableProperties(props);
>> avroDecoder = new KafkaAvroDecoder(vProps);
>> }
>>
>> public static void main(String[] args) {
>>
>> PipelineOptions options = PipelineOptionsFactory.create();
>> Pipeline p = Pipeline.create(options);
>>
>> p.apply(KafkaIO.read()
>> .withBootstrapServers("kafka:9092")
>> .withTopic("dbserver1.inventory.customers")
>> .withKeyDeserializer(ByteArrayDeserializer.class)
>> .withValueDeserializer(ByteArrayDeserializer.class)
>> .withoutMetadata(
>> )
>> .apply(Values.create())
>> .apply("ParseAvro", ParDo.of(new DoFn()
>> {
>> @ProcessElement
>> public void processElement(ProcessContext c) {
>> Envelope data = (Envelope)
>> avroDecoder.fromBytes(c.element());
>> c.output(data);
>> }
>> }))
>>
>> Thanks,
>> Andrew
>>
>> On Wed, Oct 18, 2017, at 06:40 PM, Raghu Angadi wrote:
>>
>> On Wed, Oct 18, 2017 at 10:35 AM, Eugene Kirpichov 
>> wrote:
>>
>> It seems that KafkaAvroDeserializer implements Deserializer,
>> though I 

Re: KafkaIO and Avro

2017-10-19 Thread Tim Robertson
Happy to hear

I wonder if we could do something like this (totally untested):

public class TypedKafkaAvroDeserializer extends
AbstractKafkaAvroDeserializer implements Deserializer {
   @Override
public T deserialize(String s, byte[] bytes) {
return (T) this.deserialize(bytes);
}
}

On Thu, Oct 19, 2017 at 12:03 PM, Andrew Jones  wrote:

> Thanks Tim, that works!
>
> Full code is:
>
> public class EnvelopeKafkaAvroDeserializer extends
> AbstractKafkaAvroDeserializer implements Deserializer {
> @Override
> public void configure(Map configs, boolean isKey) {
> configure(new KafkaAvroDeserializerConfig(configs));
> }
>
> @Override
> public Envelope deserialize(String s, byte[] bytes) {
> return (Envelope) this.deserialize(bytes);
> }
>
> @Override
> public void close() {}
> }
>
> Nicer than my solution so think that is the one I'm going to go with for
> now.
>
> Thanks,
> Andrew
>
>
> On Thu, Oct 19, 2017, at 10:20 AM, Tim Robertson wrote:
>
> Hi Andrew,
>
> I also saw the same behaviour.
>
> It's not pretty but perhaps try this? It was my last idea I ran out of
> time to try...
>
>
> *// Basically a copy KafkaAvroDeserializer with the casts in 
> deserialize**public class *EnvelopeAvroDeserializer *extends 
> *AbstractKafkaAvroDeserializer *implements *Deserializer {
>
>   ...
>
>   *public *Envelope deserialize(String s, *byte*[] bytes) {
>
> *return *(Envelope) *this*.deserialize(bytes);
>
>   }
>
>
>
>   *public *Envelope deserialize(String s, *byte*[] bytes, Schema 
> readerSchema) {
>
> *return *(Envelope) *this*.deserialize(bytes, readerSchema);
>
>   }
>
>
>
>   ...
>
> }
>
>  Tim
>
>
> On Thu, Oct 19, 2017 at 10:52 AM, Andrew Jones <
> andrew+b...@andrew-jones.com> wrote:
>
>
> Using Object doesn't work unfortunately. I get an 'Unable to automatically
> infer a Coder' error at runtime.
>
> This is the code:
>
> p.apply(KafkaIO.read()
> .withValueDeserializer(KafkaAvroDeserializer.class)
>
> It compiles, but at runtime:
>
> Caused by: java.lang.RuntimeException: Unable to automatically infer a
> Coder for the Kafka Deserializer class 
> io.confluent.kafka.serializers.KafkaAvroDeserializer:
> no coder registered for type class java.lang.Object
> at org.apache.beam.sdk.io.kafka.KafkaIO.inferCoder(KafkaIO.java:1696)
>
> So far the only thing I've got working is this, where I use the
> ByteArrayDeserializer and then parse Avro myself:
>
> private static KafkaAvroDecoder avroDecoder;
> static {
> final Properties props = new Properties();
> props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
> props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
> "http://registry:8081;);
> props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG,
> true);
> VerifiableProperties vProps = new VerifiableProperties(props);
> avroDecoder = new KafkaAvroDecoder(vProps);
> }
>
> public static void main(String[] args) {
>
> PipelineOptions options = PipelineOptionsFactory.create();
> Pipeline p = Pipeline.create(options);
>
> p.apply(KafkaIO.read()
> .withBootstrapServers("kafka:9092")
> .withTopic("dbserver1.inventory.customers")
> .withKeyDeserializer(ByteArrayDeserializer.class)
> .withValueDeserializer(ByteArrayDeserializer.class)
> .withoutMetadata(
> )
> .apply(Values.create())
> .apply("ParseAvro", ParDo.of(new DoFn() {
> @ProcessElement
> public void processElement(ProcessContext c) {
> Envelope data = (Envelope)
> avroDecoder.fromBytes(c.element());
> c.output(data);
> }
> }))
>
> Thanks,
> Andrew
>
> On Wed, Oct 18, 2017, at 06:40 PM, Raghu Angadi wrote:
>
> On Wed, Oct 18, 2017 at 10:35 AM, Eugene Kirpichov 
> wrote:
>
> It seems that KafkaAvroDeserializer implements Deserializer,
> though I suppose with proper configuration that Object will at run-time be
> your desired type. Have you tried adding some Java type casts to make it
> compile?
>
>
> +1, cast might be the simplest fix. Alternately you can wrap or
> extend KafkaAvroDeserializer as Tim suggested. It would cast the Object
> returned by KafkaAvroDeserializer::deserializer() to Envolope at runtime.
>
>
> On Wed, Oct 18, 2017 at 7:26 AM Tim Robertson 
> wrote:
>
> I just tried quickly and see the same as you Andrew.
> We're missing something obvious or else extending KafkaAvroDeserializer seems
> necessary right?
>
> On Wed, Oct 18, 2017 at 3:14 PM, Andrew Jones <
> andrew+b...@andrew-jones.com> wrote:
>
> Hi,
>
> I'm trying to read Avro data from a Kafka stream 

Re: KafkaIO and Avro

2017-10-19 Thread Andrew Jones
Thanks Tim, that works!

Full code is:

public class EnvelopeKafkaAvroDeserializer extends
AbstractKafkaAvroDeserializer implements Deserializer {@Override
public void configure(Map configs, boolean isKey) {
configure(new KafkaAvroDeserializerConfig(configs));
}

@Override
public Envelope deserialize(String s, byte[] bytes) {
return (Envelope) this.deserialize(bytes);
}

@Override
public void close() {}
}

Nicer than my solution so think that is the one I'm going to go
with for now.
Thanks,
Andrew


On Thu, Oct 19, 2017, at 10:20 AM, Tim Robertson wrote:
> Hi Andrew,
> 
> I also saw the same behaviour.  
> 
> It's not pretty but perhaps try this? It was my last idea I ran out of
> time to try...> *// Basically a copy KafkaAvroDeserializer with the casts in
> deserialize
**public class *EnvelopeAvroDeserializer *extends
*AbstractKafkaAvroDeserializer *implements *Deserializer {
>> 
> 
> 
>   ...
> 
> 
> 
> 
>   *public *Envelope deserialize(String s, *byte*[] bytes) {
> 
> 
> 
> 
> *return *(Envelope) *this*.deserialize(bytes);
> 
> 
> 
> 
>   }
> 
> 
> 
> 
> 
> 
> 
> 
> 
>   *public *Envelope deserialize(String s, *byte*[] bytes, Schema
>   readerSchema) {
>> 
> 
> 
> *return *(Envelope) *this*.deserialize(bytes, readerSchema);
> 
> 
> 
> 
>   }
> 
> 
> 
> 
> 
> 
> 
> 
> 
>   ...
> 
> 
> 
> 
> }
> 
> 
> 
> 
> Tim
> 
> 
> On Thu, Oct 19, 2017 at 10:52 AM, Andrew Jones  jones.com> wrote:>> __
>> Using Object doesn't work unfortunately. I get an 'Unable to
>> automatically infer a Coder' error at runtime.>> 
>> This is the code:
>> 
>> p.apply(KafkaIO.read()
>> .withValueDeserializer(KafkaAvroDeserializer.class)
>> 
>> It compiles, but at runtime:
>> 
>> Caused by: java.lang.RuntimeException: Unable to automatically infer
>> a Coder for the Kafka Deserializer class
>> io.confluent.kafka.serializers.KafkaAvroDeserializer: no coder
>> registered for type class java.lang.Object>> at 
>> org.apache.beam.sdk.io.kafka.KafkaIO.inferCoder(KafkaIO.java:1696)>> 
>> So far the only thing I've got working is this, where I use the
>> ByteArrayDeserializer and then parse Avro myself:>> 
>> private static KafkaAvroDecoder avroDecoder;
>> static {
>> final Properties props = new Properties();
>> props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
>> "kafka:9092");>> 
>> props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_C-
>> ONFIG, "http://registry:8081;);>> 
>> props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_C-
>> ONFIG, true);>> VerifiableProperties vProps = new
>> VerifiableProperties(props);>> avroDecoder = new 
>> KafkaAvroDecoder(vProps);
>> }
>> 
>> public static void main(String[] args) {
>> 
>> PipelineOptions options = PipelineOptionsFactory.create();
>> Pipeline p = Pipeline.create(options);
>> 
>> p.apply(KafkaIO.read()
>> .withBootstrapServers("kafka:9092")
>> .withTopic("dbserver1.inventory.customers")
>> .withKeyDeserializer(ByteArrayDeserializer.class)
>> .withValueDeserializer(ByteArrayDeserializer.class)
>> .withoutMetadata(
>> )
>> .apply(Values.create())
>> .apply("ParseAvro", ParDo.of(new DoFn> Envelope>() {>> @ProcessElement
>> public void processElement(ProcessContext c) {
>> Envelope data = (Envelope)
>> avroDecoder.fromBytes(c.element());>>
>>  c.output(data);
>> }
>> }))
>> 
>> Thanks,
>> Andrew
>> 
>> On Wed, Oct 18, 2017, at 06:40 PM, Raghu Angadi wrote:
>>> On Wed, Oct 18, 2017 at 10:35 AM, Eugene Kirpichov
>>>  wrote: It seems that KafkaAvroDeserializer 
>>> implements
 Deserializer, though I suppose with proper configuration
 that Object will at run-time be your desired type. Have you tried
 adding some Java type casts to make it compile?>>> 
>>> +1, cast might be the simplest fix. Alternately you can wrap or
>>> extend KafkaAvroDeserializer as Tim suggested. It would cast the
>>> Object returned by KafkaAvroDeserializer::deserializer() to Envolope
>>> at runtime.>>> 
 
 On Wed, Oct 18, 2017 at 7:26 AM Tim Robertson
  wrote:> I just tried quickly and see the 
 same as you Andrew.  
> We're missing something obvious or else extending
> KafkaAvroDeserializer seems necessary right?> 
> On Wed, Oct 18, 2017 at 3:14 PM, Andrew Jones  jones.com> wrote:>> Hi,
>> 
>> I'm trying to read Avro data from a Kafka stream using KafkaIO. I
>> think>> it should be as simple as:
>> 
>> 

Re: KafkaIO and Avro

2017-10-19 Thread Tim Robertson
Hi Andrew,

I also saw the same behaviour.

It's not pretty but perhaps try this? It was my last idea I ran out of time
to try...

// Basically a copy KafkaAvroDeserializer with the casts in deserialize
public class EnvelopeAvroDeserializer extends
AbstractKafkaAvroDeserializer implements Deserializer {
  ...
  public Envelope deserialize(String s, byte[] bytes) {
return (Envelope) this.deserialize(bytes);
  }

  public Envelope deserialize(String s, byte[] bytes, Schema readerSchema) {
return (Envelope) this.deserialize(bytes, readerSchema);
  }

  ...
}

Tim


On Thu, Oct 19, 2017 at 10:52 AM, Andrew Jones  wrote:

> Using Object doesn't work unfortunately. I get an 'Unable to automatically
> infer a Coder' error at runtime.
>
> This is the code:
>
> p.apply(KafkaIO.read()
> .withValueDeserializer(KafkaAvroDeserializer.class)
>
> It compiles, but at runtime:
>
> Caused by: java.lang.RuntimeException: Unable to automatically infer a
> Coder for the Kafka Deserializer class 
> io.confluent.kafka.serializers.KafkaAvroDeserializer:
> no coder registered for type class java.lang.Object
> at org.apache.beam.sdk.io.kafka.KafkaIO.inferCoder(KafkaIO.java:1696)
>
> So far the only thing I've got working is this, where I use the
> ByteArrayDeserializer and then parse Avro myself:
>
> private static KafkaAvroDecoder avroDecoder;
> static {
> final Properties props = new Properties();
> props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
> props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
> "http://registry:8081;);
> props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG,
> true);
> VerifiableProperties vProps = new VerifiableProperties(props);
> avroDecoder = new KafkaAvroDecoder(vProps);
> }
>
> public static void main(String[] args) {
>
> PipelineOptions options = PipelineOptionsFactory.create();
> Pipeline p = Pipeline.create(options);
>
> p.apply(KafkaIO.read()
> .withBootstrapServers("kafka:9092")
> .withTopic("dbserver1.inventory.customers")
> .withKeyDeserializer(ByteArrayDeserializer.class)
> .withValueDeserializer(ByteArrayDeserializer.class)
> .withoutMetadata(
> )
> .apply(Values.create())
> .apply("ParseAvro", ParDo.of(new DoFn() {
> @ProcessElement
> public void processElement(ProcessContext c) {
> Envelope data = (Envelope) avroDecoder.fromBytes(c.
> element());
> c.output(data);
> }
> }))
>
> Thanks,
> Andrew
>
> On Wed, Oct 18, 2017, at 06:40 PM, Raghu Angadi wrote:
>
> On Wed, Oct 18, 2017 at 10:35 AM, Eugene Kirpichov 
> wrote:
>
> It seems that KafkaAvroDeserializer implements Deserializer,
> though I suppose with proper configuration that Object will at run-time be
> your desired type. Have you tried adding some Java type casts to make it
> compile?
>
>
> +1, cast might be the simplest fix. Alternately you can wrap or
> extend KafkaAvroDeserializer as Tim suggested. It would cast the Object
> returned by KafkaAvroDeserializer::deserializer() to Envolope at runtime.
>
>
> On Wed, Oct 18, 2017 at 7:26 AM Tim Robertson 
> wrote:
>
> I just tried quickly and see the same as you Andrew.
> We're missing something obvious or else extending KafkaAvroDeserializer seems
> necessary right?
>
> On Wed, Oct 18, 2017 at 3:14 PM, Andrew Jones <
> andrew+b...@andrew-jones.com> wrote:
>
> Hi,
>
> I'm trying to read Avro data from a Kafka stream using KafkaIO. I think
> it should be as simple as:
>
> p.apply(KafkaIO.*read*()
>   .withValueDeserializerAndCoder(KafkaAvroDeserializer.class,
>   AvroCoder.of(Envelope.class))
>
> Where Envelope is the name of the Avro class. However, that does not
> compile and I get the following error:
>
> incompatible types:
> java.lang.Class
> cannot be converted to java.lang.Class org.apache.kafka.common.serialization.Deserializer .inventory.customers.Envelope>>
>
> I've tried a number of variations on this theme but haven't yet worked
> it out and am starting to run out of ideas...
>
> Has anyone successfully read Avro data from Kafka?
>
> The code I'm using can be found at
> https://github.com/andrewrjones/debezium-kafka-beam-example and a full
> environment can be created with Docker.
>
> Thanks,
> Andrew
>
>
>


Re: KafkaIO and Avro

2017-10-18 Thread Eugene Kirpichov
It seems that KafkaAvroDeserializer implements Deserializer, though
I suppose with proper configuration that Object will at run-time be your
desired type. Have you tried adding some Java type casts to make it compile?

On Wed, Oct 18, 2017 at 7:26 AM Tim Robertson 
wrote:

> I just tried quickly and see the same as you Andrew.
> We're missing something obvious or else extending KafkaAvroDeserializer seems
> necessary right?
>
> On Wed, Oct 18, 2017 at 3:14 PM, Andrew Jones <
> andrew+b...@andrew-jones.com> wrote:
>
>> Hi,
>>
>> I'm trying to read Avro data from a Kafka stream using KafkaIO. I think
>> it should be as simple as:
>>
>> p.apply(KafkaIO.*read*()
>>   .withValueDeserializerAndCoder(KafkaAvroDeserializer.class,
>>   AvroCoder.of(Envelope.class))
>>
>> Where Envelope is the name of the Avro class. However, that does not
>> compile and I get the following error:
>>
>> incompatible types:
>> java.lang.Class
>> cannot be converted to java.lang.Class>
>> org.apache.kafka.common.serialization.Deserializer>
>>
>> I've tried a number of variations on this theme but haven't yet worked
>> it out and am starting to run out of ideas...
>>
>> Has anyone successfully read Avro data from Kafka?
>>
>> The code I'm using can be found at
>> https://github.com/andrewrjones/debezium-kafka-beam-example and a full
>> environment can be created with Docker.
>>
>> Thanks,
>> Andrew
>>
>
>


KafkaIO and Avro

2017-10-18 Thread Andrew Jones
Hi,

I'm trying to read Avro data from a Kafka stream using KafkaIO. I think
it should be as simple as:

p.apply(KafkaIO.*read*()
  .withValueDeserializerAndCoder(KafkaAvroDeserializer.class,
  AvroCoder.of(Envelope.class))

Where Envelope is the name of the Avro class. However, that does not
compile and I get the following error:

incompatible types:
java.lang.Class
cannot be converted to java.lang.Class>

I've tried a number of variations on this theme but haven't yet worked
it out and am starting to run out of ideas...

Has anyone successfully read Avro data from Kafka?

The code I'm using can be found at
https://github.com/andrewrjones/debezium-kafka-beam-example and a full
environment can be created with Docker.

Thanks,
Andrew