Support for Confluent Schema Registry was merged into KafkaIO today. You can test it with tomorrow's snapshots (version 2.20.0-SNAPSHOT) or just when 2.20.0 gets released. Notice that this was already possible, but Alexey took care of making this more user friendly because this is (was) a frequently requested feature by Kafka/Avro users.
On Fri, Sep 28, 2018 at 6:58 PM Raghu Angadi <[email protected]> wrote: > Looks like your producer writing a Avro specfic records. > > Can you read the records using bundled console consumer? I think it will > be simpler for you to get it returning valid records and use the same > deserializer config with your KafkaIO reader. > > On Fri, Sep 28, 2018 at 9:33 AM Vishwas Bm <[email protected]> wrote: > >> Hi Raghu, >> >> Thanks for the response. We are now trying with GenericAvroDeserializer >> but still seeing issues. >> We have a producer which sends messages to kafka in format >> <String,GenericRecord>. >> >> Below is the code snippet, we have used at Beam KafkaIo. >> >> org.apache.avro.Schema schema = null; >> try { >> schema = new org.apache.avro.Schema.Parser().parse(new >> File("Schema path")); >> } catch (Exception e) { >> e.printStackTrace(); >> } >> KafkaIO.Read<String, GenericRecord> kafkaIoRead = >> KafkaIO.<String, GenericRecord>read() >> >> .withBootstrapServers(bootstrapServerUrl).withTopic(topicName) >> .withKeyDeserializer(StringDeserializer.class) >> >> .withValueDeserializerAndCoder(GenericAvroDeserializer.class, >> AvroCoder.of(schema)) >> >> .updateConsumerProperties(ImmutableMap.of("schema.registry.url", schemaUrl)) >> .withTimestampPolicyFactory((tp, prevWatermark) -> new >> KafkaCustomTimestampPolicy(maxDelay, >> timestampInfo, prevWatermark)); >> >> Below is the error seen, >> >> Caused by: >> avro.shaded.com.google.common.util.concurrent.UncheckedExecutionException: >> org.apache.avro.AvroRuntimeException: Not a Specific class: interface >> org.apache.avro.generic.GenericRecord >> at >> avro.shaded.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2234) >> at >> avro.shaded.com.google.common.cache.LocalCache.get(LocalCache.java:3965) >> at >> avro.shaded.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3969) >> at >> avro.shaded.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4829) >> at >> org.apache.avro.specific.SpecificData.getSchema(SpecificData.java:225) >> ... 8 more >> Caused by: org.apache.avro.AvroRuntimeException: Not a Specific class: >> interface org.apache.avro.generic.GenericRecord >> at >> org.apache.avro.specific.SpecificData.createSchema(SpecificData.java:285) >> at >> org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:594) >> at >> org.apache.avro.specific.SpecificData$2.load(SpecificData.java:218) >> at >> org.apache.avro.specific.SpecificData$2.load(SpecificData.java:215) >> at >> avro.shaded.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3568) >> at >> avro.shaded.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2350) >> at >> avro.shaded.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2313) >> at >> avro.shaded.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2228) >> >> >> Can you provide some pointers on this. >> >> >> *Thanks & Regards,* >> >> *Vishwas * >> >> >> >> On Fri, Sep 28, 2018 at 3:12 AM Raghu Angadi <[email protected]> wrote: >> >>> It is a compilation error due to type mismatch for value type. >>> >>> Please match key and value types for KafkaIO reader. I.e. if you have >>> KafkaIO.<KeyType, ValueType>read()., 'withValueDeserializer()' needs a >>> class object which extends 'Deserializer<ValueType>'. Since >>> KafkaAvroDeserializer extends 'Deserializer<Object>', so your ValueType >>> needs to be Object, instead of String. >>> >>> Btw, it might be better to use GenericAvroDeseiralizer or >>> SpecificAvroDeserializer from the same package. >>> >>> >>> On Thu, Sep 27, 2018 at 10:31 AM Vishwas Bm <[email protected]> wrote: >>> >>>> >>>> Hi Raghu, >>>> >>>> The deserializer is provided by confluent >>>> *io.confluent.kafka.serializers* package. >>>> >>>> When we set valueDeserializer as KafkaAvroDeserializer. We are >>>> getting below error: >>>> The method withValueDeserializer(Class<? extends >>>> Deserializer<String>>) in the type KafkaIO.Read<String,String> is not >>>> applicable for the arguments >>>> (Class<KafkaAvroDeserializer>) >>>> >>>> From the error, it looks like beam does not support this deserializer. >>>> Also we wanted to use schemaRegistry from confluent, is this supported >>>> in Beam ? >>>> >>>> >>>> *Thanks & Regards,* >>>> *Vishwas * >>>> >>>> >>>> On Thu, Sep 27, 2018 at 10:28 PM Raghu Angadi <[email protected]> >>>> wrote: >>>> >>>>> You can set key/value deserializers : >>>>> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L101 >>>>> What are the errors you see? >>>>> >>>>> Also note that Beam includes AvroCoder for handling Avro records in >>>>> Beam. >>>>> >>>>> On Thu, Sep 27, 2018 at 6:05 AM rahul patwari < >>>>> [email protected]> wrote: >>>>> >>>>>> Hi, >>>>>> >>>>>> We have a usecase to read data from Kafka serialized with >>>>>> KafkaAvroSerializer and schema is present in Schema Registry. >>>>>> >>>>>> When we are trying to use ValueDeserializer as >>>>>> io.confluent.kafka.serializers.KafkaAvroDeserializer to get >>>>>> GenericRecord, >>>>>> we are seeing errors. >>>>>> >>>>>> Does KafkaIO.read() supports reading from schema registry and using >>>>>> confluent KafkaAvroSerDe? >>>>>> >>>>>> Regards, >>>>>> Rahul >>>>>> >>>>>
