Hi,

I can't see neither wrong nor expected output in your message, can you
re-attach it?
Could you provide the code of your pipeline including the view creation?
Are you using Blink planner (can be chosen by useBlinkPlanner [1])?


[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/common.html#main-differences-between-the-two-planners

Regards,
Roman


On Sun, Feb 21, 2021 at 9:40 AM s_penakalap...@yahoo.com <
s_penakalap...@yahoo.com> wrote:

> Hi All,
>
> I am using Flink1.12, I am trying to read realtime data from Kafka topic
> and as per the requirement I need to implement windowing LAG function.
> Approach I followed is below:
>
> DataStream vData = env.addSource(...)
> vData.keyBy(Id)
> createTemperoryView
> then apply flink sql.
>
> My sample data is like below, vTime field contains the timestamp when the
> even was generated and vNumSeq is the unique number for particular group Id.
>
> I tried Lag function by ordering by vSeq field (long datatype), Job failed
> with "OVER windows' ordering in stream mode must be defined on a time
> attribute".
>
> I even tried by using vTime field (eventTS is also long datatype). I tried
> converting this field to sql.Timestamp, still no luck Job failed with above
> error.
>
> When I referred few documents solution provided was to use
> proctime/rowtime. So I modified the query to use proctime() Job succeeded
> but with wrong results.
>
> Kindly help with simple example badly stuck. I am ok to use even
> Datastream API to implement lag functionality.
>
> Lag Query:
> select vdata.f0 as id, vdata.f1 as name, vdata.f2 as vTime, vdata.f3 as
> vSeq, vdata.f4 as currentSal, LAG(vdata.f4,1,0) OVER ( partition BY
> vdata.f0 ORDER BY proctime()) AS prevSal from VData vData
>
> Wrong output :
>
>
> Expected:
>
>
> Regards,
> Sunitha.
>
>

Reply via email to