Thanks Tim, that works!

Full code is:

public class EnvelopeKafkaAvroDeserializer extends
AbstractKafkaAvroDeserializer implements Deserializer<Envelope> {    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
        configure(new KafkaAvroDeserializerConfig(configs));
    }

    @Override
    public Envelope deserialize(String s, byte[] bytes) {
        return (Envelope) this.deserialize(bytes);
    }

    @Override
    public void close() {}
}

Nicer than my solution so think that is the one I'm going to go
with for now.
Thanks,
Andrew


On Thu, Oct 19, 2017, at 10:20 AM, Tim Robertson wrote:
> Hi Andrew,
> 
> I also saw the same behaviour.  
> 
> It's not pretty but perhaps try this? It was my last idea I ran out of
> time to try...> *// Basically a copy KafkaAvroDeserializer with the casts in
> deserialize
**public class *EnvelopeAvroDeserializer *extends
*AbstractKafkaAvroDeserializer *implements *Deserializer<Envelope> {
>> 
> 
> 
>   ...
> 
> 
> 
> 
>   *public *Envelope deserialize(String s, *byte*[] bytes) {
> 
> 
> 
> 
>     *return *(Envelope) *this*.deserialize(bytes);
> 
> 
> 
> 
>   }
> 
> 
> 
> 
> 
> 
> 
> 
> 
>   *public *Envelope deserialize(String s, *byte*[] bytes, Schema
>   readerSchema) {
>> 
> 
> 
>     *return *(Envelope) *this*.deserialize(bytes, readerSchema);
> 
> 
> 
> 
>   }
> 
> 
> 
> 
> 
> 
> 
> 
> 
>   ...
> 
> 
> 
> 
> }
> 
> 
> 
> 
> Tim
> 
> 
> On Thu, Oct 19, 2017 at 10:52 AM, Andrew Jones <andrew+beam@andrew-
> jones.com> wrote:>> __
>> Using Object doesn't work unfortunately. I get an 'Unable to
>> automatically infer a Coder' error at runtime.>> 
>> This is the code:
>> 
>> p.apply(KafkaIO.<String, Object>read()
>>                 .withValueDeserializer(KafkaAvroDeserializer.class)
>> 
>> It compiles, but at runtime:
>> 
>> Caused by: java.lang.RuntimeException: Unable to automatically infer
>> a Coder for the Kafka Deserializer class
>> io.confluent.kafka.serializers.KafkaAvroDeserializer: no coder
>> registered for type class java.lang.Object>> at 
>> org.apache.beam.sdk.io.kafka.KafkaIO.inferCoder(KafkaIO.java:1696)>> 
>> So far the only thing I've got working is this, where I use the
>> ByteArrayDeserializer and then parse Avro myself:>> 
>>     private static KafkaAvroDecoder avroDecoder;
>>     static {
>>         final Properties props = new Properties();
>>         props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
>>         "kafka:9092");>>         
>> props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_C-
>>         ONFIG, "http://registry:8081";);>>         
>> props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_C-
>>         ONFIG, true);>>         VerifiableProperties vProps = new
>>         VerifiableProperties(props);>>         avroDecoder = new 
>> KafkaAvroDecoder(vProps);
>>     }
>> 
>>     public static void main(String[] args) {
>> 
>>         PipelineOptions options = PipelineOptionsFactory.create();
>>         Pipeline p = Pipeline.create(options);
>> 
>>         p.apply(KafkaIO.<byte[], byte[]>read()
>>                 .withBootstrapServers("kafka:9092")
>>                 .withTopic("dbserver1.inventory.customers")
>>                 .withKeyDeserializer(ByteArrayDeserializer.class)
>>                 .withValueDeserializer(ByteArrayDeserializer.class)
>>                 .withoutMetadata(
>>         )
>>                 .apply(Values.<byte[]>create())
>>                 .apply("ParseAvro", ParDo.of(new DoFn<byte[],
>>                 Envelope>() {>>                     @ProcessElement
>>                     public void processElement(ProcessContext c) {
>>                         Envelope data = (Envelope)
>>                         avroDecoder.fromBytes(c.element());>>                
>>          c.output(data);
>>                     }
>>                 }))
>> 
>> Thanks,
>> Andrew
>> 
>> On Wed, Oct 18, 2017, at 06:40 PM, Raghu Angadi wrote:
>>> On Wed, Oct 18, 2017 at 10:35 AM, Eugene Kirpichov
>>> <[email protected]> wrote:>>>> It seems that KafkaAvroDeserializer 
>>> implements
>>>> Deserializer<Object>, though I suppose with proper configuration
>>>> that Object will at run-time be your desired type. Have you tried
>>>> adding some Java type casts to make it compile?>>> 
>>> +1, cast might be the simplest fix. Alternately you can wrap or
>>> extend KafkaAvroDeserializer as Tim suggested. It would cast the
>>> Object returned by KafkaAvroDeserializer::deserializer() to Envolope
>>> at runtime.>>> 
>>>> 
>>>> On Wed, Oct 18, 2017 at 7:26 AM Tim Robertson
>>>> <[email protected]> wrote:>>>>> I just tried quickly and see the 
>>>> same as you Andrew.  
>>>>> We're missing something obvious or else extending
>>>>> KafkaAvroDeserializer seems necessary right?>>>>> 
>>>>> On Wed, Oct 18, 2017 at 3:14 PM, Andrew Jones <andrew+beam@andrew-
>>>>> jones.com> wrote:>>>>>> Hi,
>>>>>> 
>>>>>> I'm trying to read Avro data from a Kafka stream using KafkaIO. I
>>>>>> think>>>>>> it should be as simple as:
>>>>>> 
>>>>>> p.apply(KafkaIO.<String, Envelope>*read*()
>>>>>>   .withValueDeserializerAndCoder(KafkaAvroDeserializer.class,
>>>>>>   AvroCoder.of(Envelope.class))
>>>>>> 
>>>>>> Where Envelope is the name of the Avro class. However, that does
>>>>>> not>>>>>> compile and I get the following error:
>>>>>> 
>>>>>> incompatible types:
>>>>>> java.lang.Class<io.confluent.kafka.serializers.KafkaAvroDeserial-
>>>>>> izer>>>>>>> cannot be converted to java.lang.Class<? extends
>>>>>> org.apache.kafka.common.serialization.Deserializer<dbserver1.inv-
>>>>>> entory.customers.Envelope>>>>>>>> 
>>>>>> I've tried a number of variations on this theme but haven't yet
>>>>>> worked>>>>>> it out and am starting to run out of ideas...
>>>>>> 
>>>>>> Has anyone successfully read Avro data from Kafka?
>>>>>> 
>>>>>> The code I'm using can be found at
>>>>>> https://github.com/andrewrjones/debezium-kafka-beam-example and a
>>>>>> full>>>>>> environment can be created with Docker.
>>>>>> 
>>>>>> Thanks,
>>>>>> Andrew
>> 

Reply via email to