Hi!I have a job that uses a RichCoFlatMapFunction of two streams: A and B.In MyOp, the A stream tuples are combined to form a state using a ValueStateDescriptor. Stream A is usually started from the beginning of a Kafka topic. Stream A has a rate of 100k tuples/s. After processing the whole Kafka queue, the rate drops to 10 tuples/s. A big drop.What I now want is that while tuples from A are being processed in flatMap1, the stream B in flatMap2 should wait until the rate of the A stream has dropped and only then, be flatMap2 should be called. Ideally, this behaviour would be captured in a separate operator, like RateBasedStreamValve or something like that :)To solve this, my idea is to add a counter/timer in the RichCoFlatMapFunction that counts how many tuples have been processed from A. If the rate drops below a threshold (here maybe 15 tuples/s), flatMap2 that proesses tuples from B empties the buffer. However, this would make my RichCoFlatMapFunction much bigger and would not allow for operator reuse in other scenarios.I'm of course happy to answer if something is unclear.-- Jonas
-- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Start-streaming-tuples-depending-on-another-streams-rate-tp11542.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.