Thanks Michael very much, it helps a lot!
I tried what you suggest and now I can compare smoothed data with raw date in
However, it cannot ensure that the smoothed data is coming in the expected way.
Basically for every raw event, I’d like to refer to the early but closest
event in smoothed data. However, it cannot be guaranteed by default. For
example, we raw event comes with event time 13:01:39, I’d like to refer to
smoothed event with event time 13:01:30 due to 15 seconds interval. But the
latter only arrives after raw event 13:01:58, this happens at least in batch
processing when I did historical analysis.
I corrected the order by using key state in coFlatMap method. I stored the
latest smoothed event and queued raw event if they arrive too early.
My question is that is there any better and straightforward way to correct the
order? Because it makes the code hard to read. I’m thinking about watermark,
but not sure how to do this.
From: TechnoMage <mla...@technomage.com>
Date: Thursday, 12 April 2018 at 3:21 AM
To: Ivan Wang <ivan.wang2...@gmail.com>
Cc: "firstname.lastname@example.org" <email@example.com>
Subject: Re: Is Flink able to do real time stock market analysis?
I am new to Flink so others may have more complete answer or correct me.
If you are counting the events in a tumbling window you will get output at the
end of each tumbling window, so a running count of events/window. It sounds
like you want to compare the raw data to the smoothed data? You can use a
CoFlatMap to receive both streams and output any records you like, say a Tuple
with the raw and smoothed value. If you use a RichCoFlatMap you can track
state, so you could keep a list of the last 20 or so raw and smoothed values so
you can align them.
On Apr 10, 2018, at 6:40 PM, Ivan Wang
I've spent nearly 2 weeks trying to figure a solution to my requirement as
below. If anyone can advise, that would be great.
1. There're going to be 2000 transactions per second as StreamRaw, I'm going to
tumbleWindow them as StreamA, let's say every 15 seconds. Then I'm going to
countWindow StreamA as StreamB, let's say every 20 events.
2. For every event in StreamRaw as E, I need to find exact one event in
StreamB which is earlier than E and closest to E. Then some comparison will be
proceeded. For example, if timestamp in E is 9:46:38, there should be an event
in StreamB with timestamp 9:46:30 because I use 15 seconds interval.
I tried CEP using StreamRaw, however, I didn't figure out how to involve
StreamB and get the exact one event in condition method.
I tried tableAPI and SQL, it throws time attribute error during the second
Seems there's no way to tell Flink the time attribute after the first
window.group(). I then tried to convert it into table first then leftoutJoin
them. But Flink tells me it's not supported.
Is Flink able to do this? If not, I'll go for other alternatives. Thanks again
if someone can help.