Hi,
I know that there are various options for configuring Flink using Beam
with the Java SDK, but are there any options to do the same with the
Python SDK? The FlinkRunnerOptions class offers only a fraction of what
the Java FlinkPipelineOptions class provides. I would like to be able to
set the parallelism when I submit an Uber JAR and I would also like to
be able to set the task retry count.
On 30/11/2021 17:49, Janek Bevendorff wrote:
Again, one step closer to getting this thing running:
I have to set --artifact_endpoint as well to submit a job to a remote
job server, not just --job_endpoint. Would be great if the docs at
least mentioned that.
I also don't understand really why there is no single option for
setting both the job and artifact endpoint address (without the port
number). They must both run in the same container, otherwise I get
errors about invalid job staging IDs, so having two options is kind of
redundant configuration.
Janek
On 30/11/2021 16:08, Janek Bevendorff wrote:
Any ideas here? I am a few steps further now, but not quite there yet.
The main deployment issue can be solved by sharing only
/tmp/beam-artifact-staging among job and taskmanagers, not the whole
tmp directory (this is knowledge from countless hours of googling and
trying things out, no documentation here whatsover). It does not
solve the remote deployment issue, but so far I am at least able to
submit jobs with a locally running Beam job server.
Unfortunately, I am getting random gRPC terminations with this
exception:
File "venv/lib/python3.7/site-packages/apache_beam/pipeline.py",
line 597, in __exit__
self.result.wait_until_finish()
File
"venv/lib/python3.7/site-packages/apache_beam/runners/portability/portable_runner.py",
line 600, in wait_until_finish
raise self._runtime_exception
RuntimeError: Pipeline
BeamApp-root-1130144051-d0476877_4d32d3e8-1fbb-4f2d-88d9-c2e05fd624eb
failed in state FAILED:
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.StatusRuntimeException:
CANCELLED: client cancelled
The error occurs randomly without any indication why. Do you have any
idea what may be wrong with the gRPC connection or also what I may be
missing for the remote job server deployment?
Besides these questions, I also have a little rant (sorry for that,
but I have to get this off my chest):
I am getting a extremely frustrated with the Python documentation,
which is often incomplete, sometimes outdated and occasionally plain
wrong. I can tell that many examples were never actually run, because
they contain invalid Python code, function names differ from what the
API actually offers, parentheses are in the wrong places etc. One
particular example is the splittable DoFn documentation. The original
blog post is entirely outdated and also contains invalid Python code
(missing self parameters of methods and such), but also the online
manual is wrong (missing constructor parameters or required method
overrides here, wrong parentheses there...). To understand how
everything works, I am basically reverse engineering the code, taking
into account the little API documentation there is. This is beyond
annoying.
I also noticed that stateful processing is completely broken
apparently. With the local runner, it doesn't run at all (various
exceptions thrown) and with the FlinkRunner or PortableRunner,
BagStateSpecs or other state parameters are always empty and
watermark timers fire after each invocation of process(). I tried
countless potential solutions, but nothing works, so I gave up and
resorted to using a CombineFn-based PTransform instead.
Janek
On 26/11/2021 17:01, Janek Bevendorff wrote:
I am one step further, but also not really.
When I mount the shared drive that serves /tmp on the Flink job and
task managers also on my local machine and then spin up a local Beam
job server with this volume mounted on /tmp as well, I can get my
job to start. This is ugly as hell, because it requires so many
extra steps, but at least it's progress.
Unfortunately, the job doesn't run properly and fails with
File "/usr/local/lib/python3.7/site-packages/dill/_dill.py", line
462, in find_class
return StockUnpickler.find_class(self, module, name)
ModuleNotFoundError: No module named 'XXXX'
where XXX is the my application module that I deploy with
--setup_file. When I download the workflow.tar.gz from the staging
directory, I can confirm that the module is present.
This isn't working as intended at all. Also what happens if multiple
users submit applications at the same time? All the Beam stuff in
/tmp has random names, but the stages/workflow.tar.gz file that is
provided for the Python SDK sidecar container has the same name for
each job. Hence it would be impossible to serve multiple users with
this setup.
Janek
On 26/11/2021 15:32, Janek Bevendorff wrote:
Hi,
Currently, I am struggling with getting Beam to run on a
Kubernetes-hosted Flink cluster and there is very little to no
documentation on how to resolve my deployment issues (besides a few
Stackoverflow threads without solutions).
I have a Flink job server running on Kubernetes that creates new
taskmanager pods from a pod template when I submit a job. Each
taskmanager pod has a sidecar container running the Beam Python SDK
image.
With this setup in place, I tried multiple methods to submit a
Python Beam job and all of them fail for different reasons:
1) Run my Python job via the FlinkRunner and set
--environment_type=EXTERNAL
This works perfectly fine locally, but fails when I set
--flink_master to the Kubernetes load balancer IP to submit to the
remote Kubernetes cluster. It allows me to submit the job itself
successfully, but not the necessary staging data. The Flink
container shows
java.io.FileNotFoundException:
/tmp/beam-temp7hxxe2gs/artifacts2liu9b8y/779b17e6efab2bbfcba170762d1096fe2451e0e76c4361af9a68296f23f4a4ec/1-ref_Environment_default_e-workflow.tar.gz
(No such file or directory)
and the Python worker shows
2021/11/26 14:16:24 Failed to retrieve staged files: failed to
retrieve /tmp/staged in 3 attempts: failed to retrieve chunk for
/tmp/staged/workflow.tar.gz
caused by:
rpc error: code = Unknown desc = ; failed to retrieve chunk for
/tmp/staged/workflow.tar.gz
caused by:
rpc error: code = Unknown desc = ; failed to retrieve chunk for
/tmp/staged/workflow.tar.gz
caused by:
rpc error: code = Unknown desc = ; failed to retrieve chunk for
/tmp/staged/workflow.tar.gz
caused by:
rpc error: code = Unknown desc =
I found a Stackoverflow issue with the exact same issue, but
without a solution. The file seems to exist only under /tmp on my
local client machine, which is useless.
2) Submit the job with --flink_submit_uber_jar=True
This will submit the staging information correctly, but I cannot
set the amount of parallelism. Instead I get the following warning:
WARNING:apache_beam.options.pipeline_options:Discarding invalid
overrides: {'parallelism': 100}
and the job runs with only a single worker (useless as well).
3) Spawn another job manager sidecar container running the Beam job
server and submit via the PortableRunner
This works (somewhat) when I run the job server image locally with
--network=host, but I cannot get it to work on Kubernetes. I
exposed the ports 8097-8099 on the load balancer IP, but when I
submit a job, I only get
grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous
of RPC that terminated with:
status = StatusCode.UNAVAILABLE
details = "failed to connect to all addresses"
debug_error_string =
"{"created":"@1637934464.499518882","description":"Failed to pick
subchannel","file":"src/core
/ext/filters/client_channel/client_channel.cc","file_line":3158,"referenced_errors":[{"created":"@1637934464.499518362","de
scription":"failed to connect to all
addresses","file":"src/core/lib/transport/error_utils.cc","file_line":147,"grpc_status
":14}]}"
This method also seems to suffer from the same issue as 2) that I
am unable to control the amount of parallelism.
Is there anything that I am doing fundamentally wrong? I cannot
really imagine that it is this difficult to submit a simple Python
job to a Beam/Flink cluster.
Thanks for any help
Janek
--
Bauhaus-Universität Weimar
Bauhausstr. 9a, R308
99423 Weimar, Germany
Phone: +49 3643 58 3577
www.webis.de