Hi All,
I am using Flink1.12, I am trying to read realtime data from Kafka topic and as 
per the requirement I need to implement windowing LAG function. Approach I 
followed is below:
DataStream vData = env.addSource(...)vData.keyBy(Id)
createTemperoryViewthen apply flink sql.
My sample data is like below, vTime field contains the timestamp when the even 
was generated and vNumSeq is the unique number for particular group Id.
I tried Lag function by ordering by vSeq field (long datatype), Job failed with 
"OVER windows' ordering in stream mode must be defined on a time attribute". 
I even tried by using vTime field (eventTS is also long datatype). I tried 
converting this field to sql.Timestamp, still no luck Job failed with above 
error.
When I referred few documents solution provided was to use proctime/rowtime. So 
I modified the query to use proctime() Job succeeded but with wrong results.
Kindly help with simple example badly stuck. I am ok to use even Datastream API 
to implement lag functionality.
Lag Query:select vdata.f0 as id, vdata.f1 as name, vdata.f2 as vTime, vdata.f3 
as vSeq, vdata.f4 as currentSal, LAG(vdata.f4,1,0) OVER ( partition BY vdata.f0 
ORDER BY proctime()) AS prevSal from VData vData 

Wrong output :

Expected:

Regards,Sunitha.

Reply via email to