Happy to hear it works now for you, Ratha.

-Michael


On Wed, Oct 12, 2016 at 6:06 AM, Ratha v <vijayara...@gmail.com> wrote:

> Sorry my fault, In the kafkaConsumer I messed with 'value.deserializer'
> property..
> Now things are working fine..
> Thanks a lot.
>
> On 12 October 2016 at 14:10, Ratha v <vijayara...@gmail.com> wrote:
>
> > HI Michael;
> > Sorry , after setting "auto.offset.reset"  to 'earliest' , I see messages
> > in my 'targetTopic'.
> > But still I get my class cast exception issue, when I consume message
> from
> > the 'targetTopic'. (To consume message I use KafkaConsumer highlevel API)
> >
> > *ConsumerRecords<?, ?> records = consumer.poll(Long.MAX_VALUE);*
> >
> >
> >
> > *Exception*
> >
> > *java.lang.ClassCastException: java.lang.String cannot be cast to
> > xxx.core.kafkamodels.KafkaPayload at
> > xx.core.listener.KafkaMessageListener.receiveData(
> KafkaMessageListener.java:108)
> > ~[classes/:?]*
> >
> > at xx.core.listener.KafkaMessageListenerThread.process(
> > KafkaMessageListenerThread.java:68) ~[classes/:?]
> >
> > at xx.core.listener.KafkaMessageListenerThread.lambda$run$1(
> > KafkaMessageListenerThread.java:50) ~[classes/:?]
> >
> > at java.lang.Iterable.forEach(Iterable.java:75) ~[?:1.8.0_66]
> >
> > at com.leightonobrien.core.listener.KafkaMessageListenerThread.run(
> > KafkaMessageListenerThread.java:50) [classes/:?]
> >
> > at java.util.concurrent.Executors$RunnableAdapter.
> call(Executors.java:511)
> > [?:1.8.0_66]
> >
> > at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_66]
> >
> > at java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1142)
> > [?:1.8.0_66]
> >
> > at java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:617)
> > [?:1.8.0_66]
> >
> > at java.lang.Thread.run(Thread.java:745) [?:1.8.0_66]
> >
> >
> >
> > On 12 October 2016 at 13:19, Ratha v <vijayara...@gmail.com> wrote:
> >
> >> HI Michael;
> >>
> >> Really appreciate for the clear explanation..
> >> I modified my code as you mentioned. I have written custom, Serde,
> >> serializer,deserializer.
> >> But now the problem i see is, both topics are not merged. Means,
> Messages
> >> in the 'sourcetopic' not to passed to 'targetTopic' . ('targetTopic has
> '0'
> >> messages)
> >> I do not see any exceptions.
> >>
> >> Here is my custom serde, serializer/deserializer and the logic; Also I
> >> have properties file where i defined  following parameters;
> >>
> >> *bootstrap.servers=xx.com <http://xx.com>\:9092,xx.com
> >> <http://xx.com>\:9092,xx.com <http://xx.com>\:9092*
> >>
> >> *key.serde=org.apache.kafka.com
> >> <http://org.apache.kafka.com>mon.serialization.Serdes$StringSerde*
> >>
> >> *value.serde=xx.kafkamodels.KafkaPayloadSerdes$KafkaPayloadSerde*
> >>
> >> *application.id <http://application.id>=stream-pipe*
> >>
> >>
> >> Do you see any issue here? Why messages are not written to '
> targetTopic'?
> >>
> >>
> >>
> >> *LOGIC*
> >>
> >> /**
> >>
> >> * create stream from source topics and write it to the target topic
> >>
> >> * @param sourceTopics
> >>
> >> * @param targetTopic
> >>
> >> */
> >>
> >> public void write(String[] sourceTopics, String targetTopic) {
> >>
> >>      KafkaStreams streams = null;
> >>
> >>      KStreamBuilder builder = new KStreamBuilder();
> >>
> >>   try {
> >>
> >>            KStream<String, KafkaPayload> kafkaPayloadStream = builder
> >> .stream(stringSerde, kafkaPayloadSerde, sourceTopics);
> >>
> >>            kafkaPayloadStream.to(stringSerde, kafkaPayloadSerde,
> >> targetTopic);
> >>
> >>            streams = new KafkaStreams(builder, properties);
> >>
> >>            streams.start();
> >>
> >>            Thread.sleep(5000L);
> >>
> >>       } catch (InterruptedException e) {
> >>
> >>           log.warn(e);
> >>
> >>      } catch (Exception e) {
> >>
> >>          log.error("Topic merge failed. ",e);
> >>
> >>       } finally {
> >>
> >>            if (streams != null) {
> >>
> >>            streams.close();
> >>
> >>      }
> >>
> >> }
> >>
> >> }
> >>
> >>
> >>
> >>
> >> *SERDE*
> >>
> >>
> >> public class KafkaPayloadSerdes {
> >>
> >> static private class WrapperSerde<KafkaPayload> implements
> >> Serde<KafkaPayload> {
> >> final private Serializer<KafkaPayload> serializer;
> >> final private Deserializer<KafkaPayload> deserializer;
> >>
> >> public WrapperSerde(Serializer<KafkaPayload> serializer,
> >> Deserializer<KafkaPayload> deserializer) {
> >> this.serializer = serializer;
> >> this.deserializer = deserializer;
> >> }
> >>
> >> @Override
> >> public void configure(Map<String, ?> configs, boolean isKey) {
> >> serializer.configure(configs, isKey);
> >> deserializer.configure(configs, isKey);
> >> }
> >>
> >> @Override
> >> public void close() {
> >> serializer.close();
> >> deserializer.close();
> >> }
> >>
> >> @Override
> >> public Serializer<KafkaPayload> serializer() {
> >> return serializer;
> >> }
> >>
> >> @Override
> >> public Deserializer<KafkaPayload> deserializer() {
> >> return deserializer;
> >> }
> >> }
> >>
> >> static public final class KafkaPayloadSerde extends
> >> WrapperSerde<KafkaPayload> {
> >> public KafkaPayloadSerde() {
> >> super(new KafkaPayloadSerializer(), new KafkaPayloadSerializer());
> >> }
> >> }
> >>
> >> /**
> >> * A serde for nullable < KafkaPayload> type.
> >> */
> >> static public Serde<KafkaPayload> KafkaPayload() {
> >> return new KafkaPayloadSerde();
> >> }
> >>
> >> }
> >>
> >>
> >> *Serilizer/Deserializer*
> >>
> >>
> >>
> >> public class KafkaPayloadSerializer implements Serializer<KafkaPayload>,
> >> Deserializer<KafkaPayload> {
> >>
> >> private static final Logger log = org.apache.logging.log4j.LogManager
> >> .getLogger(MethodHandles.lookup().lookupClass().getCanonicalName());
> >>
> >> @Override
> >> public KafkaPayload deserialize(String topic, byte[] arg1) {
> >> ByteArrayInputStream bis = new ByteArrayInputStream(arg1);
> >> ObjectInput in = null;
> >> Object obj = null;
> >> try {
> >> in = new ObjectInputStream(bis);
> >> obj = in.readObject();
> >> } catch (IOException e) {
> >> log.error(e);
> >> } catch (ClassNotFoundException e) {
> >> log.error(e);
> >> } finally {
> >> try {
> >> bis.close();
> >> if (in != null) {
> >> in.close();
> >> }
> >> } catch (IOException ex) {
> >> log.error(ex);
> >> }
> >> }
> >> return (KafkaPayload) obj;
> >> }
> >>
> >> @Override
> >> public void close() {
> >> // TODO Auto-generated method stub
> >>
> >> }
> >>
> >> @Override
> >> public byte[] serialize(String topic, KafkaPayload kpayload) {
> >> ByteArrayOutputStream bos = new ByteArrayOutputStream();
> >> ObjectOutput out = null;
> >> byte[] payload = null;
> >> try {
> >> out = new ObjectOutputStream(bos);
> >> out.writeObject(kpayload);
> >> payload = bos.toByteArray();
> >>
> >> } catch (IOException e) {
> >> e.printStackTrace();
> >> } finally {
> >> try {
> >> if (out != null) {
> >> out.close();
> >> bos.close();
> >> }
> >> } catch (Exception ex) {
> >> log.error(ex);
> >> }
> >> }
> >> return payload;
> >> }
> >>
> >> @Override
> >> public void configure(Map configs, boolean isKey) {
> >> // TODO Auto-generated method stub
> >>
> >> }
> >>
> >> }
> >>
> >>
> >>
> >> On 11 October 2016 at 20:13, Michael Noll <mich...@confluent.io> wrote:
> >>
> >>> When I wrote:
> >>>
> >>>     "If you haven't changed to default key and value serdes, then
> `to()`
> >>> will fail because [...]"
> >>>
> >>> it should have read:
> >>>
> >>>     "If you haven't changed the default key and value serdes, then
> `to()`
> >>> will fail because [...]"
> >>>
> >>>
> >>>
> >>> On Tue, Oct 11, 2016 at 11:12 AM, Michael Noll <mich...@confluent.io>
> >>> wrote:
> >>>
> >>> > Ratha,
> >>> >
> >>> > if you based your problematic code on the PipeDemo example, then you
> >>> > should have these two lines in your code (which most probably you
> >>> haven't
> >>> > changed):
> >>> >
> >>> >     props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
> >>> > Serdes.String().getClass());
> >>> >     props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
> >>> > Serdes.String().getClass());
> >>> >
> >>> > This configures your application to interpret (= encode/decode), by
> >>> > default, the keys and values of any messages it reads from Kafka as
> >>> > strings.  This works for the PipeDemo example because the keys and
> >>> values
> >>> > are actually strings.
> >>> >
> >>> > In your application, however, you do:
> >>> >
> >>> >    KStream<String, KafkaPayload> kafkaPayloadStream =
> >>> > builder.stream(sourceTopics);
> >>> >
> >>> > This won't work, because `builder.stream()`, when calling it without
> >>> > explicit serdes, will use the default serdes configured for your
> >>> > application.  So `builder.stream(sourceTopics)` will give you
> >>> > `KStream<String, String>`, not `KStream<String, KafkaPayload>`.
> Also,
> >>> you
> >>> > can't just cast a String to KafkaPayload to "fix" the problem;  if
> you
> >>> > attempt to do so you run into the ClassCastException that you
> reported
> >>> > below.
> >>> >
> >>> > What you need to do fix your problem is:
> >>> >
> >>> > 1. Provide a proper serde for `KafkaPayload`.  See
> >>> > http://docs.confluent.io/current/streams/developer-
> >>> > guide.html#implementing-custom-serializers-deserializers-serdes.
> >>> There
> >>> > are also example implementations of such custom serdes at [1] and
> [2].
> >>> >
> >>> > Once you have that, you can e.g. write:
> >>> >
> >>> >     final Serde<String> stringSerde = Serdes.String(); // provided by
> >>> Kafka
> >>> >     final Serde<KafkaPayload> kafkaPayloadSerde = ...; // must be
> >>> provided
> >>> > by you!
> >>> >
> >>> > 2.  Call `builder.stream()` with explicit serdes to overrides the
> >>> default
> >>> > serdes.  stringSerde is for the keys, kafkaPayloadSerde is for the
> >>> values.
> >>> >
> >>> >     KStream<String, KafkaPayload> kafkaPayloadStream =
> >>> > builder.stream(stringSerde, kafkaPayloadSerde, sourceTopics);
> >>> >
> >>> > That should do it.
> >>> >
> >>> > Lastly, you must think about serialization also when calling `to()`
> or
> >>> > `through()`:
> >>> >
> >>> >     kafkaPayloadStream.to(targetTopic);
> >>> >
> >>> > If you haven't changed to default key and value serdes, then `to()`
> >>> will
> >>> > fail because it will by default (in your app configuration) interpret
> >>> > message values still as strings rather than KafkaPayload.  To fix
> this
> >>> you
> >>> > should call:
> >>> >
> >>> >     kafkaPayloadStream.to(stringSerde, kafkaPayloadSerde,
> >>> targetTopic);
> >>> >
> >>> > You need to override the default serdes whenever the data must be
> >>> written
> >>> > with, well, non-default serdes.
> >>> >
> >>> > I'd recommend reading http://docs.confluent.io/curre
> >>> nt/streams/developer-
> >>> > guide.html#data-types-and-serialization to better understand how
> this
> >>> > works.
> >>> >
> >>> >
> >>> > Hope this helps,
> >>> > Michael
> >>> >
> >>> >
> >>> >
> >>> > [1] http://docs.confluent.io/current/streams/developer-
> >>> > guide.html#available-serializers-deserializers-serdes
> >>> > [2] https://github.com/confluentinc/examples/tree/
> >>> > kafka-0.10.0.1-cp-3.0.1/kafka-streams/src/main/java/io/
> >>> > confluent/examples/streams/utils
> >>> >
> >>> >
> >>> >
> >>> >
> >>> > On Tue, Oct 11, 2016 at 7:38 AM, Ratha v <vijayara...@gmail.com>
> >>> wrote:
> >>> >
> >>> >> I checked my target topic and I see few messages than the source
> >>> topic.
> >>> >> (If
> >>> >> source topic have 5 messages, I see 2 messages in my target topic)
> >>> What
> >>> >> settings I need to do ?
> >>> >>
> >>> >> And, when I try to consume message from the target topic, I get
> >>> ClassCast
> >>> >> Exception.
> >>> >>
> >>> >> java.lang.ClassCastException: java.lang.String cannot be cast to
> >>> >> xx.yy.core.kafkamodels.KafkaPayload;
> >>> >>
> >>> >> * receivedPayload = (KafkaPayload) consumerRecord.value();*
> >>> >>
> >>> >>
> >>> >> I Merge two topics like;
> >>> >>
> >>> >> * KStreamBuilder builder = new KStreamBuilder();*
> >>> >>
> >>> >> * KStream<String, KafkaPayload> kafkaPayloadStream =
> >>> >> builder.stream(sourceTopics);*
> >>> >>
> >>> >> * kafkaPayloadStream.to(targetTopic);*
> >>> >>
> >>> >> * streams = new KafkaStreams(builder, properties);*
> >>> >>
> >>> >> * streams.start();*
> >>> >>
> >>> >>
> >>> >> Why do I see classcast exception when consuming the message?
> >>> >>
> >>> >>
> >>> >> On 11 October 2016 at 15:19, Ratha v <vijayara...@gmail.com> wrote:
> >>> >>
> >>> >> > Hi all;
> >>> >> > I have custom datatype defined (a pojo class).
> >>> >> > I copy  messages from one topic to another topic.
> >>> >> > I do not see any messages in my target topic.
> >>> >> > This works fro string messages, but not for my custom message.
> >>> >> > Waht might be the cause?
> >>> >> > I followed this sample [1]
> >>> >> > [1]
> >>> >> > https://github.com/apache/kafka/blob/trunk/streams/
> >>> >> > examples/src/main/java/org/apache/kafka/streams/examples/
> >>> >> > pipe/PipeDemo.java
> >>> >> >
> >>> >> >
> >>> >> > --
> >>> >> > -Ratha
> >>> >> > http://vvratha.blogspot.com/
> >>> >> >
> >>> >>
> >>> >>
> >>> >>
> >>> >> --
> >>> >> -Ratha
> >>> >> http://vvratha.blogspot.com/
> >>> >>
> >>> >
> >>> >
> >>> >
> >>>
> >>
> >>
> >>
> >> --
> >> -Ratha
> >> http://vvratha.blogspot.com/
> >>
> >
> >
> >
> > --
> > -Ratha
> > http://vvratha.blogspot.com/
> >
>
>
>
> --
> -Ratha
> http://vvratha.blogspot.com/
>

Reply via email to