Re: Avro DeSerializeation Issue in Kafka Streams

2020-05-06 Thread Suresh Chidambaram
Thanks for the info Nagendra.

Thanks
C Suresh

On Wednesday, May 6, 2020, Nagendra Korrapati 
wrote:

>  When specific.avro.reader is set to true Deserializer tries to create the
> instance of the Class. The class name is formed by reading the schema
> (writer schema) from schema registry and concatenating the namespace and
> record name. It is trying to create that instance and it is not found in
> the class path. But I am not sure how it formed the name XYZ-Table (Check
> the namespace and name of the record in the schema registry and making the
> class available in the class path should solve it )This is my
> understanding. I may be wrong!!
>
> Nagendra
>
> > On May 5, 2020, at 11:12 AM, Suresh Chidambaram 
> wrote:
> >
> > Hi All,
> >
> > Currently, I'm working on a usecase wherein I have to deserialie an Avro
> > object and convert to some other format of Avro. Below is the  flow.
> >
> > DB -> Source Topic(Avro format) -> Stream Processor -> Target Topic (Avro
> > as nested object).
> >
> > When I deserialize the message from the Source Topic, the below exception
> > is thrown.
> >
> > Could someone help me resolving this issue?
> >
> > 2020-05-05 10:29:34.218  INFO 13804 --- [-StreamThread-1]
> > o.a.k.clients.consumer.KafkaConsumer : [Consumer
> > clientId=confluent-kafka-poc-client-StreamThread-1-restore-consumer,
> > groupId=null] Unsubscribed all topics or patterns and assigned partitions
> > 2020-05-05 10:29:34.218  INFO 13804 --- [-StreamThread-1]
> > o.a.k.clients.consumer.KafkaConsumer : [Consumer
> > clientId=confluent-kafka-poc-client-StreamThread-1-restore-consumer,
> > groupId=null] Unsubscribed all topics or patterns and assigned partitions
> > 2020-05-05 10:29:34.218  INFO 13804 --- [-StreamThread-1]
> > o.a.k.s.p.internals.StreamThread : stream-thread
> > [confluent-kafka-poc-client-StreamThread-1] State transition from
> > PARTITIONS_ASSIGNED to RUNNING
> > 2020-05-05 10:29:34.219  INFO 13804 --- [-StreamThread-1]
> > org.apache.kafka.streams.KafkaStreams: stream-client
> > [confluent-kafka-poc-client] State transition from REBALANCING to RUNNING
> > 2020-05-05 10:29:34.220  INFO 13804 --- [-StreamThread-1]
> > o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer
> > clientId=confluent-kafka-poc-client-StreamThread-1-consumer,
> > groupId=confluent-kafka-poc] Found no committed offset for partition
> > DEMO-poc-0
> > 2020-05-05 10:29:34.228  INFO 13804 --- [-StreamThread-1]
> > o.a.k.c.c.internals.SubscriptionState: [Consumer
> > clientId=confluent-kafka-poc-client-StreamThread-1-consumer,
> > groupId=confluent-kafka-poc] Resetting offset for partition DEMO-poc-0 to
> > offset 0.
> > 2020-05-05 10:30:12.886 ERROR 13804 --- [-StreamThread-1]
> > o.a.k.s.e.LogAndFailExceptionHandler : Exception caught during
> > Deserialization, taskId: 0_0, topic: DEMO-poc, partition: 0, offset: 0
> >
> > org.apache.kafka.common.errors.SerializationException: Error
> deserializing
> > Avro message for id 1421
> >
> > *Caused by: org.apache.kafka.common.errors.SerializationException: Could
> > not find class "XYZ-Table" specified in writer's schema whilst finding
> > reader's schema for a SpecificRecord.*
> > 2020-05-05 10:30:12.888 ERROR 13804 --- [-StreamThread-1]
> > o.a.k.s.p.internals.StreamThread : stream-thread
> > [confluent-kafka-poc-client-StreamThread-1] Encountered the following
> > unexpected Kafka exception during processing, this usually indicate
> Streams
> > internal errors:
> >
> > org.apache.kafka.streams.errors.StreamsException: Deserialization
> exception
> > handler is set to fail upon a deserialization error. If you would rather
> > have the streaming pipeline continue after a deserialization error,
> please
> > set the default.deserialization.exception.handler appropriately.
> >at
> > org.apache.kafka.streams.processor.internals.RecordDeserializer.
> deserialize(RecordDeserializer.java:80)
> > ~[kafka-streams-5.3.0-ccs.jar!/:na]
> >at
> > org.apache.kafka.streams.processor.internals.RecordQueue.updateHead(
> RecordQueue.java:158)
> > ~[kafka-streams-5.3.0-ccs.jar!/:na]
> >at
> > org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(
> RecordQueue.java:100)
> > ~[kafka-streams-5.3.0-ccs.jar!/:na]
> >at
> > org.apache.kafka.streams.processor.internals.
> PartitionGroup.addRawRecords(PartitionGroup.java:136)
> > ~[kafka-streams-5.3.0-ccs.jar!/:na]
> >at
> > org.apache.kafka.streams.processor.internals.StreamTask.addRecords(
> StreamTask.java:746)
> > ~[kafka-streams-5.3.0-ccs.jar!/:na]
> >at
> > org.apache.kafka.streams.processor.internals.StreamThread.
> addRecordsToTasks(StreamThread.java:1023)
> > ~[kafka-streams-5.3.0-ccs.jar!/:na]
> >at
> > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(
> StreamThread.java:861)
> > ~[kafka-streams-5.3.0-ccs.jar!/:na]
> >at
> > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> 

