Hi all,

I am having multiple questions regarding Flink :) Let me give you some
background of what I have done so far.

*Description*
I am using Flink 1.11.2. My job is doing data enrichment. Data is consumed
from 6 different kafka topics and it is joined via multiple
CoProcessFunctions. On a daily basis the job is handling ~20 millions
events from the source kafka topics.

*Configuration*
These are the settings I am using:

jobmanager.memory.process.size: 4096m
jobmanager.memory.off-heap.size: 512m
taskmanager.memory.process.size: 12000m
taskmanager.memory.task.off-heap.size: 512m
taskmanager.numberOfTaskSlots: 1
parallelism.default: 5
taskmanager.rpc.port: 6122
jobmanager.execution.failover-strategy: region
state.backend: rocksdb
state.backend.incremental: true
state.backend.rocksdb.localdir: /opt/flink/rocksdb
state.backend.rocksdb.memory.managed: true
state.backend.rocksdb.predefined-options: FLASH_SSD_OPTIMIZED
state.backend.rocksdb.block.cache-size: 64mb
state.checkpoints.dir: s3://bucket/checkpoints
state.savepoints.dir: s3://bucket/savepoints
s3.access-key: AWS_ACCESS_KEY_ID
s3.secret-key: AWS_SECRET_ACCESS_KEY
s3.endpoint: http://<internal_url>
s3.path.style.access: true
s3.entropy.key: _entropy_
s3.entropy.length: 8
presto.s3.socket-timeout: 10m
client.timeout: 60min

*Deployment setup*
Flink is deployed in k8s with Per-Job mode having 1 job manager and 5 task
managers. I have a daily cron job which triggers savepoint in order to have
a fresh copy of the whole state.

*Problems with the existing setup*
1. I observe that savepoints are causing Flink to consume more than the
allowed memory. I observe the behavior described in this stackoverflow post
<https://stackoverflow.com/questions/64172881/flink-1-11-1-off-heap-memory-growing-using-rocksdbstatebackend>
(which
seems to be solved in 1.12.X if I am getting it right).
2. I cannot achieve high availability with Per-Job mode and thus I ended up
having a regular savepoint on a daily basis.

*Questions*
1. Is it a good idea to have regular savepoints (say on a daily basis)?
2. Is it possible to have high availability with Per-Job mode? Or maybe I
should go with session mode and make sure that my flink cluster is running
a single job?
3. Let's assume that savepoints should be triggered only before job
update/deployment. How can I trigger a savepoint if my job is already
consuming more than 80% of the allowed memory per pod in k8s? My
observations show that k8s kills task managers (which are running as pods)
and I need to retry it a couple of times.
4. Should I consider upgrading to version 1.12.3?
5. Should I consider switching off state.backend.rocksdb.memory.managed
property even in version 1.12.3?
6. How do I decide when the job parallelism should be increased? Are there
some metrics which can lead me to a clue that the parallelism should be
increased?

Best Regards,
Rado

Reply via email to