When I wrote:

    "If you haven't changed to default key and value serdes, then `to()`
will fail because [...]"

it should have read:

    "If you haven't changed the default key and value serdes, then `to()`
will fail because [...]"



On Tue, Oct 11, 2016 at 11:12 AM, Michael Noll <mich...@confluent.io> wrote:

> Ratha,
>
> if you based your problematic code on the PipeDemo example, then you
> should have these two lines in your code (which most probably you haven't
> changed):
>
>     props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
> Serdes.String().getClass());
>     props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
> Serdes.String().getClass());
>
> This configures your application to interpret (= encode/decode), by
> default, the keys and values of any messages it reads from Kafka as
> strings.  This works for the PipeDemo example because the keys and values
> are actually strings.
>
> In your application, however, you do:
>
>    KStream<String, KafkaPayload> kafkaPayloadStream =
> builder.stream(sourceTopics);
>
> This won't work, because `builder.stream()`, when calling it without
> explicit serdes, will use the default serdes configured for your
> application.  So `builder.stream(sourceTopics)` will give you
> `KStream<String, String>`, not `KStream<String, KafkaPayload>`.  Also, you
> can't just cast a String to KafkaPayload to "fix" the problem;  if you
> attempt to do so you run into the ClassCastException that you reported
> below.
>
> What you need to do fix your problem is:
>
> 1. Provide a proper serde for `KafkaPayload`.  See
> http://docs.confluent.io/current/streams/developer-
> guide.html#implementing-custom-serializers-deserializers-serdes.  There
> are also example implementations of such custom serdes at [1] and [2].
>
> Once you have that, you can e.g. write:
>
>     final Serde<String> stringSerde = Serdes.String(); // provided by Kafka
>     final Serde<KafkaPayload> kafkaPayloadSerde = ...; // must be provided
> by you!
>
> 2.  Call `builder.stream()` with explicit serdes to overrides the default
> serdes.  stringSerde is for the keys, kafkaPayloadSerde is for the values.
>
>     KStream<String, KafkaPayload> kafkaPayloadStream =
> builder.stream(stringSerde, kafkaPayloadSerde, sourceTopics);
>
> That should do it.
>
> Lastly, you must think about serialization also when calling `to()` or
> `through()`:
>
>     kafkaPayloadStream.to(targetTopic);
>
> If you haven't changed to default key and value serdes, then `to()` will
> fail because it will by default (in your app configuration) interpret
> message values still as strings rather than KafkaPayload.  To fix this you
> should call:
>
>     kafkaPayloadStream.to(stringSerde, kafkaPayloadSerde, targetTopic);
>
> You need to override the default serdes whenever the data must be written
> with, well, non-default serdes.
>
> I'd recommend reading http://docs.confluent.io/current/streams/developer-
> guide.html#data-types-and-serialization to better understand how this
> works.
>
>
> Hope this helps,
> Michael
>
>
>
> [1] http://docs.confluent.io/current/streams/developer-
> guide.html#available-serializers-deserializers-serdes
> [2] https://github.com/confluentinc/examples/tree/
> kafka-0.10.0.1-cp-3.0.1/kafka-streams/src/main/java/io/
> confluent/examples/streams/utils
>
>
>
>
> On Tue, Oct 11, 2016 at 7:38 AM, Ratha v <vijayara...@gmail.com> wrote:
>
>> I checked my target topic and I see few messages than the source topic.
>> (If
>> source topic have 5 messages, I see 2 messages in my target topic) What
>> settings I need to do ?
>>
>> And, when I try to consume message from the target topic, I get ClassCast
>> Exception.
>>
>> java.lang.ClassCastException: java.lang.String cannot be cast to
>> xx.yy.core.kafkamodels.KafkaPayload;
>>
>> * receivedPayload = (KafkaPayload) consumerRecord.value();*
>>
>>
>> I Merge two topics like;
>>
>> * KStreamBuilder builder = new KStreamBuilder();*
>>
>> * KStream<String, KafkaPayload> kafkaPayloadStream =
>> builder.stream(sourceTopics);*
>>
>> * kafkaPayloadStream.to(targetTopic);*
>>
>> * streams = new KafkaStreams(builder, properties);*
>>
>> * streams.start();*
>>
>>
>> Why do I see classcast exception when consuming the message?
>>
>>
>> On 11 October 2016 at 15:19, Ratha v <vijayara...@gmail.com> wrote:
>>
>> > Hi all;
>> > I have custom datatype defined (a pojo class).
>> > I copy  messages from one topic to another topic.
>> > I do not see any messages in my target topic.
>> > This works fro string messages, but not for my custom message.
>> > Waht might be the cause?
>> > I followed this sample [1]
>> > [1]
>> > https://github.com/apache/kafka/blob/trunk/streams/
>> > examples/src/main/java/org/apache/kafka/streams/examples/
>> > pipe/PipeDemo.java
>> >
>> >
>> > --
>> > -Ratha
>> > http://vvratha.blogspot.com/
>> >
>>
>>
>>
>> --
>> -Ratha
>> http://vvratha.blogspot.com/
>>
>
>
>

Reply via email to