Sorry my fault, In the kafkaConsumer I messed with 'value.deserializer' property.. Now things are working fine.. Thanks a lot.
On 12 October 2016 at 14:10, Ratha v <[email protected]> wrote: > HI Michael; > Sorry , after setting "auto.offset.reset" to 'earliest' , I see messages > in my 'targetTopic'. > But still I get my class cast exception issue, when I consume message from > the 'targetTopic'. (To consume message I use KafkaConsumer highlevel API) > > *ConsumerRecords<?, ?> records = consumer.poll(Long.MAX_VALUE);* > > > > *Exception* > > *java.lang.ClassCastException: java.lang.String cannot be cast to > xxx.core.kafkamodels.KafkaPayload at > xx.core.listener.KafkaMessageListener.receiveData(KafkaMessageListener.java:108) > ~[classes/:?]* > > at xx.core.listener.KafkaMessageListenerThread.process( > KafkaMessageListenerThread.java:68) ~[classes/:?] > > at xx.core.listener.KafkaMessageListenerThread.lambda$run$1( > KafkaMessageListenerThread.java:50) ~[classes/:?] > > at java.lang.Iterable.forEach(Iterable.java:75) ~[?:1.8.0_66] > > at com.leightonobrien.core.listener.KafkaMessageListenerThread.run( > KafkaMessageListenerThread.java:50) [classes/:?] > > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > [?:1.8.0_66] > > at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_66] > > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > [?:1.8.0_66] > > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > [?:1.8.0_66] > > at java.lang.Thread.run(Thread.java:745) [?:1.8.0_66] > > > > On 12 October 2016 at 13:19, Ratha v <[email protected]> wrote: > >> HI Michael; >> >> Really appreciate for the clear explanation.. >> I modified my code as you mentioned. I have written custom, Serde, >> serializer,deserializer. >> But now the problem i see is, both topics are not merged. Means, Messages >> in the 'sourcetopic' not to passed to 'targetTopic' . ('targetTopic has '0' >> messages) >> I do not see any exceptions. >> >> Here is my custom serde, serializer/deserializer and the logic; Also I >> have properties file where i defined following parameters; >> >> *bootstrap.servers=xx.com <http://xx.com>\:9092,xx.com >> <http://xx.com>\:9092,xx.com <http://xx.com>\:9092* >> >> *key.serde=org.apache.kafka.com >> <http://org.apache.kafka.com>mon.serialization.Serdes$StringSerde* >> >> *value.serde=xx.kafkamodels.KafkaPayloadSerdes$KafkaPayloadSerde* >> >> *application.id <http://application.id>=stream-pipe* >> >> >> Do you see any issue here? Why messages are not written to ' targetTopic'? >> >> >> >> *LOGIC* >> >> /** >> >> * create stream from source topics and write it to the target topic >> >> * @param sourceTopics >> >> * @param targetTopic >> >> */ >> >> public void write(String[] sourceTopics, String targetTopic) { >> >> KafkaStreams streams = null; >> >> KStreamBuilder builder = new KStreamBuilder(); >> >> try { >> >> KStream<String, KafkaPayload> kafkaPayloadStream = builder >> .stream(stringSerde, kafkaPayloadSerde, sourceTopics); >> >> kafkaPayloadStream.to(stringSerde, kafkaPayloadSerde, >> targetTopic); >> >> streams = new KafkaStreams(builder, properties); >> >> streams.start(); >> >> Thread.sleep(5000L); >> >> } catch (InterruptedException e) { >> >> log.warn(e); >> >> } catch (Exception e) { >> >> log.error("Topic merge failed. ",e); >> >> } finally { >> >> if (streams != null) { >> >> streams.close(); >> >> } >> >> } >> >> } >> >> >> >> >> *SERDE* >> >> >> public class KafkaPayloadSerdes { >> >> static private class WrapperSerde<KafkaPayload> implements >> Serde<KafkaPayload> { >> final private Serializer<KafkaPayload> serializer; >> final private Deserializer<KafkaPayload> deserializer; >> >> public WrapperSerde(Serializer<KafkaPayload> serializer, >> Deserializer<KafkaPayload> deserializer) { >> this.serializer = serializer; >> this.deserializer = deserializer; >> } >> >> @Override >> public void configure(Map<String, ?> configs, boolean isKey) { >> serializer.configure(configs, isKey); >> deserializer.configure(configs, isKey); >> } >> >> @Override >> public void close() { >> serializer.close(); >> deserializer.close(); >> } >> >> @Override >> public Serializer<KafkaPayload> serializer() { >> return serializer; >> } >> >> @Override >> public Deserializer<KafkaPayload> deserializer() { >> return deserializer; >> } >> } >> >> static public final class KafkaPayloadSerde extends >> WrapperSerde<KafkaPayload> { >> public KafkaPayloadSerde() { >> super(new KafkaPayloadSerializer(), new KafkaPayloadSerializer()); >> } >> } >> >> /** >> * A serde for nullable < KafkaPayload> type. >> */ >> static public Serde<KafkaPayload> KafkaPayload() { >> return new KafkaPayloadSerde(); >> } >> >> } >> >> >> *Serilizer/Deserializer* >> >> >> >> public class KafkaPayloadSerializer implements Serializer<KafkaPayload>, >> Deserializer<KafkaPayload> { >> >> private static final Logger log = org.apache.logging.log4j.LogManager >> .getLogger(MethodHandles.lookup().lookupClass().getCanonicalName()); >> >> @Override >> public KafkaPayload deserialize(String topic, byte[] arg1) { >> ByteArrayInputStream bis = new ByteArrayInputStream(arg1); >> ObjectInput in = null; >> Object obj = null; >> try { >> in = new ObjectInputStream(bis); >> obj = in.readObject(); >> } catch (IOException e) { >> log.error(e); >> } catch (ClassNotFoundException e) { >> log.error(e); >> } finally { >> try { >> bis.close(); >> if (in != null) { >> in.close(); >> } >> } catch (IOException ex) { >> log.error(ex); >> } >> } >> return (KafkaPayload) obj; >> } >> >> @Override >> public void close() { >> // TODO Auto-generated method stub >> >> } >> >> @Override >> public byte[] serialize(String topic, KafkaPayload kpayload) { >> ByteArrayOutputStream bos = new ByteArrayOutputStream(); >> ObjectOutput out = null; >> byte[] payload = null; >> try { >> out = new ObjectOutputStream(bos); >> out.writeObject(kpayload); >> payload = bos.toByteArray(); >> >> } catch (IOException e) { >> e.printStackTrace(); >> } finally { >> try { >> if (out != null) { >> out.close(); >> bos.close(); >> } >> } catch (Exception ex) { >> log.error(ex); >> } >> } >> return payload; >> } >> >> @Override >> public void configure(Map configs, boolean isKey) { >> // TODO Auto-generated method stub >> >> } >> >> } >> >> >> >> On 11 October 2016 at 20:13, Michael Noll <[email protected]> wrote: >> >>> When I wrote: >>> >>> "If you haven't changed to default key and value serdes, then `to()` >>> will fail because [...]" >>> >>> it should have read: >>> >>> "If you haven't changed the default key and value serdes, then `to()` >>> will fail because [...]" >>> >>> >>> >>> On Tue, Oct 11, 2016 at 11:12 AM, Michael Noll <[email protected]> >>> wrote: >>> >>> > Ratha, >>> > >>> > if you based your problematic code on the PipeDemo example, then you >>> > should have these two lines in your code (which most probably you >>> haven't >>> > changed): >>> > >>> > props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, >>> > Serdes.String().getClass()); >>> > props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, >>> > Serdes.String().getClass()); >>> > >>> > This configures your application to interpret (= encode/decode), by >>> > default, the keys and values of any messages it reads from Kafka as >>> > strings. This works for the PipeDemo example because the keys and >>> values >>> > are actually strings. >>> > >>> > In your application, however, you do: >>> > >>> > KStream<String, KafkaPayload> kafkaPayloadStream = >>> > builder.stream(sourceTopics); >>> > >>> > This won't work, because `builder.stream()`, when calling it without >>> > explicit serdes, will use the default serdes configured for your >>> > application. So `builder.stream(sourceTopics)` will give you >>> > `KStream<String, String>`, not `KStream<String, KafkaPayload>`. Also, >>> you >>> > can't just cast a String to KafkaPayload to "fix" the problem; if you >>> > attempt to do so you run into the ClassCastException that you reported >>> > below. >>> > >>> > What you need to do fix your problem is: >>> > >>> > 1. Provide a proper serde for `KafkaPayload`. See >>> > http://docs.confluent.io/current/streams/developer- >>> > guide.html#implementing-custom-serializers-deserializers-serdes. >>> There >>> > are also example implementations of such custom serdes at [1] and [2]. >>> > >>> > Once you have that, you can e.g. write: >>> > >>> > final Serde<String> stringSerde = Serdes.String(); // provided by >>> Kafka >>> > final Serde<KafkaPayload> kafkaPayloadSerde = ...; // must be >>> provided >>> > by you! >>> > >>> > 2. Call `builder.stream()` with explicit serdes to overrides the >>> default >>> > serdes. stringSerde is for the keys, kafkaPayloadSerde is for the >>> values. >>> > >>> > KStream<String, KafkaPayload> kafkaPayloadStream = >>> > builder.stream(stringSerde, kafkaPayloadSerde, sourceTopics); >>> > >>> > That should do it. >>> > >>> > Lastly, you must think about serialization also when calling `to()` or >>> > `through()`: >>> > >>> > kafkaPayloadStream.to(targetTopic); >>> > >>> > If you haven't changed to default key and value serdes, then `to()` >>> will >>> > fail because it will by default (in your app configuration) interpret >>> > message values still as strings rather than KafkaPayload. To fix this >>> you >>> > should call: >>> > >>> > kafkaPayloadStream.to(stringSerde, kafkaPayloadSerde, >>> targetTopic); >>> > >>> > You need to override the default serdes whenever the data must be >>> written >>> > with, well, non-default serdes. >>> > >>> > I'd recommend reading http://docs.confluent.io/curre >>> nt/streams/developer- >>> > guide.html#data-types-and-serialization to better understand how this >>> > works. >>> > >>> > >>> > Hope this helps, >>> > Michael >>> > >>> > >>> > >>> > [1] http://docs.confluent.io/current/streams/developer- >>> > guide.html#available-serializers-deserializers-serdes >>> > [2] https://github.com/confluentinc/examples/tree/ >>> > kafka-0.10.0.1-cp-3.0.1/kafka-streams/src/main/java/io/ >>> > confluent/examples/streams/utils >>> > >>> > >>> > >>> > >>> > On Tue, Oct 11, 2016 at 7:38 AM, Ratha v <[email protected]> >>> wrote: >>> > >>> >> I checked my target topic and I see few messages than the source >>> topic. >>> >> (If >>> >> source topic have 5 messages, I see 2 messages in my target topic) >>> What >>> >> settings I need to do ? >>> >> >>> >> And, when I try to consume message from the target topic, I get >>> ClassCast >>> >> Exception. >>> >> >>> >> java.lang.ClassCastException: java.lang.String cannot be cast to >>> >> xx.yy.core.kafkamodels.KafkaPayload; >>> >> >>> >> * receivedPayload = (KafkaPayload) consumerRecord.value();* >>> >> >>> >> >>> >> I Merge two topics like; >>> >> >>> >> * KStreamBuilder builder = new KStreamBuilder();* >>> >> >>> >> * KStream<String, KafkaPayload> kafkaPayloadStream = >>> >> builder.stream(sourceTopics);* >>> >> >>> >> * kafkaPayloadStream.to(targetTopic);* >>> >> >>> >> * streams = new KafkaStreams(builder, properties);* >>> >> >>> >> * streams.start();* >>> >> >>> >> >>> >> Why do I see classcast exception when consuming the message? >>> >> >>> >> >>> >> On 11 October 2016 at 15:19, Ratha v <[email protected]> wrote: >>> >> >>> >> > Hi all; >>> >> > I have custom datatype defined (a pojo class). >>> >> > I copy messages from one topic to another topic. >>> >> > I do not see any messages in my target topic. >>> >> > This works fro string messages, but not for my custom message. >>> >> > Waht might be the cause? >>> >> > I followed this sample [1] >>> >> > [1] >>> >> > https://github.com/apache/kafka/blob/trunk/streams/ >>> >> > examples/src/main/java/org/apache/kafka/streams/examples/ >>> >> > pipe/PipeDemo.java >>> >> > >>> >> > >>> >> > -- >>> >> > -Ratha >>> >> > http://vvratha.blogspot.com/ >>> >> > >>> >> >>> >> >>> >> >>> >> -- >>> >> -Ratha >>> >> http://vvratha.blogspot.com/ >>> >> >>> > >>> > >>> > >>> >> >> >> >> -- >> -Ratha >> http://vvratha.blogspot.com/ >> > > > > -- > -Ratha > http://vvratha.blogspot.com/ > -- -Ratha http://vvratha.blogspot.com/