Re: Avro DeSerializeation Issue in Kafka Streams

2020-05-05 Thread Nagendra Korrapati
 When specific.avro.reader is set to true Deserializer tries to create the 
instance of the Class. The class name is formed by reading the schema (writer 
schema) from schema registry and concatenating the namespace and record name. 
It is trying to create that instance and it is not found in the class path. But 
I am not sure how it formed the name XYZ-Table (Check the namespace and name of 
the record in the schema registry and making the class available in the class 
path should solve it )This is my understanding. I may be wrong!!

Nagendra

> On May 5, 2020, at 11:12 AM, Suresh Chidambaram  
> wrote:
> 
> Hi All,
> 
> Currently, I'm working on a usecase wherein I have to deserialie an Avro
> object and convert to some other format of Avro. Below is the  flow.
> 
> DB -> Source Topic(Avro format) -> Stream Processor -> Target Topic (Avro
> as nested object).
> 
> When I deserialize the message from the Source Topic, the below exception
> is thrown.
> 
> Could someone help me resolving this issue?
> 
> 2020-05-05 10:29:34.218  INFO 13804 --- [-StreamThread-1]
> o.a.k.clients.consumer.KafkaConsumer : [Consumer
> clientId=confluent-kafka-poc-client-StreamThread-1-restore-consumer,
> groupId=null] Unsubscribed all topics or patterns and assigned partitions
> 2020-05-05 10:29:34.218  INFO 13804 --- [-StreamThread-1]
> o.a.k.clients.consumer.KafkaConsumer : [Consumer
> clientId=confluent-kafka-poc-client-StreamThread-1-restore-consumer,
> groupId=null] Unsubscribed all topics or patterns and assigned partitions
> 2020-05-05 10:29:34.218  INFO 13804 --- [-StreamThread-1]
> o.a.k.s.p.internals.StreamThread : stream-thread
> [confluent-kafka-poc-client-StreamThread-1] State transition from
> PARTITIONS_ASSIGNED to RUNNING
> 2020-05-05 10:29:34.219  INFO 13804 --- [-StreamThread-1]
> org.apache.kafka.streams.KafkaStreams: stream-client
> [confluent-kafka-poc-client] State transition from REBALANCING to RUNNING
> 2020-05-05 10:29:34.220  INFO 13804 --- [-StreamThread-1]
> o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer
> clientId=confluent-kafka-poc-client-StreamThread-1-consumer,
> groupId=confluent-kafka-poc] Found no committed offset for partition
> DEMO-poc-0
> 2020-05-05 10:29:34.228  INFO 13804 --- [-StreamThread-1]
> o.a.k.c.c.internals.SubscriptionState: [Consumer
> clientId=confluent-kafka-poc-client-StreamThread-1-consumer,
> groupId=confluent-kafka-poc] Resetting offset for partition DEMO-poc-0 to
> offset 0.
> 2020-05-05 10:30:12.886 ERROR 13804 --- [-StreamThread-1]
> o.a.k.s.e.LogAndFailExceptionHandler : Exception caught during
> Deserialization, taskId: 0_0, topic: DEMO-poc, partition: 0, offset: 0
> 
> org.apache.kafka.common.errors.SerializationException: Error deserializing
> Avro message for id 1421
> 
> *Caused by: org.apache.kafka.common.errors.SerializationException: Could
> not find class "XYZ-Table" specified in writer's schema whilst finding
> reader's schema for a SpecificRecord.*
> 2020-05-05 10:30:12.888 ERROR 13804 --- [-StreamThread-1]
> o.a.k.s.p.internals.StreamThread : stream-thread
> [confluent-kafka-poc-client-StreamThread-1] Encountered the following
> unexpected Kafka exception during processing, this usually indicate Streams
> internal errors:
> 
> org.apache.kafka.streams.errors.StreamsException: Deserialization exception
> handler is set to fail upon a deserialization error. If you would rather
> have the streaming pipeline continue after a deserialization error, please
> set the default.deserialization.exception.handler appropriately.
>at
> org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:80)
> ~[kafka-streams-5.3.0-ccs.jar!/:na]
>at
> org.apache.kafka.streams.processor.internals.RecordQueue.updateHead(RecordQueue.java:158)
> ~[kafka-streams-5.3.0-ccs.jar!/:na]
>at
> org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:100)
> ~[kafka-streams-5.3.0-ccs.jar!/:na]
>at
> org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:136)
> ~[kafka-streams-5.3.0-ccs.jar!/:na]
>at
> org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:746)
> ~[kafka-streams-5.3.0-ccs.jar!/:na]
>at
> org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:1023)
> ~[kafka-streams-5.3.0-ccs.jar!/:na]
>at
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:861)
> ~[kafka-streams-5.3.0-ccs.jar!/:na]
>at
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:805)
> ~[kafka-streams-5.3.0-ccs.jar!/:na]
>at
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:774)
> ~[kafka-streams-5.3.0-ccs.jar!/:na]
> Caused by: org.apache.kafka.common.errors.SerializationException: Error
> deserializing Avro message for id 1421
> 

