Re: Avro DeSerializeation Issue in Kafka Streams

2020-05-06 Thread Suresh Chidambaram
Thanks for the info Nagendra.

Thanks
C Suresh

On Wednesday, May 6, 2020, Nagendra Korrapati 
wrote:

>  When specific.avro.reader is set to true Deserializer tries to create the
> instance of the Class. The class name is formed by reading the schema
> (writer schema) from schema registry and concatenating the namespace and
> record name. It is trying to create that instance and it is not found in
> the class path. But I am not sure how it formed the name XYZ-Table (Check
> the namespace and name of the record in the schema registry and making the
> class available in the class path should solve it )This is my
> understanding. I may be wrong!!
>
> Nagendra
>
> > On May 5, 2020, at 11:12 AM, Suresh Chidambaram 
> wrote:
> >
> > Hi All,
> >
> > Currently, I'm working on a usecase wherein I have to deserialie an Avro
> > object and convert to some other format of Avro. Below is the  flow.
> >
> > DB -> Source Topic(Avro format) -> Stream Processor -> Target Topic (Avro
> > as nested object).
> >
> > When I deserialize the message from the Source Topic, the below exception
> > is thrown.
> >
> > Could someone help me resolving this issue?
> >
> > 2020-05-05 10:29:34.218  INFO 13804 --- [-StreamThread-1]
> > o.a.k.clients.consumer.KafkaConsumer : [Consumer
> > clientId=confluent-kafka-poc-client-StreamThread-1-restore-consumer,
> > groupId=null] Unsubscribed all topics or patterns and assigned partitions
> > 2020-05-05 10:29:34.218  INFO 13804 --- [-StreamThread-1]
> > o.a.k.clients.consumer.KafkaConsumer : [Consumer
> > clientId=confluent-kafka-poc-client-StreamThread-1-restore-consumer,
> > groupId=null] Unsubscribed all topics or patterns and assigned partitions
> > 2020-05-05 10:29:34.218  INFO 13804 --- [-StreamThread-1]
> > o.a.k.s.p.internals.StreamThread : stream-thread
> > [confluent-kafka-poc-client-StreamThread-1] State transition from
> > PARTITIONS_ASSIGNED to RUNNING
> > 2020-05-05 10:29:34.219  INFO 13804 --- [-StreamThread-1]
> > org.apache.kafka.streams.KafkaStreams: stream-client
> > [confluent-kafka-poc-client] State transition from REBALANCING to RUNNING
> > 2020-05-05 10:29:34.220  INFO 13804 --- [-StreamThread-1]
> > o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer
> > clientId=confluent-kafka-poc-client-StreamThread-1-consumer,
> > groupId=confluent-kafka-poc] Found no committed offset for partition
> > DEMO-poc-0
> > 2020-05-05 10:29:34.228  INFO 13804 --- [-StreamThread-1]
> > o.a.k.c.c.internals.SubscriptionState: [Consumer
> > clientId=confluent-kafka-poc-client-StreamThread-1-consumer,
> > groupId=confluent-kafka-poc] Resetting offset for partition DEMO-poc-0 to
> > offset 0.
> > 2020-05-05 10:30:12.886 ERROR 13804 --- [-StreamThread-1]
> > o.a.k.s.e.LogAndFailExceptionHandler : Exception caught during
> > Deserialization, taskId: 0_0, topic: DEMO-poc, partition: 0, offset: 0
> >
> > org.apache.kafka.common.errors.SerializationException: Error
> deserializing
> > Avro message for id 1421
> >
> > *Caused by: org.apache.kafka.common.errors.SerializationException: Could
> > not find class "XYZ-Table" specified in writer's schema whilst finding
> > reader's schema for a SpecificRecord.*
> > 2020-05-05 10:30:12.888 ERROR 13804 --- [-StreamThread-1]
> > o.a.k.s.p.internals.StreamThread : stream-thread
> > [confluent-kafka-poc-client-StreamThread-1] Encountered the following
> > unexpected Kafka exception during processing, this usually indicate
> Streams
> > internal errors:
> >
> > org.apache.kafka.streams.errors.StreamsException: Deserialization
> exception
> > handler is set to fail upon a deserialization error. If you would rather
> > have the streaming pipeline continue after a deserialization error,
> please
> > set the default.deserialization.exception.handler appropriately.
> >at
> > org.apache.kafka.streams.processor.internals.RecordDeserializer.
> deserialize(RecordDeserializer.java:80)
> > ~[kafka-streams-5.3.0-ccs.jar!/:na]
> >at
> > org.apache.kafka.streams.processor.internals.RecordQueue.updateHead(
> RecordQueue.java:158)
> > ~[kafka-streams-5.3.0-ccs.jar!/:na]
> >at
> > org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(
> RecordQueue.java:100)
> > ~[kafka-streams-5.3.0-ccs.jar!/:na]
> >at
> > org.apache.kafka.streams.processor.internals.
> PartitionGroup.addRawRecords(PartitionGroup.java:136)
> > ~[kafka-streams-5.3.0-ccs.jar!/:na]
> >

