Hi, I have DF of user events from log file and I'm trying to construct sessions of same user. Session is list of events from same user, where time difference between two consecutive events (when sorted by time) within the session isn't greater than 30 minutes.
Currently I'm using DF's Lag function to compare current events predecessor event. val win = Window.partitionBy("user_id").orderBy("time") val prevTime = lag("time", 1).over(win)val journey = when(col("time") - prevTime > 30 * 60, 1).otherwise(0) dataset.select(col("*"), journey.as("session_id", metadata)) And I'm getting this DF as a result +---+-------+----------+----------+ | id|user_id| time|session_id| +---+-------+----------+----------+ | 1| u1|1454372971| 0| | 2| u1|1454373571| 0| | 3| u1|1454374171| 0| | 4| u1|1454376571| 1| | 5| u1|1454377171| 0| | 6| u2|1454377771| 0| | 7| u2|1454380771| 1| | 8| u2|1454381371| 0| | 9| u2|1454381971| 0| +---+-------+----------+----------+ In this case session_id = 1 when new session has started. But instead of that I want to have some unique identifier for each session per user like: +---+-------+----------+----------+ | id|user_id| time|session_id| +---+-------+----------+----------+ | 1| u1|1454372971| 1| | 2| u1|1454373571| 1| | 3| u1|1454374171| 1| | 4| u1|1454376571| 2| | 5| u1|1454377171| 2| | 6| u2|1454377771| 1| | 7| u2|1454380771| 2| | 8| u2|1454381371| 2| | 9| u2|1454381971| 2| +---+-------+----------+----------+ How can I do it with Sparks Window functions efficiently? Thanks, Giorgi