Hi all,
I'm struggling with an apparently simple problem.
I'm joining 2 different streams:
Stream1. User activity data, with key, value --> <String, JsonObject>
Stream2. User location data (such as the city name) with key, value -->
<String, String>
Keys are homogeneous in content and represents the id of the user's device.
The error thrown is:
Exception in thread "StreamThread-2" java.lang.ClassCastException:
com.mytest.JsonObject cannot be cast to java.lang.String
at com.mytest.serdes.JsonObjectSerde$1.serialize(JsonObjectSerde.java:49)
at org.apache.kafka.streams.state.StateSerdes.rawValue(StateSerdes.java:176)
at
org.apache.kafka.streams.state.internals.RocksDBWindowStore.put(RocksDBWindowStore.java:109)
at
org.apache.kafka.streams.state.internals.RocksDBWindowStore.put(RocksDBWindowStore.java:101)
at
org.apache.kafka.streams.kstream.internals.KStreamJoinWindow$KStreamJoinWindowProcessor.process(KStreamJoinWindow.java:65)
at
org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:48)
at
org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188)
at
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:134)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:83)
at
org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:70)
at
org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:197)
at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:641)
at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368)
This is the code I'm running:
//Other Stream: User Location, is a string with the name of the city the
user is (like "San Francisco")
KStreamBuilder locationStreamBuilder = new KStreamBuilder();
KStream<String, String> userLocationStream =
locationStreamBuilder.stream(stringSerde, stringSerde,
"userLocationStreamData");
KStream<String, String> locationKstream = userLocationStream.
map(MyStreamUtils::enhanceWithAreaDetails);
locationKstream.to("user_location");
KafkaStreams userLocationKafkaStream = new
KafkaStreams(locationStreamBuilder, propsLocation);
userLocationKafkaStream.start();
//This Stream: User Activity
KStreamBuilder activityStreamBuilder = new KStreamBuilder();
KStream<String, JsonObject> activity =
activityStreamBuilder.stream(stringSerde,
jsonSerde, "activityStreamData");
activity.filter(MyStreamUtils::filterOutFakeUsers)
.map(MyStreamUtils::enhanceWithScoreDetails)
.join(
locationKstream,
MyStreamUtils::locationActivityJoiner,
JoinWindows.of(1000).until(1000 * 60 * 5),
stringSerde, jsonSerde, stringSerde)
.to("usersWithLocation")
KafkaStreams userActivityStream = new KafkaStreams(builder, propsActivity);
userActivityStream.start();
And MyStreamUtils::locationActivityJoiner does:
public static JsonObject locationActivityJoiner(JsonObject activity, String
loc) {
JsonObject join = activity.copy();
join.put("city" , loc);
return join;
}
Basically it seems that locationActivityJoiner receives either as right
and left, elements that belongs only from activity KStream, while I was
expecting to receive an activity (a JsonObject object) and a userLocation (a
String object) element.
how is this possible? I can't get where I'm doing wrong.
Do you have any clue on why this is happenings?
thanks a lot for your support and work.
Best
Marco