Hi, Radoslav,

> 1. Is it a good idea to have regular savepoints (say on a daily basis)?
> 2. Is it possible to have high availability with Per-Job mode? Or maybe I 
> should go with session mode and make sure that my flink cluster is running a 
> single job?

Yes, we can achieve HA with per-job mode with ZooKeeper[2]. Look at
your configuration, you need to also enable the checkpoint[2], which
is automatically triggered and helps you to resume the program when
failure, by setting the execution.checkpointing.interval.

> 3. Let's assume that savepoints should be triggered only before job 
> update/deployment. How can I trigger a savepoint if my job is already 
> consuming more than 80% of the allowed memory per pod in k8s? My observations 
> show that k8s kills task managers (which are running as pods) and I need to 
> retry it a couple of times.

I think with the checkpoint, you no longer need to trigger the
savepoint manually with a specific condition as the checkpoint will be
periodically triggered.

> 4. Should I consider upgrading to version 1.12.3?
> 5. Should I consider switching off state.backend.rocksdb.memory.managed 
> property even in version 1.12.3?

I'm not an expert on the state backend, but it seems the fix of that
issue is only applied to the docker image. So I guess you can package
a custom image yourselves if you do not want to upgrade. However, if
you are using the Native K8S mode[3] and there is no compatibility
issue, I think it might be good to upgrading because there are also
lots of improvements[4] in 1.12.

> 6. How do I decide when the job parallelism should be increased? Are there 
> some metrics which can lead me to a clue that the parallelism should be 
> increased?

As there are 6 Kafka sources in your job, I think the parallelism
should first be fixed with the topic partition number. For metrics,
you could refer to the backpressure of tasks and
numRecordsOutPerSecond[5].

[1] 
https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/ha/zookeeper_ha/
[2] 
https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/checkpoints.html
[3] 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/native_kubernetes.html
[4] https://issues.apache.org/jira/browse/FLINK-17709
[5] 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/ops/metrics.html#io

Best,
Yangze Guo

On Mon, Apr 26, 2021 at 4:14 PM Radoslav Smilyanov
<radoslav.smilya...@smule.com> wrote:
>
> Hi all,
>
> I am having multiple questions regarding Flink :) Let me give you some 
> background of what I have done so far.
>
> Description
> I am using Flink 1.11.2. My job is doing data enrichment. Data is consumed 
> from 6 different kafka topics and it is joined via multiple 
> CoProcessFunctions. On a daily basis the job is handling ~20 millions events 
> from the source kafka topics.
>
> Configuration
> These are the settings I am using:
>
> jobmanager.memory.process.size: 4096m
> jobmanager.memory.off-heap.size: 512m
> taskmanager.memory.process.size: 12000m
> taskmanager.memory.task.off-heap.size: 512m
> taskmanager.numberOfTaskSlots: 1
> parallelism.default: 5
> taskmanager.rpc.port: 6122
> jobmanager.execution.failover-strategy: region
> state.backend: rocksdb
> state.backend.incremental: true
> state.backend.rocksdb.localdir: /opt/flink/rocksdb
> state.backend.rocksdb.memory.managed: true
> state.backend.rocksdb.predefined-options: FLASH_SSD_OPTIMIZED
> state.backend.rocksdb.block.cache-size: 64mb
> state.checkpoints.dir: s3://bucket/checkpoints
> state.savepoints.dir: s3://bucket/savepoints
> s3.access-key: AWS_ACCESS_KEY_ID
> s3.secret-key: AWS_SECRET_ACCESS_KEY
> s3.endpoint: http://<internal_url>
> s3.path.style.access: true
> s3.entropy.key: _entropy_
> s3.entropy.length: 8
> presto.s3.socket-timeout: 10m
> client.timeout: 60min
>
> Deployment setup
> Flink is deployed in k8s with Per-Job mode having 1 job manager and 5 task 
> managers. I have a daily cron job which triggers savepoint in order to have a 
> fresh copy of the whole state.
>
> Problems with the existing setup
> 1. I observe that savepoints are causing Flink to consume more than the 
> allowed memory. I observe the behavior described in this stackoverflow post 
> (which seems to be solved in 1.12.X if I am getting it right).
> 2. I cannot achieve high availability with Per-Job mode and thus I ended up 
> having a regular savepoint on a daily basis.
>
> Questions
> 1. Is it a good idea to have regular savepoints (say on a daily basis)?
> 2. Is it possible to have high availability with Per-Job mode? Or maybe I 
> should go with session mode and make sure that my flink cluster is running a 
> single job?
> 3. Let's assume that savepoints should be triggered only before job 
> update/deployment. How can I trigger a savepoint if my job is already 
> consuming more than 80% of the allowed memory per pod in k8s? My observations 
> show that k8s kills task managers (which are running as pods) and I need to 
> retry it a couple of times.
> 4. Should I consider upgrading to version 1.12.3?
> 5. Should I consider switching off state.backend.rocksdb.memory.managed 
> property even in version 1.12.3?
> 6. How do I decide when the job parallelism should be increased? Are there 
> some metrics which can lead me to a clue that the parallelism should be 
> increased?
>
> Best Regards,
> Rado
>
>
>

Reply via email to