Re: Operations dependencies between values with different key in a ConnectedStreams

2017-08-09 Thread Aljoscha Krettek
Hi Gabriele,

Yes, something like this is possible with Flink. However, you have to implement 
a two-stage approach for this that I would roughly call "scatter-gather". You 
have three operators:

input -> Scatter -> State -> Gather -> output

Where the "Scatter" analyses the what state you need for which key, then sends 
requests for those downstream as well as the original message. All of these 
messages must be amended with a unique ID. The "State" operator has the actual 
state, it receives the "request" events and sends the state that it has for the 
given key downstream, again with the unique ID that was generated earlier. The 
"Gather" operator would receive the original message and all the different bits 
of state that were emitted by the "State" operator. Here you need to buffer and 
wait until you receive all the messages for a given unique ID. Once you have 
those you can process.

This is a presentation from ING who implemented that approach (along with a DSL 
for specifying calculations): 
https://www.slideshare.net/FlinkForward/flink-forward-sf-2017-erik-de-nooij-streaming-models-how-ing-adds-models-at-runtime-to-catch-fraudsters
 
.
 It has nice pictures and might be more helpful than my description.

Best,
Aljoscha

> On 28. Jul 2017, at 22:14, Chao Wang  wrote:
> 
> Hi Gabriele,
> 
> I think CEP may be able to deal with this kind of expressions, in general, 
> although I am not sure about how to deal with different time windows (5s and 
> 3s, in your case). Take a look at the available patterns in the CEP library 
> doc: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/libs/cep.html#the-pattern-api
> 
> Chao
> 
> 
> On 07/28/2017 10:08 AM, Gabriele Di Bernardo wrote:
>> Hi guys,
>> 
>> I have a question for you. I have an application with two keyed data 
>> streams: one for control and the other one for the data. Each control 
>> message represents an operation to be performed on the data values marked 
>> with a certain identifier. I connected the two streams and I process the 
>> data with a CoProcessFunction.
>> 
>> The operations I do are really simple like collecting the MAX or the MEAN 
>> value of the last n seconds. Now, I would like to create more complex 
>> operations where the result value of a key might depend by the result value 
>> of another key. To be more clear, I would like to evaluate expressions like: 
>> if {ALL the values of data marked with id 22 in the last 5s} are BIGGER THAN 
>> {The MEAN value of data marked with id 17 in the last 3s}. In this example, 
>> I should defer the evaluation of the expression until I have the MEAN value 
>> of the right part of the expression and check it against ALL the data keyed 
>> with key 22 from the last 5 seconds. I’d like to ask you if something like 
>> this in Flink is doable and what is the best way to do that in your opinion. 
>> I am also checking how the CEP library works (can it be helpful?).
>> 
>> Thank you so much in advance.
>> 
>> Best,
>> 
>> 
>> Gabriele
> 



Re: Operations dependencies between values with different key in a ConnectedStreams

2017-07-28 Thread Chao Wang

Hi Gabriele,

I think CEP may be able to deal with this kind of expressions, in 
general, although I am not sure about how to deal with different time 
windows (5s and 3s, in your case). Take a look at the available patterns 
in the CEP library doc: 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/libs/cep.html#the-pattern-api


Chao


On 07/28/2017 10:08 AM, Gabriele Di Bernardo wrote:

Hi guys,

I have a question for you. I have an application with two keyed data streams: 
one for control and the other one for the data. Each control message represents 
an operation to be performed on the data values marked with a certain 
identifier. I connected the two streams and I process the data with a 
CoProcessFunction.

The operations I do are really simple like collecting the MAX or the MEAN value 
of the last n seconds. Now, I would like to create more complex operations 
where the result value of a key might depend by the result value of another 
key. To be more clear, I would like to evaluate expressions like: if {ALL the 
values of data marked with id 22 in the last 5s} are BIGGER THAN {The MEAN 
value of data marked with id 17 in the last 3s}. In this example, I should 
defer the evaluation of the expression until I have the MEAN value of the right 
part of the expression and check it against ALL the data keyed with key 22 from 
the last 5 seconds. I’d like to ask you if something like this in Flink is 
doable and what is the best way to do that in your opinion. I am also checking 
how the CEP library works (can it be helpful?).

Thank you so much in advance.

Best,


Gabriele