Hi Ivan, You can certainly do these things with Flink. Michael pointed you in a good direction if you want to implement the logic in the DataStream API / ProcessFunctions.
Flink's SQL support should also be able to handle the use case you described. The "ingredients" would be - a TUMBLE window [1] with a TUMBLE_ROWTIME [2] in the SELECT clause to forward the rowtime of the window. - an OVER window with an ORDER BY time ROW for the sliding window (just forward the time attribute in SELECT) [3] - a windowed join to join the row event with the smoothed aggregate [4] Best, Fabian [1] https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/table/sql.html#group-windows [2] https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/table/sql.html#selecting-group-window-start-and-end-timestamps [3] https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/table/sql.html#aggregations [4] https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/table/sql.html#joins 2018-04-12 14:13 GMT-07:00 TechnoMage <mla...@technomage.com>: > Given the data from a window can not arrive before any of the data in that > window, it will always arrive after the raw data for the same period, and > may have some latency relative to the raw data. If your > RichFlatMapFunction uses a ListState to hold more than one window worth of > raw and smoothed data, you should be able to get what you want. Given > distributed systems and relative time I am not sure you will get simpler > than that. > > Michael > > > On Apr 12, 2018, at 7:52 AM, Ivan Wang <ivan.w...@augmentum.com> wrote: > > Thanks Michael very much, it helps a lot! > > I tried what you suggest and now I can compare smoothed data with raw date > in coFlat method. > However, it cannot ensure that the smoothed data is coming in the expected > way. Basically for every raw event, I’d like to refer to the early but > closest event in smoothed data. However, it cannot be guaranteed by > default. For example, we raw event comes with event time 13:01:39, I’d like > to refer to smoothed event with event time 13:01:30 due to 15 seconds > interval. But the latter only arrives after raw event 13:01:58, this > happens at least in batch processing when I did historical analysis. > > I corrected the order by using key state in coFlatMap method. I stored the > latest smoothed event and queued raw event if they arrive too early. > > My question is that is there any better and straightforward way to correct > the order? Because it makes the code hard to read. I’m thinking about > watermark, but not sure how to do this. > > > -- > Thanks > Ivan > *From: *TechnoMage <mla...@technomage.com> > *Date: *Thursday, 12 April 2018 at 3:21 AM > *To: *Ivan Wang <ivan.wang2...@gmail.com> > *Cc: *"user@flink.apache.org" <user@flink.apache.org> > *Subject: *Re: Is Flink able to do real time stock market analysis? > > I am new to Flink so others may have more complete answer or correct me. > > If you are counting the events in a tumbling window you will get output at > the end of each tumbling window, so a running count of events/window. It > sounds like you want to compare the raw data to the smoothed data? You can > use a CoFlatMap to receive both streams and output any records you like, > say a Tuple with the raw and smoothed value. If you use a RichCoFlatMap > you can track state, so you could keep a list of the last 20 or so raw and > smoothed values so you can align them. > > Michael > > > On Apr 10, 2018, at 6:40 PM, Ivan Wang <ivan.wang2...@gmail.com> wrote: > > Hi all, > > I've spent nearly 2 weeks trying to figure a solution to my requirement as > below. If anyone can advise, that would be great. > > 1. There're going to be 2000 transactions per second as StreamRaw, I'm > going to tumbleWindow them as StreamA, let's say every 15 seconds. Then I'm > going to countWindow StreamA as StreamB, let's say every 20 events. > > 2. For every event in StreamRaw as E, I need to find exact one event in > StreamB which is earlier than E and closest to E. Then some comparison will > be proceeded. For example, if timestamp in E is 9:46:38, there should be an > event in StreamB with timestamp 9:46:30 because I use 15 seconds interval. > > > I tried CEP using StreamRaw, however, I didn't figure out how to involve > StreamB and get the exact one event in condition method. > > > I tried tableAPI and SQL, it throws time attribute error during the second > window method. > > > *window(Tumble).group().select().window(Slide).group().select()* > > > Seems there's no way to tell Flink the time attribute after the first > window.group(). I then tried to convert it into table first then > leftoutJoin them. But Flink tells me it's not supported. > > Is Flink able to do this? If not, I'll go for other alternatives. Thanks > again if someone can help. > > >