Re: Permissions to delete Checkpoint on cancel

2018-07-23 Thread ashish pok
Stefan, 
Can’t thank you enough for this write-up. This is awesome explanation. I had 
misunderstood concepts of RocksDB working directory and Checkpoint FS. My main 
intent is to boost performance of RocksDB with SSD available locally. Recovery 
time from HDFS is not much of a concern but load on HDFS “may” be a concern in 
future - we will see.
Going over the documentation again after reading your email, it looks like what 
I intended to do was change my RocksDB working directory to local SSD, which I 
believe is Java IO Tmp dir by default, by using 
state.backend.rocksdb.checkpointdir option first and perform any tuning 
necessary to optimize SSD. 
Thanks,

- Ashish

On Monday, July 23, 2018, 10:39 AM, Stefan Richter 
 wrote:

Hi,
ok, let me briefly explain the differences between local working director, 
checkpoint directory, and savepoint directory and also outline their best 
practises/requirements/tradeoffs. First easy comment is that typically 
checkpoints and savepoints have similar requirements and most users write them 
to the same fs. The working directory, i.e. the directory for spilling or where 
RocksDB operates is transient, it does not require replication because it is 
not part of the fault tolerance strategy. Here the main concern is speed and 
that is why it is ideally a local, physically attached disk on the TM machine.
In contrast to that, checkpoints and savepoints are part of the fault tolerance 
strategy and that is why they typically should be on fault tolerant file 
systems. In database terms, think of checkpoints as a recovery mechanism and 
savepoints as backups. As we usually want to survive node failures, those file 
systems should be fault tolerant/replicated, and also accessible for read/write 
from all TMs and the JM. TMs obviously need to write the data, and read in 
recovery. Under node failures, this means that a TM might have to read state 
that was written on a different machine, that is why TMs should be able to 
access the files written by other TMs. The JM is responsible for deleting 
checkpoints, because TMs might go down and that is why the JM needs access as 
well.
Those requirements typically hold for most Flink users. However, you might get 
away with certain particular trade-offs. You can write checkpoints to local 
disk if:
- Everything runs on one machine, or
- (not sure somebody ever did this, but it could work)1) You will do the 
cleanup of old checkpoints manually (because JM cannot reach them), e.g. with 
scripts and2) You will never try to rescale from a checkpoint and 3) Tasks will 
never migrate to a different machine. You ignore node/disk/etc failures, and 
ensure that your job „owns" the cluster with no other jobs running in parallel. 
This means accepting data loss in the previous cases.
Typically, it should be ok to use a dfs only for checkpoints and savepoints, 
the local working directories should not go to dfs or else things will slow 
down dramatically. If you are just worried about recovery times, you might want 
to take a look at the local recovery feature [1], that keeps a secondary copy 
of the state on local disk for faster restores, but still ensures fault 
tolerance with a primary copy in dfs.
Best,Stefan
[1] 
https://ci.apache.org/projects/flink/flink-docs-master/ops/state/large_state_tuning.html#task-local-recovery


Am 23.07.2018 um 14:18 schrieb ashish pok :
Sorry,
Just a follow-up. In absence of NAS then the best option to go with here is 
checkpoint and savepoints both on HDFS and StateBackend using local SSDs then?
We were trying to not even hit HDFS other than for savepoints.


- Ashish

On Monday, July 23, 2018, 7:45 AM, ashish pok  wrote:

Stefan,
I did have first point at the back of my mind. I was under the impression 
though for checkpoints, cleanup would be done by TMs as they are being taken by 
TMs.
So for a standalone cluster with its own zookeeper for JM high availability, a 
NAS is a must have? We were going to go with local checkpoints with access to 
remote HDFS for savepoints. This sounds like it will be a bad idea then. 
Unfortunately we can’t run on YARN and NAS is also a no-no in one of our 
datacenters - there is a mountain of security complainace to climb before we 
will in Production if we need to go that route.
Thanks, Ashish


On Monday, July 23, 2018, 5:10 AM, Stefan Richter  
wrote:

