Hi,
Currently, I am struggling with getting Beam to run on a
Kubernetes-hosted Flink cluster and there is very little to no
documentation on how to resolve my deployment issues (besides a few
Stackoverflow threads without solutions).
I have a Flink job server running on Kubernetes that creates new
taskmanager pods from a pod template when I submit a job. Each
taskmanager pod has a sidecar container running the Beam Python SDK image.
With this setup in place, I tried multiple methods to submit a Python
Beam job and all of them fail for different reasons:
1) Run my Python job via the FlinkRunner and set --environment_type=EXTERNAL
This works perfectly fine locally, but fails when I set --flink_master
to the Kubernetes load balancer IP to submit to the remote Kubernetes
cluster. It allows me to submit the job itself successfully, but not
the necessary staging data. The Flink container shows
java.io.FileNotFoundException:
/tmp/beam-temp7hxxe2gs/artifacts2liu9b8y/779b17e6efab2bbfcba170762d1096fe2451e0e76c4361af9a68296f23f4a4ec/1-ref_Environment_default_e-workflow.tar.gz
(No such file or directory)
and the Python worker shows
2021/11/26 14:16:24 Failed to retrieve staged files: failed to retrieve
/tmp/staged in 3 attempts: failed to retrieve chunk for
/tmp/staged/workflow.tar.gz
caused by:
rpc error: code = Unknown desc = ; failed to retrieve chunk for
/tmp/staged/workflow.tar.gz
caused by:
rpc error: code = Unknown desc = ; failed to retrieve chunk for
/tmp/staged/workflow.tar.gz
caused by:
rpc error: code = Unknown desc = ; failed to retrieve chunk for
/tmp/staged/workflow.tar.gz
caused by:
rpc error: code = Unknown desc =
I found a Stackoverflow issue with the exact same issue, but without a
solution. The file seems to exist only under /tmp on my local client
machine, which is useless.
2) Submit the job with --flink_submit_uber_jar=True
This will submit the staging information correctly, but I cannot set the
amount of parallelism. Instead I get the following warning:
WARNING:apache_beam.options.pipeline_options:Discarding invalid
overrides: {'parallelism': 100}
and the job runs with only a single worker (useless as well).
3) Spawn another job manager sidecar container running the Beam job
server and submit via the PortableRunner
This works (somewhat) when I run the job server image locally with
--network=host, but I cannot get it to work on Kubernetes. I exposed the
ports 8097-8099 on the load balancer IP, but when I submit a job, I only
get
grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC
that terminated with:
status = StatusCode.UNAVAILABLE
details = "failed to connect to all addresses"
debug_error_string =
"{"created":"@1637934464.499518882","description":"Failed to pick
subchannel","file":"src/core
/ext/filters/client_channel/client_channel.cc","file_line":3158,"referenced_errors":[{"created":"@1637934464.499518362","de
scription":"failed to connect to all
addresses","file":"src/core/lib/transport/error_utils.cc","file_line":147,"grpc_status
":14}]}"
This method also seems to suffer from the same issue as 2) that I am
unable to control the amount of parallelism.
Is there anything that I am doing fundamentally wrong? I cannot really
imagine that it is this difficult to submit a simple Python job to a
Beam/Flink cluster.
Thanks for any help
Janek