[jira] [Commented] (FLINK-12751) Create file based HA support

2020-11-09 Thread Slim Bouguerra (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-12751?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17228959#comment-17228959
 ] 

Slim Bouguerra commented on FLINK-12751:


Hi guys, is there anything blocking this Jira/PR We are making some design 
decision and would love base our decision on some reasonable ETAs.

In our case this HA Story will work out of the box since we are using an NFS 
mount.

[~borisl] did you run this in prod ? can you please share your 
thoughts/experience when deploying this ?

Thanks.

> Create file based HA support
> 
>
> Key: FLINK-12751
> URL: https://issues.apache.org/jira/browse/FLINK-12751
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing, Runtime / Coordination
>Affects Versions: 1.8.0, 1.9.0, 2.0.0
> Environment: Flink on k8 and Mini cluster
>Reporter: Boris Lublinsky
>Priority: Major
>  Labels: features, pull-request-available
>   Original Estimate: 168h
>  Time Spent: 10m
>  Remaining Estimate: 167h 50m
>
> In the current Flink implementation, HA support can be implemented either 
> using Zookeeper or Custom Factory class.
> Add HA implementation based on PVC. The idea behind this implementation
> is as follows:
> * Because implementation assumes a single instance of Job manager (Job 
> manager selection and restarts are done by K8 Deployment of 1)
> URL management is done using StandaloneHaServices implementation (in the case 
> of cluster) and EmbeddedHaServices implementation (in the case of mini 
> cluster)
> * For management of the submitted Job Graphs, checkpoint counter and 
> completed checkpoint an implementation is leveraging the following file 
> system layout
> {code}
>  ha -> root of the HA data
>  checkpointcounter -> checkpoint counter folder
>   -> job id folder
>   -> counter file
>   -> another job id folder
>  ...
>  completedCheckpoint -> completed checkpoint folder
>   -> job id folder
>   -> checkpoint file
>   -> checkpoint file
>  ...
>   -> another job id folder
>  ...
>  submittedJobGraph -> submitted graph folder
>   -> job id folder
>   -> graph file
>   -> another job id folder
>  ...
> {code}
> An implementation should overwrites 2 of the Flink files:
> * HighAvailabilityServicesUtils - added `FILESYSTEM` option for picking HA 
> service
> * HighAvailabilityMode - added `FILESYSTEM` to available HA options.
> The actual implementation adds the following classes:
> * `FileSystemHAServices` - an implementation of a `HighAvailabilityServices` 
> for file system
> * `FileSystemUtils` - support class for creation of runtime components.
> * `FileSystemStorageHelper` - file system operations implementation for 
> filesystem based HA
> * `FileSystemCheckpointRecoveryFactory` - an implementation of a 
> `CheckpointRecoveryFactory`for file system
> * `FileSystemCheckpointIDCounter` - an implementation of a 
> `CheckpointIDCounter` for file system
> * `FileSystemCompletedCheckpointStore` - an implementation of a 
> `CompletedCheckpointStore` for file system
> * `FileSystemSubmittedJobGraphStore` - an implementation of a 
> `SubmittedJobGraphStore` for file system



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-12751) Create file based HA support

2020-09-17 Thread Yang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-12751?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17198068#comment-17198068
 ] 

Yang Wang commented on FLINK-12751:
---

Yes, you are right. I am aware of this solution. And I think we could have both 
for deploying Flink on Kubernetes.

 

BTW, this tickets seems to be a duplicate of FLINK-17598.

