Hello,
I'm playing around with the brand new SessionWindows. I have a simple
topology such as:
KStream<String, JsonObject> sess =
builder.stream(stringSerde, jsonSerde, SOURCE_TOPIC);
sess
.map(MySession::enhanceWithUserId_And_PutUserIdAsKey)
.groupByKey(stringSerde, jsonSerde)
.aggregate(
MySession::new,
MySession::aggregateSessions,
MySession::mergeSessions,
SessionWindows
.with(WINDOW_INACTIVITY_GAPS_MS)
.until(WINDOW_MAINTAIN_DURATION_MS),
.filter(MySession::filterOutZeroLenghtSessions)
.to(windowedSerde, mySessionSerde, SINK_TOPIC_KTABLE);
these are the most important configuration I'm using, all the other configs
are the classical serdes and hosts props:
private static final String WINDOW_INACTIVITY_GAPS_MS = 5_MINUTES
private static final String WINDOW_MAINTAIN_DURATION_MS = 5_MINUTES +
2_MINUTES;
private static final Properties props = new Properties();
props.put(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG,
ONE_DAY);
The source stream has data arriving at around 100 messages/second
I'm experiencing this behaviours:
1) MySession::new is called thousands of times, way way more of the number
of messages ingested (around 100 / 1000 times more) the most of this
sessions never reach the end of the pipeline (even if I remove
.filter(MySession::filterOutZeroLenghtSessions) ) and nor
MySession::aggregateSessions
and MySession::mergeSessions are invoked.
Is this correct? I don't understand, maybe I've setup something wrong...
2) I can see that the stream pipeline can ingest the first 15 minutes of
data and sessions that reach SINK_TOPIC_KTABLE looks good. However:
- every second that passes the pipeline gets slower and slower and
- I can see new updates to old sessions also after
.until(WINDOW_MAINTAIN_DURATION_MS)
period.
- the stream consumer starts to ingest new data with slower and slower
rates as time passes, eventually reaching almost 0msg/sec
I was expecting that after WINDOW_MAINTAIN_DURATION_MS i can see only new
sessions and those that have been fired, will just be removed from session
store and never touched again.
At the beginning I was thinking that my pipeline was not setup correctly,
however I've tried to follow slavishly the docs and I could not find where
things can go wrong.
Do you have some hints about this?
Please let me know if you need more info about.
thanks a lot,
Marco