Hi,

I am wondering how this can even work properly if you are using a local fs for 
checkpoints instead of a distributed fs. First, what happens under node 
failures, if the SSD becomes unavailable or if a task gets scheduled to a 
different machine, and can no longer access the disk with the  corresponding 
state data, or if you want to scale-out. Second, the same problem is also what 
you can observe with the job manager: how could the checkpoint coordinator, 
that runs on the JM, access a file on a local FS on a different node to cleanup 
the checkpoint data? The purpose of using a distributed fs here is that all TM 

Re: Permissions to delete Checkpoint on cancel

2018-07-23 Thread Stefan Richter
Hi,

ok, let me briefly explain the differences between local working director, 
checkpoint directory, and savepoint directory and also outline their best 
practises/requirements/tradeoffs. First easy comment is that typically 
checkpoints and savepoints have similar requirements and most users write them 
to the same fs. The working directory, i.e. the directory for spilling or where 
RocksDB operates is transient, it does not require replication because it is 
not part of the fault tolerance strategy. Here the main concern is speed and 
that is why it is ideally a local, physically attached disk on the TM machine.

In contrast to that, checkpoints and savepoints are part of the fault tolerance 
strategy and that is why they typically should be on fault tolerant file 
systems. In database terms, think of checkpoints as a recovery mechanism and 
savepoints as backups. As we usually want to survive node failures, those file 
systems should be fault tolerant/replicated, and also accessible for read/write 
from all TMs and the JM. TMs obviously need to write the data, and read in 
recovery. Under node failures, this means that a TM might have to read state 
that was written on a different machine, that is why TMs should be able to 
access the files written by other TMs. The JM is responsible for deleting 
checkpoints, because TMs might go down and that is why the JM needs access as 
well.

Those requirements typically hold for most Flink users. However, you might get 
away with certain particular trade-offs. You can write checkpoints to local 
disk if:

- Everything runs on one machine, or

- (not sure somebody ever did this, but it could work)
1) You will do the cleanup of old checkpoints manually (because JM cannot reach 
them), e.g. with scripts and
2) You will never try to rescale from a checkpoint and 
3) Tasks will never migrate to a different machine. You ignore node/disk/etc 
failures, and ensure that your job „owns" the cluster with no other jobs 
running in parallel. This means accepting data loss in the previous cases.

Typically, it should be ok to use a dfs only for checkpoints and savepoints, 
the local working directories should not go to dfs or else things will slow 
down dramatically. If you are just worried about recovery times, you might want 
to take a look at the local recovery feature [1], that keeps a secondary copy 
of the state on local disk for faster restores, but still ensures fault 
tolerance with a primary copy in dfs.

Best,
Stefan

[1] 
https://ci.apache.org/projects/flink/flink-docs-master/ops/state/large_state_tuning.html#task-local-recovery

