Hi Gagan,

Time attribute fields will be materialized by the unbounded groupby. Also,
currently, the window doesn't have the ability to handle retraction
messages. I see two ways to solve the problem.

- Use multi-window.  The first window performs lastValue, the second
performs count.
- Use two non-window aggregates. In this case, you don't have to change
anything for the first aggregate. For the second one, you can group by an
hour field and perform count(). The code looks like:

SELECT userId,
         count(orderId)
FROM
    (SELECT orderId,
         getHour(orderTime) as myHour,
         lastValue(userId) AS userId,
         lastValue(status) AS status
    FROM orders
    GROUP BY  orderId, orderTime)
WHERE status='PENDING'
GROUP BY myHour, userId

Best,
Hequn




On Sat, Jan 26, 2019 at 12:29 PM Gagan Agrawal <agrawalga...@gmail.com>
wrote:

> Based on the suggestions in this mail thread, I tried out few experiments
> on upsert stream with flink 1.7.1 and here is the issue I am facing with
> window stream.
>
> *1. Global Pending order count. *
> Following query works fine and it's able to handle updates as per original
> requirement.
>
> select userId, count(orderId) from
> (select orderId, lastValue(userId) as userId, lastValue(status) as status
> from orders group by orderId)
> where status='PENDING' group by userId
>
> *2. Last 1 Hour tumbling window count (Append stream)*
> Though following query doesn't handle upsert stream, I just tried to make
> sure time column is working fine. This is working, but as expected, it
> doesn't handle updates on orderId.
>
> select userId, count(orderId) from orders
> where status='PENDING' group by TUMBLE(orderTime, INTERVAL '1' HOUR),
> userId
>
> 3. *Last 1 Hour tumbling window count (With upsert stream)*
> Now I tried combination of above two where input stream is converted to
> upsert stream (via lastValue aggregate function) and then Pending count
> needs to be calculated in last 1 hour window.
>
> select userId, count(orderId) from
> (select orderId, orderTime, lastValue(userId) as userId, lastValue(status)
> as status from orders group by orderId, orderTime)
> where status='PENDING' group by TUMBLE(orderTime, INTERVAL '1' HOUR),
> userId
>
> This one gives me following error. Is this because I have added orderTime
> in group by/select clause and hence it's time characteristics have changed?
> What is the workaround here as without adding orderTime, I can not perform
> window aggregation on upsert stream.
>
> [error] Exception in thread "main"
> org.apache.flink.table.api.ValidationException:* Window can only be
> defined over a time attribute column.*
> [error]         at
> org.apache.flink.table.plan.rules.datastream.DataStreamLogicalWindowAggregateRule.getOperandAsTimeIndicator$1(DataStreamLogicalWindowAggregateRule.scala:84)
> [error]         at
> org.apache.flink.table.plan.rules.datastream.DataStreamLogicalWindowAggregateRule.translateWindowExpression(DataStreamLogicalWindowAggregateRule.scala:89)
> [error]         at
> org.apache.flink.table.plan.rules.common.LogicalWindowAggregateRule.onMatch(LogicalWindowAggregateRule.scala:65)
> [error]         at
> org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:315)
> [error]         at
> org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:556)
> [error]         at
> org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:415)
> [error]         at
> org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:252)
> [error]         at
> org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127)
> [error]         at
> org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:211)
> [error]         at
> org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:198)
> [error]         at
> org.apache.flink.table.api.TableEnvironment.runHepPlanner(TableEnvironment.scala:360)
> [error]         at
> org.apache.flink.table.api.TableEnvironment.runHepPlannerSequentially(TableEnvironment.scala:326)
> [error]         at
> org.apache.flink.table.api.TableEnvironment.optimizeNormalizeLogicalPlan(TableEnvironment.scala:282)
> [error]         at
> org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:811)
> [error]         at
> org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:860)
> [error]         at
> org.apache.flink.table.api.scala.StreamTableEnvironment.toRetractStream(StreamTableEnvironment.scala:205)
> [error]         at
> org.apache.flink.table.api.scala.StreamTableEnvironment.toRetractStream(StreamTableEnvironment.scala:185)
>
> Gagan
>
> On Tue, Jan 22, 2019 at 7:01 PM Gagan Agrawal <agrawalga...@gmail.com>
> wrote:
>
>> Thanks Hequn for your response. I initially thought of trying out "over
>> window" clause, however as per documentation there seems to be limitation
>> in "orderBy" clause where it allows only single time event/processing time
>> attribute. Whereas in my case events are getting generated from mysql bin
>> log where I have seen multiple event updates getting generated with same
>> timestamp (may be because they are part of same transaction) and hence will
>> need bin log offset along with timestamp to be able to sort them correctly.
>> So looks like can't use "over window" until it allows multiple columns in
>> "orderBy". I am exploring option of creating my own window as you suggested
>> to be more flexible.
>>
>> Gagan
>>
>> On Tue, Jan 22, 2019 at 7:23 AM Hequn Cheng <chenghe...@gmail.com> wrote:
>>
>>> Hi Gagan,
>>>
>>> > But I also have a requirement for event time based sliding window
>>> aggregation
>>>
>>> Yes, you can achieve this with Flink TableAPI/SQL. However, currently,
>>> sliding windows don't support early fire, i.e., only output results when
>>> event time reaches the end of the window. Once window fires, the window
>>> state will be cleared and late data belonging to this window will be
>>> ignored. In order to wait for the late event, you can extract
>>> watermark with an offset from the timestamp. For example, make watermark =
>>> timestamp - 5min.
>>>
>>> If event time and early fire is a strong requirement in your scenarios,
>>> you can probably use an over window[1] to solve your problem, say an over
>>> window with 1h preceding. Over window outputs a result for each input.
>>>
>>> If the above solutions can't meet your requirements, you can write a
>>> DataStream job in which define your own window logic[2].
>>>
>>> Best, Hequn
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/table/tableApi.html#over-windows
>>> [2]
>>> https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/windows.html
>>>
>>>
>>>
>>> On Tue, Jan 22, 2019 at 12:58 AM Gagan Agrawal <agrawalga...@gmail.com>
>>> wrote:
>>>
>>>> Thank you guys. It's great to hear multiple solutions to achieve this.
>>>> I understand that records once emitted to Kafka can not be deleted and
>>>> that's acceptable for our use case as last updated value should always be
>>>> correct. However as I understand most of these solutions will work for
>>>> global aggregation which was asked in original question. But I also have
>>>> requirement for event time based sliding window aggregation where same
>>>> order count needs to be maintained for past x hours window (sliding at say
>>>> every 5 minutes). Is it possible to achieve with Table Api / SQL at the
>>>> moment or will require some custom implementation?
>>>>
>>>> For window based upsert stream, there can be few scenarios.
>>>>
>>>> 1. An update to record key comes in same window. E.g Pending (t1) ->
>>>> Success (t2) happens in same window w1. In this case once window
>>>> aggregation is triggered/emitted, such records will be counted as 0
>>>> 2. An update to record key belongs to same window but arrives late. In
>>>> this case old(and already emitted)  window (w1) needs to be re-emitted with
>>>> decreased value.
>>>> 3. An update to record key comes in different window. E.g Pending (t1)
>>>> in window w1 and Success (t2) in w2. I think in this case it may not
>>>> require to re-emit old window w1 as it represents pending count till that
>>>> window time (w1) which is still valid as record moved to Success in next
>>>> window w2 (based on event time).
>>>>
>>>> Gagan
>>>>
>>>>
>>>> On Mon, Jan 21, 2019 at 8:31 PM Piotr Nowojski <pi...@da-platform.com>
>>>> wrote:
>>>>
>>>>> @Jeff: It depends if user can define a time window for his condition.
>>>>> As Gagan described his problem it was about “global” threshold of pending
>>>>> orders.
>>>>>
>>>>>
>>>>>
>>>>> I have just thought about another solution that should work without
>>>>> any custom code. Converting “status” field to status_value int:
>>>>> - "+1” for pending
>>>>> - “-1” for success/failure
>>>>> - “0” otherwise
>>>>>
>>>>> Then running:
>>>>>
>>>>> SELECT uid, SUM(status_value) FROM … GROUP BY uid;
>>>>>
>>>>> Query on top of such stream. Conversion to integers could be made by
>>>>> using `CASE` expression.
>>>>>
>>>>> One thing to note here is that probably all of the proposed solutions
>>>>> would work based on the order of the records, not based on the event_time.
>>>>>
>>>>> Piotrek
>>>>>
>>>>> On 21 Jan 2019, at 15:10, Jeff Zhang <zjf...@gmail.com> wrote:
>>>>>
>>>>> I am thinking of another approach instead of retract stream. Is it
>>>>> possible to define a custom window to do this ? This window is defined for
>>>>> each order. And then you just need to analyze the events in this window.
>>>>>
>>>>> Piotr Nowojski <pi...@da-platform.com> 于2019年1月21日周一 下午8:44写道:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> There is a missing feature in Flink Table API/SQL of supporting
>>>>>> retraction streams as the input (or conversions from append stream to
>>>>>> retraction stream) at the moment. With that your problem would simplify 
>>>>>> to
>>>>>> one simple `SELECT uid, count(*) FROM Changelog GROUP BY uid`. There is 
>>>>>> an
>>>>>> ongoing work with related work [1], so this might be supported in the 
>>>>>> next
>>>>>> couple of months.
>>>>>>
>>>>>> There might a workaround at the moment that could work. I think you
>>>>>> would need to write your own custom `LAST_ROW(x)` aggregation function,
>>>>>> which would just return the value of the most recent aggregated row. With
>>>>>> that you could write a query like this:
>>>>>>
>>>>>> SELECT
>>>>>> uid, count(*)
>>>>>> FROM (
>>>>>> SELECT
>>>>>> *
>>>>>> FROM (
>>>>>> SELECT
>>>>>> uid, LAST_ROW(status)
>>>>>> FROM
>>>>>> changelog
>>>>>> GROUP BY
>>>>>> uid, oid)
>>>>>> WHERE status = `pending`)
>>>>>> GROUP BY
>>>>>> uid
>>>>>>
>>>>>> Where `changelog` is an append only stream with the following content:
>>>>>>
>>>>>> *user, order, status, event_time*
>>>>>> u1, o1, pending, t1
>>>>>> u2, o2, failed, t2
>>>>>> *u1, o3, pending, t3*
>>>>>> *u1, o3, success, t4*
>>>>>> u2, o4, pending, t5
>>>>>> u2, o4, pending, t6
>>>>>>
>>>>>>
>>>>>>
>>>>>> Besides that, you could also write your own a relatively simple Data
>>>>>> Stream application to do the same thing.
>>>>>>
>>>>>> I’m CC’ing Timo, maybe he will have another better idea.
>>>>>>
>>>>>> Piotrek
>>>>>>
>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-8577
>>>>>>
>>>>>> On 18 Jan 2019, at 18:30, Gagan Agrawal <agrawalga...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>> Hi,
>>>>>> I have a requirement and need to understand if same can be achieved
>>>>>> with Flink retract stream. Let's say we have stream with 4 attributes
>>>>>> userId, orderId, status, event_time where orderId is unique and hence any
>>>>>> change in same orderId updates previous value as below
>>>>>>
>>>>>> *Changelog* *Event Stream*
>>>>>>
>>>>>> *user, order, status, event_time*
>>>>>> u1, o1, pending, t1
>>>>>> u2, o2, failed, t2
>>>>>> *u1, o3, pending, t3*
>>>>>> *u1, o3, success, t4*
>>>>>> u2, o4, pending, t5
>>>>>> u2, o4, pending, t6
>>>>>>
>>>>>> *Snapshot view at time t6 (as viewed in mysql)*
>>>>>> u1, o1, pending, t1
>>>>>> u2, o2, failed, t2
>>>>>> u1, o3, success, t4
>>>>>> u4, o4, pending, t6
>>>>>> (Here rows at time t3 and t5 are deleted as they have been updated
>>>>>> for respective order ids)
>>>>>>
>>>>>> What I need is to maintain count of "Pending" orders against a user
>>>>>> and if they go beyond configured threshold, then push that user and 
>>>>>> pending
>>>>>> count to Kafka. Here there can be multiple updates to order status e.g
>>>>>> Pending -> Success or Pending -> Failed. Also in some cases there may not
>>>>>> be any change in status but we may still get a row (may be due to some
>>>>>> other attribute update which we are not concerned about). So is it 
>>>>>> possible
>>>>>> to have running count in flink as below at respective event times. Here
>>>>>> Pending count is decreased from 2 to 1 for user u1 at t4 since one of 
>>>>>> it's
>>>>>> order status was changed from Pending to Success. Similarly for user u2, 
>>>>>> at
>>>>>> time t6, there was no change in running count as there was no change in
>>>>>> status for order o4
>>>>>>
>>>>>> t1 -> u1 : 1, u2 : 0
>>>>>> t2 -> u1 : 1, u2 : 0
>>>>>> t3 -> u1 : 2, u2 : 0
>>>>>> *t4 -> u1 : 1, u2 : 0 (since o3 moved pending to success, so count is
>>>>>> decreased for u1)*
>>>>>> t5 -> u1 : 1, u2 : 1
>>>>>> *t6 -> u1 : 1, u2 : 1 (no increase in count of u2 as o4 update has no
>>>>>> change)*
>>>>>>
>>>>>> As I understand may be retract stream can achieve this. However I am
>>>>>> not sure how. Any samples around this would be of great help.
>>>>>>
>>>>>> Gagan
>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>> --
>>>>> Best Regards
>>>>>
>>>>> Jeff Zhang
>>>>>
>>>>>
>>>>>

Reply via email to