Avro DeSerializeation Issue in Kafka Streams

2020-05-05 Thread Suresh Chidambaram
Hi All,

Currently, I'm working on a usecase wherein I have to deserialie an Avro
object and convert to some other format of Avro. Below is the  flow.

DB -> Source Topic(Avro format) -> Stream Processor -> Target Topic (Avro
as nested object).

When I deserialize the message from the Source Topic, the below exception
is thrown.

Could someone help me resolving this issue?

2020-05-05 10:29:34.218  INFO 13804 --- [-StreamThread-1]
o.a.k.clients.consumer.KafkaConsumer : [Consumer
clientId=confluent-kafka-poc-client-StreamThread-1-restore-consumer,
groupId=null] Unsubscribed all topics or patterns and assigned partitions
2020-05-05 10:29:34.218  INFO 13804 --- [-StreamThread-1]
o.a.k.clients.consumer.KafkaConsumer : [Consumer
clientId=confluent-kafka-poc-client-StreamThread-1-restore-consumer,
groupId=null] Unsubscribed all topics or patterns and assigned partitions
2020-05-05 10:29:34.218  INFO 13804 --- [-StreamThread-1]
o.a.k.s.p.internals.StreamThread : stream-thread
[confluent-kafka-poc-client-StreamThread-1] State transition from
PARTITIONS_ASSIGNED to RUNNING
2020-05-05 10:29:34.219  INFO 13804 --- [-StreamThread-1]
org.apache.kafka.streams.KafkaStreams: stream-client
[confluent-kafka-poc-client] State transition from REBALANCING to RUNNING
2020-05-05 10:29:34.220  INFO 13804 --- [-StreamThread-1]
o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer
clientId=confluent-kafka-poc-client-StreamThread-1-consumer,
groupId=confluent-kafka-poc] Found no committed offset for partition
DEMO-poc-0
2020-05-05 10:29:34.228  INFO 13804 --- [-StreamThread-1]
o.a.k.c.c.internals.SubscriptionState: [Consumer
clientId=confluent-kafka-poc-client-StreamThread-1-consumer,
groupId=confluent-kafka-poc] Resetting offset for partition DEMO-poc-0 to
offset 0.
2020-05-05 10:30:12.886 ERROR 13804 --- [-StreamThread-1]
o.a.k.s.e.LogAndFailExceptionHandler : Exception caught during
Deserialization, taskId: 0_0, topic: DEMO-poc, partition: 0, offset: 0