> Create file based HA support
> 
>
> Key: FLINK-12751
> URL: https://issues.apache.org/jira/browse/FLINK-12751
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing, Runtime / Coordination
>Affects Versions: 1.8.0, 1.9.0, 2.0.0
> Environment: Flink on k8 and Mini cluster
>Reporter: Boris Lublinsky
>Priority: Major
>  Labels: features, pull-request-available
>   Original Estimate: 168h
>  Time Spent: 10m
>  Remaining Estimate: 167h 50m
>
> In the current Flink implementation, HA support can be implemented either 
> using Zookeeper or Custom Factory class.
> Add HA implementation based on PVC. The idea behind this implementation
> is as follows:
> * Because implementation assumes a single instance of Job manager (Job 
> manager selection and restarts are done by K8 Deployment of 1)
> URL management is done using StandaloneHaServices implementation (in the case 
> of cluster) and EmbeddedHaServices implementation (in the case of mini 
> cluster)
> * For management of the submitted Job Graphs, checkpoint counter and 
> completed checkpoint an implementation is leveraging the following file 
> system layout
> {code}
>  ha -> root of the HA data
>  checkpointcounter -> checkpoint counter folder
>   -> job id folder
>   -> counter file
>   -> another job id folder
>  ...
>  completedCheckpoint -> completed checkpoint folder
>   -> job id folder
>   -> checkpoint file
>   -> checkpoint file
>  ...
>   -> another job id folder
>  ...
>  submittedJobGraph -> submitted graph folder
>   -> job id folder
>   -> graph file
>   -> another job id folder
>  ...
> {code}
> An implementation should overwrites 2 of the Flink files:
> * HighAvailabilityServicesUtils - added `FILESYSTEM` option for picking HA 
> service
> * HighAvailabilityMode - added `FILESYSTEM` to available HA options.
> The actual implementation adds the following classes:
> * `FileSystemHAServices` - an implementation of a `HighAvailabilityServices` 
> for file system
> * `FileSystemUtils` - support class for creation of runtime components.
> * `FileSystemStorageHelper` - file system operations implementation for 
> filesystem based HA
> * `FileSystemCheckpointRecoveryFactory` - an implementation of a 
> `CheckpointRecoveryFactory`for file system
> * `FileSystemCheckpointIDCounter` - an implementation of a 
> `CheckpointIDCounter` for file system
> * `FileSystemCompletedCheckpointStore` - an implementation of a 
> `CompletedCheckpointStore` for file system
> * `FileSystemSubmittedJobGraphStore` - an implementation of a 
> `SubmittedJobGraphStore` for file system



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-12751) Create file based HA support

2020-09-17 Thread Teng Fei Liao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-12751?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17197929#comment-17197929
 ] 

Teng Fei Liao commented on FLINK-12751:
---

If you replace the deployment with a stateful set for the job manager, 
kubernetes will guarantee you exactly 1 job manager will be running. An 
additional benefit you get here is that the stateful maintains the network 
identity for the job manager. We're actually running this variant in production.

> Create file based HA support
> 
>
> Key: FLINK-12751
> URL: https://issues.apache.org/jira/browse/FLINK-12751
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing, Runtime / Coordination
>Affects Versions: 1.8.0, 1.9.0, 2.0.0
> Environment: Flink on k8 and Mini cluster
>Reporter: Boris Lublinsky
>Priority: Major
>  Labels: features, pull-request-available
>   Original Estimate: 168h
>  Time Spent: 10m
>  Remaining Estimate: 167h 50m
>
> In the current Flink implementation, HA support can be implemented either 
> using Zookeeper or Custom Factory class.
> Add HA implementation based on PVC. The idea behind this implementation
> is as follows:
> * Because implementation assumes a single instance of Job manager (Job 
> manager selection and restarts are done by K8 Deployment of 1)
> URL management is done using StandaloneHaServices implementation (in the case 
> of cluster) and EmbeddedHaServices implementation (in the case of mini 
> cluster)
> * For management of the submitted Job Graphs, checkpoint counter and 
> completed checkpoint an implementation is leveraging the following file 
> system layout
> {code}
>  ha -> root of the HA data
>  checkpointcounter -> checkpoint counter folder
>   -> job id folder
>   -> counter file
>   -> another job id folder
>  ...
>  completedCheckpoint -> completed checkpoint folder
>   -> job id folder
>   -> checkpoint file
>   -> checkpoint file
>  ...
>   -> another job id folder
>  ...
>  submittedJobGraph -> submitted graph folder
>   -> job id folder
>   -> graph file
>   -> another job id folder
>  ...
> {code}
> An implementation should overwrites 2 of the Flink files:
> * HighAvailabilityServicesUtils - added `FILESYSTEM` option for picking HA 
> service
> * HighAvailabilityMode - added `FILESYSTEM` to available HA options.
> The actual implementation adds the following classes:
> * `FileSystemHAServices` - an implementation of a `HighAvailabilityServices` 
> for file system
> * `FileSystemUtils` - support class for creation of runtime components.
> * `FileSystemStorageHelper` - file system operations implementation for 
> filesystem based HA
> * `FileSystemCheckpointRecoveryFactory` - an implementation of a 
> `CheckpointRecoveryFactory`for file system
> * `FileSystemCheckpointIDCounter` - an implementation of a 
> `CheckpointIDCounter` for file system
> * `FileSystemCompletedCheckpointStore` - an implementation of a 
> `CompletedCheckpointStore` for file system
> * `FileSystemSubmittedJobGraphStore` - an implementation of a 
> `SubmittedJobGraphStore` for file system



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-12751) Create file based HA support