Avro DeSerializeation Issue in Kafka Streams

2020-05-05 Thread Suresh Chidambaram
Hi All,

Currently, I'm working on a usecase wherein I have to deserialie an Avro
object and convert to some other format of Avro. Below is the  flow.

DB -> Source Topic(Avro format) -> Stream Processor -> Target Topic (Avro
as nested object).

When I deserialize the message from the Source Topic, the below exception
is thrown.

Could someone help me resolving this issue?

2020-05-05 10:29:34.218  INFO 13804 --- [-StreamThread-1]
o.a.k.clients.consumer.KafkaConsumer : [Consumer
clientId=confluent-kafka-poc-client-StreamThread-1-restore-consumer,
groupId=null] Unsubscribed all topics or patterns and assigned partitions
2020-05-05 10:29:34.218  INFO 13804 --- [-StreamThread-1]
o.a.k.clients.consumer.KafkaConsumer : [Consumer
clientId=confluent-kafka-poc-client-StreamThread-1-restore-consumer,
groupId=null] Unsubscribed all topics or patterns and assigned partitions
2020-05-05 10:29:34.218  INFO 13804 --- [-StreamThread-1]
o.a.k.s.p.internals.StreamThread : stream-thread
[confluent-kafka-poc-client-StreamThread-1] State transition from
PARTITIONS_ASSIGNED to RUNNING
2020-05-05 10:29:34.219  INFO 13804 --- [-StreamThread-1]
org.apache.kafka.streams.KafkaStreams: stream-client
[confluent-kafka-poc-client] State transition from REBALANCING to RUNNING
2020-05-05 10:29:34.220  INFO 13804 --- [-StreamThread-1]
o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer
clientId=confluent-kafka-poc-client-StreamThread-1-consumer,
groupId=confluent-kafka-poc] Found no committed offset for partition
DEMO-poc-0
2020-05-05 10:29:34.228  INFO 13804 --- [-StreamThread-1]
o.a.k.c.c.internals.SubscriptionState: [Consumer
clientId=confluent-kafka-poc-client-StreamThread-1-consumer,
groupId=confluent-kafka-poc] Resetting offset for partition DEMO-poc-0 to
offset 0.
2020-05-05 10:30:12.886 ERROR 13804 --- [-StreamThread-1]
o.a.k.s.e.LogAndFailExceptionHandler : Exception caught during
Deserialization, taskId: 0_0, topic: DEMO-poc, partition: 0, offset: 0

org.apache.kafka.common.errors.SerializationException: Error deserializing
Avro message for id 1421

