Sorry my fault, In the kafkaConsumer I messed with 'value.deserializer'
property..
Now things are working fine..
Thanks a lot.

On 12 October 2016 at 14:10, Ratha v <[email protected]> wrote:

> HI Michael;
> Sorry , after setting "auto.offset.reset"  to 'earliest' , I see messages
> in my 'targetTopic'.
> But still I get my class cast exception issue, when I consume message from
> the 'targetTopic'. (To consume message I use KafkaConsumer highlevel API)
>
> *ConsumerRecords<?, ?> records = consumer.poll(Long.MAX_VALUE);*
>
>
>
> *Exception*
>
> *java.lang.ClassCastException: java.lang.String cannot be cast to
> xxx.core.kafkamodels.KafkaPayload at
> xx.core.listener.KafkaMessageListener.receiveData(KafkaMessageListener.java:108)
> ~[classes/:?]*
>
> at xx.core.listener.KafkaMessageListenerThread.process(
> KafkaMessageListenerThread.java:68) ~[classes/:?]
>
> at xx.core.listener.KafkaMessageListenerThread.lambda$run$1(
> KafkaMessageListenerThread.java:50) ~[classes/:?]
>
> at java.lang.Iterable.forEach(Iterable.java:75) ~[?:1.8.0_66]
>
> at com.leightonobrien.core.listener.KafkaMessageListenerThread.run(
> KafkaMessageListenerThread.java:50) [classes/:?]
>
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> [?:1.8.0_66]
>
> at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_66]
>
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> [?:1.8.0_66]
>
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> [?:1.8.0_66]
>
> at java.lang.Thread.run(Thread.java:745) [?:1.8.0_66]
>
>
>
> On 12 October 2016 at 13:19, Ratha v <[email protected]> wrote:
>
>> HI Michael;
>>
>> Really appreciate for the clear explanation..
>> I modified my code as you mentioned. I have written custom, Serde,
>> serializer,deserializer.
>> But now the problem i see is, both topics are not merged. Means, Messages
>> in the 'sourcetopic' not to passed to 'targetTopic' . ('targetTopic has '0'
>> messages)
>> I do not see any exceptions.
>>
>> Here is my custom serde, serializer/deserializer and the logic; Also I
>> have properties file where i defined  following parameters;
>>
>> *bootstrap.servers=xx.com <http://xx.com>\:9092,xx.com
>> <http://xx.com>\:9092,xx.com <http://xx.com>\:9092*
>>
>> *key.serde=org.apache.kafka.com
>> <http://org.apache.kafka.com>mon.serialization.Serdes$StringSerde*
>>
>> *value.serde=xx.kafkamodels.KafkaPayloadSerdes$KafkaPayloadSerde*
>>
>> *application.id <http://application.id>=stream-pipe*
>>
>>
>> Do you see any issue here? Why messages are not written to ' targetTopic'?
>>
>>
>>
>> *LOGIC*
>>
>> /**
>>
>> * create stream from source topics and write it to the target topic
>>
>> * @param sourceTopics
>>
>> * @param targetTopic
>>
>> */
>>
>> public void write(String[] sourceTopics, String targetTopic) {
>>
>>      KafkaStreams streams = null;
>>
>>      KStreamBuilder builder = new KStreamBuilder();
>>
>>   try {
>>
>>            KStream<String, KafkaPayload> kafkaPayloadStream = builder
>> .stream(stringSerde, kafkaPayloadSerde, sourceTopics);
>>
>>            kafkaPayloadStream.to(stringSerde, kafkaPayloadSerde,
>> targetTopic);
>>
>>            streams = new KafkaStreams(builder, properties);
>>
>>            streams.start();
>>
>>            Thread.sleep(5000L);
>>
>>       } catch (InterruptedException e) {
>>
>>           log.warn(e);
>>
>>      } catch (Exception e) {
>>
>>          log.error("Topic merge failed. ",e);
>>
>>       } finally {
>>
>>            if (streams != null) {
>>
>>            streams.close();
>>
>>      }
>>
>> }
>>
>> }
>>
>>
>>
>>
>> *SERDE*
>>
>>
>> public class KafkaPayloadSerdes {
>>
>> static private class WrapperSerde<KafkaPayload> implements
>> Serde<KafkaPayload> {
>> final private Serializer<KafkaPayload> serializer;
>> final private Deserializer<KafkaPayload> deserializer;
>>
>> public WrapperSerde(Serializer<KafkaPayload> serializer,
>> Deserializer<KafkaPayload> deserializer) {
>> this.serializer = serializer;
>> this.deserializer = deserializer;
>> }
>>
>> @Override
>> public void configure(Map<String, ?> configs, boolean isKey) {
>> serializer.configure(configs, isKey);
>> deserializer.configure(configs, isKey);
>> }
>>
>> @Override
>> public void close() {
>> serializer.close();
>> deserializer.close();
>> }
>>
>> @Override
>> public Serializer<KafkaPayload> serializer() {
>> return serializer;
>> }
>>
>> @Override
>> public Deserializer<KafkaPayload> deserializer() {
>> return deserializer;
>> }
>> }
>>
>> static public final class KafkaPayloadSerde extends
>> WrapperSerde<KafkaPayload> {
>> public KafkaPayloadSerde() {
>> super(new KafkaPayloadSerializer(), new KafkaPayloadSerializer());
>> }
>> }
>>
>> /**
>> * A serde for nullable < KafkaPayload> type.
>> */
>> static public Serde<KafkaPayload> KafkaPayload() {
>> return new KafkaPayloadSerde();
>> }
>>
>> }
>>
>>
>> *Serilizer/Deserializer*
>>
>>
>>
>> public class KafkaPayloadSerializer implements Serializer<KafkaPayload>,
>> Deserializer<KafkaPayload> {
>>
>> private static final Logger log = org.apache.logging.log4j.LogManager
>> .getLogger(MethodHandles.lookup().lookupClass().getCanonicalName());
>>
>> @Override
>> public KafkaPayload deserialize(String topic, byte[] arg1) {
>> ByteArrayInputStream bis = new ByteArrayInputStream(arg1);
>> ObjectInput in = null;
>> Object obj = null;
>> try {
>> in = new ObjectInputStream(bis);
>> obj = in.readObject();
>> } catch (IOException e) {
>> log.error(e);
>> } catch (ClassNotFoundException e) {
>> log.error(e);
>> } finally {
>> try {
>> bis.close();
>> if (in != null) {
>> in.close();
>> }
>> } catch (IOException ex) {
>> log.error(ex);
>> }
>> }
>> return (KafkaPayload) obj;
>> }
>>
>> @Override
>> public void close() {
>> // TODO Auto-generated method stub
>>
>> }
>>
>> @Override
>> public byte[] serialize(String topic, KafkaPayload kpayload) {
>> ByteArrayOutputStream bos = new ByteArrayOutputStream();
>> ObjectOutput out = null;
>> byte[] payload = null;
>> try {
>> out = new ObjectOutputStream(bos);
>> out.writeObject(kpayload);
>> payload = bos.toByteArray();
>>
>> } catch (IOException e) {
>> e.printStackTrace();
>> } finally {
>> try {
>> if (out != null) {
>> out.close();
>> bos.close();
>> }
>> } catch (Exception ex) {
>> log.error(ex);
>> }
>> }
>> return payload;
>> }
>>
>> @Override
>> public void configure(Map configs, boolean isKey) {
>> // TODO Auto-generated method stub
>>
>> }
>>
>> }
>>
>>
>>
>> On 11 October 2016 at 20:13, Michael Noll <[email protected]> wrote:
>>
>>> When I wrote:
>>>
>>>     "If you haven't changed to default key and value serdes, then `to()`
>>> will fail because [...]"
>>>
>>> it should have read:
>>>
>>>     "If you haven't changed the default key and value serdes, then `to()`
>>> will fail because [...]"
>>>
>>>
>>>
>>> On Tue, Oct 11, 2016 at 11:12 AM, Michael Noll <[email protected]>
>>> wrote:
>>>
>>> > Ratha,
>>> >
>>> > if you based your problematic code on the PipeDemo example, then you
>>> > should have these two lines in your code (which most probably you
>>> haven't
>>> > changed):
>>> >
>>> >     props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
>>> > Serdes.String().getClass());
>>> >     props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
>>> > Serdes.String().getClass());
>>> >
>>> > This configures your application to interpret (= encode/decode), by
>>> > default, the keys and values of any messages it reads from Kafka as
>>> > strings.  This works for the PipeDemo example because the keys and
>>> values
>>> > are actually strings.
>>> >
>>> > In your application, however, you do:
>>> >
>>> >    KStream<String, KafkaPayload> kafkaPayloadStream =
>>> > builder.stream(sourceTopics);
>>> >
>>> > This won't work, because `builder.stream()`, when calling it without
>>> > explicit serdes, will use the default serdes configured for your
>>> > application.  So `builder.stream(sourceTopics)` will give you
>>> > `KStream<String, String>`, not `KStream<String, KafkaPayload>`.  Also,
>>> you
>>> > can't just cast a String to KafkaPayload to "fix" the problem;  if you
>>> > attempt to do so you run into the ClassCastException that you reported
>>> > below.
>>> >
>>> > What you need to do fix your problem is:
>>> >
>>> > 1. Provide a proper serde for `KafkaPayload`.  See
>>> > http://docs.confluent.io/current/streams/developer-
>>> > guide.html#implementing-custom-serializers-deserializers-serdes.
>>> There
>>> > are also example implementations of such custom serdes at [1] and [2].
>>> >
>>> > Once you have that, you can e.g. write:
>>> >
>>> >     final Serde<String> stringSerde = Serdes.String(); // provided by
>>> Kafka
>>> >     final Serde<KafkaPayload> kafkaPayloadSerde = ...; // must be
>>> provided
>>> > by you!
>>> >
>>> > 2.  Call `builder.stream()` with explicit serdes to overrides the
>>> default
>>> > serdes.  stringSerde is for the keys, kafkaPayloadSerde is for the
>>> values.
>>> >
>>> >     KStream<String, KafkaPayload> kafkaPayloadStream =
>>> > builder.stream(stringSerde, kafkaPayloadSerde, sourceTopics);
>>> >
>>> > That should do it.
>>> >
>>> > Lastly, you must think about serialization also when calling `to()` or
>>> > `through()`:
>>> >
>>> >     kafkaPayloadStream.to(targetTopic);
>>> >
>>> > If you haven't changed to default key and value serdes, then `to()`
>>> will
>>> > fail because it will by default (in your app configuration) interpret
>>> > message values still as strings rather than KafkaPayload.  To fix this
>>> you
>>> > should call:
>>> >
>>> >     kafkaPayloadStream.to(stringSerde, kafkaPayloadSerde,
>>> targetTopic);
>>> >
>>> > You need to override the default serdes whenever the data must be
>>> written
>>> > with, well, non-default serdes.
>>> >
>>> > I'd recommend reading http://docs.confluent.io/curre
>>> nt/streams/developer-
>>> > guide.html#data-types-and-serialization to better understand how this
>>> > works.
>>> >
>>> >
>>> > Hope this helps,
>>> > Michael
>>> >
>>> >
>>> >
>>> > [1] http://docs.confluent.io/current/streams/developer-
>>> > guide.html#available-serializers-deserializers-serdes
>>> > [2] https://github.com/confluentinc/examples/tree/
>>> > kafka-0.10.0.1-cp-3.0.1/kafka-streams/src/main/java/io/
>>> > confluent/examples/streams/utils
>>> >
>>> >
>>> >
>>> >
>>> > On Tue, Oct 11, 2016 at 7:38 AM, Ratha v <[email protected]>
>>> wrote:
>>> >
>>> >> I checked my target topic and I see few messages than the source
>>> topic.
>>> >> (If
>>> >> source topic have 5 messages, I see 2 messages in my target topic)
>>> What
>>> >> settings I need to do ?
>>> >>
>>> >> And, when I try to consume message from the target topic, I get
>>> ClassCast
>>> >> Exception.
>>> >>
>>> >> java.lang.ClassCastException: java.lang.String cannot be cast to
>>> >> xx.yy.core.kafkamodels.KafkaPayload;
>>> >>
>>> >> * receivedPayload = (KafkaPayload) consumerRecord.value();*
>>> >>
>>> >>
>>> >> I Merge two topics like;
>>> >>
>>> >> * KStreamBuilder builder = new KStreamBuilder();*
>>> >>
>>> >> * KStream<String, KafkaPayload> kafkaPayloadStream =
>>> >> builder.stream(sourceTopics);*
>>> >>
>>> >> * kafkaPayloadStream.to(targetTopic);*
>>> >>
>>> >> * streams = new KafkaStreams(builder, properties);*
>>> >>
>>> >> * streams.start();*
>>> >>
>>> >>
>>> >> Why do I see classcast exception when consuming the message?
>>> >>
>>> >>
>>> >> On 11 October 2016 at 15:19, Ratha v <[email protected]> wrote:
>>> >>
>>> >> > Hi all;
>>> >> > I have custom datatype defined (a pojo class).
>>> >> > I copy  messages from one topic to another topic.
>>> >> > I do not see any messages in my target topic.
>>> >> > This works fro string messages, but not for my custom message.
>>> >> > Waht might be the cause?
>>> >> > I followed this sample [1]
>>> >> > [1]
>>> >> > https://github.com/apache/kafka/blob/trunk/streams/
>>> >> > examples/src/main/java/org/apache/kafka/streams/examples/
>>> >> > pipe/PipeDemo.java
>>> >> >
>>> >> >
>>> >> > --
>>> >> > -Ratha
>>> >> > http://vvratha.blogspot.com/
>>> >> >
>>> >>
>>> >>
>>> >>
>>> >> --
>>> >> -Ratha
>>> >> http://vvratha.blogspot.com/
>>> >>
>>> >
>>> >
>>> >
>>>
>>
>>
>>
>> --
>> -Ratha
>> http://vvratha.blogspot.com/
>>
>
>
>
> --
> -Ratha
> http://vvratha.blogspot.com/
>



-- 
-Ratha
http://vvratha.blogspot.com/

Reply via email to