Re: Cyclic ConnectedStream
I really don't know what you mean, I've been reading the documentation and examples showing iterations. but it just won't work for me I believe. Maybe you can write a quick example? It doesn't matter the details, only the topology. If anyone else has an idea it's very welcome! Matt On Tue, Jan 31, 2017 at 3:07 PM, Gábor Gévaywrote: > I somehow still suspect that iterations might work for your use case. > Note, that in the streaming API, iterations are currently nothing more than > a back-edge in the topology, i.e. a low-level tool to create a cyclic > topology, like as you say with your hypothetical setter syntax. (It's quite > different from the iterations of the batch API.) > > The tricky part for your use-case is that you would want a ConnectedStream > as your iteration head, which should get the elements from the back-edge in > a separated way from the normal input. You could simulate this by using not > ConnectedStream.flatMap, but a just a simple Stream.flatMap whose input > element type is an Either type, whose two components would be the normal > input and the back-edge input. (And you add maps before the closeWith and > to your input1, which would appropriately wrap into the two alternatives of > the Either type.) > > Best, > Gábor > > > > 2017-01-29 15:39 GMT+01:00 Matt : > >> Check this image for clarification, this is what I'm trying to do: >> http://i.imgur.com/iZxPv04.png >> >> [image: Inline image 1] >> >> The rectangles are the two CoFlatMapFunction, sharing a state between >> process and update (map1 and map2). It's clear from the image that I need >> input1 and the green box to create the blue box, and input2 and the blue >> box to create the green one. >> >> --- >> *blue* = *input1*.connect(*green*).keyBy(...).flatMap(...); >> *green* = *input2*.connect(*blue*).keyBy(...).flatMap(...); >> --- >> >> As you can see there's no cycle in the flow of data so I guess this >> topology is valid. The problem is not having a way to define such flow. >> >> For instance, with the appropriate setters we would be able to do this: >> >> --- >> *blue* = *input1*.connect(); >> *green* = *input2*.connect(); >> >> *blue.*setConnection(*green*); >> *green*.setConnection(*blue*); >> >> *blue*.keyBy(...).flatMap(...); >> *green*.keyBy(...).flatMap(...); >> --- >> >> Any idea is welcome. >> >> Matt >> >> On Sat, Jan 28, 2017 at 5:31 PM, Matt wrote: >> >>> I'm aware of IterativeStream but I don't think it's useful in this case. >>> >>> As shown in the example above, my use case is "cyclic" in that the same >>> object goes from *Input* to *predictionStream* (flatMap1), then to >>> *statsStream* (flatMap2, where it's updated with an object from *Input2*) >>> and finally to *predictionStream* (flatMap2). >>> >>> The same operator is never applied twice to the object, thus I would say >>> this dataflow is cyclic only in the dependencies of the stream >>> (predictionStream depends on statsStream, but it depends on >>> predictionStream in the first place). >>> >>> I hope it is clear now. >>> >>> Matt >>> >>> On Sat, Jan 28, 2017 at 3:17 PM, Gábor Gévay wrote: >>> Hello, Cyclic dataflows can be built using iterations: https://ci.apache.org/projects/flink/flink-docs-release-1.2/ dev/datastream_api.html#iterations Best, Gábor 2017-01-28 18:39 GMT+01:00 Matt : > I have a ConnectedStream (A) that depends on another ConnectedStream (B), > which depends on the first one (A). > > Simplified code: > > predictionStream = input > .connect(statsStream) > .keyBy(...) > .flatMap(CoFlatMapFunction { > flatMap1(obj, output) { > p = prediction(obj) > output.collect(p) > } > flatMap2(stat, output) { > updateModel(stat) > } > }) > > statsStream = input2 > .connect(predictionStream) > .keyBy(...) > .flatMap(CoFlatMapFunction { > flatMap1(obj2, output) { > s = getStats(obj2, p) > output.collect(s) > } > flatMap2(prediction, output) { > p = prediction > } > }) > > I'm guessing this should be possible to achieve, one way would be to add a > sink on statsStream to save the elements into Kafka and read from that topic > on predictionStream instead of initializing it with a reference of > statsStream. But I would rather avoid writing unnecessarily into kafka. > > Is there any other way to achieve this? > > Thanks, > Matt >>> >>> >> >
Re: Cyclic ConnectedStream
I somehow still suspect that iterations might work for your use case. Note, that in the streaming API, iterations are currently nothing more than a back-edge in the topology, i.e. a low-level tool to create a cyclic topology, like as you say with your hypothetical setter syntax. (It's quite different from the iterations of the batch API.) The tricky part for your use-case is that you would want a ConnectedStream as your iteration head, which should get the elements from the back-edge in a separated way from the normal input. You could simulate this by using not ConnectedStream.flatMap, but a just a simple Stream.flatMap whose input element type is an Either type, whose two components would be the normal input and the back-edge input. (And you add maps before the closeWith and to your input1, which would appropriately wrap into the two alternatives of the Either type.) Best, Gábor 2017-01-29 15:39 GMT+01:00 Matt: > Check this image for clarification, this is what I'm trying to do: > http://i.imgur.com/iZxPv04.png > > [image: Inline image 1] > > The rectangles are the two CoFlatMapFunction, sharing a state between > process and update (map1 and map2). It's clear from the image that I need > input1 and the green box to create the blue box, and input2 and the blue > box to create the green one. > > --- > *blue* = *input1*.connect(*green*).keyBy(...).flatMap(...); > *green* = *input2*.connect(*blue*).keyBy(...).flatMap(...); > --- > > As you can see there's no cycle in the flow of data so I guess this > topology is valid. The problem is not having a way to define such flow. > > For instance, with the appropriate setters we would be able to do this: > > --- > *blue* = *input1*.connect(); > *green* = *input2*.connect(); > > *blue.*setConnection(*green*); > *green*.setConnection(*blue*); > > *blue*.keyBy(...).flatMap(...); > *green*.keyBy(...).flatMap(...); > --- > > Any idea is welcome. > > Matt > > On Sat, Jan 28, 2017 at 5:31 PM, Matt wrote: > >> I'm aware of IterativeStream but I don't think it's useful in this case. >> >> As shown in the example above, my use case is "cyclic" in that the same >> object goes from *Input* to *predictionStream* (flatMap1), then to >> *statsStream* (flatMap2, where it's updated with an object from *Input2*) >> and finally to *predictionStream* (flatMap2). >> >> The same operator is never applied twice to the object, thus I would say >> this dataflow is cyclic only in the dependencies of the stream >> (predictionStream depends on statsStream, but it depends on >> predictionStream in the first place). >> >> I hope it is clear now. >> >> Matt >> >> On Sat, Jan 28, 2017 at 3:17 PM, Gábor Gévay wrote: >> >>> Hello, >>> >>> Cyclic dataflows can be built using iterations: >>> https://ci.apache.org/projects/flink/flink-docs-release-1.2/ >>> dev/datastream_api.html#iterations >>> >>> Best, >>> Gábor >>> >>> >>> >>> >>> 2017-01-28 18:39 GMT+01:00 Matt : >>> > I have a ConnectedStream (A) that depends on another ConnectedStream >>> (B), >>> > which depends on the first one (A). >>> > >>> > Simplified code: >>> > >>> > predictionStream = input >>> > .connect(statsStream) >>> > .keyBy(...) >>> > .flatMap(CoFlatMapFunction { >>> > flatMap1(obj, output) { >>> > p = prediction(obj) >>> > output.collect(p) >>> > } >>> > flatMap2(stat, output) { >>> > updateModel(stat) >>> > } >>> > }) >>> > >>> > statsStream = input2 >>> > .connect(predictionStream) >>> > .keyBy(...) >>> > .flatMap(CoFlatMapFunction { >>> > flatMap1(obj2, output) { >>> > s = getStats(obj2, p) >>> > output.collect(s) >>> > } >>> > flatMap2(prediction, output) { >>> > p = prediction >>> > } >>> > }) >>> > >>> > I'm guessing this should be possible to achieve, one way would be to >>> add a >>> > sink on statsStream to save the elements into Kafka and read from that >>> topic >>> > on predictionStream instead of initializing it with a reference of >>> > statsStream. But I would rather avoid writing unnecessarily into kafka. >>> > >>> > Is there any other way to achieve this? >>> > >>> > Thanks, >>> > Matt >>> >> >> >
Re: Cyclic ConnectedStream
Check this image for clarification, this is what I'm trying to do: http://i.imgur.com/iZxPv04.png [image: Inline image 1] The rectangles are the two CoFlatMapFunction, sharing a state between process and update (map1 and map2). It's clear from the image that I need input1 and the green box to create the blue box, and input2 and the blue box to create the green one. --- *blue* = *input1*.connect(*green*).keyBy(...).flatMap(...); *green* = *input2*.connect(*blue*).keyBy(...).flatMap(...); --- As you can see there's no cycle in the flow of data so I guess this topology is valid. The problem is not having a way to define such flow. For instance, with the appropriate setters we would be able to do this: --- *blue* = *input1*.connect(); *green* = *input2*.connect(); *blue.*setConnection(*green*); *green*.setConnection(*blue*); *blue*.keyBy(...).flatMap(...); *green*.keyBy(...).flatMap(...); --- Any idea is welcome. Matt On Sat, Jan 28, 2017 at 5:31 PM, Mattwrote: > I'm aware of IterativeStream but I don't think it's useful in this case. > > As shown in the example above, my use case is "cyclic" in that the same > object goes from *Input* to *predictionStream* (flatMap1), then to > *statsStream* (flatMap2, where it's updated with an object from *Input2*) > and finally to *predictionStream* (flatMap2). > > The same operator is never applied twice to the object, thus I would say > this dataflow is cyclic only in the dependencies of the stream > (predictionStream depends on statsStream, but it depends on > predictionStream in the first place). > > I hope it is clear now. > > Matt > > On Sat, Jan 28, 2017 at 3:17 PM, Gábor Gévay wrote: > >> Hello, >> >> Cyclic dataflows can be built using iterations: >> https://ci.apache.org/projects/flink/flink-docs-release-1.2/ >> dev/datastream_api.html#iterations >> >> Best, >> Gábor >> >> >> >> >> 2017-01-28 18:39 GMT+01:00 Matt : >> > I have a ConnectedStream (A) that depends on another ConnectedStream >> (B), >> > which depends on the first one (A). >> > >> > Simplified code: >> > >> > predictionStream = input >> > .connect(statsStream) >> > .keyBy(...) >> > .flatMap(CoFlatMapFunction { >> > flatMap1(obj, output) { >> > p = prediction(obj) >> > output.collect(p) >> > } >> > flatMap2(stat, output) { >> > updateModel(stat) >> > } >> > }) >> > >> > statsStream = input2 >> > .connect(predictionStream) >> > .keyBy(...) >> > .flatMap(CoFlatMapFunction { >> > flatMap1(obj2, output) { >> > s = getStats(obj2, p) >> > output.collect(s) >> > } >> > flatMap2(prediction, output) { >> > p = prediction >> > } >> > }) >> > >> > I'm guessing this should be possible to achieve, one way would be to >> add a >> > sink on statsStream to save the elements into Kafka and read from that >> topic >> > on predictionStream instead of initializing it with a reference of >> > statsStream. But I would rather avoid writing unnecessarily into kafka. >> > >> > Is there any other way to achieve this? >> > >> > Thanks, >> > Matt >> > >
Re: Cyclic ConnectedStream
I'm aware of IterativeStream but I don't think it's useful in this case. As shown in the example above, my use case is "cyclic" in that the same object goes from *Input* to *predictionStream* (flatMap1), then to *statsStream* (flatMap2, where it's updated with an object from *Input2*) and finally to *predictionStream* (flatMap2). The same operator is never applied twice to the object, thus I would say this dataflow is cyclic only in the dependencies of the stream (predictionStream depends on statsStream, but it depends on predictionStream in the first place). I hope it is clear now. Matt On Sat, Jan 28, 2017 at 3:17 PM, Gábor Gévaywrote: > Hello, > > Cyclic dataflows can be built using iterations: > https://ci.apache.org/projects/flink/flink-docs- > release-1.2/dev/datastream_api.html#iterations > > Best, > Gábor > > > > > 2017-01-28 18:39 GMT+01:00 Matt : > > I have a ConnectedStream (A) that depends on another ConnectedStream (B), > > which depends on the first one (A). > > > > Simplified code: > > > > predictionStream = input > > .connect(statsStream) > > .keyBy(...) > > .flatMap(CoFlatMapFunction { > > flatMap1(obj, output) { > > p = prediction(obj) > > output.collect(p) > > } > > flatMap2(stat, output) { > > updateModel(stat) > > } > > }) > > > > statsStream = input2 > > .connect(predictionStream) > > .keyBy(...) > > .flatMap(CoFlatMapFunction { > > flatMap1(obj2, output) { > > s = getStats(obj2, p) > > output.collect(s) > > } > > flatMap2(prediction, output) { > > p = prediction > > } > > }) > > > > I'm guessing this should be possible to achieve, one way would be to add > a > > sink on statsStream to save the elements into Kafka and read from that > topic > > on predictionStream instead of initializing it with a reference of > > statsStream. But I would rather avoid writing unnecessarily into kafka. > > > > Is there any other way to achieve this? > > > > Thanks, > > Matt >
Cyclic ConnectedStream
I have a ConnectedStream (A) that depends on another ConnectedStream (B), which depends on the first one (A). Simplified code: *predictionStream = **input* .connect(*statsStream*) .keyBy(...) .flatMap(CoFlatMapFunction { flatMap1(obj, output) { p = prediction(obj) * output.collect(p)* } flatMap2(stat, output) { updateModel(stat) } }) *statsStream = input2* .connect(*predictionStream*) .keyBy(...) .flatMap(CoFlatMapFunction { flatMap1(obj2, output) { s = getStats(obj2, p) *output.collect(s)* } flatMap2(prediction, output) { p = prediction } }) I'm guessing this should be possible to achieve, one way would be to add a sink on statsStream to save the elements into Kafka and read from that topic on predictionStream instead of initializing it with a reference of statsStream. But I would rather avoid writing unnecessarily into kafka. Is there any other way to achieve this? Thanks, Matt