Re: Deployment/Memory Configuration/Scalability

2021-04-27 Thread Radoslav Smilyanov
Hi Yangze Guo,

Thanks for your reply.

> 1. Is it a good idea to have regular savepoints (say on a daily basis)?
> > 2. Is it possible to have high availability with Per-Job mode? Or maybe
> I should go with session mode and make sure that my flink cluster is
> running a single job?
> Yes, we can achieve HA with per-job mode with ZooKeeper[2]. Look at
> your configuration, you need to also enable the checkpoint[2], which
> is automatically triggered and helps you to resume the program when
> failure, by setting the execution.checkpointing.interval.


I forgot to add the checkpoint configuration since it's part of a custom
job configuration which is mounted in each pod. So checkpoints are enabled.
:)
That's why savepoint is triggered on a daily basis since the existing
deployment setup has a single Job Manager.
I will take a look at k8s or Zookeeper HA options.

> 3. Let's assume that savepoints should be triggered only before job
> update/deployment. How can I trigger a savepoint if my job is already
> consuming more than 80% of the allowed memory per pod in k8s? My
> observations show that k8s kills task managers (which are running as pods)
> and I need to retry it a couple of times.
> I think with the checkpoint, you no longer need to trigger the
> savepoint manually with a specific condition as the checkpoint will be
> periodically triggered.


Checkpoints are already enabled (once per every 10 minutes). Once HA is
setuped correctly I think that savepoints can be used only when the job
needs to be updated.

> 6. How do I decide when the job parallelism should be increased? Are
> there some metrics which can lead me to a clue that the parallelism should
> be increased?
> As there are 6 Kafka sources in your job, I think the parallelism
> should first be fixed with the topic partition number. For metrics,
> you could refer to the backpressure of tasks and
> numRecordsOutPerSecond[5].


Currently I am using parallelism which is equal to the highest number of
kafka topic partitions. Unfortunately some of the topics have higher load
compared to others and thus some of them are having 1 partition while
others are having 4 partitions (for example).

Thanks,
Rado

On Tue, Apr 27, 2021 at 7:50 AM Yangze Guo  wrote:

> Hi, Radoslav,
>
> > 1. Is it a good idea to have regular savepoints (say on a daily basis)?
> > 2. Is it possible to have high availability with Per-Job mode? Or maybe
> I should go with session mode and make sure that my flink cluster is
> running a single job?
>
> Yes, we can achieve HA with per-job mode with ZooKeeper[2]. Look at
> your configuration, you need to also enable the checkpoint[2], which
> is automatically triggered and helps you to resume the program when
> failure, by setting the execution.checkpointing.interval.
>
> > 3. Let's assume that savepoints should be triggered only before job
> update/deployment. How can I trigger a savepoint if my job is already
> consuming more than 80% of the allowed memory per pod in k8s? My
> observations show that k8s kills task managers (which are running as pods)
> and I need to retry it a couple of times.
>
> I think with the checkpoint, you no longer need to trigger the
> savepoint manually with a specific condition as the checkpoint will be
> periodically triggered.
>
> > 4. Should I consider upgrading to version 1.12.3?
> > 5. Should I consider switching off state.backend.rocksdb.memory.managed
> property even in version 1.12.3?
>
> I'm not an expert on the state backend, but it seems the fix of that
> issue is only applied to the docker image. So I guess you can package
> a custom image yourselves if you do not want to upgrade. However, if
> you are using the Native K8S mode[3] and there is no compatibility
> issue, I think it might be good to upgrading because there are also
> lots of improvements[4] in 1.12.
>
> > 6. How do I decide when the job parallelism should be increased? Are
> there some metrics which can lead me to a clue that the parallelism should
> be increased?
>
> As there are 6 Kafka sources in your job, I think the parallelism
> should first be fixed with the topic partition number. For metrics,
> you could refer to the backpressure of tasks and
> numRecordsOutPerSecond[5].
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/ha/zookeeper_ha/
> [2]
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/checkpoints.html
> [3]
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/native_kubernetes.html
> [4] https://issues.apache.org/jira/browse/FLINK-17709
> [5]
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/ops/metrics.html#io
>
> Best,
> Yangze Guo
>
> On Mon, Apr 26, 2021 at 4:14 PM Radoslav Smilyanov
>  wrote:
> >
> > Hi all,
> >
> > I am having multiple questions regarding Flink :) Let me give you some
> background of what I have done so far.
> >
> > Description
> > I am using Flink 1.11.2. My job is doing data enrichment. 

