Hello
I am trying to make a Flink application deployment in k8s, but the error
message shows that the task manager can't resolve resource manager address
*Could not resolve ResourceManager address
akka.tcp://flink@flink-jm-svc-streaming-job:6123/user/rpc/resourcemanager_*,
retrying in 10000 ms: Could not connect to rpc endpoint under address
akka.tcp://flink@flink-jm-svc-streaming-job:6123/user/rpc/resourcemanager_*.*
I already exposed the job manager address as a service and triple checked
all the values, I couldn't figure out why the resource manager address
can't be resolved.
I attached the job manager, task manager, job manager service and part of
the flink-config.yaml below. Any help is appreciated.
flink-conf.yaml: |+
jobmanager.rpc.address: flink-jm-svc-{{ .Release.Name }}
jobmanager.rpc.port: 6123
taskmanager.numberOfTaskSlots: {{ .Values.numOfTaskSlots }}
blob.server.port: 6124
taskmanager.rpc.port: 6122
queryable-state.proxy.ports: 6125
jobmanager.memory.process.size: {{ .Values.jobManagerMemoryProcessSize }}
taskmanager.memory.process.size: {{ .Values.taskManagerMemoryProcessSize }}
parallelism.default: 4
kubernetes.cluster-id: flink-app-cluster-{{ .Release.Name }}
# high-availability:
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
# high-availability.storageDir: {{ .Values.haStorageDir }}
restart-strategy: fixed-delay
restart-strategy.fixed-delay.attempts: 10
kubernetes.namespace: streaming
metrics.reporter.prom.class:
org.apache.flink.metrics.prometheus.PrometheusReporter
metrics.reporter.prom.port: 9250-9260
state.savepoints.dir: {{ .Values.savepointDir }}-{{ .Release.Name }}
apiVersion: apps/v1
kind: Deployment
metadata:
namespace: streaming
name: flink-tm-{{ .Release.Name }}
spec:
replicas: {{ .Values.numOfTaskManager }}
selector:
matchLabels:
app: flink
component: flink-tm-{{ .Release.Name }}
template:
metadata:
labels:
app: flink
component: flink-tm-{{ .Release.Name }}
spec:
serviceAccountName: download-metadata
containers:
- name: taskmanager
image: {{ .Values.image }}
env:
- name: POD_IP
valueFrom:
fieldRef:
apiVersion: v1
fieldPath: status.podIP
args: ["taskmanager"]
ports:
- containerPort: 6122
name: rpc
- containerPort: 6125
name: query-state
livenessProbe:
tcpSocket:
port: 6122
initialDelaySeconds: 30
periodSeconds: 60
volumeMounts:
- name: flink-config-volume
mountPath: /opt/flink/conf/
# - mountPath: /tmp/
# name: ssd-tmp
# subPathExpr: $(POD_IP)
securityContext:
runAsUser: 9999 # refers to user _flink_ from official flink image,
change if necessary
resources:
requests:
memory: 2G
cpu: 1
limits:
memory: 2G
cpu: 1
volumes:
- name: flink-config-volume
configMap:
name: flink-config-{{ .Release.Name }}
items:
- key: flink-conf.yaml
path: flink-conf.yaml
- key: log4j-console.properties
path: log4j-console.properties
# - name: ssd-tmp
# hostPath:
# path: /ssd/tmp/
# type: Directory
apiVersion: v1
kind: Service
metadata:
namespace: streaming
name: flink-jm-svc-{{ .Release.Name }}
spec:
type: ClusterIP
ports:
- name: rpc
port: 6123
- name: blob-server
port: 6124
- name: webui
port: 8081
selector:
app: flink
component: flink-jm-{{ .Release.Name }}
apiVersion: batch/v1
kind: Job
metadata:
namespace: streaming
name: flink-jm-{{ .Release.Name }}
spec:
parallelism: {{ .Values.numOfJobManager }}
template:
metadata:
labels:
app: flink
component: flink-jm-{{ .Release.Name }}
spec:
restartPolicy: OnFailure
containers:
- name: jobmanager
image: {{ .Values.image }}
env:
- name: POD_IP
valueFrom:
fieldRef:
apiVersion: v1
fieldPath: status.podIP
args: [
"standalone-job",
"--host",
"$(POD_IP)",
"--job-classname",
"{{ .Values.jobClassName }}",
]
ports:
- containerPort: 6123
name: rpc
- containerPort: 6124
name: blob-server
- containerPort: 8081
name: webui
livenessProbe:
tcpSocket:
port: 6123
initialDelaySeconds: 30
periodSeconds: 60
volumeMounts:
- name: flink-config-volume
mountPath: /opt/flink/conf
# - mountPath: /tmp/
# name: ssd-tmp
# subPathExpr: $(POD_IP)
- name: pipelines-metadata
mountPath: /app
securityContext:
runAsUser: 9999 # refers to user _flink_ from official flink image,
change if necessary
serviceAccountName: download-metadata # Service account which has the
permissions to create, edit, delete ConfigMaps
volumes:
- name: flink-config-volume
configMap:
name: flink-config-{{ .Release.Name }}
items:
- key: flink-conf.yaml
path: flink-conf.yaml
- key: log4j-console.properties
path: log4j-console.properties
# - name: ssd-tmp
# hostPath:
# path: /ssd/tmp/
# type: Directory
- name: pipelines-metadata
emptyDir: {}