2019-07-08 Thread Yang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12751?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16880105#comment-16880105
 ] 

Yang Wang commented on FLINK-12751:
---

Hi [~Xeli], i am not sure the backend implementation of PVC ReadWriteOnce. If 
the kubernetes is guaranteed to mount only once by job managers, then we will 
not need leader election.

 

Hi [~borisl], if the job manager election could be supported, then i think this 
is a good feature. Especially we want to deploy flink cluster on kubernetes.

> Create file based HA support
> 
>
> Key: FLINK-12751
> URL: https://issues.apache.org/jira/browse/FLINK-12751
> Project: Flink
>  Issue Type: Improvement
>  Components: FileSystems
>Affects Versions: 1.8.0, 1.9.0, 2.0.0
> Environment: Flink on k8 and Mini cluster
>Reporter: Boris Lublinsky
>Priority: Major
>  Labels: features, pull-request-available
>   Original Estimate: 168h
>  Time Spent: 10m
>  Remaining Estimate: 167h 50m
>
> In the current Flink implementation, HA support can be implemented either 
> using Zookeeper or Custom Factory class.
> Add HA implementation based on PVC. The idea behind this implementation
> is as follows:
> * Because implementation assumes a single instance of Job manager (Job 
> manager selection and restarts are done by K8 Deployment of 1)
> URL management is done using StandaloneHaServices implementation (in the case 
> of cluster) and EmbeddedHaServices implementation (in the case of mini 
> cluster)
> * For management of the submitted Job Graphs, checkpoint counter and 
> completed checkpoint an implementation is leveraging the following file 
> system layout
> {code}
>  ha -> root of the HA data
>  checkpointcounter -> checkpoint counter folder
>   -> job id folder
>   -> counter file
>   -> another job id folder
>  ...
>  completedCheckpoint -> completed checkpoint folder
>   -> job id folder
>   -> checkpoint file
>   -> checkpoint file
>  ...
>   -> another job id folder
>  ...
>  submittedJobGraph -> submitted graph folder
>   -> job id folder
>   -> graph file
>   -> another job id folder
>  ...
> {code}
> An implementation should overwrites 2 of the Flink files:
> * HighAvailabilityServicesUtils - added `FILESYSTEM` option for picking HA 
> service
> * HighAvailabilityMode - added `FILESYSTEM` to available HA options.
> The actual implementation adds the following classes:
> * `FileSystemHAServices` - an implementation of a `HighAvailabilityServices` 
> for file system
> * `FileSystemUtils` - support class for creation of runtime components.
> * `FileSystemStorageHelper` - file system operations implementation for 
> filesystem based HA
> * `FileSystemCheckpointRecoveryFactory` - an implementation of a 
> `CheckpointRecoveryFactory`for file system
> * `FileSystemCheckpointIDCounter` - an implementation of a 
> `CheckpointIDCounter` for file system
> * `FileSystemCompletedCheckpointStore` - an implementation of a 
> `CompletedCheckpointStore` for file system
> * `FileSystemSubmittedJobGraphStore` - an implementation of a 
> `SubmittedJobGraphStore` for file system



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-12751) Create file based HA support

