Re: Avro DeSerializeation Issue in Kafka Streams
Thanks for the info Nagendra. Thanks C Suresh On Wednesday, May 6, 2020, Nagendra Korrapati wrote: > When specific.avro.reader is set to true Deserializer tries to create the > instance of the Class. The class name is formed by reading the schema > (writer schema) from schema registry and concatenating the namespace and > record name. It is trying to create that instance and it is not found in > the class path. But I am not sure how it formed the name XYZ-Table (Check > the namespace and name of the record in the schema registry and making the > class available in the class path should solve it )This is my > understanding. I may be wrong!! > > Nagendra > > > On May 5, 2020, at 11:12 AM, Suresh Chidambaram > wrote: > > > > Hi All, > > > > Currently, I'm working on a usecase wherein I have to deserialie an Avro > > object and convert to some other format of Avro. Below is the flow. > > > > DB -> Source Topic(Avro format) -> Stream Processor -> Target Topic (Avro > > as nested object). > > > > When I deserialize the message from the Source Topic, the below exception > > is thrown. > > > > Could someone help me resolving this issue? > > > > 2020-05-05 10:29:34.218 INFO 13804 --- [-StreamThread-1] > > o.a.k.clients.consumer.KafkaConsumer : [Consumer > > clientId=confluent-kafka-poc-client-StreamThread-1-restore-consumer, > > groupId=null] Unsubscribed all topics or patterns and assigned partitions > > 2020-05-05 10:29:34.218 INFO 13804 --- [-StreamThread-1] > > o.a.k.clients.consumer.KafkaConsumer : [Consumer > > clientId=confluent-kafka-poc-client-StreamThread-1-restore-consumer, > > groupId=null] Unsubscribed all topics or patterns and assigned partitions > > 2020-05-05 10:29:34.218 INFO 13804 --- [-StreamThread-1] > > o.a.k.s.p.internals.StreamThread : stream-thread > > [confluent-kafka-poc-client-StreamThread-1] State transition from > > PARTITIONS_ASSIGNED to RUNNING > > 2020-05-05 10:29:34.219 INFO 13804 --- [-StreamThread-1] > > org.apache.kafka.streams.KafkaStreams: stream-client > > [confluent-kafka-poc-client] State transition from REBALANCING to RUNNING > > 2020-05-05 10:29:34.220 INFO 13804 --- [-StreamThread-1] > > o.a.k.c.c.internals.ConsumerCoordinator : [Consumer > > clientId=confluent-kafka-poc-client-StreamThread-1-consumer, > > groupId=confluent-kafka-poc] Found no committed offset for partition > > DEMO-poc-0 > > 2020-05-05 10:29:34.228 INFO 13804 --- [-StreamThread-1] > > o.a.k.c.c.internals.SubscriptionState: [Consumer > > clientId=confluent-kafka-poc-client-StreamThread-1-consumer, > > groupId=confluent-kafka-poc] Resetting offset for partition DEMO-poc-0 to > > offset 0. > > 2020-05-05 10:30:12.886 ERROR 13804 --- [-StreamThread-1] > > o.a.k.s.e.LogAndFailExceptionHandler : Exception caught during > > Deserialization, taskId: 0_0, topic: DEMO-poc, partition: 0, offset: 0 > > > > org.apache.kafka.common.errors.SerializationException: Error > deserializing > > Avro message for id 1421 > > > > *Caused by: org.apache.kafka.common.errors.SerializationException: Could > > not find class "XYZ-Table" specified in writer's schema whilst finding > > reader's schema for a SpecificRecord.* > > 2020-05-05 10:30:12.888 ERROR 13804 --- [-StreamThread-1] > > o.a.k.s.p.internals.StreamThread : stream-thread > > [confluent-kafka-poc-client-StreamThread-1] Encountered the following > > unexpected Kafka exception during processing, this usually indicate > Streams > > internal errors: > > > > org.apache.kafka.streams.errors.StreamsException: Deserialization > exception > > handler is set to fail upon a deserialization error. If you would rather > > have the streaming pipeline continue after a deserialization error, > please > > set the default.deserialization.exception.handler appropriately. > >at > > org.apache.kafka.streams.processor.internals.RecordDeserializer. > deserialize(RecordDeserializer.java:80) > > ~[kafka-streams-5.3.0-ccs.jar!/:na] > >at > > org.apache.kafka.streams.processor.internals.RecordQueue.updateHead( > RecordQueue.java:158) > > ~[kafka-streams-5.3.0-ccs.jar!/:na] > >at > > org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords( > RecordQueue.java:100) > > ~[kafka-streams-5.3.0-ccs.jar!/:na] > >at > > org.apache.kafka.streams.processor.internals. > PartitionGroup.addRawRecords(PartitionGroup.java:136) > > ~[kafka-streams-5.3.0-ccs.jar!/:na] > >at > > org.apache.kafka.streams.processor.internals.StreamTask.addRecords( > StreamTask.java:746) > > ~[kafka-streams-5.3.0-ccs.jar!/:na] > >at > > org.apache.kafka.streams.processor.internals.StreamThread. > addRecordsToTasks(StreamThread.java:1023) > > ~[kafka-streams-5.3.0-ccs.jar!/:na] > >at > > org.apache.kafka.streams.processor.internals.StreamThread.runOnce( > StreamThread.java:861) > > ~[kafka-streams-5.3.0-ccs.jar!/:na] > >at > > org.apache.kafka.streams.processor.internals.StreamThread.runLoop( >
Re: Avro DeSerializeation Issue in Kafka Streams
When specific.avro.reader is set to true Deserializer tries to create the instance of the Class. The class name is formed by reading the schema (writer schema) from schema registry and concatenating the namespace and record name. It is trying to create that instance and it is not found in the class path. But I am not sure how it formed the name XYZ-Table (Check the namespace and name of the record in the schema registry and making the class available in the class path should solve it )This is my understanding. I may be wrong!! Nagendra > On May 5, 2020, at 11:12 AM, Suresh Chidambaram > wrote: > > Hi All, > > Currently, I'm working on a usecase wherein I have to deserialie an Avro > object and convert to some other format of Avro. Below is the flow. > > DB -> Source Topic(Avro format) -> Stream Processor -> Target Topic (Avro > as nested object). > > When I deserialize the message from the Source Topic, the below exception > is thrown. > > Could someone help me resolving this issue? > > 2020-05-05 10:29:34.218 INFO 13804 --- [-StreamThread-1] > o.a.k.clients.consumer.KafkaConsumer : [Consumer > clientId=confluent-kafka-poc-client-StreamThread-1-restore-consumer, > groupId=null] Unsubscribed all topics or patterns and assigned partitions > 2020-05-05 10:29:34.218 INFO 13804 --- [-StreamThread-1] > o.a.k.clients.consumer.KafkaConsumer : [Consumer > clientId=confluent-kafka-poc-client-StreamThread-1-restore-consumer, > groupId=null] Unsubscribed all topics or patterns and assigned partitions > 2020-05-05 10:29:34.218 INFO 13804 --- [-StreamThread-1] > o.a.k.s.p.internals.StreamThread : stream-thread > [confluent-kafka-poc-client-StreamThread-1] State transition from > PARTITIONS_ASSIGNED to RUNNING > 2020-05-05 10:29:34.219 INFO 13804 --- [-StreamThread-1] > org.apache.kafka.streams.KafkaStreams: stream-client > [confluent-kafka-poc-client] State transition from REBALANCING to RUNNING > 2020-05-05 10:29:34.220 INFO 13804 --- [-StreamThread-1] > o.a.k.c.c.internals.ConsumerCoordinator : [Consumer > clientId=confluent-kafka-poc-client-StreamThread-1-consumer, > groupId=confluent-kafka-poc] Found no committed offset for partition > DEMO-poc-0 > 2020-05-05 10:29:34.228 INFO 13804 --- [-StreamThread-1] > o.a.k.c.c.internals.SubscriptionState: [Consumer > clientId=confluent-kafka-poc-client-StreamThread-1-consumer, > groupId=confluent-kafka-poc] Resetting offset for partition DEMO-poc-0 to > offset 0. > 2020-05-05 10:30:12.886 ERROR 13804 --- [-StreamThread-1] > o.a.k.s.e.LogAndFailExceptionHandler : Exception caught during > Deserialization, taskId: 0_0, topic: DEMO-poc, partition: 0, offset: 0 > > org.apache.kafka.common.errors.SerializationException: Error deserializing > Avro message for id 1421 > > *Caused by: org.apache.kafka.common.errors.SerializationException: Could > not find class "XYZ-Table" specified in writer's schema whilst finding > reader's schema for a SpecificRecord.* > 2020-05-05 10:30:12.888 ERROR 13804 --- [-StreamThread-1] > o.a.k.s.p.internals.StreamThread : stream-thread > [confluent-kafka-poc-client-StreamThread-1] Encountered the following > unexpected Kafka exception during processing, this usually indicate Streams > internal errors: > > org.apache.kafka.streams.errors.StreamsException: Deserialization exception > handler is set to fail upon a deserialization error. If you would rather > have the streaming pipeline continue after a deserialization error, please > set the default.deserialization.exception.handler appropriately. >at > org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:80) > ~[kafka-streams-5.3.0-ccs.jar!/:na] >at > org.apache.kafka.streams.processor.internals.RecordQueue.updateHead(RecordQueue.java:158) > ~[kafka-streams-5.3.0-ccs.jar!/:na] >at > org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:100) > ~[kafka-streams-5.3.0-ccs.jar!/:na] >at > org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:136) > ~[kafka-streams-5.3.0-ccs.jar!/:na] >at > org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:746) > ~[kafka-streams-5.3.0-ccs.jar!/:na] >at > org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:1023) > ~[kafka-streams-5.3.0-ccs.jar!/:na] >at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:861) > ~[kafka-streams-5.3.0-ccs.jar!/:na] >at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:805) > ~[kafka-streams-5.3.0-ccs.jar!/:na] >at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:774) > ~[kafka-streams-5.3.0-ccs.jar!/:na] > Caused by: org.apache.kafka.common.errors.SerializationException: Error > deserializing Avro message for id 1421 >
Avro DeSerializeation Issue in Kafka Streams
Hi All, Currently, I'm working on a usecase wherein I have to deserialie an Avro object and convert to some other format of Avro. Below is the flow. DB -> Source Topic(Avro format) -> Stream Processor -> Target Topic (Avro as nested object). When I deserialize the message from the Source Topic, the below exception is thrown. Could someone help me resolving this issue? 2020-05-05 10:29:34.218 INFO 13804 --- [-StreamThread-1] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=confluent-kafka-poc-client-StreamThread-1-restore-consumer, groupId=null] Unsubscribed all topics or patterns and assigned partitions 2020-05-05 10:29:34.218 INFO 13804 --- [-StreamThread-1] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=confluent-kafka-poc-client-StreamThread-1-restore-consumer, groupId=null] Unsubscribed all topics or patterns and assigned partitions 2020-05-05 10:29:34.218 INFO 13804 --- [-StreamThread-1] o.a.k.s.p.internals.StreamThread : stream-thread [confluent-kafka-poc-client-StreamThread-1] State transition from PARTITIONS_ASSIGNED to RUNNING 2020-05-05 10:29:34.219 INFO 13804 --- [-StreamThread-1] org.apache.kafka.streams.KafkaStreams: stream-client [confluent-kafka-poc-client] State transition from REBALANCING to RUNNING 2020-05-05 10:29:34.220 INFO 13804 --- [-StreamThread-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=confluent-kafka-poc-client-StreamThread-1-consumer, groupId=confluent-kafka-poc] Found no committed offset for partition DEMO-poc-0 2020-05-05 10:29:34.228 INFO 13804 --- [-StreamThread-1] o.a.k.c.c.internals.SubscriptionState: [Consumer clientId=confluent-kafka-poc-client-StreamThread-1-consumer, groupId=confluent-kafka-poc] Resetting offset for partition DEMO-poc-0 to offset 0. 2020-05-05 10:30:12.886 ERROR 13804 --- [-StreamThread-1] o.a.k.s.e.LogAndFailExceptionHandler : Exception caught during Deserialization, taskId: 0_0, topic: DEMO-poc, partition: 0, offset: 0 org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id 1421 *Caused by: org.apache.kafka.common.errors.SerializationException: Could not find class "XYZ-Table" specified in writer's schema whilst finding reader's schema for a SpecificRecord.* 2020-05-05 10:30:12.888 ERROR 13804 --- [-StreamThread-1] o.a.k.s.p.internals.StreamThread : stream-thread [confluent-kafka-poc-client-StreamThread-1] Encountered the following unexpected Kafka exception during processing, this usually indicate Streams internal errors: org.apache.kafka.streams.errors.StreamsException: Deserialization exception handler is set to fail upon a deserialization error. If you would rather have the streaming pipeline continue after a deserialization error, please set the default.deserialization.exception.handler appropriately. at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:80) ~[kafka-streams-5.3.0-ccs.jar!/:na] at org.apache.kafka.streams.processor.internals.RecordQueue.updateHead(RecordQueue.java:158) ~[kafka-streams-5.3.0-ccs.jar!/:na] at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:100) ~[kafka-streams-5.3.0-ccs.jar!/:na] at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:136) ~[kafka-streams-5.3.0-ccs.jar!/:na] at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:746) ~[kafka-streams-5.3.0-ccs.jar!/:na] at org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:1023) ~[kafka-streams-5.3.0-ccs.jar!/:na] at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:861) ~[kafka-streams-5.3.0-ccs.jar!/:na] at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:805) ~[kafka-streams-5.3.0-ccs.jar!/:na] at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:774) ~[kafka-streams-5.3.0-ccs.jar!/:na] Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id 1421 Caused by: org.apache.kafka.common.errors.SerializationException: Could not find class "XYZ-Table" specified in writer's schema whilst finding reader's schema for a SpecificRecord. 2020-05-05 10:30:12.888 INFO 13804 --- [-StreamThread-1] o.a.k.s.p.internals.StreamThread : stream-thread [confluent-kafka-poc-client-StreamThread-1] State transition from RUNNING to PENDING_SHUTDOWN 2020-05-05 10:30:12.888 INFO 13804 --- [-StreamThread-1] o.a.k.s.p.internals.StreamThread : stream-thread [confluent-kafka-poc-client-StreamThread-1] Shutting down 2020-05-05 10:30:12.891 INFO 13804 --- [-StreamThread-1] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=confluent-kafka-poc-client-StreamThread-1-restore-consumer, groupId=null] Unsubscribed all topics or