Hi Mate,

That option might be exactly what I need. Thanks!

Best regards,
Nathan T. A. Lewis




---- On Sun, 12 May 2024 05:27:10 -0600 czmat...@gmail.com wrote ----


Hi Nathan,


Job submissions for FlinkSessionJob resources will always be done by first 
uploading the JAR file itself from the Operator pod using the JobManager's REST 
API, then starting a new job using the uploaded JAR. This means that 
downloading the JAR file with an initContainer to the JobManager will not help 
in your case.


You could look into the Operator config option 
'kubernetes.operator.user.artifacts.http.header' to set the HTTP headers used 
to download the artifacts. Please check FLINK-27483 [1] for more information.


[1] https://issues.apache.org/jira/browse/FLINK-27483


Regards,
Mate Czagany



Nathan T. A. Lewis <nat...@sourcespectrum.com> ezt írta (időpont: 2024. máj. 
9., Cs, 19:00):

Hello,

I am trying to run a Flink Session Job with a jar that is hosted on a maven 
repository in Google's Artifact Registry.

The first thing I tried was to just specify the `jarURI` directly:

apiVersion: flink.apache.org/v1beta1
kind: FlinkSessionJob
metadata:
  name: myJobName
spec:
  deploymentName: flink-session
  job:
    jarURI: 
"https://mylocation-maven.pkg.dev/myGCPproject/myreponame/path/to/the.jar";
    entryClass: myentryclass
    parallelism: 1
    upgradeMode: savepoint

But, since it is a private repository, it not surprisingly resulted in:

java.io.IOException: Server returned HTTP response code: 401 for URL: 
https://mylocation-maven.pkg.dev/myGCPproject/myreponame/path/to/the.jar

I didn't see anywhere in the FlinkSessionJob definition to put a bearer token 
and doubt it would be a good idea security-wise to store one there anyway, so I 
instead looked into using `initContainers` on the FlinkDeployment like in this 
example: 
https://github.com/apache/flink-kubernetes-operator/blob/main/examples/pod-template.yaml

apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: flink-session
spec:
  flinkVersion: v1_18
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "2"
    state.checkpoints.dir: mycheckpointsdir
    state.savepoints.dir: mysavepointsdir
    state.backend: rocksdb
    state.backend.rocksdb.timer-service.factory: ROCKSDB
    state.backend.incremental: "true"
    execution.checkpointing.interval: "1m"
  serviceAccount: flink
  jobManager:
    resource:
      memory: "2048m"
      cpu: 0.5
  taskManager:
    resource:
      memory: "2048m"
      cpu: 1
  podTemplate:
      spec:
        initContainers:
          - name: gcloud
            image: google/cloud-sdk:latest
            volumeMounts:
              - mountPath: /opt/flink/downloads
                name: downloads
            command: ["sh", "-c", "gcloud artifacts files download 
--project=myGCPproject --repository=myreponame --location=mylocation 
--destination=/opt/flink/downloads path/to/the.jar"]
        containers:
          - name: flink-main-container
            volumeMounts:
              - mountPath: /opt/flink/downloads
                name: downloads
        volumes:
          - name: downloads
            emptyDir: { }

This worked well for getting the jar onto the jobManager pod, but it looks like 
the FlinkSessionJob actually looks for the jar on the pod of the Flink 
Kubernetes Operator itself. So in the end, the job still isn't being run.

As a workaround for now, I'm planning to move my jar from Maven to a Google 
Cloud Storage bucket and then add the gcs filesystem plugin to the operator 
image. What I'd love to know is if I've overlooked some already implemented way 
to connect to a private maven repository for a FlinkSessionJob. I suppose in a 
worst case, we could write a filesystem plugin that handles the 
`artifactrepository://` scheme and uses Google's java libraries to handle 
authentication and download of the artifact. Again, I'm kind of hoping 
something already exists though, rather than having to build something new.


Best regards,
Nathan T.A. Lewis

Reply via email to