2019-06-27 Thread Boris Lublinsky (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12751?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16873929#comment-16873929
 ] 

Boris Lublinsky commented on FLINK-12751:
-

Although the kubelet can die, if the node is alive, it will be immediately 
restarted, as it is a service. The racing condition is possible, but not 
probable.

If you want to ensure uniqueness of the job manager, we can add a lock file 
with job manager IP.  See 
[https://events.linuxfoundation.org/wp-content/uploads/2017/11/Leader-Election-Machanism-in-Distributed-System-Using-a-Globally-Accessible-Filesystem-OSS-Dharmendra-Kushwaha.pdf]
 for example

> Create file based HA support
> 
>
> Key: FLINK-12751
> URL: https://issues.apache.org/jira/browse/FLINK-12751
> Project: Flink
>  Issue Type: Improvement
>  Components: FileSystems
>Affects Versions: 1.8.0, 1.9.0, 2.0.0
> Environment: Flink on k8 and Mini cluster
>Reporter: Boris Lublinsky
>Priority: Major
>  Labels: features, pull-request-available
>   Original Estimate: 168h
>  Time Spent: 10m
>  Remaining Estimate: 167h 50m
>
> In the current Flink implementation, HA support can be implemented either 
> using Zookeeper or Custom Factory class.
> Add HA implementation based on PVC. The idea behind this implementation
> is as follows:
> * Because implementation assumes a single instance of Job manager (Job 
> manager selection and restarts are done by K8 Deployment of 1)
> URL management is done using StandaloneHaServices implementation (in the case 
> of cluster) and EmbeddedHaServices implementation (in the case of mini 
> cluster)
> * For management of the submitted Job Graphs, checkpoint counter and 
> completed checkpoint an implementation is leveraging the following file 
> system layout
> {code}
>  ha -> root of the HA data
>  checkpointcounter -> checkpoint counter folder
>   -> job id folder
>   -> counter file
>   -> another job id folder
>  ...
>  completedCheckpoint -> completed checkpoint folder
>   -> job id folder
>   -> checkpoint file
>   -> checkpoint file
>  ...
>   -> another job id folder
>  ...
>  submittedJobGraph -> submitted graph folder
>   -> job id folder
>   -> graph file
>   -> another job id folder
>  ...
> {code}
> An implementation should overwrites 2 of the Flink files:
> * HighAvailabilityServicesUtils - added `FILESYSTEM` option for picking HA 
> service
> * HighAvailabilityMode - added `FILESYSTEM` to available HA options.
> The actual implementation adds the following classes:
> * `FileSystemHAServices` - an implementation of a `HighAvailabilityServices` 
> for file system
> * `FileSystemUtils` - support class for creation of runtime components.
> * `FileSystemStorageHelper` - file system operations implementation for 
> filesystem based HA
> * `FileSystemCheckpointRecoveryFactory` - an implementation of a 
> `CheckpointRecoveryFactory`for file system
> * `FileSystemCheckpointIDCounter` - an implementation of a 
> `CheckpointIDCounter` for file system
> * `FileSystemCompletedCheckpointStore` - an implementation of a 
> `CompletedCheckpointStore` for file system
> * `FileSystemSubmittedJobGraphStore` - an implementation of a 
> `SubmittedJobGraphStore` for file system



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-12751) Create file based HA support

2019-06-27 Thread Richard Deurwaarder (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12751?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16873907#comment-16873907
 ] 

Richard Deurwaarder commented on FLINK-12751:
-

