I am one step further, but also not really.

When I mount the shared drive that serves /tmp on the Flink job and task managers also on my local machine and then spin up a local Beam job server with this volume mounted on /tmp as well, I can get my job to start. This is ugly as hell, because it requires so many extra steps, but at least it's progress.

Unfortunately, the job doesn't run properly and fails with

  File "/usr/local/lib/python3.7/site-packages/dill/_dill.py", line 462, in find_class
    return StockUnpickler.find_class(self, module, name)
ModuleNotFoundError: No module named 'XXXX'

where XXX is the my application module that I deploy with --setup_file. When I download the workflow.tar.gz from the staging directory, I can confirm that the module is present.

This isn't working as intended at all. Also what happens if multiple users submit applications at the same time? All the Beam stuff in /tmp has random names, but the stages/workflow.tar.gz file that is provided for the Python SDK sidecar container has the same name for each job. Hence it would be impossible to serve multiple users with this setup.

Janek


On 26/11/2021 15:32, Janek Bevendorff wrote:
Hi,

Currently, I am struggling with getting Beam to run on a Kubernetes-hosted Flink cluster and there is very little to no documentation on how to resolve my deployment issues (besides a few Stackoverflow threads without solutions).

I have a Flink job server running on Kubernetes that creates new taskmanager pods from a pod template when I submit a job. Each taskmanager pod has a sidecar container running the Beam Python SDK image.

With this setup in place, I tried multiple methods to submit a Python Beam job and all of them fail for different reasons:

1) Run my Python job via the FlinkRunner and set --environment_type=EXTERNAL

This works perfectly fine locally, but fails when I set --flink_master to the Kubernetes load balancer IP to submit to the remote Kubernetes cluster. It  allows me to submit the job itself successfully, but not the necessary staging data. The Flink container shows

java.io.FileNotFoundException: /tmp/beam-temp7hxxe2gs/artifacts2liu9b8y/779b17e6efab2bbfcba170762d1096fe2451e0e76c4361af9a68296f23f4a4ec/1-ref_Environment_default_e-workflow.tar.gz (No such file or directory)

and the Python worker shows

2021/11/26 14:16:24 Failed to retrieve staged files: failed to retrieve /tmp/staged in 3 attempts: failed to retrieve chunk for /tmp/staged/workflow.tar.gz
        caused by:
rpc error: code = Unknown desc = ; failed to retrieve chunk for /tmp/staged/workflow.tar.gz
        caused by:
rpc error: code = Unknown desc = ; failed to retrieve chunk for /tmp/staged/workflow.tar.gz
        caused by:
rpc error: code = Unknown desc = ; failed to retrieve chunk for /tmp/staged/workflow.tar.gz
        caused by:
rpc error: code = Unknown desc =

I found a Stackoverflow issue with the exact same issue, but without a solution. The file seems to exist only under /tmp on my local client machine, which is useless.

2) Submit the job with --flink_submit_uber_jar=True

This will submit the staging information correctly, but I cannot set the amount of parallelism. Instead I get the following warning:

WARNING:apache_beam.options.pipeline_options:Discarding invalid overrides: {'parallelism': 100}

and the job runs with only a single worker (useless as well).

3) Spawn another job manager sidecar container running the Beam job server and submit via the PortableRunner

This works (somewhat) when I run the job server image locally with --network=host, but I cannot get it to work on Kubernetes. I exposed the ports 8097-8099 on the load balancer IP, but when I submit a job, I only get

grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
        status = StatusCode.UNAVAILABLE
        details = "failed to connect to all addresses"
        debug_error_string = "{"created":"@1637934464.499518882","description":"Failed to pick subchannel","file":"src/core /ext/filters/client_channel/client_channel.cc","file_line":3158,"referenced_errors":[{"created":"@1637934464.499518362","de scription":"failed to connect to all addresses","file":"src/core/lib/transport/error_utils.cc","file_line":147,"grpc_status
":14}]}"

This method also seems to suffer from the same issue as 2) that I am unable to control the amount of parallelism.


Is there anything that I am doing fundamentally wrong? I cannot really imagine that it is this difficult to submit a simple Python job to a Beam/Flink cluster.


Thanks for any help
Janek

--

Bauhaus-Universität Weimar
Bauhausstr. 9a, R308
99423 Weimar, Germany

Phone: +49 3643 58 3577
www.webis.de

Reply via email to