Hi Andrew,
I also saw the same behaviour.
It's not pretty but perhaps try this? It was my last idea I ran out of time
to try...
// Basically a copy KafkaAvroDeserializer with the casts in deserialize
public class EnvelopeAvroDeserializer extends
AbstractKafkaAvroDeserializer implements Deserializer<Envelope> {
...
public Envelope deserialize(String s, byte[] bytes) {
return (Envelope) this.deserialize(bytes);
}
public Envelope deserialize(String s, byte[] bytes, Schema readerSchema) {
return (Envelope) this.deserialize(bytes, readerSchema);
}
...
}
Tim
On Thu, Oct 19, 2017 at 10:52 AM, Andrew Jones <[email protected]
> wrote:
> Using Object doesn't work unfortunately. I get an 'Unable to automatically
> infer a Coder' error at runtime.
>
> This is the code:
>
> p.apply(KafkaIO.<String, Object>read()
> .withValueDeserializer(KafkaAvroDeserializer.class)
>
> It compiles, but at runtime:
>
> Caused by: java.lang.RuntimeException: Unable to automatically infer a
> Coder for the Kafka Deserializer class
> io.confluent.kafka.serializers.KafkaAvroDeserializer:
> no coder registered for type class java.lang.Object
> at org.apache.beam.sdk.io.kafka.KafkaIO.inferCoder(KafkaIO.java:1696)
>
> So far the only thing I've got working is this, where I use the
> ByteArrayDeserializer and then parse Avro myself:
>
> private static KafkaAvroDecoder avroDecoder;
> static {
> final Properties props = new Properties();
> props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
> props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
> "http://registry:8081");
> props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG,
> true);
> VerifiableProperties vProps = new VerifiableProperties(props);
> avroDecoder = new KafkaAvroDecoder(vProps);
> }
>
> public static void main(String[] args) {
>
> PipelineOptions options = PipelineOptionsFactory.create();
> Pipeline p = Pipeline.create(options);
>
> p.apply(KafkaIO.<byte[], byte[]>read()
> .withBootstrapServers("kafka:9092")
> .withTopic("dbserver1.inventory.customers")
> .withKeyDeserializer(ByteArrayDeserializer.class)
> .withValueDeserializer(ByteArrayDeserializer.class)
> .withoutMetadata(
> )
> .apply(Values.<byte[]>create())
> .apply("ParseAvro", ParDo.of(new DoFn<byte[], Envelope>() {
> @ProcessElement
> public void processElement(ProcessContext c) {
> Envelope data = (Envelope) avroDecoder.fromBytes(c.
> element());
> c.output(data);
> }
> }))
>
> Thanks,
> Andrew
>
> On Wed, Oct 18, 2017, at 06:40 PM, Raghu Angadi wrote:
>
> On Wed, Oct 18, 2017 at 10:35 AM, Eugene Kirpichov <[email protected]>
> wrote:
>
> It seems that KafkaAvroDeserializer implements Deserializer<Object>,
> though I suppose with proper configuration that Object will at run-time be
> your desired type. Have you tried adding some Java type casts to make it
> compile?
>
>
> +1, cast might be the simplest fix. Alternately you can wrap or
> extend KafkaAvroDeserializer as Tim suggested. It would cast the Object
> returned by KafkaAvroDeserializer::deserializer() to Envolope at runtime.
>
>
> On Wed, Oct 18, 2017 at 7:26 AM Tim Robertson <[email protected]>
> wrote:
>
> I just tried quickly and see the same as you Andrew.
> We're missing something obvious or else extending KafkaAvroDeserializer seems
> necessary right?
>
> On Wed, Oct 18, 2017 at 3:14 PM, Andrew Jones <
> [email protected]> wrote:
>
> Hi,
>
> I'm trying to read Avro data from a Kafka stream using KafkaIO. I think
> it should be as simple as:
>
> p.apply(KafkaIO.<String, Envelope>*read*()
> .withValueDeserializerAndCoder(KafkaAvroDeserializer.class,
> AvroCoder.of(Envelope.class))
>
> Where Envelope is the name of the Avro class. However, that does not
> compile and I get the following error:
>
> incompatible types:
> java.lang.Class<io.confluent.kafka.serializers.KafkaAvroDeserializer>
> cannot be converted to java.lang.Class<? extends
> org.apache.kafka.common.serialization.Deserializer<dbserver1
> .inventory.customers.Envelope>>
>
> I've tried a number of variations on this theme but haven't yet worked
> it out and am starting to run out of ideas...
>
> Has anyone successfully read Avro data from Kafka?
>
> The code I'm using can be found at
> https://github.com/andrewrjones/debezium-kafka-beam-example and a full
> environment can be created with Docker.
>
> Thanks,
> Andrew
>
>
>