Re: Deployment/Memory Configuration/Scalability

2021-04-26 Thread Yangze Guo
Hi, Radoslav,

> 1. Is it a good idea to have regular savepoints (say on a daily basis)?
> 2. Is it possible to have high availability with Per-Job mode? Or maybe I 
> should go with session mode and make sure that my flink cluster is running a 
> single job?

Yes, we can achieve HA with per-job mode with ZooKeeper[2]. Look at
your configuration, you need to also enable the checkpoint[2], which
is automatically triggered and helps you to resume the program when
failure, by setting the execution.checkpointing.interval.

> 3. Let's assume that savepoints should be triggered only before job 
> update/deployment. How can I trigger a savepoint if my job is already 
> consuming more than 80% of the allowed memory per pod in k8s? My observations 
> show that k8s kills task managers (which are running as pods) and I need to 
> retry it a couple of times.

I think with the checkpoint, you no longer need to trigger the
savepoint manually with a specific condition as the checkpoint will be
periodically triggered.

> 4. Should I consider upgrading to version 1.12.3?
> 5. Should I consider switching off state.backend.rocksdb.memory.managed 
> property even in version 1.12.3?

I'm not an expert on the state backend, but it seems the fix of that
issue is only applied to the docker image. So I guess you can package
a custom image yourselves if you do not want to upgrade. However, if
you are using the Native K8S mode[3] and there is no compatibility
issue, I think it might be good to upgrading because there are also
lots of improvements[4] in 1.12.

> 6. How do I decide when the job parallelism should be increased? Are there 
> some metrics which can lead me to a clue that the parallelism should be 
> increased?

As there are 6 Kafka sources in your job, I think the parallelism
should first be fixed with the topic partition number. For metrics,
you could refer to the backpressure of tasks and
numRecordsOutPerSecond[5].

[1] 
https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/ha/zookeeper_ha/
[2] 
https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/checkpoints.html
[3] 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/native_kubernetes.html
[4] https://issues.apache.org/jira/browse/FLINK-17709
[5] 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/ops/metrics.html#io

Best,
Yangze Guo

On Mon, Apr 26, 2021 at 4:14 PM Radoslav Smilyanov
 wrote:
>
> Hi all,
>
> I am having multiple questions regarding Flink :) Let me give you some 
> background of what I have done so far.
>
> Description
> I am using Flink 1.11.2. My job is doing data enrichment. Data is consumed 
> from 6 different kafka topics and it is joined via multiple 
> CoProcessFunctions. On a daily basis the job is handling ~20 millions events 
> from the source kafka topics.
>
> Configuration
> These are the settings I am using:
>
> jobmanager.memory.process.size: 4096m
> jobmanager.memory.off-heap.size: 512m
> taskmanager.memory.process.size: 12000m
> taskmanager.memory.task.off-heap.size: 512m
> taskmanager.numberOfTaskSlots: 1
> parallelism.default: 5
> taskmanager.rpc.port: 6122
> jobmanager.execution.failover-strategy: region
> state.backend: rocksdb
> state.backend.incremental: true
> state.backend.rocksdb.localdir: /opt/flink/rocksdb
> state.backend.rocksdb.memory.managed: true
> state.backend.rocksdb.predefined-options: FLASH_SSD_OPTIMIZED
> state.backend.rocksdb.block.cache-size: 64mb
> state.checkpoints.dir: s3://bucket/checkpoints
> state.savepoints.dir: s3://bucket/savepoints
> s3.access-key: AWS_ACCESS_KEY_ID
> s3.secret-key: AWS_SECRET_ACCESS_KEY
> s3.endpoint: http://
> s3.path.style.access: true
> s3.entropy.key: _entropy_
> s3.entropy.length: 8
> presto.s3.socket-timeout: 10m
> client.timeout: 60min
>
> Deployment setup
> Flink is deployed in k8s with Per-Job mode having 1 job manager and 5 task 
> managers. I have a daily cron job which triggers savepoint in order to have a 
> fresh copy of the whole state.
>
> Problems with the existing setup
> 1. I observe that savepoints are causing Flink to consume more than the 
> allowed memory. I observe the behavior described in this stackoverflow post 
> (which seems to be solved in 1.12.X if I am getting it right).
> 2. I cannot achieve high availability with Per-Job mode and thus I ended up 
> having a regular savepoint on a daily basis.
>
> Questions
> 1. Is it a good idea to have regular savepoints (say on a daily basis)?
> 2. Is it possible to have high availability with Per-Job mode? Or maybe I 
> should go with session mode and make sure that my flink cluster is running a 
> single job?
> 3. Let's assume that savepoints should be triggered only before job 
> update/deployment. How can I trigger a savepoint if my job is already 
> consuming more than 80% of the allowed memory per pod in k8s? My observations 
> show that k8s kills task managers (which are running as pods) and I need to 
> retry it a couple of 

