Thanks Tim, that works!
Full code is:
public class EnvelopeKafkaAvroDeserializer extends
AbstractKafkaAvroDeserializer implements Deserializer<Envelope> { @Override
public void configure(Map<String, ?> configs, boolean isKey) {
configure(new KafkaAvroDeserializerConfig(configs));
}
@Override
public Envelope deserialize(String s, byte[] bytes) {
return (Envelope) this.deserialize(bytes);
}
@Override
public void close() {}
}
Nicer than my solution so think that is the one I'm going to go
with for now.
Thanks,
Andrew
On Thu, Oct 19, 2017, at 10:20 AM, Tim Robertson wrote:
> Hi Andrew,
>
> I also saw the same behaviour.
>
> It's not pretty but perhaps try this? It was my last idea I ran out of
> time to try...> *// Basically a copy KafkaAvroDeserializer with the casts in
> deserialize
**public class *EnvelopeAvroDeserializer *extends
*AbstractKafkaAvroDeserializer *implements *Deserializer<Envelope> {
>>
>
>
> ...
>
>
>
>
> *public *Envelope deserialize(String s, *byte*[] bytes) {
>
>
>
>
> *return *(Envelope) *this*.deserialize(bytes);
>
>
>
>
> }
>
>
>
>
>
>
>
>
>
> *public *Envelope deserialize(String s, *byte*[] bytes, Schema
> readerSchema) {
>>
>
>
> *return *(Envelope) *this*.deserialize(bytes, readerSchema);
>
>
>
>
> }
>
>
>
>
>
>
>
>
>
> ...
>
>
>
>
> }
>
>
>
>
> Tim
>
>
> On Thu, Oct 19, 2017 at 10:52 AM, Andrew Jones <andrew+beam@andrew-
> jones.com> wrote:>> __
>> Using Object doesn't work unfortunately. I get an 'Unable to
>> automatically infer a Coder' error at runtime.>>
>> This is the code:
>>
>> p.apply(KafkaIO.<String, Object>read()
>> .withValueDeserializer(KafkaAvroDeserializer.class)
>>
>> It compiles, but at runtime:
>>
>> Caused by: java.lang.RuntimeException: Unable to automatically infer
>> a Coder for the Kafka Deserializer class
>> io.confluent.kafka.serializers.KafkaAvroDeserializer: no coder
>> registered for type class java.lang.Object>> at
>> org.apache.beam.sdk.io.kafka.KafkaIO.inferCoder(KafkaIO.java:1696)>>
>> So far the only thing I've got working is this, where I use the
>> ByteArrayDeserializer and then parse Avro myself:>>
>> private static KafkaAvroDecoder avroDecoder;
>> static {
>> final Properties props = new Properties();
>> props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
>> "kafka:9092");>>
>> props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_C-
>> ONFIG, "http://registry:8081");>>
>> props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_C-
>> ONFIG, true);>> VerifiableProperties vProps = new
>> VerifiableProperties(props);>> avroDecoder = new
>> KafkaAvroDecoder(vProps);
>> }
>>
>> public static void main(String[] args) {
>>
>> PipelineOptions options = PipelineOptionsFactory.create();
>> Pipeline p = Pipeline.create(options);
>>
>> p.apply(KafkaIO.<byte[], byte[]>read()
>> .withBootstrapServers("kafka:9092")
>> .withTopic("dbserver1.inventory.customers")
>> .withKeyDeserializer(ByteArrayDeserializer.class)
>> .withValueDeserializer(ByteArrayDeserializer.class)
>> .withoutMetadata(
>> )
>> .apply(Values.<byte[]>create())
>> .apply("ParseAvro", ParDo.of(new DoFn<byte[],
>> Envelope>() {>> @ProcessElement
>> public void processElement(ProcessContext c) {
>> Envelope data = (Envelope)
>> avroDecoder.fromBytes(c.element());>>
>> c.output(data);
>> }
>> }))
>>
>> Thanks,
>> Andrew
>>
>> On Wed, Oct 18, 2017, at 06:40 PM, Raghu Angadi wrote:
>>> On Wed, Oct 18, 2017 at 10:35 AM, Eugene Kirpichov
>>> <[email protected]> wrote:>>>> It seems that KafkaAvroDeserializer
>>> implements
>>>> Deserializer<Object>, though I suppose with proper configuration
>>>> that Object will at run-time be your desired type. Have you tried
>>>> adding some Java type casts to make it compile?>>>
>>> +1, cast might be the simplest fix. Alternately you can wrap or
>>> extend KafkaAvroDeserializer as Tim suggested. It would cast the
>>> Object returned by KafkaAvroDeserializer::deserializer() to Envolope
>>> at runtime.>>>
>>>>
>>>> On Wed, Oct 18, 2017 at 7:26 AM Tim Robertson
>>>> <[email protected]> wrote:>>>>> I just tried quickly and see the
>>>> same as you Andrew.
>>>>> We're missing something obvious or else extending
>>>>> KafkaAvroDeserializer seems necessary right?>>>>>
>>>>> On Wed, Oct 18, 2017 at 3:14 PM, Andrew Jones <andrew+beam@andrew-
>>>>> jones.com> wrote:>>>>>> Hi,
>>>>>>
>>>>>> I'm trying to read Avro data from a Kafka stream using KafkaIO. I
>>>>>> think>>>>>> it should be as simple as:
>>>>>>
>>>>>> p.apply(KafkaIO.<String, Envelope>*read*()
>>>>>> .withValueDeserializerAndCoder(KafkaAvroDeserializer.class,
>>>>>> AvroCoder.of(Envelope.class))
>>>>>>
>>>>>> Where Envelope is the name of the Avro class. However, that does
>>>>>> not>>>>>> compile and I get the following error:
>>>>>>
>>>>>> incompatible types:
>>>>>> java.lang.Class<io.confluent.kafka.serializers.KafkaAvroDeserial-
>>>>>> izer>>>>>>> cannot be converted to java.lang.Class<? extends
>>>>>> org.apache.kafka.common.serialization.Deserializer<dbserver1.inv-
>>>>>> entory.customers.Envelope>>>>>>>>
>>>>>> I've tried a number of variations on this theme but haven't yet
>>>>>> worked>>>>>> it out and am starting to run out of ideas...
>>>>>>
>>>>>> Has anyone successfully read Avro data from Kafka?
>>>>>>
>>>>>> The code I'm using can be found at
>>>>>> https://github.com/andrewrjones/debezium-kafka-beam-example and a
>>>>>> full>>>>>> environment can be created with Docker.
>>>>>>
>>>>>> Thanks,
>>>>>> Andrew
>>