HI Michael;
Really appreciate for the clear explanation..
I modified my code as you mentioned. I have written custom, Serde,
serializer,deserializer.
But now the problem i see is, both topics are not merged. Means, Messages
in the 'sourcetopic' not to passed to 'targetTopic' . ('targetTopic has '0'
messages)
I do not see any exceptions.
Here is my custom serde, serializer/deserializer and the logic; Also I have
properties file where i defined following parameters;
*bootstrap.servers=xx.com <http://xx.com>\:9092,xx.com
<http://xx.com>\:9092,xx.com <http://xx.com>\:9092*
*key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde*
*value.serde=xx.kafkamodels.KafkaPayloadSerdes$KafkaPayloadSerde*
*application.id <http://application.id>=stream-pipe*
Do you see any issue here? Why messages are not written to ' targetTopic'?
*LOGIC*
/**
* create stream from source topics and write it to the target topic
* @param sourceTopics
* @param targetTopic
*/
public void write(String[] sourceTopics, String targetTopic) {
KafkaStreams streams = null;
KStreamBuilder builder = new KStreamBuilder();
try {
KStream<String, KafkaPayload> kafkaPayloadStream = builder
.stream(stringSerde, kafkaPayloadSerde, sourceTopics);
kafkaPayloadStream.to(stringSerde, kafkaPayloadSerde, targetTopic
);
streams = new KafkaStreams(builder, properties);
streams.start();
Thread.sleep(5000L);
} catch (InterruptedException e) {
log.warn(e);
} catch (Exception e) {
log.error("Topic merge failed. ",e);
} finally {
if (streams != null) {
streams.close();
}
}
}
*SERDE*
public class KafkaPayloadSerdes {
static private class WrapperSerde<KafkaPayload> implements
Serde<KafkaPayload> {
final private Serializer<KafkaPayload> serializer;
final private Deserializer<KafkaPayload> deserializer;
public WrapperSerde(Serializer<KafkaPayload> serializer,
Deserializer<KafkaPayload> deserializer) {
this.serializer = serializer;
this.deserializer = deserializer;
}
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
serializer.configure(configs, isKey);
deserializer.configure(configs, isKey);
}
@Override
public void close() {
serializer.close();
deserializer.close();
}
@Override
public Serializer<KafkaPayload> serializer() {
return serializer;
}
@Override
public Deserializer<KafkaPayload> deserializer() {
return deserializer;
}
}
static public final class KafkaPayloadSerde extends
WrapperSerde<KafkaPayload> {
public KafkaPayloadSerde() {
super(new KafkaPayloadSerializer(), new KafkaPayloadSerializer());
}
}
/**
* A serde for nullable < KafkaPayload> type.
*/
static public Serde<KafkaPayload> KafkaPayload() {
return new KafkaPayloadSerde();
}
}
*Serilizer/Deserializer*
public class KafkaPayloadSerializer implements Serializer<KafkaPayload>,
Deserializer<KafkaPayload> {
private static final Logger log = org.apache.logging.log4j.LogManager
.getLogger(MethodHandles.lookup().lookupClass().getCanonicalName());
@Override
public KafkaPayload deserialize(String topic, byte[] arg1) {
ByteArrayInputStream bis = new ByteArrayInputStream(arg1);
ObjectInput in = null;
Object obj = null;
try {
in = new ObjectInputStream(bis);
obj = in.readObject();
} catch (IOException e) {
log.error(e);
} catch (ClassNotFoundException e) {
log.error(e);
} finally {
try {
bis.close();
if (in != null) {
in.close();
}
} catch (IOException ex) {
log.error(ex);
}
}
return (KafkaPayload) obj;
}
@Override
public void close() {
// TODO Auto-generated method stub
}
@Override
public byte[] serialize(String topic, KafkaPayload kpayload) {
ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutput out = null;
byte[] payload = null;
try {
out = new ObjectOutputStream(bos);
out.writeObject(kpayload);
payload = bos.toByteArray();
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
if (out != null) {
out.close();
bos.close();
}
} catch (Exception ex) {
log.error(ex);
}
}
return payload;
}
@Override
public void configure(Map configs, boolean isKey) {
// TODO Auto-generated method stub
}
}
On 11 October 2016 at 20:13, Michael Noll <[email protected]> wrote:
> When I wrote:
>
> "If you haven't changed to default key and value serdes, then `to()`
> will fail because [...]"
>
> it should have read:
>
> "If you haven't changed the default key and value serdes, then `to()`
> will fail because [...]"
>
>
>
> On Tue, Oct 11, 2016 at 11:12 AM, Michael Noll <[email protected]>
> wrote:
>
> > Ratha,
> >
> > if you based your problematic code on the PipeDemo example, then you
> > should have these two lines in your code (which most probably you haven't
> > changed):
> >
> > props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
> > Serdes.String().getClass());
> > props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
> > Serdes.String().getClass());
> >
> > This configures your application to interpret (= encode/decode), by
> > default, the keys and values of any messages it reads from Kafka as
> > strings. This works for the PipeDemo example because the keys and values
> > are actually strings.
> >
> > In your application, however, you do:
> >
> > KStream<String, KafkaPayload> kafkaPayloadStream =
> > builder.stream(sourceTopics);
> >
> > This won't work, because `builder.stream()`, when calling it without
> > explicit serdes, will use the default serdes configured for your
> > application. So `builder.stream(sourceTopics)` will give you
> > `KStream<String, String>`, not `KStream<String, KafkaPayload>`. Also,
> you
> > can't just cast a String to KafkaPayload to "fix" the problem; if you
> > attempt to do so you run into the ClassCastException that you reported
> > below.
> >
> > What you need to do fix your problem is:
> >
> > 1. Provide a proper serde for `KafkaPayload`. See
> > http://docs.confluent.io/current/streams/developer-
> > guide.html#implementing-custom-serializers-deserializers-serdes. There
> > are also example implementations of such custom serdes at [1] and [2].
> >
> > Once you have that, you can e.g. write:
> >
> > final Serde<String> stringSerde = Serdes.String(); // provided by
> Kafka
> > final Serde<KafkaPayload> kafkaPayloadSerde = ...; // must be
> provided
> > by you!
> >
> > 2. Call `builder.stream()` with explicit serdes to overrides the default
> > serdes. stringSerde is for the keys, kafkaPayloadSerde is for the
> values.
> >
> > KStream<String, KafkaPayload> kafkaPayloadStream =
> > builder.stream(stringSerde, kafkaPayloadSerde, sourceTopics);
> >
> > That should do it.
> >
> > Lastly, you must think about serialization also when calling `to()` or
> > `through()`:
> >
> > kafkaPayloadStream.to(targetTopic);
> >
> > If you haven't changed to default key and value serdes, then `to()` will
> > fail because it will by default (in your app configuration) interpret
> > message values still as strings rather than KafkaPayload. To fix this
> you
> > should call:
> >
> > kafkaPayloadStream.to(stringSerde, kafkaPayloadSerde, targetTopic);
> >
> > You need to override the default serdes whenever the data must be written
> > with, well, non-default serdes.
> >
> > I'd recommend reading http://docs.confluent.io/
> current/streams/developer-
> > guide.html#data-types-and-serialization to better understand how this
> > works.
> >
> >
> > Hope this helps,
> > Michael
> >
> >
> >
> > [1] http://docs.confluent.io/current/streams/developer-
> > guide.html#available-serializers-deserializers-serdes
> > [2] https://github.com/confluentinc/examples/tree/
> > kafka-0.10.0.1-cp-3.0.1/kafka-streams/src/main/java/io/
> > confluent/examples/streams/utils
> >
> >
> >
> >
> > On Tue, Oct 11, 2016 at 7:38 AM, Ratha v <[email protected]> wrote:
> >
> >> I checked my target topic and I see few messages than the source topic.
> >> (If
> >> source topic have 5 messages, I see 2 messages in my target topic) What
> >> settings I need to do ?
> >>
> >> And, when I try to consume message from the target topic, I get
> ClassCast
> >> Exception.
> >>
> >> java.lang.ClassCastException: java.lang.String cannot be cast to
> >> xx.yy.core.kafkamodels.KafkaPayload;
> >>
> >> * receivedPayload = (KafkaPayload) consumerRecord.value();*
> >>
> >>
> >> I Merge two topics like;
> >>
> >> * KStreamBuilder builder = new KStreamBuilder();*
> >>
> >> * KStream<String, KafkaPayload> kafkaPayloadStream =
> >> builder.stream(sourceTopics);*
> >>
> >> * kafkaPayloadStream.to(targetTopic);*
> >>
> >> * streams = new KafkaStreams(builder, properties);*
> >>
> >> * streams.start();*
> >>
> >>
> >> Why do I see classcast exception when consuming the message?
> >>
> >>
> >> On 11 October 2016 at 15:19, Ratha v <[email protected]> wrote:
> >>
> >> > Hi all;
> >> > I have custom datatype defined (a pojo class).
> >> > I copy messages from one topic to another topic.
> >> > I do not see any messages in my target topic.
> >> > This works fro string messages, but not for my custom message.
> >> > Waht might be the cause?
> >> > I followed this sample [1]
> >> > [1]
> >> > https://github.com/apache/kafka/blob/trunk/streams/
> >> > examples/src/main/java/org/apache/kafka/streams/examples/
> >> > pipe/PipeDemo.java
> >> >
> >> >
> >> > --
> >> > -Ratha
> >> > http://vvratha.blogspot.com/
> >> >
> >>
> >>
> >>
> >> --
> >> -Ratha
> >> http://vvratha.blogspot.com/
> >>
> >
> >
> >
>
--
-Ratha
http://vvratha.blogspot.com/