Deployment/Memory Configuration/Scalability

2021-04-26 Thread Radoslav Smilyanov
Hi all,

I am having multiple questions regarding Flink :) Let me give you some
background of what I have done so far.

*Description*
I am using Flink 1.11.2. My job is doing data enrichment. Data is consumed
from 6 different kafka topics and it is joined via multiple
CoProcessFunctions. On a daily basis the job is handling ~20 millions
events from the source kafka topics.

*Configuration*
These are the settings I am using:

jobmanager.memory.process.size: 4096m
jobmanager.memory.off-heap.size: 512m
taskmanager.memory.process.size: 12000m
taskmanager.memory.task.off-heap.size: 512m
taskmanager.numberOfTaskSlots: 1
parallelism.default: 5
taskmanager.rpc.port: 6122
jobmanager.execution.failover-strategy: region
state.backend: rocksdb
state.backend.incremental: true
state.backend.rocksdb.localdir: /opt/flink/rocksdb
state.backend.rocksdb.memory.managed: true
state.backend.rocksdb.predefined-options: FLASH_SSD_OPTIMIZED
state.backend.rocksdb.block.cache-size: 64mb
state.checkpoints.dir: s3://bucket/checkpoints
state.savepoints.dir: s3://bucket/savepoints
s3.access-key: AWS_ACCESS_KEY_ID
s3.secret-key: AWS_SECRET_ACCESS_KEY
s3.endpoint: http://
s3.path.style.access: true
s3.entropy.key: _entropy_
s3.entropy.length: 8
presto.s3.socket-timeout: 10m
client.timeout: 60min

*Deployment setup*
Flink is deployed in k8s with Per-Job mode having 1 job manager and 5 task
managers. I have a daily cron job which triggers savepoint in order to have
a fresh copy of the whole state.

*Problems with the existing setup*
1. I observe that savepoints are causing Flink to consume more than the
allowed memory. I observe the behavior described in this stackoverflow post

(which
seems to be solved in 1.12.X if I am getting it right).
2. I cannot achieve high availability with Per-Job mode and thus I ended up
having a regular savepoint on a daily basis.

*Questions*
1. Is it a good idea to have regular savepoints (say on a daily basis)?
2. Is it possible to have high availability with Per-Job mode? Or maybe I
should go with session mode and make sure that my flink cluster is running
a single job?
3. Let's assume that savepoints should be triggered only before job
update/deployment. How can I trigger a savepoint if my job is already
consuming more than 80% of the allowed memory per pod in k8s? My
observations show that k8s kills task managers (which are running as pods)
and I need to retry it a couple of times.
4. Should I consider upgrading to version 1.12.3?
5. Should I consider switching off state.backend.rocksdb.memory.managed
property even in version 1.12.3?
6. How do I decide when the job parallelism should be increased? Are there
some metrics which can lead me to a clue that the parallelism should be
increased?

Best Regards,
Rado