I'm aware of IterativeStream but I don't think it's useful in this case. As shown in the example above, my use case is "cyclic" in that the same object goes from *Input* to *predictionStream* (flatMap1), then to *statsStream* (flatMap2, where it's updated with an object from *Input2*) and finally to *predictionStream* (flatMap2).
The same operator is never applied twice to the object, thus I would say this dataflow is cyclic only in the dependencies of the stream (predictionStream depends on statsStream, but it depends on predictionStream in the first place). I hope it is clear now. Matt On Sat, Jan 28, 2017 at 3:17 PM, Gábor Gévay <gga...@gmail.com> wrote: > Hello, > > Cyclic dataflows can be built using iterations: > https://ci.apache.org/projects/flink/flink-docs- > release-1.2/dev/datastream_api.html#iterations > > Best, > Gábor > > > > > 2017-01-28 18:39 GMT+01:00 Matt <dromitl...@gmail.com>: > > I have a ConnectedStream (A) that depends on another ConnectedStream (B), > > which depends on the first one (A). > > > > Simplified code: > > > > predictionStream = input > > .connect(statsStream) > > .keyBy(...) > > .flatMap(CoFlatMapFunction { > > flatMap1(obj, output) { > > p = prediction(obj) > > output.collect(p) > > } > > flatMap2(stat, output) { > > updateModel(stat) > > } > > }) > > > > statsStream = input2 > > .connect(predictionStream) > > .keyBy(...) > > .flatMap(CoFlatMapFunction { > > flatMap1(obj2, output) { > > s = getStats(obj2, p) > > output.collect(s) > > } > > flatMap2(prediction, output) { > > p = prediction > > } > > }) > > > > I'm guessing this should be possible to achieve, one way would be to add > a > > sink on statsStream to save the elements into Kafka and read from that > topic > > on predictionStream instead of initializing it with a reference of > > statsStream. But I would rather avoid writing unnecessarily into kafka. > > > > Is there any other way to achieve this? > > > > Thanks, > > Matt >