Hi Omar,

I think Matthias is right. The K8s HA services create and edit config maps.
Hence they need the rights to do this. In the native K8s documentation
there is a section about how to create a service account with the right
permissions [1].

I think that our K8s HA documentation currently lacks this part. I will
create a PR to update the documentation.

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/native_kubernetes.html#rbac

Cheers,
Till

On Mon, Feb 15, 2021 at 9:32 AM Matthias Pohl <matth...@ververica.com>
wrote:

> I'm adding the Flink user ML to the conversation again.
>
> On Mon, Feb 15, 2021 at 8:18 AM Matthias Pohl <matth...@ververica.com>
> wrote:
>
>> Hi Omer,
>> thanks for sharing the configuration. You're right: Using NFS for HA's
>> storageDir is fine.
>>
>> About the error message you're referring to: I haven't worked with the HA
>> k8s service, yet. But the RBAC is a good hint. Flink's native Kubernetes
>> documentation [1] points out that you can use a custom service account.
>> This one needs special permissions to start/stop pods automatically (which
>> does not apply in your case) but also to access ConfigMaps. You might want
>> to try setting the permission as described in [1].
>>
>> Best,
>> Matthias
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/native_kubernetes.html#rbac
>>
>> On Sun, Feb 14, 2021 at 7:16 PM Omer Ozery <omeroz...@gmail.com> wrote:
>>
>>> Hey Matthias.
>>> My name is Omer, i am Daniel's devops, i will elaborate about our flink
>>> situation.
>>> these our flink resource definitions, as they are generated using the
>>> helm template command (minus log4j,metrics configuration and some sensitive
>>> data)
>>> ---
>>> # Source: flink/templates/flink-configmap.yaml
>>> apiVersion: v1
>>> kind: ConfigMap
>>> metadata:
>>>   name: flink-config
>>>   labels:
>>>     app: flink
>>> data:
>>>   flink-conf.yaml: |
>>>     jobmanager.rpc.address: flink-jobmanager
>>>     jobmanager.rpc.port: 6123
>>>     jobmanager.execution.failover-strategy: region
>>>     jobmanager.memory.process.size: 8g
>>>     taskmanager.memory.process.size: 24g
>>>     taskmanager.memory.task.off-heap.size: 1g
>>>     taskmanager.numberOfTaskSlots: 4
>>>     queryable-state.proxy.ports: 6125
>>>     queryable-state.enable: true
>>>     blob.server.port: 6124
>>>     parallelism.default: 1
>>>     state.backend.incremental: true
>>>     state.backend: rocksdb
>>>     state.backend.rocksdb.localdir: /opt/flink/rocksdb
>>>     state.checkpoints.dir: file:///opt/flink/checkpoints
>>>     classloader.resolve-order: child-first
>>>     kubernetes.cluster-id: flink-cluster
>>>     kubernetes.namespace: intel360-beta
>>>     high-availability:
>>> org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
>>>     high-availability.storageDir: file:///opt/flink/recovery
>>>
>>> ---
>>> # Source: flink/templates/flink-service.yaml
>>> apiVersion: v1
>>> kind: Service
>>> metadata:
>>>   name: flink-jobmanager
>>>   labels:
>>>     {}
>>> spec:
>>>   ports:
>>>   - name: http-ui
>>>     port: 8081
>>>     targetPort: http-ui
>>>   - name: tcp-rpc
>>>     port: 6123
>>>     targetPort: tcp-rpc
>>>   - name: tcp-blob
>>>     port: 6124
>>>     targetPort: tcp-blob
>>>   selector:
>>>     app: flink
>>>     component: jobmanager
>>> ---
>>> # Source: flink/templates/flink-deployment.yaml
>>> apiVersion: apps/v1
>>> kind: Deployment
>>> metadata:
>>>   name: flink-jobmanager
>>> spec:
>>>   replicas: 1
>>>   selector:
>>>     matchLabels:
>>>       app: flink
>>>       component: jobmanager
>>>   template:
>>>     metadata:
>>>       labels:
>>>         app: flink
>>>         component: jobmanager
>>>       annotations:
>>>         checksum/config:
>>> f22b7a7c8a618b2f5257476737883s6a76a88c25b37e455257e120d8e4ed941
>>>     spec:
>>>       containers:
>>>       - name: jobmanager
>>>         image: flink:1.12.1-scala_2.11-java11
>>>         args: [ "jobmanager" ]
>>>         ports:
>>>         - name: http-ui
>>>           containerPort: 8081
>>>         - name: tcp-rpc
>>>           containerPort: 6123
>>>         - name: tcp-blob
>>>           containerPort: 6124
>>>         resources:
>>>           {}
>>>         # Environment Variables
>>>         env:
>>>         - name: ENABLE_CHECKPOINTING
>>>           value: "true"
>>>         - name: JOB_MANAGER_RPC_ADDRESS
>>>           value: "flink-jobmanager"
>>>         volumeMounts:
>>>         - name: flink-config
>>>           mountPath: /opt/flink/conf/flink-conf.yaml
>>>           subPath: flink-conf.yaml
>>>         # NFS mounts
>>>         - name: flink-checkpoints
>>>           mountPath: "/opt/flink/checkpoints"
>>>         - name: flink-recovery
>>>           mountPath: "/opt/flink/recovery"
>>>       volumes:
>>>       - name: flink-config
>>>         configMap:
>>>           name: flink-config
>>>       # NFS volumes
>>>       - name: flink-checkpoints
>>>         nfs:
>>>           server: "my-nfs-server.my-org"
>>>           path: "/my-shared-nfs-dir/flink/checkpoints"
>>>       - name: flink-recovery
>>>         nfs:
>>>           server: "my-nfs-server.my-org"
>>>           path: "/my-shared-nfs-dir/flink/recovery"
>>> ---
>>> # Source: flink/templates/flink-deployment.yaml
>>> apiVersion: apps/v1
>>> kind: Deployment
>>> metadata:
>>>   name: flink-taskmanager
>>> spec:
>>>   replicas: 7
>>>   selector:
>>>     matchLabels:
>>>       app: flink
>>>       component: taskmanager
>>>   template:
>>>     metadata:
>>>       labels:
>>>         app: flink
>>>         component: taskmanager
>>>       annotations:
>>>         checksum/config:
>>> f22b7a7c8a618b2f5257476737883s6a76a88c25b37e455257e120d8e4ed941
>>>     spec:
>>>       containers:
>>>       - name: taskmanager
>>>         image: flink:1.12.1-scala_2.11-java11
>>>         args: [ "taskmanager" ]
>>>         resources:
>>>           limits:
>>>             cpu: 6000m
>>>             memory: 24Gi
>>>           requests:
>>>             cpu: 6000m
>>>             memory: 24Gi
>>>         # Environment Variables
>>>         env:
>>>         - name: ENABLE_CHECKPOINTING
>>>           value: "true"
>>>         - name: JOB_MANAGER_RPC_ADDRESS
>>>           value: "flink-jobmanager"
>>>         volumeMounts:
>>>         - name: flink-config
>>>           mountPath: /opt/flink/conf/flink-conf.yaml
>>>           subPath: flink-conf.yaml
>>>         # NFS mounts
>>>         - name: flink-checkpoints
>>>           mountPath: "/opt/flink/checkpoints"
>>>         - name: flink-recovery
>>>           mountPath: "/opt/flink/recovery"
>>>       volumes:
>>>       - name: flink-config
>>>         configMap:
>>>           name: flink-config
>>>       # NFS volumes
>>>       - name: flink-checkpoints
>>>         nfs:
>>>           server: "my-nfs-server.my-org"
>>>           path: "/my-shared-nfs-dir/flink/checkpoints"
>>>       - name: flink-recovery
>>>         nfs:
>>>           server: "my-nfs-server.my-org"
>>>           path: "/my-shared-nfs-dir/flink/recovery"
>>> ---
>>> # Source: flink/templates/flink-ingress.yaml
>>> apiVersion: extensions/v1beta1
>>> kind: Ingress
>>> metadata:
>>>   name: jobmanager
>>> spec:
>>>   rules:
>>>     - host: my.flink.job.manager.url
>>>       http:
>>>         paths:
>>>           - path: /
>>>             backend:
>>>               serviceName: flink-jobmanager
>>>               servicePort: 8081
>>> ---
>>>
>>> as you can see we are using the skeleton of the standalone configuration
>>> as it documented here:
>>>
>>> https://ci.apache.org/projects/flink/flink-docs-stable/deployment/resource-providers/standalone/kubernetes.html
>>> with some per-company configuration obviously, but still under the scope
>>> of this document..
>>>
>>> on a normal beautiful day and without the HA configuration, everything
>>> works fine.
>>> when trying to configure kubernetes HA using this document:
>>>
>>> https://ci.apache.org/projects/flink/flink-docs-stable/deployment/ha/kubernetes_ha.html
>>> with the following parameters:
>>>     kubernetes.cluster-id: flink-cluster
>>>     high-availability:
>>> org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
>>>     high-availability.storageDir: file:///opt/flink/recovery
>>>
>>> the jobmanager fails with the following error:
>>> 2021-02-14 16:57:19,103 ERROR
>>> io.fabric8.kubernetes.client.extended.leaderelection.LeaderElector [] -
>>> Exception occurred while acquiring lock 'ConfigMapLock: default -
>>> flink-cluster-restserver-leader (54211907-eba9-47b1-813e-11f12ba89ccb)'
>>> io.fabric8.kubernetes.client.KubernetesClientException: Failure
>>> executing: GET at:
>>> https://10.96.0.1/api/v1/namespaces/default/configmaps/flink-cluster-restserver-leader.
>>> Message: Forbidden!Configured service account doesn't have access. Service
>>> account may have been revoked. configmaps "flink-cluster-restserver-leader"
>>> is forbidden: User "system:serviceaccount:intel360-beta:default" cannot get
>>> resource "configmaps" in API group "" in the namespace "default".
>>>
>>> so we added this line as well (as you can see in the flink-config
>>> configmap above)
>>> kubernetes.namespace: intel360-beta
>>> although it is not part of the document and i don't think flink should
>>> be aware of the namespace it resides in, it damages the modularity of upper
>>> layers of configurations, regardless we added it and then got the the
>>> following error:
>>>
>>> 2021-02-14 17:00:57,086 ERROR
>>> io.fabric8.kubernetes.client.extended.leaderelection.LeaderElector [] -
>>> Exception occurred while acquiring lock 'ConfigMapLock: intel360-beta -
>>> flink-cluster-restserver-leader (66180ce6-c62e-4ea4-9420-0a3134bef3d6)'
>>> io.fabric8.kubernetes.client.KubernetesClientException: Failure
>>> executing: GET at:
>>> https://10.96.0.1/api/v1/namespaces/intel360-beta/configmaps/flink-cluster-restserver-leader.
>>> Message: Forbidden!Configured service account doesn't have access. Service
>>> account may have been revoked. configmaps "flink-cluster-restserver-leader"
>>> is forbidden: User "system:serviceaccount:intel360-beta:default" cannot get
>>> resource "configmaps" in API group "" in the namespace "intel360-beta".
>>>
>>> which is bassically the same error message just directed to the flink's
>>> namespace.
>>> my question is, do i need to add RBAC to the flink's service account,
>>> because i got the impression from the flink official documents and some
>>> blogs responses that it designed to function without any special
>>> permissions.
>>> if we do need RBAC can you give an official documentations reference of
>>> the exact permissions.
>>>
>>> NOTE: as you can see our flink-checkpoints and recovery locations are
>>> directed to a local directory mounted to a shared NFS between all tasks and
>>> job manager, since our infrastructure is bare-metal by design. (although
>>> this one is hosted in AWS)
>>>
>>> thanks in advance
>>> Omer
>>>
>>>
>>> ---------- Forwarded message ---------
>>> From: Daniel Peled <daniel.peled.w...@gmail.com>
>>> Date: Sun, Feb 14, 2021 at 6:18 PM
>>> Subject: Fwd: Flink’s Kubernetes HA services - NOT working
>>> To: <omeroz...@gmail.com>
>>>
>>>
>>>
>>>
>>> ---------- Forwarded message ---------
>>> מאת: Matthias Pohl <matth...@ververica.com>
>>> ‪Date: יום ה׳, 11 בפבר׳ 2021 ב-17:37‬
>>> Subject: Re: Flink’s Kubernetes HA services - NOT working
>>> To: Matthias Pohl <matth...@ververica.com>
>>> Cc: Daniel Peled <daniel.peled.w...@gmail.com>, user <
>>> user@flink.apache.org>
>>>
>>>
>>> One other thing: It looks like you've set high-availability.storageDir
>>> to a local path file:///opt/flink/recovery. You should use a storage path
>>> that is accessible from all Flink cluster components (e.g. using S3). Only
>>> references are stored in Kubernetes ConfigMaps [1].
>>>
>>> Best,
>>> Matthias
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/ha/kubernetes_ha.html#configuration
>>>
>>> On Wed, Feb 10, 2021 at 6:08 PM Matthias Pohl <matth...@ververica.com>
>>> wrote:
>>>
>>>> Hi Daniel,
>>>> what's the exact configuration you used? Did you use the resource
>>>> definitions provided in the Standalone Flink on Kubernetes docs [1]? Did
>>>> you do certain things differently in comparison to the documentation?
>>>>
>>>> Best,
>>>> Matthias
>>>>
>>>> [1]
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/standalone/kubernetes.html#appendix
>>>>
>>>> On Wed, Feb 10, 2021 at 1:31 PM Daniel Peled <
>>>> daniel.peled.w...@gmail.com> wrote:
>>>>
>>>>>
>>>>> ,Hey
>>>>>
>>>>> We are using standalone flink on kubernetes
>>>>> :"And we have followed the instructions in the following link
>>>>> "Kubernetes HA Services
>>>>>
>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/deployment/ha/kubernetes_ha.html
>>>>> .We were unable to make it work
>>>>> .We are facing a lot of problems
>>>>> For example some of the jobs don't start complaining that there are
>>>>> not enough slots available - although there are enough slots  and it seems
>>>>> as the job manager is NOT aware of all the task managers
>>>>> .In other scenario we were unable to run any job at all
>>>>>  The flink dashboard is unresponsive and we get the error
>>>>> "flink service temporarily unavailable due to an ongoing leader
>>>>> election. please refresh"
>>>>> .We believe we are missing some configurations
>>>>>  ?Are there any more detailed instructions
>>>>> ?And suggestions/tips
>>>>>  .Attached is the log of the job manager in one of the attempts
>>>>>
>>>>> Please give me some advice.
>>>>> BR,
>>>>> Danny
>>>>>
>>>>
>>>

Reply via email to