I am not sure if kubelet controls this, nor if we would want to rely on it but: 
kubernetes allows us to mount PVCs with specific Access modes. So if the PVC is 
mounted as ReadWriteOnce this would prevent multiple job managers from being 
up, right?

[https://kubernetes.io/docs/concepts/storage/persistent-volumes/]

> Create file based HA support
> 
>
> Key: FLINK-12751
> URL: https://issues.apache.org/jira/browse/FLINK-12751
> Project: Flink
>  Issue Type: Improvement
>  Components: FileSystems
>Affects Versions: 1.8.0, 1.9.0, 2.0.0
> Environment: Flink on k8 and Mini cluster
>Reporter: Boris Lublinsky
>Priority: Major
>  Labels: features, pull-request-available
>   Original Estimate: 168h
>  Time Spent: 10m
>  Remaining Estimate: 167h 50m
>
> In the current Flink implementation, HA support can be implemented either 
> using Zookeeper or Custom Factory class.
> Add HA implementation based on PVC. The idea behind this implementation
> is as follows:
> * Because implementation assumes a single instance of Job manager (Job 
> manager selection and restarts are done by K8 Deployment of 1)
> URL management is done using StandaloneHaServices implementation (in the case 
> of cluster) and EmbeddedHaServices implementation (in the case of mini 
> cluster)
> * For management of the submitted Job Graphs, checkpoint counter and 
> completed checkpoint an implementation is leveraging the following file 
> system layout
> {code}
>  ha -> root of the HA data
>  checkpointcounter -> checkpoint counter folder
>   -> job id folder
>   -> counter file
>   -> another job id folder
>  ...
>  completedCheckpoint -> completed checkpoint folder
>   -> job id folder
>   -> checkpoint file
>   -> checkpoint file
>  ...
>   -> another job id folder
>  ...
>  submittedJobGraph -> submitted graph folder
>   -> job id folder
>   -> graph file
>   -> another job id folder
>  ...
> {code}
> An implementation should overwrites 2 of the Flink files:
> * HighAvailabilityServicesUtils - added `FILESYSTEM` option for picking HA 
> service
> * HighAvailabilityMode - added `FILESYSTEM` to available HA options.
> The actual implementation adds the following classes:
> * `FileSystemHAServices` - an implementation of a `HighAvailabilityServices` 
> for file system
> * `FileSystemUtils` - support class for creation of runtime components.
> * `FileSystemStorageHelper` - file system operations implementation for 
> filesystem based HA
> * `FileSystemCheckpointRecoveryFactory` - an implementation of a 
> `CheckpointRecoveryFactory`for file system
> * `FileSystemCheckpointIDCounter` - an implementation of a 
> `CheckpointIDCounter` for file system
> * `FileSystemCompletedCheckpointStore` - an implementation of a 
> `CompletedCheckpointStore` for file system
> * `FileSystemSubmittedJobGraphStore` - an implementation of a 
> `SubmittedJobGraphStore` for file system



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-12751) Create file based HA support

2019-06-26 Thread Yang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12751?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16873830#comment-16873830
 ] 

Yang Wang commented on FLINK-12751:
---

[~borisl] Sorry for the late.

Just as you say, job manager selection is done by k8s deployment of 1. However, 
if the kubelet crashed, the pod running on it may not be terminated. So two job 
managers may be running at the same time. We need to use the etcd/zookeeper to 
do the job manager selection.

