Hi Tony,

Yes exactly I am assuming the lambda emits a value only after it has been
published to the control topic (t1) and at least 1 value arrives in the
data topic for each of it's arguments. This will happen at a time t2 > t1.
So yes, there is uncertainty with regards to when t2 will happen. Ideally
t2 - t1 ~ 0 but for our use case it is fine. Is this the correctness that
you are talking about? Do I have the right picture of what happens?

Thanks
M

On Thu, Sep 7, 2017 at 3:11 AM, Tony Wei <tony19920...@gmail.com> wrote:

> Hi Martin,
>
> The performance is an issue, but in your case, yes, it might not be a
> problem if X << N.
>
> However, the other problem is where data should go in the beginning if
> there is no lambda been received. This problem doesn't associate with
> performance, but instead with correctness. If you want to keep the value
> state for the incoming lambda you should broadcast it to all nodes, because
> you would never know where the next lambda that requires this data would be
> routed to. Of course, you can send this data to a pre-defined node and
> route the lambda to this node, but this will lead to all data in the same
> node to let all lambda can get all required data. It is not a good solution
> because of a lack of scalability.
>
> In my origin thought, it is based on only storing state of data after you
> receive at least one lambda that requires it, so that data has its
> destination node to go. Can this assumption be acceptable in your case?
> What do you think?
>
> Best,
> Tony Wei
>
> 2017-09-06 22:41 GMT+08:00 Martin Eden <martineden...@gmail.com>:
>
>> Hi Aljoscha, Tony,
>>
>> We actually do not need all the keys to be on all nodes where lambdas
>> are. We just need the keys that represent the data for the lambda arguments
>> to be routed to the same node as the lambda, whichever one it might be.
>>
>> Essentially in the solution we emit the data multiple times and by doing
>> that we roughly multiply the input rate by the average number of lambdas a
>> key is a part of (X). In terms of memory this is O(X * N) where N is the
>> number of keys int the data. N is the large bit. If X ~ N then we have O
>> (N^2) complexity for the Flink state. And in that case yes I see your point
>> about performance Aljoscha. But if X << N, as is our case, then we have
>> O(N) which should be manageable by Flink's distributed state mechanism
>> right? Do you see any gotchas in this new light? Are my assumptions correct?
>>
>> Thanks,
>> M
>>
>>
>>
>>
>>
>> On Sat, Sep 2, 2017 at 3:38 AM, Tony Wei <tony19920...@gmail.com> wrote:
>>
>>> Hi Martin, Aljoscha
>>>
>>> I think Aljoscha is right. My origin thought was to keep the state only
>>> after a lambda function coming.
>>>
>>> Use Aljoscha's scenario as example, initially, all data will be
>>> discarded because there is no any lambdas. When lambda f1 [D, E] and f2
>>> [A, C] comes, A, C begin to be routed to machine "0" and D, E begin to be
>>> routed to machine "1". Then, when we get a new lambda f3 [C, D], we can
>>> duplicate C, D and route these copies to machine "2".
>>>
>>> However, after reading your example again, I found what you want is a
>>> whole picture for all variables' state in a global view, so that no matter
>>> what time a new lambda comes it can always get its variables' state
>>> immediately. In that case, I have the same opinion as Aljoscha.
>>>
>>> Best,
>>> Tony Wei
>>>
>>> 2017-09-01 23:59 GMT+08:00 Aljoscha Krettek <aljos...@apache.org>:
>>>
>>>> Hi Martin,
>>>>
>>>> I think with those requirements this is very hard (or maybe impossible)
>>>> to do efficiently in a distributed setting. It might be that I'm
>>>> misunderstanding things but let's look at an example. Assume that
>>>> initially, we don't have any lambdas, so data can be sent to any machine
>>>> because it doesn't matter where they go. Now, we get a new lambda f2 [A,
>>>> C]. Say this gets routed to machine "0", now this means that messages with
>>>> key A and C also need to be router to machine "0". Now, we get a new lambda
>>>> f1 [D, E], say this gets routed to machine "2", meaning that messages with
>>>> key D and E are also routed to machine "2".
>>>>
>>>> Then, we get a new lambda f3 [C, D]. Do we now re-route all previous
>>>> lambdas and inputs to different machines? They all have to go to the same
>>>> machine, but which one? I'm currently thinking that there would need to be
>>>> some component that does the routing, but this has to be global, so it's
>>>> hard to do in a distributed setting.
>>>>
>>>> What do you think?
>>>>
>>>> Best,
>>>> Aljoscha
>>>>
>>>> On 1. Sep 2017, at 07:17, Martin Eden <martineden...@gmail.com> wrote:
>>>>
>>>> This might be a way forward but since side inputs are not there I will
>>>> try and key the control stream by the keys in the first co flat map.
>>>>
>>>> I'll see how it goes.
>>>>
>>>> Thanks guys,
>>>> M
>>>>
>>>> On Thu, Aug 31, 2017 at 5:16 PM, Tony Wei <tony19920...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Martin,
>>>>>
>>>>> Yes, that is exactly what I thought.
>>>>> But the first step also needs to be fulfilled  by SideInput. I'm not
>>>>> sure how to achieve this in the current release.
>>>>>
>>>>> Best,
>>>>> Tony Wei
>>>>>
>>>>> Martin Eden <martineden...@gmail.com>於 2017年8月31日 週四,下午11:32寫道:
>>>>>
>>>>>> Hi Aljoscha, Tony,
>>>>>>
>>>>>> Aljoscha:
>>>>>> Yes it's the first option you mentioned.
>>>>>> Yes, the stream has multiple values in flight for A, B, C. f1 needs
>>>>>> to be applied each time a new value for either A, B or C comes in. So we
>>>>>> need to use state to cache the latest values. So using the example data
>>>>>> stream in my first msg the emitted stream should be:
>>>>>>
>>>>>> 1. Data Stream:
>>>>>> KEY VALUE TIME
>>>>>> .
>>>>>> .
>>>>>> .
>>>>>> C      V6        6
>>>>>> B      V6        6
>>>>>> A      V5        5
>>>>>> A      V4        4
>>>>>> C      V3        3
>>>>>> A      V3        3
>>>>>> B      V3        3
>>>>>> B      V2        2
>>>>>> A      V1        1
>>>>>>
>>>>>> 2. Control Stream:
>>>>>> Lambda  ArgumentKeys TIME
>>>>>> .
>>>>>> .
>>>>>> .
>>>>>> f2            [A, C]                 4
>>>>>> f1            [A, B, C]            1
>>>>>>
>>>>>> 3. Expected emitted stream:
>>>>>> TIME    VALUE
>>>>>> .
>>>>>> .
>>>>>> .
>>>>>> 6          f1(V5, V6, V3)
>>>>>>             f1(V5, V6, V6)
>>>>>>             f2(V5, V6)
>>>>>> 5          f1(V5, V3, V3)
>>>>>>             f2(V5, V3)
>>>>>> 4          f1(V4, V3, V3)
>>>>>>             f2(V4, V3)
>>>>>> 3          f1(V3, V3, V3)
>>>>>> 2          -
>>>>>> 1          -
>>>>>>
>>>>>> So essentially as soon as the argument list fills up then we apply
>>>>>> the function/lambda at each new arriving message in the data stream for
>>>>>> either argument key.
>>>>>>
>>>>>> Tony:
>>>>>> Yes we need to group by and pass to the lambda.
>>>>>> Ok, so what you are proposing might work. So your solution assumes
>>>>>> that we have to connect with the control stream twice? Once for the 
>>>>>> tagging
>>>>>> and another time re-connect-ing the control stream with the tagged stream
>>>>>> for the actual application of the function/lambda?
>>>>>>
>>>>>> Thanks,
>>>>>> Alex
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Thu, Aug 31, 2017 at 2:57 PM, Aljoscha Krettek <
>>>>>> aljos...@apache.org> wrote:
>>>>>>
>>>>>>> Hi Martin,
>>>>>>>
>>>>>>> In your original example, what does this syntax mean exactly:
>>>>>>>
>>>>>>> f1            [A, B, C]            1
>>>>>>>
>>>>>>> Does it mean that f1 needs one A, one B and one C from the main
>>>>>>> stream? If yes, which ones, because there are multiple As and Bs and so 
>>>>>>> on.
>>>>>>> Or does it mean that f1 can apply to an A or a B or a C? If it's the 
>>>>>>> first,
>>>>>>> then I think it's quite hard to find a partitioning such that both f1, 
>>>>>>> f2,
>>>>>>> and all A, B, and C go to the same machine.
>>>>>>>
>>>>>>> Best,
>>>>>>> Aljoscha
>>>>>>>
>>>>>>> On 31. Aug 2017, at 15:53, Tony Wei <tony19920...@gmail.com> wrote:
>>>>>>>
>>>>>>> Hi Martin,
>>>>>>>
>>>>>>> So the problem is that you want to group those arguments in Data
>>>>>>> Stream and pass them to the lambda function from Control Stream at the 
>>>>>>> same
>>>>>>> time. Am I right?
>>>>>>>
>>>>>>> If right, then you could give each lambda function an id as well.
>>>>>>> Use these ids to tag those arguments to which they belong.
>>>>>>> After that, keyBy function could be used to group those arguments
>>>>>>> belonging to the same lambda function. Joining this stream with Control
>>>>>>> Stream by function id could make arguments and function be in the same
>>>>>>> instance.
>>>>>>>
>>>>>>> What do you think? Could this solution solve your problem?
>>>>>>>
>>>>>>> Best,
>>>>>>> Tony Wei
>>>>>>>
>>>>>>> 2017-08-31 20:43 GMT+08:00 Martin Eden <martineden...@gmail.com>:
>>>>>>>
>>>>>>>> Thanks for your reply Tony,
>>>>>>>>
>>>>>>>> Yes we are in the latter case, where the functions/lambdas come in
>>>>>>>> the control stream. Think of them as strings containing the logic of 
>>>>>>>> the
>>>>>>>> function. The values for each of the arguments to the function come 
>>>>>>>> from
>>>>>>>> the data stream. That is why we need to co-locate the data stream 
>>>>>>>> messages
>>>>>>>> for the corresponding keys with the control message that has the 
>>>>>>>> function
>>>>>>>> to be applied.
>>>>>>>>
>>>>>>>> We have a way of interpreting the logic described in the string and
>>>>>>>> executing it on the incoming values from the data stream. This is 
>>>>>>>> kicked
>>>>>>>> off from within the Flink runtime (synchronous to a flatMap of the
>>>>>>>> RichCoFlatMapFunction) but is not using Flink predefined operators
>>>>>>>> or functions.
>>>>>>>>
>>>>>>>> So yeah I see your point about mapping the arguments but the
>>>>>>>> problem is not really that, the problem is making sure that the values 
>>>>>>>> in
>>>>>>>> the control stream are in the same instance of the task/ keyed managed
>>>>>>>> state as a the actual control stream message. Once they are we can pass
>>>>>>>> them in.
>>>>>>>>
>>>>>>>> Any other thoughts?
>>>>>>>>
>>>>>>>> M
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Thu, Aug 31, 2017 at 12:06 PM, Tony Wei <tony19920...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi Martin,
>>>>>>>>>
>>>>>>>>> About problem 2. How were those lambda functions created?
>>>>>>>>> Pre-defined functions / operators or automatically generated based on 
>>>>>>>>> the
>>>>>>>>> message from Control Stream?
>>>>>>>>>
>>>>>>>>> For the former, you could give each function one id and user
>>>>>>>>> flapMap to duplicate data with multiple ids. Then, you could use 
>>>>>>>>> filter
>>>>>>>>> function and send them to the corresponding operators.
>>>>>>>>>
>>>>>>>>> For the general case like the latter, because you had broadcasted
>>>>>>>>> the messages to all tasks, it could always build a mapping table from
>>>>>>>>> argument keys to lambda functions in each sub-task and use the map to
>>>>>>>>> process the data. But I was wondering if it is possible to generate a
>>>>>>>>> completely new function in the runtime.
>>>>>>>>>
>>>>>>>>> Best,
>>>>>>>>> Tony Wei
>>>>>>>>>
>>>>>>>>> 2017-08-31 18:33 GMT+08:00 Martin Eden <martineden...@gmail.com>:
>>>>>>>>>
>>>>>>>>>> Thanks for your reply Tony.
>>>>>>>>>>
>>>>>>>>>> So there are actually 2 problems to solve:
>>>>>>>>>>
>>>>>>>>>> 1. All control stream msgs need to be broadcasted to all tasks.
>>>>>>>>>>
>>>>>>>>>> 2. The data stream messages with the same keys as those specified
>>>>>>>>>> in the control message need to go to the same task as well, so that 
>>>>>>>>>> all the
>>>>>>>>>> values required for the lambda (i.e. functions f1, f2 ...) are there.
>>>>>>>>>>
>>>>>>>>>> In my understanding side inputs (which are actually not available
>>>>>>>>>> in the current release) would address problem 1.
>>>>>>>>>>
>>>>>>>>>> To address problem 1 I also tried dataStream.keyBy(key).connect(
>>>>>>>>>> controlStream.broadcast).flatMap(new RichCoFlatMapFunction) but
>>>>>>>>>> I get a runtime exception telling me I still need to do a keyBy 
>>>>>>>>>> before the
>>>>>>>>>> flatMap. So are the upcoming side inputs the only way to broadcast a
>>>>>>>>>> control stream to all tasks of a coFlatMap? Or is there another way?
>>>>>>>>>>
>>>>>>>>>> As for problem 2, I am still pending a reply. Would appreciate if
>>>>>>>>>> anyone has some suggestions.
>>>>>>>>>>
>>>>>>>>>> Thanks,
>>>>>>>>>> M
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Thu, Aug 31, 2017 at 9:59 AM, Tony Wei <tony19920...@gmail.com
>>>>>>>>>> > wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi Martin,
>>>>>>>>>>>
>>>>>>>>>>> Let me understand your question first.
>>>>>>>>>>> You have two Stream: Data Stream and Control Stream and you want
>>>>>>>>>>> to select data in Data Stream based on the key set got from Control 
>>>>>>>>>>> Stream.
>>>>>>>>>>>
>>>>>>>>>>> If I were not misunderstanding your question, I think SideInput
>>>>>>>>>>> is what you want.
>>>>>>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-17+Si
>>>>>>>>>>> de+Inputs+for+DataStream+API#FLIP-17SideInputsforDataStreamA
>>>>>>>>>>> PI-StoringSide-InputData
>>>>>>>>>>> It lets you to define one stream as a SideInput and can be
>>>>>>>>>>> assigned to the other stream, then the data in SideInput stream 
>>>>>>>>>>> will be
>>>>>>>>>>> broadcasted.
>>>>>>>>>>>
>>>>>>>>>>> So far, I have no idea if there is any solution to solve this
>>>>>>>>>>> without SideInput.
>>>>>>>>>>>
>>>>>>>>>>> Best,
>>>>>>>>>>> Tony Wei
>>>>>>>>>>>
>>>>>>>>>>> 2017-08-31 16:10 GMT+08:00 Martin Eden <martineden...@gmail.com>
>>>>>>>>>>> :
>>>>>>>>>>>
>>>>>>>>>>>> Hi all,
>>>>>>>>>>>>
>>>>>>>>>>>> I am trying to implement the following using Flink:
>>>>>>>>>>>>
>>>>>>>>>>>> I have 2 input message streams:
>>>>>>>>>>>>
>>>>>>>>>>>> 1. Data Stream:
>>>>>>>>>>>> KEY VALUE TIME
>>>>>>>>>>>> .
>>>>>>>>>>>> .
>>>>>>>>>>>> .
>>>>>>>>>>>> C      V6        6
>>>>>>>>>>>> B      V6        6
>>>>>>>>>>>> A      V5        5
>>>>>>>>>>>> A      V4        4
>>>>>>>>>>>> C      V3        3
>>>>>>>>>>>> A      V3        3
>>>>>>>>>>>> B      V3        3
>>>>>>>>>>>> B      V2        2
>>>>>>>>>>>> A      V1        1
>>>>>>>>>>>>
>>>>>>>>>>>> 2. Control Stream:
>>>>>>>>>>>> Lambda  ArgumentKeys TIME
>>>>>>>>>>>> .
>>>>>>>>>>>> .
>>>>>>>>>>>> .
>>>>>>>>>>>> f2            [A, C]                 4
>>>>>>>>>>>> f1            [A, B, C]            1
>>>>>>>>>>>>
>>>>>>>>>>>> I want to apply the lambdas coming in the control stream to the
>>>>>>>>>>>> selection of keys that are coming in the data stream.
>>>>>>>>>>>>
>>>>>>>>>>>> Since we have 2 streams I naturally thought of connecting them
>>>>>>>>>>>> using .connect. For this I need to key both of them by a certain 
>>>>>>>>>>>> criteria.
>>>>>>>>>>>> And here lies the problem, how can I make sure the messages with 
>>>>>>>>>>>> keys A,B,C
>>>>>>>>>>>> specified in the control stream end up in the same task as well as 
>>>>>>>>>>>> the
>>>>>>>>>>>> control message (f1, [A, B, C]) itself. Basically I don't know how 
>>>>>>>>>>>> to key
>>>>>>>>>>>> by to achieve this.
>>>>>>>>>>>>
>>>>>>>>>>>> I suspect a custom partitioner is required that partitions the
>>>>>>>>>>>> data stream based on the messages in the control stream? Is this 
>>>>>>>>>>>> even
>>>>>>>>>>>> possible?
>>>>>>>>>>>>
>>>>>>>>>>>> Any suggestions welcomed!
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks,
>>>>>>>>>>>> M
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>
>>>>
>>>
>>
>

Reply via email to