Try something like this:

stream
  .groupBy(
    (key, value) -> value.userId
  )
  .aggregate(
    () -> new Session(),
    (aggKey, newValue, aggValue) -> {
      aggValue.userId = newValue.userId
      if (newValue.start) {
        aggValue.start = newValue.start
        aggValue.duration = 0
        aggValue.open = true
      }
      else if (newValue.end) {
        aggValue.duration = newValue.end - aggValue.start
        aggValue.close = true
      } else {
        aggValue.count++
        aggValue.duration = now() - aggValue.start
      }
    }
  )

Note you need to have a well defined Session class and Event class with
their appropriate serde.
So the aggregated stream would have Session with its attributes like start
time, duration, count, session open or close.

One thing you need to take care is after a session is closed a new session
for the same user can be created again.
So you may need to break sessions based on some windowing or as session is
closed you store it in some store (be it internal to kafka or some external
database) and reset the session object.

Hope this helps.




On Sat, Jan 4, 2020 at 10:02 PM Chris Madge <ch...@madg.es> wrote:

> Hi there,
>
> It’s my first voyage into stream processing - I’ve tried a few things but
> I think I’m struggling to think in the streams way. I wondered if I could
> be cheeky and ask if someone could give me some clues as to the correct
> design for my first task to get me started?
>
> I have application events coming in like:
>
> <timestamp>,type:start,<user_id>
> <timestamp>,type:action,<user_id>
> <timestamp>,type:action,<user_id>
> <timestamp>,type:action,<user_id>
> <timestamp>,type:end,<user_id>
>
> each one represents a single user session.
>
> I need to output:
> <timestamp of start event>,<duration between start and end
> event>,<user_id>,<count_of_action_events>
>
> I’m working with event time (specified by the application) and I can’t
> trust the application to close sessions/notify gracefully (I’m happy for
> those to be thrown out, but cool ideas for alternatives are very welcome!).
>
> Any advice would be much appreciated.
>
> Chris Madge
>

Reply via email to