I have a ConnectedStream (A) that depends on another ConnectedStream (B),
which depends on the first one (A).

Simplified code:

*predictionStream = **input*
  .connect(*statsStream*)
  .keyBy(...)
  .flatMap(CoFlatMapFunction {
     flatMap1(obj, output) {
         p = prediction(obj)
*         output.collect(p)*
     }
     flatMap2(stat, output) {
         updateModel(stat)
     }
  })

*statsStream = input2*
  .connect(*predictionStream*)
  .keyBy(...)
  .flatMap(CoFlatMapFunction {
     flatMap1(obj2, output) {
        s = getStats(obj2, p)
*        output.collect(s)*
     }
     flatMap2(prediction, output) {
        p = prediction
     }
  })

I'm guessing this should be possible to achieve, one way would be to add a
sink on statsStream to save the elements into Kafka and read from that
topic on predictionStream instead of initializing it with a reference of
statsStream. But I would rather avoid writing unnecessarily into kafka.

Is there any other way to achieve this?

Thanks,
Matt

Reply via email to