I'm afraid your analysis is 100% correct. Currently there's no out-of-box feature for dealing with this but our work on a new source interface ([1]) will enable us to add a feature that we call "event-time alignment" where source readers would slow down reading from certain source partitions if their watermark advances to far beyond the minimum watermark over all source partitions.


On 07.02.20 13:36, Akshay Aggarwal wrote:
Hi Flink Users,

We have a scenario where we're reading from multiple kafka topics using a
single kafka consumer. Each topic has a very different ingestion rate, like
CheckoutTopic has 500 rec/sec, PageViewTopic has 10,000 rec/sec. We are
performing ordering of these events across topics using a keyed process
function (keyed on userId) and a EVENT_TIME watermark which is based on the
ingestionTs of the record captured just before it is produced into kafka.

On live data this pipeline works perfectly, but if I restart the job to
process from an old savepoint (say 24hrs old), the job fills up the state,
a full back pressure (ratio 1) gets created on the source operators,
checkpoints start failing and the job eventually dies. My hypothesis is
that the data from both the topics are read at the max rate possible, but
since the watermark from the PageViewTopic will lag significantly behind
the CheckoutTopic overall watermarks don't progress, excessive data
from CheckoutTopic fills up the state and results in the failure mentioned

I also observed this while backfilling from a savepoint using a single
topic, even though watermarks do progress faster than before, the job has
the same fate. In this case I'm assuming the offsets/watermarks of the
individual partitions go out-of-sync with respect to time leading to a
similar situation mentioned above.

Is this understanding correct? is there a known solution for this? And if
not, what is the suggested approach to tackle this problem?

Thanks & Regards,
Akshay Aggarwal

Reply via email to