> Am 23.07.2018 um 14:18 schrieb ashish pok :
> 
> Sorry,
> 
> Just a follow-up. In absence of NAS then the best option to go with here is 
> checkpoint and savepoints both on HDFS and StateBackend using local SSDs then?
> 
> We were trying to not even hit HDFS other than for savepoints.
> 
> 
> - Ashish
> On Monday, July 23, 2018, 7:45 AM, ashish pok  wrote:
> 
> Stefan,
> 
> I did have first point at the back of my mind. I was under the impression 
> though for checkpoints, cleanup would be done by TMs as they are being taken 
> by TMs.
> 
> So for a standalone cluster with its own zookeeper for JM high availability, 
> a NAS is a must have? We were going to go with local checkpoints with access 
> to remote HDFS for savepoints. This sounds like it will be a bad idea then. 
> Unfortunately we can’t run on YARN and NAS is also a no-no in one of our 
> datacenters - there is a mountain of security complainace to climb before we 
> will in Production if we need to go that route.
> 
> Thanks, Ashish
> 
> On Monday, July 23, 2018, 5:10 AM, Stefan Richter 
>  wrote:
> 
> Hi,
> 
> I am wondering how this can even work properly if you are using a local fs 
> for checkpoints instead of a distributed fs. First, what happens under node 
> failures, if the SSD becomes unavailable or if a task gets scheduled to a 
> different machine, and can no longer access the disk with the  corresponding 
> state data, or if you want to scale-out. Second, the same problem is also 
> what you can observe with the job manager: how could the checkpoint 
> coordinator, that runs on the JM, access a file on a local FS on a different 
> node to cleanup the checkpoint data? The purpose of using a distributed fs 
> here is that all TM and the JM can access the checkpoint files.
> 
> Best,
> Stefan
> 
> > Am 22.07.2018 um 19:03 schrieb Ashish Pokharel  > >:
> > 
> > All,
> > 
> > We recently moved our Checkpoint directory from HDFS to local SSDs mounted 
> > on Data Nodes (we were starting to see perf impacts on checkpoints etc as 
> > complex ML apps were spinning up more and more in YARN). This worked great 
> > other than the fact that when jobs are being canceled or canceled with 
> > Savepoint, local data is not being cleaned up. In HDFS, Checkpoint 
> > directories were cleaned up on Cancel and Cancel with Savepoints as far as 
> > I 

Re: Permissions to delete Checkpoint on cancel

2018-07-23 Thread ashish pok
Sorry,
Just a follow-up. In absence of NAS then the best option to go with here is 
checkpoint and savepoints both on HDFS and StateBackend using local SSDs then?
We were trying to not even hit HDFS other than for savepoints.


- Ashish

On Monday, July 23, 2018, 7:45 AM, ashish pok  wrote:

Stefan,
I did have first point at the back of my mind. I was under the impression 
though for checkpoints, cleanup would be done by TMs as they are being taken by 
TMs.
So for a standalone cluster with its own zookeeper for JM high availability, a 
NAS is a must have? We were going to go with local checkpoints with access to 
remote HDFS for savepoints. This sounds like it will be a bad idea then. 
Unfortunately we can’t run on YARN and NAS is also a no-no in one of our 
datacenters - there is a mountain of security complainace to climb before we 
will in Production if we need to go that route.
Thanks, Ashish


On Monday, July 23, 2018, 5:10 AM, Stefan Richter  
wrote:

Hi,

I am wondering how this can even work properly if you are using a local fs for 
checkpoints instead of a distributed fs. First, what happens under node 
failures, if the SSD becomes unavailable or if a task gets scheduled to a 
different machine, and can no longer access the disk with the  corresponding 
state data, or if you want to scale-out. Second, the same problem is also what 
you can observe with the job manager: how could the checkpoint coordinator, 
that runs on the JM, access a file on a local FS on a different node to cleanup 
the checkpoint data? The purpose of using a distributed fs here is that all TM 
and the JM can access the checkpoint files.

Best,
Stefan

> Am 22.07.2018 um 19:03 schrieb Ashish Pokharel :
> 
> All,
> 
> We recently moved our Checkpoint directory from HDFS to local SSDs mounted on 
> Data Nodes (we were starting to see perf impacts on checkpoints etc as 
> complex ML apps were spinning up more and more in YARN). This worked great 
> other than the fact that when jobs are being canceled or canceled with 
> Savepoint, local data is not being cleaned up. In HDFS, Checkpoint 
> directories were cleaned up on Cancel and Cancel with Savepoints as far as I 
> can remember. I am wondering if it is permissions issue. Local disks have RWX 
> permissions for both yarn and flink headless users (flink headless user 
> submits the apps to YARN using our CICD pipeline). 
> 
> Appreciate any pointers on this.
> 
> Thanks, Ashish








Re: Permissions to delete Checkpoint on cancel

2018-07-23 Thread ashish pok
Stefan,
I did have first point at the back of my mind. I was under the impression 
though for checkpoints, cleanup would be done by TMs as they are being taken by 
TMs.
So for a standalone cluster with its own zookeeper for JM high availability, a 
NAS is a must have? We were going to go with local checkpoints with access to 
remote HDFS for savepoints. This sounds like it will be a bad idea then. 
Unfortunately we can’t run on YARN and NAS is also a no-no in one of our 
datacenters - there is a mountain of security complainace to climb before we 
will in Production if we need to go that route.
Thanks, Ashish


