Check this image for clarification, this is what I'm trying to do: http://i.imgur.com/iZxPv04.png
[image: Inline image 1] The rectangles are the two CoFlatMapFunction, sharing a state between process and update (map1 and map2). It's clear from the image that I need input1 and the green box to create the blue box, and input2 and the blue box to create the green one. --- *blue* = *input1*.connect(*green*).keyBy(...).flatMap(...); *green* = *input2*.connect(*blue*).keyBy(...).flatMap(...); --- As you can see there's no cycle in the flow of data so I guess this topology is valid. The problem is not having a way to define such flow. For instance, with the appropriate setters we would be able to do this: --- *blue* = *input1*.connect(); *green* = *input2*.connect(); *blue.*setConnection(*green*); *green*.setConnection(*blue*); *blue*.keyBy(...).flatMap(...); *green*.keyBy(...).flatMap(...); --- Any idea is welcome. Matt On Sat, Jan 28, 2017 at 5:31 PM, Matt <dromitl...@gmail.com> wrote: > I'm aware of IterativeStream but I don't think it's useful in this case. > > As shown in the example above, my use case is "cyclic" in that the same > object goes from *Input* to *predictionStream* (flatMap1), then to > *statsStream* (flatMap2, where it's updated with an object from *Input2*) > and finally to *predictionStream* (flatMap2). > > The same operator is never applied twice to the object, thus I would say > this dataflow is cyclic only in the dependencies of the stream > (predictionStream depends on statsStream, but it depends on > predictionStream in the first place). > > I hope it is clear now. > > Matt > > On Sat, Jan 28, 2017 at 3:17 PM, Gábor Gévay <gga...@gmail.com> wrote: > >> Hello, >> >> Cyclic dataflows can be built using iterations: >> https://ci.apache.org/projects/flink/flink-docs-release-1.2/ >> dev/datastream_api.html#iterations >> >> Best, >> Gábor >> >> >> >> >> 2017-01-28 18:39 GMT+01:00 Matt <dromitl...@gmail.com>: >> > I have a ConnectedStream (A) that depends on another ConnectedStream >> (B), >> > which depends on the first one (A). >> > >> > Simplified code: >> > >> > predictionStream = input >> > .connect(statsStream) >> > .keyBy(...) >> > .flatMap(CoFlatMapFunction { >> > flatMap1(obj, output) { >> > p = prediction(obj) >> > output.collect(p) >> > } >> > flatMap2(stat, output) { >> > updateModel(stat) >> > } >> > }) >> > >> > statsStream = input2 >> > .connect(predictionStream) >> > .keyBy(...) >> > .flatMap(CoFlatMapFunction { >> > flatMap1(obj2, output) { >> > s = getStats(obj2, p) >> > output.collect(s) >> > } >> > flatMap2(prediction, output) { >> > p = prediction >> > } >> > }) >> > >> > I'm guessing this should be possible to achieve, one way would be to >> add a >> > sink on statsStream to save the elements into Kafka and read from that >> topic >> > on predictionStream instead of initializing it with a reference of >> > statsStream. But I would rather avoid writing unnecessarily into kafka. >> > >> > Is there any other way to achieve this? >> > >> > Thanks, >> > Matt >> > >