Hi, I can't see neither wrong nor expected output in your message, can you re-attach it? Could you provide the code of your pipeline including the view creation? Are you using Blink planner (can be chosen by useBlinkPlanner [1])?
[1] https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/common.html#main-differences-between-the-two-planners Regards, Roman On Sun, Feb 21, 2021 at 9:40 AM s_penakalap...@yahoo.com < s_penakalap...@yahoo.com> wrote: > Hi All, > > I am using Flink1.12, I am trying to read realtime data from Kafka topic > and as per the requirement I need to implement windowing LAG function. > Approach I followed is below: > > DataStream vData = env.addSource(...) > vData.keyBy(Id) > createTemperoryView > then apply flink sql. > > My sample data is like below, vTime field contains the timestamp when the > even was generated and vNumSeq is the unique number for particular group Id. > > I tried Lag function by ordering by vSeq field (long datatype), Job failed > with "OVER windows' ordering in stream mode must be defined on a time > attribute". > > I even tried by using vTime field (eventTS is also long datatype). I tried > converting this field to sql.Timestamp, still no luck Job failed with above > error. > > When I referred few documents solution provided was to use > proctime/rowtime. So I modified the query to use proctime() Job succeeded > but with wrong results. > > Kindly help with simple example badly stuck. I am ok to use even > Datastream API to implement lag functionality. > > Lag Query: > select vdata.f0 as id, vdata.f1 as name, vdata.f2 as vTime, vdata.f3 as > vSeq, vdata.f4 as currentSal, LAG(vdata.f4,1,0) OVER ( partition BY > vdata.f0 ORDER BY proctime()) AS prevSal from VData vData > > Wrong output : > > > Expected: > > > Regards, > Sunitha. > >