Hi Martin, Aljoscha I think Aljoscha is right. My origin thought was to keep the state only after a lambda function coming.
Use Aljoscha's scenario as example, initially, all data will be discarded because there is no any lambdas. When lambda f1 [D, E] and f2 [A, C] comes, A, C begin to be routed to machine "0" and D, E begin to be routed to machine "1". Then, when we get a new lambda f3 [C, D], we can duplicate C, D and route these copies to machine "2". However, after reading your example again, I found what you want is a whole picture for all variables' state in a global view, so that no matter what time a new lambda comes it can always get its variables' state immediately. In that case, I have the same opinion as Aljoscha. Best, Tony Wei 2017-09-01 23:59 GMT+08:00 Aljoscha Krettek <aljos...@apache.org>: > Hi Martin, > > I think with those requirements this is very hard (or maybe impossible) to > do efficiently in a distributed setting. It might be that I'm > misunderstanding things but let's look at an example. Assume that > initially, we don't have any lambdas, so data can be sent to any machine > because it doesn't matter where they go. Now, we get a new lambda f2 [A, > C]. Say this gets routed to machine "0", now this means that messages with > key A and C also need to be router to machine "0". Now, we get a new lambda > f1 [D, E], say this gets routed to machine "2", meaning that messages with > key D and E are also routed to machine "2". > > Then, we get a new lambda f3 [C, D]. Do we now re-route all previous > lambdas and inputs to different machines? They all have to go to the same > machine, but which one? I'm currently thinking that there would need to be > some component that does the routing, but this has to be global, so it's > hard to do in a distributed setting. > > What do you think? > > Best, > Aljoscha > > On 1. Sep 2017, at 07:17, Martin Eden <martineden...@gmail.com> wrote: > > This might be a way forward but since side inputs are not there I will try > and key the control stream by the keys in the first co flat map. > > I'll see how it goes. > > Thanks guys, > M > > On Thu, Aug 31, 2017 at 5:16 PM, Tony Wei <tony19920...@gmail.com> wrote: > >> Hi Martin, >> >> Yes, that is exactly what I thought. >> But the first step also needs to be fulfilled by SideInput. I'm not sure >> how to achieve this in the current release. >> >> Best, >> Tony Wei >> >> Martin Eden <martineden...@gmail.com>於 2017年8月31日 週四,下午11:32寫道: >> >>> Hi Aljoscha, Tony, >>> >>> Aljoscha: >>> Yes it's the first option you mentioned. >>> Yes, the stream has multiple values in flight for A, B, C. f1 needs to >>> be applied each time a new value for either A, B or C comes in. So we need >>> to use state to cache the latest values. So using the example data stream >>> in my first msg the emitted stream should be: >>> >>> 1. Data Stream: >>> KEY VALUE TIME >>> . >>> . >>> . >>> C V6 6 >>> B V6 6 >>> A V5 5 >>> A V4 4 >>> C V3 3 >>> A V3 3 >>> B V3 3 >>> B V2 2 >>> A V1 1 >>> >>> 2. Control Stream: >>> Lambda ArgumentKeys TIME >>> . >>> . >>> . >>> f2 [A, C] 4 >>> f1 [A, B, C] 1 >>> >>> 3. Expected emitted stream: >>> TIME VALUE >>> . >>> . >>> . >>> 6 f1(V5, V6, V3) >>> f1(V5, V6, V6) >>> f2(V5, V6) >>> 5 f1(V5, V3, V3) >>> f2(V5, V3) >>> 4 f1(V4, V3, V3) >>> f2(V4, V3) >>> 3 f1(V3, V3, V3) >>> 2 - >>> 1 - >>> >>> So essentially as soon as the argument list fills up then we apply the >>> function/lambda at each new arriving message in the data stream for either >>> argument key. >>> >>> Tony: >>> Yes we need to group by and pass to the lambda. >>> Ok, so what you are proposing might work. So your solution assumes that >>> we have to connect with the control stream twice? Once for the tagging and >>> another time re-connect-ing the control stream with the tagged stream for >>> the actual application of the function/lambda? >>> >>> Thanks, >>> Alex >>> >>> >>> >>> On Thu, Aug 31, 2017 at 2:57 PM, Aljoscha Krettek <aljos...@apache.org> >>> wrote: >>> >>>> Hi Martin, >>>> >>>> In your original example, what does this syntax mean exactly: >>>> >>>> f1 [A, B, C] 1 >>>> >>>> Does it mean that f1 needs one A, one B and one C from the main stream? >>>> If yes, which ones, because there are multiple As and Bs and so on. Or does >>>> it mean that f1 can apply to an A or a B or a C? If it's the first, then I >>>> think it's quite hard to find a partitioning such that both f1, f2, and all >>>> A, B, and C go to the same machine. >>>> >>>> Best, >>>> Aljoscha >>>> >>>> On 31. Aug 2017, at 15:53, Tony Wei <tony19920...@gmail.com> wrote: >>>> >>>> Hi Martin, >>>> >>>> So the problem is that you want to group those arguments in Data Stream >>>> and pass them to the lambda function from Control Stream at the same time. >>>> Am I right? >>>> >>>> If right, then you could give each lambda function an id as well. Use >>>> these ids to tag those arguments to which they belong. >>>> After that, keyBy function could be used to group those arguments >>>> belonging to the same lambda function. Joining this stream with Control >>>> Stream by function id could make arguments and function be in the same >>>> instance. >>>> >>>> What do you think? Could this solution solve your problem? >>>> >>>> Best, >>>> Tony Wei >>>> >>>> 2017-08-31 20:43 GMT+08:00 Martin Eden <martineden...@gmail.com>: >>>> >>>>> Thanks for your reply Tony, >>>>> >>>>> Yes we are in the latter case, where the functions/lambdas come in the >>>>> control stream. Think of them as strings containing the logic of the >>>>> function. The values for each of the arguments to the function come from >>>>> the data stream. That is why we need to co-locate the data stream messages >>>>> for the corresponding keys with the control message that has the function >>>>> to be applied. >>>>> >>>>> We have a way of interpreting the logic described in the string and >>>>> executing it on the incoming values from the data stream. This is kicked >>>>> off from within the Flink runtime (synchronous to a flatMap of the >>>>> RichCoFlatMapFunction) but is not using Flink predefined operators or >>>>> functions. >>>>> >>>>> So yeah I see your point about mapping the arguments but the problem >>>>> is not really that, the problem is making sure that the values in the >>>>> control stream are in the same instance of the task/ keyed managed state >>>>> as >>>>> a the actual control stream message. Once they are we can pass them in. >>>>> >>>>> Any other thoughts? >>>>> >>>>> M >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> On Thu, Aug 31, 2017 at 12:06 PM, Tony Wei <tony19920...@gmail.com> >>>>> wrote: >>>>> >>>>>> Hi Martin, >>>>>> >>>>>> About problem 2. How were those lambda functions created? Pre-defined >>>>>> functions / operators or automatically generated based on the message >>>>>> from >>>>>> Control Stream? >>>>>> >>>>>> For the former, you could give each function one id and user flapMap >>>>>> to duplicate data with multiple ids. Then, you could use filter function >>>>>> and send them to the corresponding operators. >>>>>> >>>>>> For the general case like the latter, because you had broadcasted the >>>>>> messages to all tasks, it could always build a mapping table from >>>>>> argument >>>>>> keys to lambda functions in each sub-task and use the map to process the >>>>>> data. But I was wondering if it is possible to generate a completely new >>>>>> function in the runtime. >>>>>> >>>>>> Best, >>>>>> Tony Wei >>>>>> >>>>>> 2017-08-31 18:33 GMT+08:00 Martin Eden <martineden...@gmail.com>: >>>>>> >>>>>>> Thanks for your reply Tony. >>>>>>> >>>>>>> So there are actually 2 problems to solve: >>>>>>> >>>>>>> 1. All control stream msgs need to be broadcasted to all tasks. >>>>>>> >>>>>>> 2. The data stream messages with the same keys as those specified in >>>>>>> the control message need to go to the same task as well, so that all the >>>>>>> values required for the lambda (i.e. functions f1, f2 ...) are there. >>>>>>> >>>>>>> In my understanding side inputs (which are actually not available in >>>>>>> the current release) would address problem 1. >>>>>>> >>>>>>> To address problem 1 I also tried dataStream.keyBy(key).connect( >>>>>>> controlStream.broadcast).flatMap(new RichCoFlatMapFunction) but I >>>>>>> get a runtime exception telling me I still need to do a keyBy before the >>>>>>> flatMap. So are the upcoming side inputs the only way to broadcast a >>>>>>> control stream to all tasks of a coFlatMap? Or is there another way? >>>>>>> >>>>>>> As for problem 2, I am still pending a reply. Would appreciate if >>>>>>> anyone has some suggestions. >>>>>>> >>>>>>> Thanks, >>>>>>> M >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Thu, Aug 31, 2017 at 9:59 AM, Tony Wei <tony19920...@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi Martin, >>>>>>>> >>>>>>>> Let me understand your question first. >>>>>>>> You have two Stream: Data Stream and Control Stream and you want to >>>>>>>> select data in Data Stream based on the key set got from Control >>>>>>>> Stream. >>>>>>>> >>>>>>>> If I were not misunderstanding your question, I think SideInput is >>>>>>>> what you want. >>>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-17+ >>>>>>>> Side+Inputs+for+DataStream+API#FLIP-17SideInputsforDataStrea >>>>>>>> mAPI-StoringSide-InputData >>>>>>>> It lets you to define one stream as a SideInput and can be assigned >>>>>>>> to the other stream, then the data in SideInput stream will be >>>>>>>> broadcasted. >>>>>>>> >>>>>>>> So far, I have no idea if there is any solution to solve this >>>>>>>> without SideInput. >>>>>>>> >>>>>>>> Best, >>>>>>>> Tony Wei >>>>>>>> >>>>>>>> 2017-08-31 16:10 GMT+08:00 Martin Eden <martineden...@gmail.com>: >>>>>>>> >>>>>>>>> Hi all, >>>>>>>>> >>>>>>>>> I am trying to implement the following using Flink: >>>>>>>>> >>>>>>>>> I have 2 input message streams: >>>>>>>>> >>>>>>>>> 1. Data Stream: >>>>>>>>> KEY VALUE TIME >>>>>>>>> . >>>>>>>>> . >>>>>>>>> . >>>>>>>>> C V6 6 >>>>>>>>> B V6 6 >>>>>>>>> A V5 5 >>>>>>>>> A V4 4 >>>>>>>>> C V3 3 >>>>>>>>> A V3 3 >>>>>>>>> B V3 3 >>>>>>>>> B V2 2 >>>>>>>>> A V1 1 >>>>>>>>> >>>>>>>>> 2. Control Stream: >>>>>>>>> Lambda ArgumentKeys TIME >>>>>>>>> . >>>>>>>>> . >>>>>>>>> . >>>>>>>>> f2 [A, C] 4 >>>>>>>>> f1 [A, B, C] 1 >>>>>>>>> >>>>>>>>> I want to apply the lambdas coming in the control stream to the >>>>>>>>> selection of keys that are coming in the data stream. >>>>>>>>> >>>>>>>>> Since we have 2 streams I naturally thought of connecting them >>>>>>>>> using .connect. For this I need to key both of them by a certain >>>>>>>>> criteria. >>>>>>>>> And here lies the problem, how can I make sure the messages with keys >>>>>>>>> A,B,C >>>>>>>>> specified in the control stream end up in the same task as well as the >>>>>>>>> control message (f1, [A, B, C]) itself. Basically I don't know how to >>>>>>>>> key >>>>>>>>> by to achieve this. >>>>>>>>> >>>>>>>>> I suspect a custom partitioner is required that partitions the >>>>>>>>> data stream based on the messages in the control stream? Is this even >>>>>>>>> possible? >>>>>>>>> >>>>>>>>> Any suggestions welcomed! >>>>>>>>> >>>>>>>>> Thanks, >>>>>>>>> M >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>>> >>> > >