Hi Tony, Yes exactly I am assuming the lambda emits a value only after it has been published to the control topic (t1) and at least 1 value arrives in the data topic for each of it's arguments. This will happen at a time t2 > t1. So yes, there is uncertainty with regards to when t2 will happen. Ideally t2 - t1 ~ 0 but for our use case it is fine. Is this the correctness that you are talking about? Do I have the right picture of what happens?
Thanks M On Thu, Sep 7, 2017 at 3:11 AM, Tony Wei <tony19920...@gmail.com> wrote: > Hi Martin, > > The performance is an issue, but in your case, yes, it might not be a > problem if X << N. > > However, the other problem is where data should go in the beginning if > there is no lambda been received. This problem doesn't associate with > performance, but instead with correctness. If you want to keep the value > state for the incoming lambda you should broadcast it to all nodes, because > you would never know where the next lambda that requires this data would be > routed to. Of course, you can send this data to a pre-defined node and > route the lambda to this node, but this will lead to all data in the same > node to let all lambda can get all required data. It is not a good solution > because of a lack of scalability. > > In my origin thought, it is based on only storing state of data after you > receive at least one lambda that requires it, so that data has its > destination node to go. Can this assumption be acceptable in your case? > What do you think? > > Best, > Tony Wei > > 2017-09-06 22:41 GMT+08:00 Martin Eden <martineden...@gmail.com>: > >> Hi Aljoscha, Tony, >> >> We actually do not need all the keys to be on all nodes where lambdas >> are. We just need the keys that represent the data for the lambda arguments >> to be routed to the same node as the lambda, whichever one it might be. >> >> Essentially in the solution we emit the data multiple times and by doing >> that we roughly multiply the input rate by the average number of lambdas a >> key is a part of (X). In terms of memory this is O(X * N) where N is the >> number of keys int the data. N is the large bit. If X ~ N then we have O >> (N^2) complexity for the Flink state. And in that case yes I see your point >> about performance Aljoscha. But if X << N, as is our case, then we have >> O(N) which should be manageable by Flink's distributed state mechanism >> right? Do you see any gotchas in this new light? Are my assumptions correct? >> >> Thanks, >> M >> >> >> >> >> >> On Sat, Sep 2, 2017 at 3:38 AM, Tony Wei <tony19920...@gmail.com> wrote: >> >>> Hi Martin, Aljoscha >>> >>> I think Aljoscha is right. My origin thought was to keep the state only >>> after a lambda function coming. >>> >>> Use Aljoscha's scenario as example, initially, all data will be >>> discarded because there is no any lambdas. When lambda f1 [D, E] and f2 >>> [A, C] comes, A, C begin to be routed to machine "0" and D, E begin to be >>> routed to machine "1". Then, when we get a new lambda f3 [C, D], we can >>> duplicate C, D and route these copies to machine "2". >>> >>> However, after reading your example again, I found what you want is a >>> whole picture for all variables' state in a global view, so that no matter >>> what time a new lambda comes it can always get its variables' state >>> immediately. In that case, I have the same opinion as Aljoscha. >>> >>> Best, >>> Tony Wei >>> >>> 2017-09-01 23:59 GMT+08:00 Aljoscha Krettek <aljos...@apache.org>: >>> >>>> Hi Martin, >>>> >>>> I think with those requirements this is very hard (or maybe impossible) >>>> to do efficiently in a distributed setting. It might be that I'm >>>> misunderstanding things but let's look at an example. Assume that >>>> initially, we don't have any lambdas, so data can be sent to any machine >>>> because it doesn't matter where they go. Now, we get a new lambda f2 [A, >>>> C]. Say this gets routed to machine "0", now this means that messages with >>>> key A and C also need to be router to machine "0". Now, we get a new lambda >>>> f1 [D, E], say this gets routed to machine "2", meaning that messages with >>>> key D and E are also routed to machine "2". >>>> >>>> Then, we get a new lambda f3 [C, D]. Do we now re-route all previous >>>> lambdas and inputs to different machines? They all have to go to the same >>>> machine, but which one? I'm currently thinking that there would need to be >>>> some component that does the routing, but this has to be global, so it's >>>> hard to do in a distributed setting. >>>> >>>> What do you think? >>>> >>>> Best, >>>> Aljoscha >>>> >>>> On 1. Sep 2017, at 07:17, Martin Eden <martineden...@gmail.com> wrote: >>>> >>>> This might be a way forward but since side inputs are not there I will >>>> try and key the control stream by the keys in the first co flat map. >>>> >>>> I'll see how it goes. >>>> >>>> Thanks guys, >>>> M >>>> >>>> On Thu, Aug 31, 2017 at 5:16 PM, Tony Wei <tony19920...@gmail.com> >>>> wrote: >>>> >>>>> Hi Martin, >>>>> >>>>> Yes, that is exactly what I thought. >>>>> But the first step also needs to be fulfilled by SideInput. I'm not >>>>> sure how to achieve this in the current release. >>>>> >>>>> Best, >>>>> Tony Wei >>>>> >>>>> Martin Eden <martineden...@gmail.com>於 2017年8月31日 週四,下午11:32寫道: >>>>> >>>>>> Hi Aljoscha, Tony, >>>>>> >>>>>> Aljoscha: >>>>>> Yes it's the first option you mentioned. >>>>>> Yes, the stream has multiple values in flight for A, B, C. f1 needs >>>>>> to be applied each time a new value for either A, B or C comes in. So we >>>>>> need to use state to cache the latest values. So using the example data >>>>>> stream in my first msg the emitted stream should be: >>>>>> >>>>>> 1. Data Stream: >>>>>> KEY VALUE TIME >>>>>> . >>>>>> . >>>>>> . >>>>>> C V6 6 >>>>>> B V6 6 >>>>>> A V5 5 >>>>>> A V4 4 >>>>>> C V3 3 >>>>>> A V3 3 >>>>>> B V3 3 >>>>>> B V2 2 >>>>>> A V1 1 >>>>>> >>>>>> 2. Control Stream: >>>>>> Lambda ArgumentKeys TIME >>>>>> . >>>>>> . >>>>>> . >>>>>> f2 [A, C] 4 >>>>>> f1 [A, B, C] 1 >>>>>> >>>>>> 3. Expected emitted stream: >>>>>> TIME VALUE >>>>>> . >>>>>> . >>>>>> . >>>>>> 6 f1(V5, V6, V3) >>>>>> f1(V5, V6, V6) >>>>>> f2(V5, V6) >>>>>> 5 f1(V5, V3, V3) >>>>>> f2(V5, V3) >>>>>> 4 f1(V4, V3, V3) >>>>>> f2(V4, V3) >>>>>> 3 f1(V3, V3, V3) >>>>>> 2 - >>>>>> 1 - >>>>>> >>>>>> So essentially as soon as the argument list fills up then we apply >>>>>> the function/lambda at each new arriving message in the data stream for >>>>>> either argument key. >>>>>> >>>>>> Tony: >>>>>> Yes we need to group by and pass to the lambda. >>>>>> Ok, so what you are proposing might work. So your solution assumes >>>>>> that we have to connect with the control stream twice? Once for the >>>>>> tagging >>>>>> and another time re-connect-ing the control stream with the tagged stream >>>>>> for the actual application of the function/lambda? >>>>>> >>>>>> Thanks, >>>>>> Alex >>>>>> >>>>>> >>>>>> >>>>>> On Thu, Aug 31, 2017 at 2:57 PM, Aljoscha Krettek < >>>>>> aljos...@apache.org> wrote: >>>>>> >>>>>>> Hi Martin, >>>>>>> >>>>>>> In your original example, what does this syntax mean exactly: >>>>>>> >>>>>>> f1 [A, B, C] 1 >>>>>>> >>>>>>> Does it mean that f1 needs one A, one B and one C from the main >>>>>>> stream? If yes, which ones, because there are multiple As and Bs and so >>>>>>> on. >>>>>>> Or does it mean that f1 can apply to an A or a B or a C? If it's the >>>>>>> first, >>>>>>> then I think it's quite hard to find a partitioning such that both f1, >>>>>>> f2, >>>>>>> and all A, B, and C go to the same machine. >>>>>>> >>>>>>> Best, >>>>>>> Aljoscha >>>>>>> >>>>>>> On 31. Aug 2017, at 15:53, Tony Wei <tony19920...@gmail.com> wrote: >>>>>>> >>>>>>> Hi Martin, >>>>>>> >>>>>>> So the problem is that you want to group those arguments in Data >>>>>>> Stream and pass them to the lambda function from Control Stream at the >>>>>>> same >>>>>>> time. Am I right? >>>>>>> >>>>>>> If right, then you could give each lambda function an id as well. >>>>>>> Use these ids to tag those arguments to which they belong. >>>>>>> After that, keyBy function could be used to group those arguments >>>>>>> belonging to the same lambda function. Joining this stream with Control >>>>>>> Stream by function id could make arguments and function be in the same >>>>>>> instance. >>>>>>> >>>>>>> What do you think? Could this solution solve your problem? >>>>>>> >>>>>>> Best, >>>>>>> Tony Wei >>>>>>> >>>>>>> 2017-08-31 20:43 GMT+08:00 Martin Eden <martineden...@gmail.com>: >>>>>>> >>>>>>>> Thanks for your reply Tony, >>>>>>>> >>>>>>>> Yes we are in the latter case, where the functions/lambdas come in >>>>>>>> the control stream. Think of them as strings containing the logic of >>>>>>>> the >>>>>>>> function. The values for each of the arguments to the function come >>>>>>>> from >>>>>>>> the data stream. That is why we need to co-locate the data stream >>>>>>>> messages >>>>>>>> for the corresponding keys with the control message that has the >>>>>>>> function >>>>>>>> to be applied. >>>>>>>> >>>>>>>> We have a way of interpreting the logic described in the string and >>>>>>>> executing it on the incoming values from the data stream. This is >>>>>>>> kicked >>>>>>>> off from within the Flink runtime (synchronous to a flatMap of the >>>>>>>> RichCoFlatMapFunction) but is not using Flink predefined operators >>>>>>>> or functions. >>>>>>>> >>>>>>>> So yeah I see your point about mapping the arguments but the >>>>>>>> problem is not really that, the problem is making sure that the values >>>>>>>> in >>>>>>>> the control stream are in the same instance of the task/ keyed managed >>>>>>>> state as a the actual control stream message. Once they are we can pass >>>>>>>> them in. >>>>>>>> >>>>>>>> Any other thoughts? >>>>>>>> >>>>>>>> M >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Thu, Aug 31, 2017 at 12:06 PM, Tony Wei <tony19920...@gmail.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Hi Martin, >>>>>>>>> >>>>>>>>> About problem 2. How were those lambda functions created? >>>>>>>>> Pre-defined functions / operators or automatically generated based on >>>>>>>>> the >>>>>>>>> message from Control Stream? >>>>>>>>> >>>>>>>>> For the former, you could give each function one id and user >>>>>>>>> flapMap to duplicate data with multiple ids. Then, you could use >>>>>>>>> filter >>>>>>>>> function and send them to the corresponding operators. >>>>>>>>> >>>>>>>>> For the general case like the latter, because you had broadcasted >>>>>>>>> the messages to all tasks, it could always build a mapping table from >>>>>>>>> argument keys to lambda functions in each sub-task and use the map to >>>>>>>>> process the data. But I was wondering if it is possible to generate a >>>>>>>>> completely new function in the runtime. >>>>>>>>> >>>>>>>>> Best, >>>>>>>>> Tony Wei >>>>>>>>> >>>>>>>>> 2017-08-31 18:33 GMT+08:00 Martin Eden <martineden...@gmail.com>: >>>>>>>>> >>>>>>>>>> Thanks for your reply Tony. >>>>>>>>>> >>>>>>>>>> So there are actually 2 problems to solve: >>>>>>>>>> >>>>>>>>>> 1. All control stream msgs need to be broadcasted to all tasks. >>>>>>>>>> >>>>>>>>>> 2. The data stream messages with the same keys as those specified >>>>>>>>>> in the control message need to go to the same task as well, so that >>>>>>>>>> all the >>>>>>>>>> values required for the lambda (i.e. functions f1, f2 ...) are there. >>>>>>>>>> >>>>>>>>>> In my understanding side inputs (which are actually not available >>>>>>>>>> in the current release) would address problem 1. >>>>>>>>>> >>>>>>>>>> To address problem 1 I also tried dataStream.keyBy(key).connect( >>>>>>>>>> controlStream.broadcast).flatMap(new RichCoFlatMapFunction) but >>>>>>>>>> I get a runtime exception telling me I still need to do a keyBy >>>>>>>>>> before the >>>>>>>>>> flatMap. So are the upcoming side inputs the only way to broadcast a >>>>>>>>>> control stream to all tasks of a coFlatMap? Or is there another way? >>>>>>>>>> >>>>>>>>>> As for problem 2, I am still pending a reply. Would appreciate if >>>>>>>>>> anyone has some suggestions. >>>>>>>>>> >>>>>>>>>> Thanks, >>>>>>>>>> M >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Thu, Aug 31, 2017 at 9:59 AM, Tony Wei <tony19920...@gmail.com >>>>>>>>>> > wrote: >>>>>>>>>> >>>>>>>>>>> Hi Martin, >>>>>>>>>>> >>>>>>>>>>> Let me understand your question first. >>>>>>>>>>> You have two Stream: Data Stream and Control Stream and you want >>>>>>>>>>> to select data in Data Stream based on the key set got from Control >>>>>>>>>>> Stream. >>>>>>>>>>> >>>>>>>>>>> If I were not misunderstanding your question, I think SideInput >>>>>>>>>>> is what you want. >>>>>>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-17+Si >>>>>>>>>>> de+Inputs+for+DataStream+API#FLIP-17SideInputsforDataStreamA >>>>>>>>>>> PI-StoringSide-InputData >>>>>>>>>>> It lets you to define one stream as a SideInput and can be >>>>>>>>>>> assigned to the other stream, then the data in SideInput stream >>>>>>>>>>> will be >>>>>>>>>>> broadcasted. >>>>>>>>>>> >>>>>>>>>>> So far, I have no idea if there is any solution to solve this >>>>>>>>>>> without SideInput. >>>>>>>>>>> >>>>>>>>>>> Best, >>>>>>>>>>> Tony Wei >>>>>>>>>>> >>>>>>>>>>> 2017-08-31 16:10 GMT+08:00 Martin Eden <martineden...@gmail.com> >>>>>>>>>>> : >>>>>>>>>>> >>>>>>>>>>>> Hi all, >>>>>>>>>>>> >>>>>>>>>>>> I am trying to implement the following using Flink: >>>>>>>>>>>> >>>>>>>>>>>> I have 2 input message streams: >>>>>>>>>>>> >>>>>>>>>>>> 1. Data Stream: >>>>>>>>>>>> KEY VALUE TIME >>>>>>>>>>>> . >>>>>>>>>>>> . >>>>>>>>>>>> . >>>>>>>>>>>> C V6 6 >>>>>>>>>>>> B V6 6 >>>>>>>>>>>> A V5 5 >>>>>>>>>>>> A V4 4 >>>>>>>>>>>> C V3 3 >>>>>>>>>>>> A V3 3 >>>>>>>>>>>> B V3 3 >>>>>>>>>>>> B V2 2 >>>>>>>>>>>> A V1 1 >>>>>>>>>>>> >>>>>>>>>>>> 2. Control Stream: >>>>>>>>>>>> Lambda ArgumentKeys TIME >>>>>>>>>>>> . >>>>>>>>>>>> . >>>>>>>>>>>> . >>>>>>>>>>>> f2 [A, C] 4 >>>>>>>>>>>> f1 [A, B, C] 1 >>>>>>>>>>>> >>>>>>>>>>>> I want to apply the lambdas coming in the control stream to the >>>>>>>>>>>> selection of keys that are coming in the data stream. >>>>>>>>>>>> >>>>>>>>>>>> Since we have 2 streams I naturally thought of connecting them >>>>>>>>>>>> using .connect. For this I need to key both of them by a certain >>>>>>>>>>>> criteria. >>>>>>>>>>>> And here lies the problem, how can I make sure the messages with >>>>>>>>>>>> keys A,B,C >>>>>>>>>>>> specified in the control stream end up in the same task as well as >>>>>>>>>>>> the >>>>>>>>>>>> control message (f1, [A, B, C]) itself. Basically I don't know how >>>>>>>>>>>> to key >>>>>>>>>>>> by to achieve this. >>>>>>>>>>>> >>>>>>>>>>>> I suspect a custom partitioner is required that partitions the >>>>>>>>>>>> data stream based on the messages in the control stream? Is this >>>>>>>>>>>> even >>>>>>>>>>>> possible? >>>>>>>>>>>> >>>>>>>>>>>> Any suggestions welcomed! >>>>>>>>>>>> >>>>>>>>>>>> Thanks, >>>>>>>>>>>> M >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>>> >>>>>> >>>> >>>> >>> >> >