Hi Juliana,
I'm not quite familiar with the python SDK, so I can give just a generic
advise. The problem you describe seems to be handled well via stateful
dofn [1], where you would hold last timestamp of event per session and
setup a timer on each incoming event to the expiration time (if the
timestamp of that event is greater than the greatest see so far). Once
you receive LOGOUT, you reset this timer and expire the session
(probably by unsetting the last received event timestamp). Note that the
events will arrive out-of-order generally (not sorted by timestamp), so
you must keep the maximal timestamp and update it only with events with
higher timestamp.
> In normal python I would keep a dict with each session as key and
last timestamp as value. For each new entry of a given key I would check
the timedelta. If bigger than window. Expired. Otherwise, update last
timestamp. But don't know how to handle in beam.
This is essentially what you should do, just use the stateful API.
Hope this helps,
Jan
[1] https://beam.apache.org/blog/2017/02/13/stateful-processing.html
[2]
https://beam.apache.org/releases/pydoc/2.18.0/apache_beam.transforms.userstate.html
On 2/5/20 12:58 AM, Juliana Pereira wrote:
I have a log web log file that contains sessions id's and
interactions, there are three interactions `GET, LOGIN, LOGOUT`.
Something like:
```
00:00:01;session1;GET
00:00:03;session2;LOGIN
00:01:01;session1;LOGOUT
00:03:01;session2;GET
00:08:15;session2;GET
```
and goes on.
I want to be able to identify (right now, I'm dealing with bounded
data) with sessions were expired. By expired I mean any session that
do not have any interaction in a 5 minutes interval.
Of course, if user "LOGOUT", expiration will not be applied. In the
data above session 2 should be considered expired.
I have the folloing dataflow
```
( p
| 'Read Files' >> ReadFromText(known_args.input, coder=LogCoder())
| ParDo(LogToSession())
| beam.Map(lambda entry: (entry.session, entry))
| beam.GroupByKey()
)
```
The `LogCoder()` is responsible to correctly read the input files. The
`LogToSession` convert a log line to a Python class that correctly
handle the data structure, begin able to acess properties correctly.
For example I can fetch `entry.session` or `entry.timestamp` or
`entry.operation`.
Once processed by `LogToSession`, `entry.timestamp` is a python
`datetime`, `entry.session` is a `str` and `entry.operation` is also
an `str`.
In normal python I would keep a dict with each session as key and last
timestamp as value. For each new entry of a given key I would check
the timedelta. If bigger than window. Expired. Otherwise, update last
timestamp. But don't know how to handle in beam.
How to handle the next steps?