> Create file based HA support
> 
>
> Key: FLINK-12751
> URL: https://issues.apache.org/jira/browse/FLINK-12751
> Project: Flink
>  Issue Type: Improvement
>  Components: FileSystems
>Affects Versions: 1.8.0, 1.9.0, 2.0.0
> Environment: Flink on k8 and Mini cluster
>Reporter: Boris Lublinsky
>Priority: Major
>  Labels: features, pull-request-available
>   Original Estimate: 168h
>  Time Spent: 10m
>  Remaining Estimate: 167h 50m
>
> In the current Flink implementation, HA support can be implemented either 
> using Zookeeper or Custom Factory class.
> Add HA implementation based on PVC. The idea behind this implementation
> is as follows:
> * Because implementation assumes a single instance of Job manager (Job 
> manager selection and restarts are done by K8 Deployment of 1)
> URL management is done using StandaloneHaServices implementation (in the case 
> of cluster) and EmbeddedHaServices implementation (in the case of mini 
> cluster)
> * For management of the submitted Job Graphs, checkpoint counter and 
> completed checkpoint an implementation is leveraging the following file 
> system layout
> {code}
>  ha -> root of the HA data
>  checkpointcounter -> checkpoint counter folder
>   -> job id folder
>   -> counter file
>   -> another job id folder
>  ...
>  completedCheckpoint -> completed checkpoint folder
>   -> job id folder
>   -> checkpoint file
>   -> checkpoint file
>  ...
>   -> another job id folder
>  ...
>  submittedJobGraph -> submitted graph folder
>   -> job id folder
>   -> graph file
>   -> another job id folder
>  ...
> {code}
> An implementation should overwrites 2 of the Flink files:
> * HighAvailabilityServicesUtils - added `FILESYSTEM` option for picking HA 
> service
> * HighAvailabilityMode - added `FILESYSTEM` to available HA options.
> The actual implementation adds the following classes:
> * `FileSystemHAServices` - an implementation of a `HighAvailabilityServices` 
> for file system
> * `FileSystemUtils` - support class for creation of runtime components.
> * `FileSystemStorageHelper` - file system operations implementation for 
> filesystem based HA
> * `FileSystemCheckpointRecoveryFactory` - an implementation of a 
> `CheckpointRecoveryFactory`for file system
> * `FileSystemCheckpointIDCounter` - an implementation of a 
> `CheckpointIDCounter` for file system
> * `FileSystemCompletedCheckpointStore` - an implementation of a 
> `CompletedCheckpointStore` for file system
> * `FileSystemSubmittedJobGraphStore` - an implementation of a 
> `SubmittedJobGraphStore` for file system



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-12751) Create file based HA support

2019-06-11 Thread Boris Lublinsky (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12751?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16861091#comment-16861091
 ] 

Boris Lublinsky commented on FLINK-12751:
-

Can you describe a use case more specifically?

> Create file based HA support
> 
>
> Key: FLINK-12751
> URL: https://issues.apache.org/jira/browse/FLINK-12751
> Project: Flink
>  Issue Type: Improvement
>  Components: FileSystems
>Affects Versions: 1.8.0, 1.9.0, 2.0.0
> Environment: Flink on k8 and Mini cluster
>Reporter: Boris Lublinsky
>Priority: Major
>  Labels: features, pull-request-available
>   Original Estimate: 168h
>  Time Spent: 10m
>  Remaining Estimate: 167h 50m
>
> In the current Flink implementation, HA support can be implemented either 
> using Zookeeper or Custom Factory class.
> Add HA implementation based on PVC. The idea behind this implementation
> is as follows:
> * Because implementation assumes a single instance of Job manager (Job 
> manager selection and restarts are done by K8 Deployment of 1)
> URL management is done using StandaloneHaServices implementation (in the case 
> of cluster) and EmbeddedHaServices implementation (in the case of mini 
> cluster)
> * For management of the submitted Job Graphs, checkpoint counter and 
> completed checkpoint an implementation is leveraging the following file 
> system layout
> {code}
>  ha -> root of the HA data
>  checkpointcounter -> checkpoint counter folder
>   -> job id folder
>   -> counter file
>   -> another job id folder
>  ...
>  completedCheckpoint -> completed checkpoint folder
>   -> job id folder
>   -> checkpoint file
>   -> checkpoint file
>  ...
>   -> another job id folder
>  ...
>  submittedJobGraph -> submitted graph folder
>   -> job id folder
>   -> graph file
>   -> another job id folder
>  ...
> {code}
> An implementation should overwrites 2 of the Flink files:
> * HighAvailabilityServicesUtils - added `FILESYSTEM` option for picking HA 
> service
> * HighAvailabilityMode - added `FILESYSTEM` to available HA options.
> The actual implementation adds the following classes:
> * `FileSystemHAServices` - an implementation of a `HighAvailabilityServices` 
> for file system
> * `FileSystemUtils` - support class for creation of runtime components.
> * `FileSystemStorageHelper` - file system operations implementation for 
> filesystem based HA
> * `FileSystemCheckpointRecoveryFactory` - an implementation of a 
> `CheckpointRecoveryFactory`for file system
> * `FileSystemCheckpointIDCounter` - an implementation of a 
> `CheckpointIDCounter` for file system
> * `FileSystemCompletedCheckpointStore` - an implementation of a 
> `CompletedCheckpointStore` for file system
> * `FileSystemSubmittedJobGraphStore` - an implementation of a 
> `SubmittedJobGraphStore` for file system



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-12751) Create file based HA support

