Franz Thoma created FLINK-9374:
--
Summary: Flink Kinesis Producer does not backpressure
Key: FLINK-9374
URL: https://issues.apache.org/jira/browse/FLINK-9374
Project: Flink
Issue Type: Bug
Reporter: Franz Thoma
Attachments: after.png, before.png
The {{FlinkKinesisProducer}} just accepts records and forwards it to a
{{KinesisProducer}} from the Amazon Kinesis Producer Library (KPL). The KPL
internally holds an unbounded queue of records that have not yet been sent.
Since Kinesis is rate-limited to 1MB per second per shard, this queue may grow
indefinitely if Flink sends records faster than the KPL can forward them to
Kinesis.
One way to circumvent this problem is to set a record TTL, so that queued
records are dropped after a certain amount of time, but this will lead to data
loss under high loads.
Currently the only time the queue is flushed is during checkpointing:
{{FlinkKinesisProducer}} consumes records at arbitrary rate, either until a
checkpoint is reached (and will wait until the queue is flushed), or until
out-of-memory, whichever is reached first. (This gets worse due to the fact
that the Java KPL is only a thin wrapper around a C++ process, so it is not
even the Java process that runs out of memory, but the C++ process.) The
implicit rate-limit due to checkpointing leads to a ragged throughput graph
like this (the periods with zero throughput are the wait times before a
checkpoint):
!file:///home/fthoma/projects/flink/before.png!!before.png! Throughput limited
by checkpointing only
My proposed solution is to add a config option {{queueLimit}} to set a maximum
number of records that may be waiting in the KPL queue. If this limit is
reached, the {{FlinkKinesisProducer}} should trigger a {{flush()}} and wait
(blocking) until the queue length is below the limit again. This automatically
leads to backpressuring, since the {{FlinkKinesisProducer}} cannot accept
records while waiting. For compatibility, {{queueLimit}} is set to
{{Integer.MAX_VALUE}} by default, so the behavior is unchanged unless a client
explicitly sets the value. Setting a »sane« default value is not possible
unfortunately, since sensible values for the limit depend on the record size
(the limit should be chosen so that about 10–100MB of records per shard are
accumulated before flushing, otherwise the maximum Kinesis throughput may not
be reached).
!after.png! Throughput with a queue limit of 10 records (the spikes are
checkpoints, where the queue is still flushed completely)
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)