Thanks Fabian. I tried to use "rowtime" and Flink tells me below exception:

*Exception in thread "main" org.apache.flink.table.api.ValidationException:
SlidingGroupWindow('w2, 'end, 150.rows, 1.rows) is invalid: Event-time
grouping windows on row intervals in a stream environment are currently not
supported.*

Then I tried to OverWindows, luckily it can serve my requirement as well.
Now my table query is like below

.window(Tumble.over("15.seconds").on("timeMill").as("w1"))
.groupBy("symbol, w1").select("(w1.rowtime) as end, symbol, price.max
as p_max, price.min as p_min")
.window(Over.partitionBy("symbol").orderBy("end").preceding("149.rows").as("w2"))
.select("symbol as symbol_, end, p_max.max over w2 as max, p_min.min
over w2 as min");


It works and I can get what I want. However, the result is not ordered by
the rowtime (here I use "end" as alias). Is this by default and any thing
to get it ordered?

Below is the entire requirement,

Basically there's one raw stream (r1), and I group it first by time as w1
then by window count as w2. I'd like to compare the "price" field in every
raw event with the same field in the most close preceding event in w2.
If condition meets, I'd like to use the price value and timestamp in that
event to get one matching event from another raw stream (r2).

CEP sounds to be a good idea. But I need to refer to event in other stream
(r2) in current pattern condition (r1). Is it possible to do this using CEP?

Thanks
Ivan



On Mon, Apr 16, 2018 at 4:01 PM, Fabian Hueske <fhue...@gmail.com> wrote:

> Sorry, I forgot to CC the user mailing list in my reply.
>
> 2018-04-12 17:27 GMT+02:00 Fabian Hueske <fhue...@gmail.com>:
>
>> Hi,
>>
>> Assuming you are using event time, the right function to generate a row
>> time attribute from a window would be "w1.rowtime" instead of "w1.start".
>>
>> The reason why Flink is picky about this is that we must ensure that the
>> result rows of the windows are aligned with the watermarks of the stream.
>>
>> Best, Fabian
>>
>>
>> Ivan Wang <ivan.wang2...@gmail.com> schrieb am So., 8. Apr. 2018, 22:26:
>>
>>> Hi all,
>>>
>>>
>>>
>>> I'd like to use 2 window group in a chain in my program as below.
>>>
>>>
>>>
>>> Table myTable = cTable
>>>         .window(Tumble.*over*("15.seconds").on("timeMill").as("w1"))
>>>         .groupBy("symbol, w1").select("w1.start as start, w1.end as
>>> end, symbol, price.max as p_max, price.min as p_min")
>>>         .window(Slide.*over*("150.rows").every("1.rows").on("start").as(
>>> "w2"))
>>>         .groupBy("symbol, w2").select("w2.start, w2.end, symbol,
>>> p_max.max, p_min.min")
>>>         ;
>>>
>>>
>>>
>>>
>>>
>>> However, it throws error: SlidingGroupWindow('w2, 'start, 150.rows,
>>> 1.rows) is invalid: Sliding window expects a time attribute for grouping in
>>> a stream environment.
>>>
>>>          at org.apache.flink.table.plan.logical.LogicalNode.failValidati
>>> on(LogicalNode.scala:149)
>>>
>>>          at org.apache.flink.table.plan.logical.WindowAggregate.validate
>>> (operators.scala:658)
>>>
>>>          at org.apache.flink.table.api.WindowGroupedTable.select(table.
>>> scala:1159)
>>>
>>>          at org.apache.flink.table.api.WindowGroupedTable.select(table.
>>> scala:1179)
>>>
>>>          at minno.gundam.ReadPattern.main(ReadPattern.java:156)
>>>
>>>
>>>
>>> Is there any way to assign time attribute after the first groupBy (w1)?
>>>
>>>
>>>
>>> Thanks
>>>
>>> Ivan
>>>
>>>
>>>
>>>
>

Reply via email to