2019-06-11 Thread Yang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12751?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16860638#comment-16860638
 ] 

Yang Wang commented on FLINK-12751:
---

The file based HA assumes that there is only one job manager is running. 
However, on k8s even we set the deployment of one, two or more job managers may 
be running when the kubelet is down. How to deal with this situation? 

> Create file based HA support
> 
>
> Key: FLINK-12751
> URL: https://issues.apache.org/jira/browse/FLINK-12751
> Project: Flink
>  Issue Type: Improvement
>  Components: FileSystems
>Affects Versions: 1.8.0, 1.9.0, 2.0.0
> Environment: Flink on k8 and Mini cluster
>Reporter: Boris Lublinsky
>Priority: Major
>  Labels: features, pull-request-available
>   Original Estimate: 168h
>  Time Spent: 10m
>  Remaining Estimate: 167h 50m
>
> In the current Flink implementation, HA support can be implemented either 
> using Zookeeper or Custom Factory class.
> Add HA implementation based on PVC. The idea behind this implementation
> is as follows:
> * Because implementation assumes a single instance of Job manager (Job 
> manager selection and restarts are done by K8 Deployment of 1)
> URL management is done using StandaloneHaServices implementation (in the case 
> of cluster) and EmbeddedHaServices implementation (in the case of mini 
> cluster)
> * For management of the submitted Job Graphs, checkpoint counter and 
> completed checkpoint an implementation is leveraging the following file 
> system layout
> {code}
>  ha -> root of the HA data
>  checkpointcounter -> checkpoint counter folder
>   -> job id folder
>   -> counter file
>   -> another job id folder
>  ...
>  completedCheckpoint -> completed checkpoint folder
>   -> job id folder
>   -> checkpoint file
>   -> checkpoint file
>  ...
>   -> another job id folder
>  ...
>  submittedJobGraph -> submitted graph folder
>   -> job id folder
>   -> graph file
>   -> another job id folder
>  ...
> {code}
> An implementation should overwrites 2 of the Flink files:
> * HighAvailabilityServicesUtils - added `FILESYSTEM` option for picking HA 
> service
> * HighAvailabilityMode - added `FILESYSTEM` to available HA options.
> The actual implementation adds the following classes:
> * `FileSystemHAServices` - an implementation of a `HighAvailabilityServices` 
> for file system
> * `FileSystemUtils` - support class for creation of runtime components.
> * `FileSystemStorageHelper` - file system operations implementation for 
> filesystem based HA
> * `FileSystemCheckpointRecoveryFactory` - an implementation of a 
> `CheckpointRecoveryFactory`for file system
> * `FileSystemCheckpointIDCounter` - an implementation of a 
> `CheckpointIDCounter` for file system
> * `FileSystemCompletedCheckpointStore` - an implementation of a 
> `CompletedCheckpointStore` for file system
> * `FileSystemSubmittedJobGraphStore` - an implementation of a 
> `SubmittedJobGraphStore` for file system



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)