The following compiles fine:
p.apply(KafkaIO.<String, Envelope>read()
.withBootstrapServers("kafka:9092")
.withTopic("dbserver1.inventory.customers")
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializerAndCoder((Class)KafkaAvroDeserializer.class,
AvroCoder.of(Envelope.class))
On Thu, Oct 19, 2017 at 12:21 PM Raghu Angadi <[email protected]> wrote:
> Same for me. It does not look like there is an annotation to suppress the
> error.
>
>
> On Thu, Oct 19, 2017 at 12:18 PM, Tim Robertson <[email protected]
> > wrote:
>
>> Hi Eugene,
>>
>> I understood that was where Andrew started and reported this. I tried
>> and saw the same as him.
>>
>> incompatible types:
>> java.lang.Class<io.confluent.kafka.serializers.KafkaAvroDeserializer>
>> cannot be converted to
>> org.apache.kafka.common.serialization.Deserializer<org.gbif.pipelines.io.avro.Envelope>
>>
>> similarly with
>> (Class<? extends Deserializer<Envelope>>) KafkaAvroDeserializer.class
>>
>>
>>
>> On Thu, Oct 19, 2017 at 9:00 PM, Eugene Kirpichov <[email protected]>
>> wrote:
>>
>>> I don't think extending the class is necessary. Not sure I understand
>>> why a simple type casting for withDeserializerAndCoder doesn't work? Have
>>> you tried this?
>>>
>>> p.apply(KafkaIO.<String, Envelope>read()
>>> .withValueDeserializerAndCoder
>>> ((Deserializer<Envelope>)KafkaAvroDeserializer.class,
>>> AvroCoder.of(Envelope.class))
>>>
>>> On Thu, Oct 19, 2017 at 11:45 AM Tim Robertson <
>>> [email protected]> wrote:
>>>
>>>> Hi Raghu
>>>>
>>>> I tried that but with KafkaAvroDeserializer already implementing
>>>> Deserializer<Object> I couldn't get it to work... I didn't spend too
>>>> much time though and agree something like that would be cleaner.
>>>>
>>>> Cheers,
>>>> Tim
>>>>
>>>> On Thu, Oct 19, 2017 at 7:54 PM, Raghu Angadi <[email protected]>
>>>> wrote:
>>>>
>>>>> Thanks Tim.
>>>>>
>>>>> How about extending KafkaAvroDeserializer rather
>>>>> than AbstractKafkaAvroDeserializer?
>>>>>
>>>>> TypedKafkaAvroDeserializer class below is useful, but not directly
>>>>> usable by the yet. It needs to store the actual type in Kafka consumer
>>>>> config to retrieve at run time.
>>>>> Even without storing the class, it is still useful. It simplifies user
>>>>> code:
>>>>>
>>>>> public class EnvelopeKafkaAvroDeserializer extends
>>>>> TypedKafkaAvroDeserializer<Envelope> {}
>>>>>
>>>>> This should be part of same package as KafkaAvroDeserializer
>>>>> (surprised it is not there yet).
>>>>>
>>>>> On Thu, Oct 19, 2017 at 3:07 AM, Tim Robertson <
>>>>> [email protected]> wrote:
>>>>>
>>>>>> Happy to hear
>>>>>>
>>>>>> I wonder if we could do something like this (totally untested):
>>>>>>
>>>>>> public class TypedKafkaAvroDeserializer<T> extends
>>>>>> AbstractKafkaAvroDeserializer implements Deserializer<T> {
>>>>>> @Override
>>>>>> public T deserialize(String s, byte[] bytes) {
>>>>>> return (T) this.deserialize(bytes);
>>>>>> }
>>>>>> }
>>>>>>
>>>>>> On Thu, Oct 19, 2017 at 12:03 PM, Andrew Jones <
>>>>>> [email protected]> wrote:
>>>>>>
>>>>>>> Thanks Tim, that works!
>>>>>>>
>>>>>>> Full code is:
>>>>>>>
>>>>>>> public class EnvelopeKafkaAvroDeserializer extends
>>>>>>> AbstractKafkaAvroDeserializer implements Deserializer<Envelope> {
>>>>>>> @Override
>>>>>>> public void configure(Map<String, ?> configs, boolean isKey) {
>>>>>>> configure(new KafkaAvroDeserializerConfig(configs));
>>>>>>> }
>>>>>>>
>>>>>>> @Override
>>>>>>> public Envelope deserialize(String s, byte[] bytes) {
>>>>>>> return (Envelope) this.deserialize(bytes);
>>>>>>> }
>>>>>>>
>>>>>>> @Override
>>>>>>> public void close() {}
>>>>>>> }
>>>>>>>
>>>>>>> Nicer than my solution so think that is the one I'm going to go with
>>>>>>> for now.
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Andrew
>>>>>>>
>>>>>>>
>>>>>>> On Thu, Oct 19, 2017, at 10:20 AM, Tim Robertson wrote:
>>>>>>>
>>>>>>> Hi Andrew,
>>>>>>>
>>>>>>> I also saw the same behaviour.
>>>>>>>
>>>>>>> It's not pretty but perhaps try this? It was my last idea I ran out
>>>>>>> of time to try...
>>>>>>>
>>>>>>>
>>>>>>> *// Basically a copy KafkaAvroDeserializer with the casts in
>>>>>>> deserialize**public class *EnvelopeAvroDeserializer *extends
>>>>>>> *AbstractKafkaAvroDeserializer *implements *Deserializer<Envelope> {
>>>>>>>
>>>>>>> ...
>>>>>>>
>>>>>>> *public *Envelope deserialize(String s, *byte*[] bytes) {
>>>>>>>
>>>>>>> *return *(Envelope) *this*.deserialize(bytes);
>>>>>>>
>>>>>>> }
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> *public *Envelope deserialize(String s, *byte*[] bytes, Schema
>>>>>>> readerSchema) {
>>>>>>>
>>>>>>> *return *(Envelope) *this*.deserialize(bytes, readerSchema);
>>>>>>>
>>>>>>> }
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> ...
>>>>>>>
>>>>>>> }
>>>>>>>
>>>>>>> Tim
>>>>>>>
>>>>>>>
>>>>>>> On Thu, Oct 19, 2017 at 10:52 AM, Andrew Jones <
>>>>>>> [email protected]> wrote:
>>>>>>>
>>>>>>>
>>>>>>> Using Object doesn't work unfortunately. I get an 'Unable to
>>>>>>> automatically infer a Coder' error at runtime.
>>>>>>>
>>>>>>> This is the code:
>>>>>>>
>>>>>>> p.apply(KafkaIO.<String, Object>read()
>>>>>>> .withValueDeserializer(KafkaAvroDeserializer.class)
>>>>>>>
>>>>>>> It compiles, but at runtime:
>>>>>>>
>>>>>>> Caused by: java.lang.RuntimeException: Unable to automatically infer
>>>>>>> a Coder for the Kafka Deserializer class
>>>>>>> io.confluent.kafka.serializers.KafkaAvroDeserializer: no coder
>>>>>>> registered
>>>>>>> for type class java.lang.Object
>>>>>>> at org.apache.beam.sdk.io.kafka.KafkaIO.inferCoder(KafkaIO.java:1696)
>>>>>>>
>>>>>>> So far the only thing I've got working is this, where I use the
>>>>>>> ByteArrayDeserializer and then parse Avro myself:
>>>>>>>
>>>>>>> private static KafkaAvroDecoder avroDecoder;
>>>>>>> static {
>>>>>>> final Properties props = new Properties();
>>>>>>> props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
>>>>>>> "kafka:9092");
>>>>>>>
>>>>>>> props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "
>>>>>>> http://registry:8081");
>>>>>>>
>>>>>>> props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG,
>>>>>>> true);
>>>>>>> VerifiableProperties vProps = new
>>>>>>> VerifiableProperties(props);
>>>>>>> avroDecoder = new KafkaAvroDecoder(vProps);
>>>>>>> }
>>>>>>>
>>>>>>> public static void main(String[] args) {
>>>>>>>
>>>>>>> PipelineOptions options = PipelineOptionsFactory.create();
>>>>>>> Pipeline p = Pipeline.create(options);
>>>>>>>
>>>>>>> p.apply(KafkaIO.<byte[], byte[]>read()
>>>>>>> .withBootstrapServers("kafka:9092")
>>>>>>> .withTopic("dbserver1.inventory.customers")
>>>>>>> .withKeyDeserializer(ByteArrayDeserializer.class)
>>>>>>> .withValueDeserializer(ByteArrayDeserializer.class)
>>>>>>> .withoutMetadata(
>>>>>>> )
>>>>>>> .apply(Values.<byte[]>create())
>>>>>>> .apply("ParseAvro", ParDo.of(new DoFn<byte[],
>>>>>>> Envelope>() {
>>>>>>> @ProcessElement
>>>>>>> public void processElement(ProcessContext c) {
>>>>>>> Envelope data = (Envelope)
>>>>>>> avroDecoder.fromBytes(c.element());
>>>>>>> c.output(data);
>>>>>>> }
>>>>>>> }))
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Andrew
>>>>>>>
>>>>>>> On Wed, Oct 18, 2017, at 06:40 PM, Raghu Angadi wrote:
>>>>>>>
>>>>>>> On Wed, Oct 18, 2017 at 10:35 AM, Eugene Kirpichov <
>>>>>>> [email protected]> wrote:
>>>>>>>
>>>>>>> It seems that KafkaAvroDeserializer implements Deserializer<Object>,
>>>>>>> though I suppose with proper configuration that Object will at run-time
>>>>>>> be
>>>>>>> your desired type. Have you tried adding some Java type casts to make it
>>>>>>> compile?
>>>>>>>
>>>>>>>
>>>>>>> +1, cast might be the simplest fix. Alternately you can wrap or
>>>>>>> extend KafkaAvroDeserializer as Tim suggested. It would cast the Object
>>>>>>> returned by KafkaAvroDeserializer::deserializer() to Envolope at
>>>>>>> runtime.
>>>>>>>
>>>>>>>
>>>>>>> On Wed, Oct 18, 2017 at 7:26 AM Tim Robertson <
>>>>>>> [email protected]> wrote:
>>>>>>>
>>>>>>> I just tried quickly and see the same as you Andrew.
>>>>>>> We're missing something obvious or else extending
>>>>>>> KafkaAvroDeserializer seems necessary right?
>>>>>>>
>>>>>>> On Wed, Oct 18, 2017 at 3:14 PM, Andrew Jones <
>>>>>>> [email protected]> wrote:
>>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> I'm trying to read Avro data from a Kafka stream using KafkaIO. I
>>>>>>> think
>>>>>>> it should be as simple as:
>>>>>>>
>>>>>>> p.apply(KafkaIO.<String, Envelope>*read*()
>>>>>>> .withValueDeserializerAndCoder(KafkaAvroDeserializer.class,
>>>>>>> AvroCoder.of(Envelope.class))
>>>>>>>
>>>>>>> Where Envelope is the name of the Avro class. However, that does not
>>>>>>> compile and I get the following error:
>>>>>>>
>>>>>>> incompatible types:
>>>>>>> java.lang.Class<io.confluent.kafka.serializers.KafkaAvroDeserializer>
>>>>>>> cannot be converted to java.lang.Class<? extends
>>>>>>>
>>>>>>> org.apache.kafka.common.serialization.Deserializer<dbserver1.inventory.customers.Envelope>>
>>>>>>>
>>>>>>> I've tried a number of variations on this theme but haven't yet
>>>>>>> worked
>>>>>>> it out and am starting to run out of ideas...
>>>>>>>
>>>>>>> Has anyone successfully read Avro data from Kafka?
>>>>>>>
>>>>>>> The code I'm using can be found at
>>>>>>> https://github.com/andrewrjones/debezium-kafka-beam-example and a
>>>>>>> full
>>>>>>> environment can be created with Docker.
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Andrew
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>
>