Hi Ivan,

You can certainly do these things with Flink.
Michael pointed you in a good direction if you want to implement the logic
in the DataStream API / ProcessFunctions.

Flink's SQL support should also be able to handle the use case you
described.

The "ingredients" would be
- a TUMBLE window [1] with a TUMBLE_ROWTIME [2] in the SELECT clause to
forward the rowtime of the window.
- an OVER window with an ORDER BY time ROW for the sliding window (just
forward the time attribute in SELECT) [3]
- a windowed join to join the row event with the smoothed aggregate [4]

Best,
Fabian

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/table/sql.html#group-windows
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/table/sql.html#selecting-group-window-start-and-end-timestamps
[3]
https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/table/sql.html#aggregations
[4]
https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/table/sql.html#joins

2018-04-12 14:13 GMT-07:00 TechnoMage <mla...@technomage.com>:

> Given the data from a window can not arrive before any of the data in that
> window, it will always arrive after the raw data for the same period, and
> may have some latency relative to the raw data.  If your
> RichFlatMapFunction uses a ListState to hold more than one window worth of
> raw and smoothed data, you should be able to get what you want.  Given
> distributed systems and relative time I am not sure you will get simpler
> than that.
>
> Michael
>
>
> On Apr 12, 2018, at 7:52 AM, Ivan Wang <ivan.w...@augmentum.com> wrote:
>
> Thanks Michael very much, it helps a lot!
>
> I tried what you suggest and now I can compare smoothed data with raw date
> in coFlat method.
> However, it cannot ensure that the smoothed data is coming in the expected
> way.  Basically for every raw event, I’d like to refer to the early but
> closest event in smoothed data. However, it cannot be guaranteed by
> default. For example, we raw event comes with event time 13:01:39, I’d like
> to refer to smoothed event with event time 13:01:30 due to 15 seconds
> interval. But the latter only arrives after raw event 13:01:58, this
> happens at least in batch processing when I did historical analysis.
>
> I corrected the order by using key state in coFlatMap method. I stored the
> latest smoothed event and queued raw event if they arrive too early.
>
> My question is that is there any better and straightforward way to correct
> the order? Because it makes the code hard to read. I’m thinking about
> watermark, but not sure how to do this.
>
>
> --
> Thanks
> Ivan
> *From: *TechnoMage <mla...@technomage.com>
> *Date: *Thursday, 12 April 2018 at 3:21 AM
> *To: *Ivan Wang <ivan.wang2...@gmail.com>
> *Cc: *"user@flink.apache.org" <user@flink.apache.org>
> *Subject: *Re: Is Flink able to do real time stock market analysis?
>
> I am new to Flink so others may have more complete answer or correct me.
>
> If you are counting the events in a tumbling window you will get output at
> the end of each tumbling window, so a running count of events/window.  It
> sounds like you want to compare the raw data to the smoothed data?  You can
> use a CoFlatMap to receive both streams and output any records you like,
> say a Tuple with the raw and smoothed value.  If you use a RichCoFlatMap
> you can track state, so you could keep a list of the last 20 or so raw and
> smoothed values so you can align them.
>
> Michael
>
>
> On Apr 10, 2018, at 6:40 PM, Ivan Wang <ivan.wang2...@gmail.com> wrote:
>
> Hi all,
>
> I've spent nearly 2 weeks trying to figure a solution to my requirement as
> below. If anyone can advise, that would be great.
>
> 1. There're going to be 2000 transactions per second as StreamRaw, I'm
> going to tumbleWindow them as StreamA, let's say every 15 seconds. Then I'm
> going to countWindow StreamA as StreamB, let's say every 20 events.
>
> 2. For every event in  StreamRaw as E, I need to find exact one event in
> StreamB which is earlier than E and closest to E. Then some comparison will
> be proceeded. For example, if timestamp in E is 9:46:38, there should be an
> event in StreamB with timestamp 9:46:30 because I use 15 seconds interval.
>
>
> I tried CEP using StreamRaw, however, I didn't figure out how to involve
> StreamB and get the exact one event in condition method.
>
>
> I tried tableAPI and SQL, it throws time attribute error during the second
> window method.
>
>
> *window(Tumble).group().select().window(Slide).group().select()*
>
>
> Seems there's no way to tell Flink the time attribute after the first
> window.group(). I then tried to convert it into table first then
> leftoutJoin them. But Flink tells me it's not supported.
>
> Is Flink able to do this? If not, I'll go for other alternatives. Thanks
> again if someone can help.
>
>
>

Reply via email to