Hi Martin, Aljoscha

I think Aljoscha is right. My origin thought was to keep the state only
after a lambda function coming.

Use Aljoscha's scenario as example, initially, all data will be discarded
because there is no any lambdas. When lambda f1 [D, E] and f2 [A, C] comes,
A, C begin to be routed to machine "0" and D, E begin to be routed to
machine "1". Then, when we get a new lambda f3 [C, D], we can duplicate C,
D and route these copies to machine "2".

However, after reading your example again, I found what you want is a whole
picture for all variables' state in a global view, so that no matter what
time a new lambda comes it can always get its variables' state immediately. In
that case, I have the same opinion as Aljoscha.

Best,
Tony Wei

2017-09-01 23:59 GMT+08:00 Aljoscha Krettek <aljos...@apache.org>:

> Hi Martin,
>
> I think with those requirements this is very hard (or maybe impossible) to
> do efficiently in a distributed setting. It might be that I'm
> misunderstanding things but let's look at an example. Assume that
> initially, we don't have any lambdas, so data can be sent to any machine
> because it doesn't matter where they go. Now, we get a new lambda f2 [A,
> C]. Say this gets routed to machine "0", now this means that messages with
> key A and C also need to be router to machine "0". Now, we get a new lambda
> f1 [D, E], say this gets routed to machine "2", meaning that messages with
> key D and E are also routed to machine "2".
>
> Then, we get a new lambda f3 [C, D]. Do we now re-route all previous
> lambdas and inputs to different machines? They all have to go to the same
> machine, but which one? I'm currently thinking that there would need to be
> some component that does the routing, but this has to be global, so it's
> hard to do in a distributed setting.
>
> What do you think?
>
> Best,
> Aljoscha
>
> On 1. Sep 2017, at 07:17, Martin Eden <martineden...@gmail.com> wrote:
>
> This might be a way forward but since side inputs are not there I will try
> and key the control stream by the keys in the first co flat map.
>
> I'll see how it goes.
>
> Thanks guys,
> M
>
> On Thu, Aug 31, 2017 at 5:16 PM, Tony Wei <tony19920...@gmail.com> wrote:
>
>> Hi Martin,
>>
>> Yes, that is exactly what I thought.
>> But the first step also needs to be fulfilled  by SideInput. I'm not sure
>> how to achieve this in the current release.
>>
>> Best,
>> Tony Wei
>>
>> Martin Eden <martineden...@gmail.com>於 2017年8月31日 週四,下午11:32寫道:
>>
>>> Hi Aljoscha, Tony,
>>>
>>> Aljoscha:
>>> Yes it's the first option you mentioned.
>>> Yes, the stream has multiple values in flight for A, B, C. f1 needs to
>>> be applied each time a new value for either A, B or C comes in. So we need
>>> to use state to cache the latest values. So using the example data stream
>>> in my first msg the emitted stream should be:
>>>
>>> 1. Data Stream:
>>> KEY VALUE TIME
>>> .
>>> .
>>> .
>>> C      V6        6
>>> B      V6        6
>>> A      V5        5
>>> A      V4        4
>>> C      V3        3
>>> A      V3        3
>>> B      V3        3
>>> B      V2        2
>>> A      V1        1
>>>
>>> 2. Control Stream:
>>> Lambda  ArgumentKeys TIME
>>> .
>>> .
>>> .
>>> f2            [A, C]                 4
>>> f1            [A, B, C]            1
>>>
>>> 3. Expected emitted stream:
>>> TIME    VALUE
>>> .
>>> .
>>> .
>>> 6          f1(V5, V6, V3)
>>>             f1(V5, V6, V6)
>>>             f2(V5, V6)
>>> 5          f1(V5, V3, V3)
>>>             f2(V5, V3)
>>> 4          f1(V4, V3, V3)
>>>             f2(V4, V3)
>>> 3          f1(V3, V3, V3)
>>> 2          -
>>> 1          -
>>>
>>> So essentially as soon as the argument list fills up then we apply the
>>> function/lambda at each new arriving message in the data stream for either
>>> argument key.
>>>
>>> Tony:
>>> Yes we need to group by and pass to the lambda.
>>> Ok, so what you are proposing might work. So your solution assumes that
>>> we have to connect with the control stream twice? Once for the tagging and
>>> another time re-connect-ing the control stream with the tagged stream for
>>> the actual application of the function/lambda?
>>>
>>> Thanks,
>>> Alex
>>>
>>>
>>>
>>> On Thu, Aug 31, 2017 at 2:57 PM, Aljoscha Krettek <aljos...@apache.org>
>>> wrote:
>>>
>>>> Hi Martin,
>>>>
>>>> In your original example, what does this syntax mean exactly:
>>>>
>>>> f1            [A, B, C]            1
>>>>
>>>> Does it mean that f1 needs one A, one B and one C from the main stream?
>>>> If yes, which ones, because there are multiple As and Bs and so on. Or does
>>>> it mean that f1 can apply to an A or a B or a C? If it's the first, then I
>>>> think it's quite hard to find a partitioning such that both f1, f2, and all
>>>> A, B, and C go to the same machine.
>>>>
>>>> Best,
>>>> Aljoscha
>>>>
>>>> On 31. Aug 2017, at 15:53, Tony Wei <tony19920...@gmail.com> wrote:
>>>>
>>>> Hi Martin,
>>>>
>>>> So the problem is that you want to group those arguments in Data Stream
>>>> and pass them to the lambda function from Control Stream at the same time.
>>>> Am I right?
>>>>
>>>> If right, then you could give each lambda function an id as well. Use
>>>> these ids to tag those arguments to which they belong.
>>>> After that, keyBy function could be used to group those arguments
>>>> belonging to the same lambda function. Joining this stream with Control
>>>> Stream by function id could make arguments and function be in the same
>>>> instance.
>>>>
>>>> What do you think? Could this solution solve your problem?
>>>>
>>>> Best,
>>>> Tony Wei
>>>>
>>>> 2017-08-31 20:43 GMT+08:00 Martin Eden <martineden...@gmail.com>:
>>>>
>>>>> Thanks for your reply Tony,
>>>>>
>>>>> Yes we are in the latter case, where the functions/lambdas come in the
>>>>> control stream. Think of them as strings containing the logic of the
>>>>> function. The values for each of the arguments to the function come from
>>>>> the data stream. That is why we need to co-locate the data stream messages
>>>>> for the corresponding keys with the control message that has the function
>>>>> to be applied.
>>>>>
>>>>> We have a way of interpreting the logic described in the string and
>>>>> executing it on the incoming values from the data stream. This is kicked
>>>>> off from within the Flink runtime (synchronous to a flatMap of the
>>>>> RichCoFlatMapFunction) but is not using Flink predefined operators or
>>>>> functions.
>>>>>
>>>>> So yeah I see your point about mapping the arguments but the problem
>>>>> is not really that, the problem is making sure that the values in the
>>>>> control stream are in the same instance of the task/ keyed managed state 
>>>>> as
>>>>> a the actual control stream message. Once they are we can pass them in.
>>>>>
>>>>> Any other thoughts?
>>>>>
>>>>> M
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Thu, Aug 31, 2017 at 12:06 PM, Tony Wei <tony19920...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Martin,
>>>>>>
>>>>>> About problem 2. How were those lambda functions created? Pre-defined
>>>>>> functions / operators or automatically generated based on the message 
>>>>>> from
>>>>>> Control Stream?
>>>>>>
>>>>>> For the former, you could give each function one id and user flapMap
>>>>>> to duplicate data with multiple ids. Then, you could use filter function
>>>>>> and send them to the corresponding operators.
>>>>>>
>>>>>> For the general case like the latter, because you had broadcasted the
>>>>>> messages to all tasks, it could always build a mapping table from 
>>>>>> argument
>>>>>> keys to lambda functions in each sub-task and use the map to process the
>>>>>> data. But I was wondering if it is possible to generate a completely new
>>>>>> function in the runtime.
>>>>>>
>>>>>> Best,
>>>>>> Tony Wei
>>>>>>
>>>>>> 2017-08-31 18:33 GMT+08:00 Martin Eden <martineden...@gmail.com>:
>>>>>>
>>>>>>> Thanks for your reply Tony.
>>>>>>>
>>>>>>> So there are actually 2 problems to solve:
>>>>>>>
>>>>>>> 1. All control stream msgs need to be broadcasted to all tasks.
>>>>>>>
>>>>>>> 2. The data stream messages with the same keys as those specified in
>>>>>>> the control message need to go to the same task as well, so that all the
>>>>>>> values required for the lambda (i.e. functions f1, f2 ...) are there.
>>>>>>>
>>>>>>> In my understanding side inputs (which are actually not available in
>>>>>>> the current release) would address problem 1.
>>>>>>>
>>>>>>> To address problem 1 I also tried dataStream.keyBy(key).connect(
>>>>>>> controlStream.broadcast).flatMap(new RichCoFlatMapFunction) but I
>>>>>>> get a runtime exception telling me I still need to do a keyBy before the
>>>>>>> flatMap. So are the upcoming side inputs the only way to broadcast a
>>>>>>> control stream to all tasks of a coFlatMap? Or is there another way?
>>>>>>>
>>>>>>> As for problem 2, I am still pending a reply. Would appreciate if
>>>>>>> anyone has some suggestions.
>>>>>>>
>>>>>>> Thanks,
>>>>>>> M
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Thu, Aug 31, 2017 at 9:59 AM, Tony Wei <tony19920...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi Martin,
>>>>>>>>
>>>>>>>> Let me understand your question first.
>>>>>>>> You have two Stream: Data Stream and Control Stream and you want to
>>>>>>>> select data in Data Stream based on the key set got from Control 
>>>>>>>> Stream.
>>>>>>>>
>>>>>>>> If I were not misunderstanding your question, I think SideInput is
>>>>>>>> what you want.
>>>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-17+
>>>>>>>> Side+Inputs+for+DataStream+API#FLIP-17SideInputsforDataStrea
>>>>>>>> mAPI-StoringSide-InputData
>>>>>>>> It lets you to define one stream as a SideInput and can be assigned
>>>>>>>> to the other stream, then the data in SideInput stream will be 
>>>>>>>> broadcasted.
>>>>>>>>
>>>>>>>> So far, I have no idea if there is any solution to solve this
>>>>>>>> without SideInput.
>>>>>>>>
>>>>>>>> Best,
>>>>>>>> Tony Wei
>>>>>>>>
>>>>>>>> 2017-08-31 16:10 GMT+08:00 Martin Eden <martineden...@gmail.com>:
>>>>>>>>
>>>>>>>>> Hi all,
>>>>>>>>>
>>>>>>>>> I am trying to implement the following using Flink:
>>>>>>>>>
>>>>>>>>> I have 2 input message streams:
>>>>>>>>>
>>>>>>>>> 1. Data Stream:
>>>>>>>>> KEY VALUE TIME
>>>>>>>>> .
>>>>>>>>> .
>>>>>>>>> .
>>>>>>>>> C      V6        6
>>>>>>>>> B      V6        6
>>>>>>>>> A      V5        5
>>>>>>>>> A      V4        4
>>>>>>>>> C      V3        3
>>>>>>>>> A      V3        3
>>>>>>>>> B      V3        3
>>>>>>>>> B      V2        2
>>>>>>>>> A      V1        1
>>>>>>>>>
>>>>>>>>> 2. Control Stream:
>>>>>>>>> Lambda  ArgumentKeys TIME
>>>>>>>>> .
>>>>>>>>> .
>>>>>>>>> .
>>>>>>>>> f2            [A, C]                 4
>>>>>>>>> f1            [A, B, C]            1
>>>>>>>>>
>>>>>>>>> I want to apply the lambdas coming in the control stream to the
>>>>>>>>> selection of keys that are coming in the data stream.
>>>>>>>>>
>>>>>>>>> Since we have 2 streams I naturally thought of connecting them
>>>>>>>>> using .connect. For this I need to key both of them by a certain 
>>>>>>>>> criteria.
>>>>>>>>> And here lies the problem, how can I make sure the messages with keys 
>>>>>>>>> A,B,C
>>>>>>>>> specified in the control stream end up in the same task as well as the
>>>>>>>>> control message (f1, [A, B, C]) itself. Basically I don't know how to 
>>>>>>>>> key
>>>>>>>>> by to achieve this.
>>>>>>>>>
>>>>>>>>> I suspect a custom partitioner is required that partitions the
>>>>>>>>> data stream based on the messages in the control stream? Is this even
>>>>>>>>> possible?
>>>>>>>>>
>>>>>>>>> Any suggestions welcomed!
>>>>>>>>>
>>>>>>>>> Thanks,
>>>>>>>>> M
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>>
>>>
>
>

Reply via email to