Re: Avro DeSerializeation Issue in Kafka Streams
Thanks for the info Nagendra. Thanks C Suresh On Wednesday, May 6, 2020, Nagendra Korrapati wrote: > When specific.avro.reader is set to true Deserializer tries to create the > instance of the Class. The class name is formed by reading the schema > (writer schema) from schema registry and concatenating the namespace and > record name. It is trying to create that instance and it is not found in > the class path. But I am not sure how it formed the name XYZ-Table (Check > the namespace and name of the record in the schema registry and making the > class available in the class path should solve it )This is my > understanding. I may be wrong!! > > Nagendra > > > On May 5, 2020, at 11:12 AM, Suresh Chidambaram > wrote: > > > > Hi All, > > > > Currently, I'm working on a usecase wherein I have to deserialie an Avro > > object and convert to some other format of Avro. Below is the flow. > > > > DB -> Source Topic(Avro format) -> Stream Processor -> Target Topic (Avro > > as nested object). > > > > When I deserialize the message from the Source Topic, the below exception > > is thrown. > > > > Could someone help me resolving this issue? > > > > 2020-05-05 10:29:34.218 INFO 13804 --- [-StreamThread-1] > > o.a.k.clients.consumer.KafkaConsumer : [Consumer > > clientId=confluent-kafka-poc-client-StreamThread-1-restore-consumer, > > groupId=null] Unsubscribed all topics or patterns and assigned partitions > > 2020-05-05 10:29:34.218 INFO 13804 --- [-StreamThread-1] > > o.a.k.clients.consumer.KafkaConsumer : [Consumer > > clientId=confluent-kafka-poc-client-StreamThread-1-restore-consumer, > > groupId=null] Unsubscribed all topics or patterns and assigned partitions > > 2020-05-05 10:29:34.218 INFO 13804 --- [-StreamThread-1] > > o.a.k.s.p.internals.StreamThread : stream-thread > > [confluent-kafka-poc-client-StreamThread-1] State transition from > > PARTITIONS_ASSIGNED to RUNNING > > 2020-05-05 10:29:34.219 INFO 13804 --- [-StreamThread-1] > > org.apache.kafka.streams.KafkaStreams: stream-client > > [confluent-kafka-poc-client] State transition from REBALANCING to RUNNING > > 2020-05-05 10:29:34.220 INFO 13804 --- [-StreamThread-1] > > o.a.k.c.c.internals.ConsumerCoordinator : [Consumer > > clientId=confluent-kafka-poc-client-StreamThread-1-consumer, > > groupId=confluent-kafka-poc] Found no committed offset for partition > > DEMO-poc-0 > > 2020-05-05 10:29:34.228 INFO 13804 --- [-StreamThread-1] > > o.a.k.c.c.internals.SubscriptionState: [Consumer > > clientId=confluent-kafka-poc-client-StreamThread-1-consumer, > > groupId=confluent-kafka-poc] Resetting offset for partition DEMO-poc-0 to > > offset 0. > > 2020-05-05 10:30:12.886 ERROR 13804 --- [-StreamThread-1] > > o.a.k.s.e.LogAndFailExceptionHandler : Exception caught during > > Deserialization, taskId: 0_0, topic: DEMO-poc, partition: 0, offset: 0 > > > > org.apache.kafka.common.errors.SerializationException: Error > deserializing > > Avro message for id 1421 > > > > *Caused by: org.apache.kafka.common.errors.SerializationException: Could > > not find class "XYZ-Table" specified in writer's schema whilst finding > > reader's schema for a SpecificRecord.* > > 2020-05-05 10:30:12.888 ERROR 13804 --- [-StreamThread-1] > > o.a.k.s.p.internals.StreamThread : stream-thread > > [confluent-kafka-poc-client-StreamThread-1] Encountered the following > > unexpected Kafka exception during processing, this usually indicate > Streams > > internal errors: > > > > org.apache.kafka.streams.errors.StreamsException: Deserialization > exception > > handler is set to fail upon a deserialization error. If you would rather > > have the streaming pipeline continue after a deserialization error, > please > > set the default.deserialization.exception.handler appropriately. > >at > > org.apache.kafka.streams.processor.internals.RecordDeserializer. > deserialize(RecordDeserializer.java:80) > > ~[kafka-streams-5.3.0-ccs.jar!/:na] > >at > > org.apache.kafka.streams.processor.internals.RecordQueue.updateHead( > RecordQueue.java:158) > > ~[kafka-streams-5.3.0-ccs.jar!/:na] > >at > > org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords( > RecordQueue.java:100) > > ~[kafka-streams-5.3.0-ccs.jar!/:na] > >at > > org.apache.kafka.streams.processor.internals. > PartitionGroup.addRawRecords(PartitionGroup.java:136) > > ~[kafka-streams-5.3.0-ccs.jar!/:na] > >
Avro DeSerializeation Issue in Kafka Streams
Hi All, Currently, I'm working on a usecase wherein I have to deserialie an Avro object and convert to some other format of Avro. Below is the flow. DB -> Source Topic(Avro format) -> Stream Processor -> Target Topic (Avro as nested object). When I deserialize the message from the Source Topic, the below exception is thrown. Could someone help me resolving this issue? 2020-05-05 10:29:34.218 INFO 13804 --- [-StreamThread-1] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=confluent-kafka-poc-client-StreamThread-1-restore-consumer, groupId=null] Unsubscribed all topics or patterns and assigned partitions 2020-05-05 10:29:34.218 INFO 13804 --- [-StreamThread-1] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=confluent-kafka-poc-client-StreamThread-1-restore-consumer, groupId=null] Unsubscribed all topics or patterns and assigned partitions 2020-05-05 10:29:34.218 INFO 13804 --- [-StreamThread-1] o.a.k.s.p.internals.StreamThread : stream-thread [confluent-kafka-poc-client-StreamThread-1] State transition from PARTITIONS_ASSIGNED to RUNNING 2020-05-05 10:29:34.219 INFO 13804 --- [-StreamThread-1] org.apache.kafka.streams.KafkaStreams: stream-client [confluent-kafka-poc-client] State transition from REBALANCING to RUNNING 2020-05-05 10:29:34.220 INFO 13804 --- [-StreamThread-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=confluent-kafka-poc-client-StreamThread-1-consumer, groupId=confluent-kafka-poc] Found no committed offset for partition DEMO-poc-0 2020-05-05 10:29:34.228 INFO 13804 --- [-StreamThread-1] o.a.k.c.c.internals.SubscriptionState: [Consumer clientId=confluent-kafka-poc-client-StreamThread-1-consumer, groupId=confluent-kafka-poc] Resetting offset for partition DEMO-poc-0 to offset 0. 2020-05-05 10:30:12.886 ERROR 13804 --- [-StreamThread-1] o.a.k.s.e.LogAndFailExceptionHandler : Exception caught during Deserialization, taskId: 0_0, topic: DEMO-poc, partition: 0, offset: 0 org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id 1421 *Caused by: org.apache.kafka.common.errors.SerializationException: Could not find class "XYZ-Table" specified in writer's schema whilst finding reader's schema for a SpecificRecord.* 2020-05-05 10:30:12.888 ERROR 13804 --- [-StreamThread-1] o.a.k.s.p.internals.StreamThread : stream-thread [confluent-kafka-poc-client-StreamThread-1] Encountered the following unexpected Kafka exception during processing, this usually indicate Streams internal errors: org.apache.kafka.streams.errors.StreamsException: Deserialization exception handler is set to fail upon a deserialization error. If you would rather have the streaming pipeline continue after a deserialization error, please set the default.deserialization.exception.handler appropriately. at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:80) ~[kafka-streams-5.3.0-ccs.jar!/:na] at org.apache.kafka.streams.processor.internals.RecordQueue.updateHead(RecordQueue.java:158) ~[kafka-streams-5.3.0-ccs.jar!/:na] at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:100) ~[kafka-streams-5.3.0-ccs.jar!/:na] at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:136) ~[kafka-streams-5.3.0-ccs.jar!/:na] at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:746) ~[kafka-streams-5.3.0-ccs.jar!/:na] at org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:1023) ~[kafka-streams-5.3.0-ccs.jar!/:na] at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:861) ~[kafka-streams-5.3.0-ccs.jar!/:na] at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:805) ~[kafka-streams-5.3.0-ccs.jar!/:na] at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:774) ~[kafka-streams-5.3.0-ccs.jar!/:na] Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id 1421 Caused by: org.apache.kafka.common.errors.SerializationException: Could not find class "XYZ-Table" specified in writer's schema whilst finding reader's schema for a SpecificRecord. 2020-05-05 10:30:12.888 INFO 13804 --- [-StreamThread-1] o.a.k.s.p.internals.StreamThread : stream-thread [confluent-kafka-poc-client-StreamThread-1] State transition from RUNNING to PENDING_SHUTDOWN 2020-05-05 10:30:12.888 INFO 13804 --- [-StreamThread-1] o.a.k.s.p.internals.StreamThread : stream-thread [confluent-kafka-poc-client-StreamThread-1] Shutting down 2020-05-05 10:30:12.891 INFO 13804 --- [-StreamThread-1] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=confluent-kafka-poc-client-StreamThread-1-restore-consumer, groupId=null] Unsubscribed all topics or
Re: Apache Kafka Streams with Sprinng Boot
Sure Khaja. Thanks C Suresh On Tuesday, April 28, 2020, KhajaAsmath Mohammed wrote: > Hello Suresh, > > I am also looking for the same. Let me know if you find anything > > Sent from my iPhone > > > On Apr 28, 2020, at 8:25 PM, Suresh Chidambaram > wrote: > > > > Hi Team, > > > > Greetings. > > > > I have been looking for an example application which uses Kafka Streams > > with Spring Boot, but I'm unable to find one in the internet. Could > > someone help me by providing the code? > > > > Thanks > > C Suresh >
Apache Kafka Streams with Sprinng Boot
Hi Team, Greetings. I have been looking for an example application which uses Kafka Streams with Spring Boot, but I'm unable to find one in the internet. Could someone help me by providing the code? Thanks C Suresh
Clarification regarding multi topics implementation
Hi Team, Greetings. I have a use-case wherein I have to consume messages from multiple topics using Kafka and process it using Kafka Streams, then publish the message to multiple target topics. The example is below. Source topic A - process A - target topic A Source topic B - process B - target topic B Could someone help me achieving this solution? I have to use Spring Boot with Kafka Streams for this solution. Thank you. C Suresh