if you based your problematic code on the PipeDemo example, then you should
have these two lines in your code (which most probably you haven't changed):


This configures your application to interpret (= encode/decode), by
default, the keys and values of any messages it reads from Kafka as
strings.  This works for the PipeDemo example because the keys and values
are actually strings.

In your application, however, you do:

   KStream<String, KafkaPayload> kafkaPayloadStream =

This won't work, because `builder.stream()`, when calling it without
explicit serdes, will use the default serdes configured for your
application.  So `builder.stream(sourceTopics)` will give you
`KStream<String, String>`, not `KStream<String, KafkaPayload>`.  Also, you
can't just cast a String to KafkaPayload to "fix" the problem;  if you
attempt to do so you run into the ClassCastException that you reported

What you need to do fix your problem is:

1. Provide a proper serde for `KafkaPayload`.  See
There are also example implementations of such custom serdes at [1] and [2].

Once you have that, you can e.g. write:

    final Serde<String> stringSerde = Serdes.String(); // provided by Kafka
    final Serde<KafkaPayload> kafkaPayloadSerde = ...; // must be provided
by you!

2.  Call `builder.stream()` with explicit serdes to overrides the default
serdes.  stringSerde is for the keys, kafkaPayloadSerde is for the values.

    KStream<String, KafkaPayload> kafkaPayloadStream =
builder.stream(stringSerde, kafkaPayloadSerde, sourceTopics);

That should do it.

Lastly, you must think about serialization also when calling `to()` or


If you haven't changed to default key and value serdes, then `to()` will
fail because it will by default (in your app configuration) interpret
message values still as strings rather than KafkaPayload.  To fix this you
should call:

    kafkaPayloadStream.to(stringSerde, kafkaPayloadSerde, targetTopic);

You need to override the default serdes whenever the data must be written
with, well, non-default serdes.

I'd recommend reading
to better understand how this works.

Hope this helps,


On Tue, Oct 11, 2016 at 7:38 AM, Ratha v <vijayara...@gmail.com> wrote:

> I checked my target topic and I see few messages than the source topic. (If
> source topic have 5 messages, I see 2 messages in my target topic) What
> settings I need to do ?
> And, when I try to consume message from the target topic, I get ClassCast
> Exception.
> java.lang.ClassCastException: java.lang.String cannot be cast to
> xx.yy.core.kafkamodels.KafkaPayload;
> * receivedPayload = (KafkaPayload) consumerRecord.value();*
> I Merge two topics like;
> * KStreamBuilder builder = new KStreamBuilder();*
> * KStream<String, KafkaPayload> kafkaPayloadStream =
> builder.stream(sourceTopics);*
> * kafkaPayloadStream.to(targetTopic);*
> * streams = new KafkaStreams(builder, properties);*
> * streams.start();*
> Why do I see classcast exception when consuming the message?
> On 11 October 2016 at 15:19, Ratha v <vijayara...@gmail.com> wrote:
> > Hi all;
> > I have custom datatype defined (a pojo class).
> > I copy  messages from one topic to another topic.
> > I do not see any messages in my target topic.
> > This works fro string messages, but not for my custom message.
> > Waht might be the cause?
> > I followed this sample [1]
> > [1]
> > https://github.com/apache/kafka/blob/trunk/streams/
> > examples/src/main/java/org/apache/kafka/streams/examples/
> > pipe/PipeDemo.java
> >
> >
> > --
> > -Ratha
> > http://vvratha.blogspot.com/
> >
> --
> -Ratha
> http://vvratha.blogspot.com/

Reply via email to