Hi Juliana,

I'm not quite familiar with the python SDK, so I can give just a generic advise. The problem you describe seems to be handled well via stateful dofn [1], where you would hold last timestamp of event per session and setup a timer on each incoming event to the expiration time (if the timestamp of that event is greater than the greatest see so far). Once you receive LOGOUT, you reset this timer and expire the session (probably by unsetting the last received event timestamp). Note that the events will arrive out-of-order generally (not sorted by timestamp), so you must keep the maximal timestamp and update it only with events with higher timestamp.

> In normal python I would keep a dict with each session as key and last timestamp as value. For each new entry of a given key I would check the timedelta. If bigger than window. Expired. Otherwise, update last timestamp. But don't know how to handle in beam.

This is essentially what you should do, just use the stateful API.

Hope this helps,

 Jan

[1] https://beam.apache.org/blog/2017/02/13/stateful-processing.html

[2] https://beam.apache.org/releases/pydoc/2.18.0/apache_beam.transforms.userstate.html

On 2/5/20 12:58 AM, Juliana Pereira wrote:
I have a log web log file that contains sessions id's and interactions, there are three interactions `GET, LOGIN, LOGOUT`. Something like:

```
00:00:01;session1;GET
00:00:03;session2;LOGIN
00:01:01;session1;LOGOUT
00:03:01;session2;GET
00:08:15;session2;GET
```

and goes on.

I want to be able to identify (right now, I'm dealing with bounded data) with sessions were expired. By expired I mean any session that do not have any interaction in a 5 minutes interval.

Of course, if user "LOGOUT", expiration will not be applied. In the data above session 2 should be considered expired.

I have the folloing dataflow
```
( p
      | 'Read Files' >> ReadFromText(known_args.input, coder=LogCoder())
      | ParDo(LogToSession())
      | beam.Map(lambda entry: (entry.session, entry))
      | beam.GroupByKey()
)
```

The `LogCoder()` is responsible to correctly read the input files. The `LogToSession` convert a log line to a Python class that correctly handle the data structure, begin able to acess properties correctly.

For example I can fetch `entry.session` or `entry.timestamp` or `entry.operation`.

Once processed by `LogToSession`, `entry.timestamp` is a python `datetime`, `entry.session` is a `str` and `entry.operation` is also an `str`.

In normal python I would keep a dict with each session as key and last timestamp as value. For each new entry of a given key I would check the timedelta. If bigger than window. Expired. Otherwise, update last timestamp. But don't know how to handle in beam.

How to handle the next steps?

Reply via email to