Hello

I am trying to make a Flink application deployment in k8s, but the error
message shows that the task manager can't resolve resource manager address


*Could not resolve ResourceManager address
akka.tcp://flink@flink-jm-svc-streaming-job:6123/user/rpc/resourcemanager_*,
retrying in 10000 ms: Could not connect to rpc endpoint under address
akka.tcp://flink@flink-jm-svc-streaming-job:6123/user/rpc/resourcemanager_*.*

I already exposed the job manager address as a service and triple checked
all the values, I couldn't figure out why the resource manager address
can't be resolved.

I attached the job manager, task manager, job manager service and part of
the flink-config.yaml below. Any help is appreciated.
flink-conf.yaml: |+
    jobmanager.rpc.address: flink-jm-svc-{{ .Release.Name }}
    jobmanager.rpc.port: 6123
    taskmanager.numberOfTaskSlots: {{ .Values.numOfTaskSlots }}
    blob.server.port: 6124
    taskmanager.rpc.port: 6122
    queryable-state.proxy.ports: 6125
    jobmanager.memory.process.size: {{ .Values.jobManagerMemoryProcessSize }}
    taskmanager.memory.process.size: {{ .Values.taskManagerMemoryProcessSize }}
    parallelism.default: 4
    kubernetes.cluster-id: flink-app-cluster-{{ .Release.Name }}
    # high-availability: 
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
    # high-availability.storageDir: {{ .Values.haStorageDir }}
    restart-strategy: fixed-delay
    restart-strategy.fixed-delay.attempts: 10
    kubernetes.namespace: streaming
    metrics.reporter.prom.class: 
org.apache.flink.metrics.prometheus.PrometheusReporter
    metrics.reporter.prom.port: 9250-9260
    state.savepoints.dir: {{ .Values.savepointDir }}-{{ .Release.Name }}
apiVersion: apps/v1
kind: Deployment
metadata:
  namespace: streaming
  name: flink-tm-{{ .Release.Name }}
spec:
  replicas: {{ .Values.numOfTaskManager }}
  selector:
    matchLabels:
      app: flink
      component: flink-tm-{{ .Release.Name }}
  template:
    metadata:
      labels:
        app: flink
        component: flink-tm-{{ .Release.Name }}
    spec:
      serviceAccountName: download-metadata
      containers:
        - name: taskmanager
          image: {{ .Values.image }}
          env:
            - name: POD_IP
              valueFrom:
                fieldRef:
                  apiVersion: v1
                  fieldPath: status.podIP
          args: ["taskmanager"]
          ports:
            - containerPort: 6122
              name: rpc
            - containerPort: 6125
              name: query-state
          livenessProbe:
            tcpSocket:
              port: 6122
            initialDelaySeconds: 30
            periodSeconds: 60
          volumeMounts:
            - name: flink-config-volume
              mountPath: /opt/flink/conf/
#            - mountPath: /tmp/
#              name: ssd-tmp
#              subPathExpr: $(POD_IP)

          securityContext:
            runAsUser: 9999 # refers to user _flink_ from official flink image, 
change if necessary
          resources:
            requests:
              memory: 2G
              cpu: 1
            limits:
              memory: 2G
              cpu: 1
      volumes:
        - name: flink-config-volume
          configMap:
            name: flink-config-{{ .Release.Name }}
            items:
              - key: flink-conf.yaml
                path: flink-conf.yaml
              - key: log4j-console.properties
                path: log4j-console.properties
#        - name: ssd-tmp
#          hostPath:
#            path: /ssd/tmp/
#            type: Directory
apiVersion: v1
kind: Service
metadata:
  namespace: streaming
  name: flink-jm-svc-{{ .Release.Name }}
spec:
  type: ClusterIP
  ports:
  - name: rpc
    port: 6123
  - name: blob-server
    port: 6124
  - name: webui
    port: 8081
  selector:
    app: flink
    component: flink-jm-{{ .Release.Name }}
apiVersion: batch/v1
kind: Job
metadata:
  namespace: streaming
  name: flink-jm-{{ .Release.Name }}
spec:
  parallelism: {{ .Values.numOfJobManager }}
  template:
    metadata:
      labels:
        app: flink
        component: flink-jm-{{ .Release.Name }}
    spec:
      restartPolicy: OnFailure
      containers:
        - name: jobmanager
          image: {{ .Values.image }}
          env:
            - name: POD_IP
              valueFrom:
                fieldRef:
                  apiVersion: v1
                  fieldPath: status.podIP
          args: [
              "standalone-job",
              "--host",
              "$(POD_IP)",
              "--job-classname",
              "{{ .Values.jobClassName }}",
            ]
          ports:
            - containerPort: 6123
              name: rpc
            - containerPort: 6124
              name: blob-server
            - containerPort: 8081
              name: webui
          livenessProbe:
            tcpSocket:
              port: 6123
            initialDelaySeconds: 30
            periodSeconds: 60
          volumeMounts:
            - name: flink-config-volume
              mountPath: /opt/flink/conf
#            - mountPath: /tmp/
#              name: ssd-tmp
#              subPathExpr: $(POD_IP)
            - name: pipelines-metadata
              mountPath: /app
          securityContext:
            runAsUser: 9999 # refers to user _flink_ from official flink image, 
change if necessary
      serviceAccountName: download-metadata # Service account which has the 
permissions to create, edit, delete ConfigMaps
      volumes:
        - name: flink-config-volume
          configMap:
            name: flink-config-{{ .Release.Name }}
            items:
              - key: flink-conf.yaml
                path: flink-conf.yaml
              - key: log4j-console.properties
                path: log4j-console.properties
#        - name: ssd-tmp
#          hostPath:
#            path: /ssd/tmp/
#            type: Directory
        - name: pipelines-metadata
          emptyDir: {}

Reply via email to