*Caused by: org.apache.kafka.common.errors.SerializationException: Could
not find class "XYZ-Table" specified in writer's schema whilst finding
reader's schema for a SpecificRecord.*
2020-05-05 10:30:12.888 ERROR 13804 --- [-StreamThread-1]
o.a.k.s.p.internals.StreamThread : stream-thread
[confluent-kafka-poc-client-StreamThread-1] Encountered the following
unexpected Kafka exception during processing, this usually indicate Streams
internal errors:

org.apache.kafka.streams.errors.StreamsException: Deserialization exception
handler is set to fail upon a deserialization error. If you would rather
have the streaming pipeline continue after a deserialization error, please
set the default.deserialization.exception.handler appropriately.
at
org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:80)
~[kafka-streams-5.3.0-ccs.jar!/:na]
at
org.apache.kafka.streams.processor.internals.RecordQueue.updateHead(RecordQueue.java:158)
~[kafka-streams-5.3.0-ccs.jar!/:na]
at
org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:100)
~[kafka-streams-5.3.0-ccs.jar!/:na]
at
org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:136)
~[kafka-streams-5.3.0-ccs.jar!/:na]
at
org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:746)
~[kafka-streams-5.3.0-ccs.jar!/:na]
at
org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:1023)
~[kafka-streams-5.3.0-ccs.jar!/:na]
at
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:861)
~[kafka-streams-5.3.0-ccs.jar!/:na]
at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:805)
~[kafka-streams-5.3.0-ccs.jar!/:na]
at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:774)
~[kafka-streams-5.3.0-ccs.jar!/:na]
Caused by: org.apache.kafka.common.errors.SerializationException: Error
deserializing Avro message for id 1421
Caused by: org.apache.kafka.common.errors.SerializationException: Could not
find class "XYZ-Table" specified in writer's schema whilst finding reader's
schema for a SpecificRecord.

2020-05-05 10:30:12.888  INFO 13804 --- [-StreamThread-1]
o.a.k.s.p.internals.StreamThread : stream-thread
[confluent-kafka-poc-client-StreamThread-1] State transition from RUNNING
to PENDING_SHUTDOWN
2020-05-05 10:30:12.888  INFO 13804 --- [-StreamThread-1]
o.a.k.s.p.internals.StreamThread : stream-thread
[confluent-kafka-poc-client-StreamThread-1] Shutting down
2020-05-05 10:30:12.891  INFO 13804 --- [-StreamThread-1]
o.a.k.clients.consumer.KafkaConsumer : [Consumer
clientId=confluent-kafka-poc-client-StreamThread-1-restore-consumer,
groupId=null] Unsubscribed all topics or 

Re: Apache Kafka Streams with Sprinng Boot

2020-04-28 Thread Suresh Chidambaram
Sure Khaja.

Thanks
C Suresh

On Tuesday, April 28, 2020, KhajaAsmath Mohammed 
wrote:

> Hello Suresh,
>
> I am also looking for the same. Let me know if you find anything
>
> Sent from my iPhone
>
> > On Apr 28, 2020, at 8:25 PM, Suresh Chidambaram 
> wrote:
> >
> > Hi Team,
> >
> > Greetings.
> >
> > I have been looking for an example application which uses Kafka Streams
> > with Spring Boot, but I'm unable to find one in the internet.  Could
> > someone help me by providing the code?
> >
> > Thanks
> > C Suresh
>


Apache Kafka Streams with Sprinng Boot

2020-04-28 Thread Suresh Chidambaram
Hi Team,

Greetings.

I have been looking for an example application which uses Kafka Streams
with Spring Boot, but I'm unable to find one in the internet.  Could
someone help me by providing the code?

Thanks
C Suresh


Clarification regarding multi topics implementation

2020-04-22 Thread Suresh Chidambaram
Hi Team,

Greetings.

I have a use-case wherein I have to consume messages from multiple topics
using Kafka and process it using Kafka Streams,  then publish the message
to multiple target topics.

The example is below.

Source topic A - process A - target topic A
Source topic B - process B - target topic B

Could someone help me achieving this solution?

I have to use Spring Boot with Kafka Streams for this solution.

Thank you.
C Suresh