org.apache.kafka.common.errors.SerializationException: Error deserializing
Avro message for id 1421

*Caused by: org.apache.kafka.common.errors.SerializationException: Could
not find class "XYZ-Table" specified in writer's schema whilst finding
reader's schema for a SpecificRecord.*
2020-05-05 10:30:12.888 ERROR 13804 --- [-StreamThread-1]
o.a.k.s.p.internals.StreamThread : stream-thread
[confluent-kafka-poc-client-StreamThread-1] Encountered the following
unexpected Kafka exception during processing, this usually indicate Streams
internal errors:

org.apache.kafka.streams.errors.StreamsException: Deserialization exception
handler is set to fail upon a deserialization error. If you would rather
have the streaming pipeline continue after a deserialization error, please
set the default.deserialization.exception.handler appropriately.
at
org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:80)
~[kafka-streams-5.3.0-ccs.jar!/:na]
at
org.apache.kafka.streams.processor.internals.RecordQueue.updateHead(RecordQueue.java:158)
~[kafka-streams-5.3.0-ccs.jar!/:na]
at
org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:100)
~[kafka-streams-5.3.0-ccs.jar!/:na]
at
org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:136)
~[kafka-streams-5.3.0-ccs.jar!/:na]
at
org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:746)
~[kafka-streams-5.3.0-ccs.jar!/:na]
at
org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:1023)
~[kafka-streams-5.3.0-ccs.jar!/:na]
at
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:861)
~[kafka-streams-5.3.0-ccs.jar!/:na]
at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:805)
~[kafka-streams-5.3.0-ccs.jar!/:na]
at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:774)
~[kafka-streams-5.3.0-ccs.jar!/:na]
Caused by: org.apache.kafka.common.errors.SerializationException: Error
deserializing Avro message for id 1421
Caused by: org.apache.kafka.common.errors.SerializationException: Could not
find class "XYZ-Table" specified in writer's schema whilst finding reader's
schema for a SpecificRecord.

2020-05-05 10:30:12.888  INFO 13804 --- [-StreamThread-1]
o.a.k.s.p.internals.StreamThread : stream-thread
[confluent-kafka-poc-client-StreamThread-1] State transition from RUNNING
to PENDING_SHUTDOWN
2020-05-05 10:30:12.888  INFO 13804 --- [-StreamThread-1]
o.a.k.s.p.internals.StreamThread : stream-thread
[confluent-kafka-poc-client-StreamThread-1] Shutting down
2020-05-05 10:30:12.891  INFO 13804 --- [-StreamThread-1]
o.a.k.clients.consumer.KafkaConsumer : [Consumer
clientId=confluent-kafka-poc-client-StreamThread-1-restore-consumer,
groupId=null] Unsubscribed all topics or