I have data streaming into my spark scala application in this format
id mark1 mark2 mark3 time
uuid1 100 200 300 Tue Aug 8 14:06:02 PDT 2017
uuid1 100 200 300 Tue Aug 8 14:06:22 PDT 2017
uuid2 150 250 350 Tue Aug 8 14:06:32 PDT 2017
uuid2 150 250 350 Tue Aug 8 14:06:52 PDT 2017
uuid2 150 250 350 Tue Aug 8 14:06:58 PDT 2017
I have it read into columns id, mark1, mark2, mark3 and time. The time is
converted to datetime format as well. I want to get this grouped by id and
get the lag for mark1 which gives the previous row's mark1 value. Something
like this:
id mark1 mark2 mark3 prev_mark time
uuid1 100 200 300 null Tue Aug 8 14:06:02 PDT 2017
uuid1 100 200 300 100 Tue Aug 8 14:06:22 PDT 2017
uuid2 150 250 350 null Tue Aug 8 14:06:32 PDT 2017
uuid2 150 250 350 150 Tue Aug 8 14:06:52 PDT 2017
uuid2 150 250 350 150 Tue Aug 8 14:06:58 PDT 2017
Consider the dataframe to be markDF. I have tried:
val window = Window.partitionBy("uuid").orderBy("timestamp") val newerDF =
newDF.withColumn("prev_mark", lag("mark1", 1, null).over(window))
which says non time windows cannot be applied on streaming/appending
datasets/frames.
I have also tried:
val window =
Window.partitionBy("uuid").orderBy("timestamp").rowsBetween(-10, 10) val
newerDF = newDF.withColumn("prev_mark", lag("mark1", 1, null).over(window))
To get a window for few rows which did not work either. The streaming
window something like: window("timestamp", "10 minutes") cannot be used to
send over the lag. I am super confused on how to do this. Any help would be
awesome!!