Hi!I have a job that uses a RichCoFlatMapFunction of two streams: A and B.In
MyOp, the A stream tuples are combined to form a state using a
ValueStateDescriptor. Stream A is usually started from the beginning of a
Kafka topic. Stream A has a rate of 100k tuples/s. After processing the
whole Kafka queue, the rate drops to 10 tuples/s. A big drop.What I now want
is that while tuples from A are being processed in flatMap1, the stream B in
flatMap2 should wait until the rate of the A stream has dropped and only
then, be flatMap2 should be called. Ideally, this behaviour would be
captured in a separate operator, like RateBasedStreamValve or something like
that :)To solve this, my idea is to add a counter/timer in the
RichCoFlatMapFunction that counts how many tuples have been processed from
A. If the rate drops below a threshold (here maybe 15 tuples/s), flatMap2
that proesses tuples from B empties the buffer. However, this would make my
RichCoFlatMapFunction much bigger and would not allow for operator reuse in
other scenarios.I'm of course happy to answer if something is unclear.--
Jonas



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Start-streaming-tuples-depending-on-another-streams-rate-tp11542.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.

Reply via email to