Hi,

I have DF of user events from log file and I'm trying to construct sessions
of same user. Session is list of events from same user, where time
difference between two consecutive events (when sorted by time) within the
session isn't greater than 30 minutes.

Currently I'm using DF's Lag function to compare current events predecessor
event.


val win = Window.partitionBy("user_id").orderBy("time")
val prevTime = lag("time", 1).over(win)val journey = when(col("time")
- prevTime  > 30 * 60, 1).otherwise(0)

dataset.select(col("*"), journey.as("session_id", metadata))

And I'm getting this DF as a result

+---+-------+----------+----------+
| id|user_id|      time|session_id|
+---+-------+----------+----------+
|  1|     u1|1454372971|         0|
|  2|     u1|1454373571|         0|
|  3|     u1|1454374171|         0|
|  4|     u1|1454376571|         1|
|  5|     u1|1454377171|         0|
|  6|     u2|1454377771|         0|
|  7|     u2|1454380771|         1|
|  8|     u2|1454381371|         0|
|  9|     u2|1454381971|         0|
+---+-------+----------+----------+

In this case session_id = 1 when new session has started. But instead of
that I want to have some unique identifier for each session per user like:

+---+-------+----------+----------+
| id|user_id|      time|session_id|
+---+-------+----------+----------+
|  1|     u1|1454372971|         1|
|  2|     u1|1454373571|         1|
|  3|     u1|1454374171|         1|
|  4|     u1|1454376571|         2|
|  5|     u1|1454377171|         2|
|  6|     u2|1454377771|         1|
|  7|     u2|1454380771|         2|
|  8|     u2|1454381371|         2|
|  9|     u2|1454381971|         2|
+---+-------+----------+----------+


How can I do it with Sparks Window functions efficiently?

Thanks,
Giorgi

Reply via email to