I have a ConnectedStream (A) that depends on another ConnectedStream (B), which depends on the first one (A).
Simplified code: *predictionStream = **input* .connect(*statsStream*) .keyBy(...) .flatMap(CoFlatMapFunction { flatMap1(obj, output) { p = prediction(obj) * output.collect(p)* } flatMap2(stat, output) { updateModel(stat) } }) *statsStream = input2* .connect(*predictionStream*) .keyBy(...) .flatMap(CoFlatMapFunction { flatMap1(obj2, output) { s = getStats(obj2, p) * output.collect(s)* } flatMap2(prediction, output) { p = prediction } }) I'm guessing this should be possible to achieve, one way would be to add a sink on statsStream to save the elements into Kafka and read from that topic on predictionStream instead of initializing it with a reference of statsStream. But I would rather avoid writing unnecessarily into kafka. Is there any other way to achieve this? Thanks, Matt