I'm trying to build a Beam pipeline to aggregate events from an
unbounded source.
These events work as follows:
1. Some event with a given ID occurs
* This ID may be re-used but will _never_ be re-used within *Y*
seconds
2. Any number of listeners report that event -- the *event* times will
all be within a couple ms of each other
3. These events need to be aggregated by `id` to form some output message
* The order of the messages is not important, just the fact that
they are included
For example, given an input message in CSV format of `id,message`, I
would expect the inputs below:
"1,this "
"1,is "
"1,event "
"2,two "
"1,one "
"2,chiming "
"2,in "
to give the outputs:
"1,this is event one"
"2,two chiming in"
With network latency, messages will have different amounts of lag
getting to Beam. I'm more concerned with latency than completeness --
it's fine if we miss a couple messages from listeners -- and the
downstream processing is not currently equipped to re-process events if
late data comes in, so I'm happy to completely discard late data.
What I'd like to express in Beam is as follows:
* A message with event ID *1* arrives → start a new window, wait *X*
seconds to see if another message with event ID *A* arrives
o A message with event ID *1* arrives in _less than_ *X* seconds →
add it to the window and wait another *X* seconds
o A message with event ID *1* arrives in _more than_ *X* seconds
but _less than_ *Y* seconds → discard it
o A message with event ID *1* arrives in _more than_ *Y* seconds →
start a new window
* If the *X* second timer for a window expires, trigger a pane with
whatever events have happened to come in
At first, session windows seem like a great candidate: I can set the gap
duration to *X* and events within *X* seconds of each other will be
assigned to the same window -- great! However, I'm unable to drop events
that are more than *X* but less than *Y* seconds apart -- Beam will
instead consider this a new session.
How could I configure Beam to trigger *X* seconds after the most recent
element and drop elements that arrive *X* < `arrival time` < *Y*?
Cheers,
Tomas
_
_Note: it feels like I'm fighting the framework here, which is often a
sign that I could maybe approach it from a framework point-of-view -- if
there are better ways to address this use-case that are more in line
with Beam's philosophy I'd love to hear them too!