On Monday, July 23, 2018, 5:10 AM, Stefan Richter  
wrote:

Hi,

I am wondering how this can even work properly if you are using a local fs for 
checkpoints instead of a distributed fs. First, what happens under node 
failures, if the SSD becomes unavailable or if a task gets scheduled to a 
different machine, and can no longer access the disk with the  corresponding 
state data, or if you want to scale-out. Second, the same problem is also what 
you can observe with the job manager: how could the checkpoint coordinator, 
that runs on the JM, access a file on a local FS on a different node to cleanup 
the checkpoint data? The purpose of using a distributed fs here is that all TM 
and the JM can access the checkpoint files.

Best,
Stefan

> Am 22.07.2018 um 19:03 schrieb Ashish Pokharel :
> 
> All,
> 
> We recently moved our Checkpoint directory from HDFS to local SSDs mounted on 
> Data Nodes (we were starting to see perf impacts on checkpoints etc as 
> complex ML apps were spinning up more and more in YARN). This worked great 
> other than the fact that when jobs are being canceled or canceled with 
> Savepoint, local data is not being cleaned up. In HDFS, Checkpoint 
> directories were cleaned up on Cancel and Cancel with Savepoints as far as I 
> can remember. I am wondering if it is permissions issue. Local disks have RWX 
> permissions for both yarn and flink headless users (flink headless user 
> submits the apps to YARN using our CICD pipeline). 
> 
> Appreciate any pointers on this.
> 
> Thanks, Ashish





Re: Permissions to delete Checkpoint on cancel

2018-07-23 Thread Stefan Richter
Hi,

I am wondering how this can even work properly if you are using a local fs for 
checkpoints instead of a distributed fs. First, what happens under node 
failures, if the SSD becomes unavailable or if a task gets scheduled to a 
different machine, and can no longer access the disk with the  corresponding 
state data, or if you want to scale-out. Second, the same problem is also what 
you can observe with the job manager: how could the checkpoint coordinator, 
that runs on the JM, access a file on a local FS on a different node to cleanup 
the checkpoint data? The purpose of using a distributed fs here is that all TM 
and the JM can access the checkpoint files.

Best,
Stefan

> Am 22.07.2018 um 19:03 schrieb Ashish Pokharel :
> 
> All,
> 
> We recently moved our Checkpoint directory from HDFS to local SSDs mounted on 
> Data Nodes (we were starting to see perf impacts on checkpoints etc as 
> complex ML apps were spinning up more and more in YARN). This worked great 
> other than the fact that when jobs are being canceled or canceled with 
> Savepoint, local data is not being cleaned up. In HDFS, Checkpoint 
> directories were cleaned up on Cancel and Cancel with Savepoints as far as I 
> can remember. I am wondering if it is permissions issue. Local disks have RWX 
> permissions for both yarn and flink headless users (flink headless user 
> submits the apps to YARN using our CICD pipeline). 
> 
> Appreciate any pointers on this.
> 
> Thanks, Ashish



Permissions to delete Checkpoint on cancel

2018-07-22 Thread Ashish Pokharel
All,

We recently moved our Checkpoint directory from HDFS to local SSDs mounted on 
Data Nodes (we were starting to see perf impacts on checkpoints etc as complex 
ML apps were spinning up more and more in YARN). This worked great other than 
the fact that when jobs are being canceled or canceled with Savepoint, local 
data is not being cleaned up. In HDFS, Checkpoint directories were cleaned up 
on Cancel and Cancel with Savepoints as far as I can remember. I am wondering 
if it is permissions issue. Local disks have RWX permissions for both yarn and 
flink headless users (flink headless user submits the apps to YARN using our 
CICD pipeline). 

Appreciate any pointers on this.

Thanks, Ashish