Hi Raghu,

Thanks for the response.  We are now trying with GenericAvroDeserializer
but still seeing issues.
We have a producer which sends messages to kafka in format
<String,GenericRecord>.

Below is the code snippet, we have used at Beam KafkaIo.

     org.apache.avro.Schema schema = null;
        try {
            schema = new org.apache.avro.Schema.Parser().parse(new
File("Schema path"));
        } catch (Exception e) {
            e.printStackTrace();
        }
        KafkaIO.Read<String, GenericRecord> kafkaIoRead = KafkaIO.<String,
GenericRecord>read()

.withBootstrapServers(bootstrapServerUrl).withTopic(topicName)
                .withKeyDeserializer(StringDeserializer.class)

.withValueDeserializerAndCoder(GenericAvroDeserializer.class,
AvroCoder.of(schema))

.updateConsumerProperties(ImmutableMap.of("schema.registry.url", schemaUrl))
                .withTimestampPolicyFactory((tp, prevWatermark) -> new
KafkaCustomTimestampPolicy(maxDelay,
                        timestampInfo, prevWatermark));

Below is the error seen,

Caused by:
avro.shaded.com.google.common.util.concurrent.UncheckedExecutionException:
org.apache.avro.AvroRuntimeException: Not a Specific class: interface
org.apache.avro.generic.GenericRecord
        at
avro.shaded.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2234)
        at
avro.shaded.com.google.common.cache.LocalCache.get(LocalCache.java:3965)
        at
avro.shaded.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3969)
        at
avro.shaded.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4829)
        at
org.apache.avro.specific.SpecificData.getSchema(SpecificData.java:225)
        ... 8 more
Caused by: org.apache.avro.AvroRuntimeException: Not a Specific class:
interface org.apache.avro.generic.GenericRecord
        at
org.apache.avro.specific.SpecificData.createSchema(SpecificData.java:285)
        at
org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:594)
        at
org.apache.avro.specific.SpecificData$2.load(SpecificData.java:218)
        at
org.apache.avro.specific.SpecificData$2.load(SpecificData.java:215)
        at
avro.shaded.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3568)
        at
avro.shaded.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2350)
        at
avro.shaded.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2313)
        at
avro.shaded.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2228)


Can you provide some pointers on this.


*Thanks & Regards,*

*Vishwas *



On Fri, Sep 28, 2018 at 3:12 AM Raghu Angadi <[email protected]> wrote:

> It is a compilation error due to type mismatch for value type.
>
> Please match key and value types for KafkaIO reader. I.e. if you have
> KafkaIO.<KeyType, ValueType>read().,  'withValueDeserializer()' needs a
> class object which extends 'Deserializer<ValueType>'. Since
> KafkaAvroDeserializer extends 'Deserializer<Object>', so your ValueType
> needs to be Object, instead of String.
>
> Btw, it might be better to use GenericAvroDeseiralizer or
> SpecificAvroDeserializer from the same package.
>
>
> On Thu, Sep 27, 2018 at 10:31 AM Vishwas Bm <[email protected]> wrote:
>
>>
>> Hi Raghu,
>>
>> The deserializer is provided by confluent
>> *io.confluent.kafka.serializers* package.
>>
>> When we set valueDeserializer as  KafkaAvroDeserializer.  We are getting
>> below error:
>>    The method withValueDeserializer(Class<? extends
>> Deserializer<String>>) in the type KafkaIO.Read<String,String> is not
>> applicable for the arguments
>>  (Class<KafkaAvroDeserializer>)
>>
>> From the error, it looks like beam does not support this deserializer.
>> Also we wanted to use schemaRegistry from confluent, is this supported in
>> Beam ?
>>
>>
>> *Thanks & Regards,*
>> *Vishwas *
>>
>>
>> On Thu, Sep 27, 2018 at 10:28 PM Raghu Angadi <[email protected]> wrote:
>>
>>> You can set key/value deserializers :
>>> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L101
>>> What are the errors you see?
>>>
>>> Also note that Beam includes AvroCoder for handling Avro records in Beam.
>>>
>>> On Thu, Sep 27, 2018 at 6:05 AM rahul patwari <
>>> [email protected]> wrote:
>>>
>>>> Hi,
>>>>
>>>> We have a usecase to read data from Kafka serialized with
>>>> KafkaAvroSerializer and schema is present in Schema Registry.
>>>>
>>>> When we are trying to use ValueDeserializer as
>>>> io.confluent.kafka.serializers.KafkaAvroDeserializer to get GenericRecord,
>>>> we are seeing errors.
>>>>
>>>> Does KafkaIO.read() supports reading from schema registry and using
>>>> confluent KafkaAvroSerDe?
>>>>
>>>> Regards,
>>>> Rahul
>>>>
>>>

Reply via email to