Hi Folks,

So I think this is what's happening. My pipeline has multi-language
transforms because its using Kafka IO. When the runner calls transform on
those transforms it contacts the expansion service which responds back with
a list of jars. The runner then downloads those jars.

Then when the runner stages artifacts it ends up uploading those jars to
the staging service.

This seems unnecessary and inefficient. I was able to hack the
portable_runner.py code to test this out. When I did my job submitted quite
quickly (the pipeline's running but I haven't verified its working so its
possible I broke something).

Is this working as intended? Is there someway to avoid this without having
to hack the runner code?

Interestingly, it seems like downloading the jars is much faster than
uploading them but I haven't investigated this.

J

On Fri, Feb 11, 2022 at 8:23 AM Jeremy Lewi <[email protected]> wrote:

> Hi Folks,
>
> I'm using a patched version of apache Beam 2.35 python and running on
> Flink on Kubernetes using the PortableJobRunner.
>
> It looks like when submitting the job, the runner tries to upload a large
> 274 Mb flink job server jar to the staging service.
>
> This doesn't seem right. I already have an instance of the JobServer
> running and my program is talking to the job server so why is it trying to
> stage the JobServer?
>
> I believe this problem started when I upgraded to 2.35.
>
> When I debugged this I found the runner was getting stuck in
> offer_artifacts
>
> https://github.com/apache/beam/blob/38b21a71a76aad902bd903d525b25a5ff464df55/sdks/python/apache_beam/runners/portability/artifact_service.py#L235
>
> When I looked at the rolePayload of the request in question it was
>
> rolePayload=beam-runners-flink-job-server-quH8FRP1-liJ7K9es-qBV9wguz4oBRlVegwqlW3OpqU.jar
>
> How does the runner decide which artifacts to upload? Could this be caused
> by using a patched version of the Python SDK but an unpatched version of
> the job server jar; so the python version (2.35.0.dev2) doesn't match the
> JobServer version 2.35.0? As a result, the portable runner thinks it needs
> to upload the Jar?
>
> We build our own version of the Python SDK because we need a fix for
> https://issues.apache.org/jira/browse/BEAM-12244.
>
> When we were using 2.33 we were also building our own Flink Jars in order
> to pull in Kafka patches which hadn't been released yet.
>
> Thanks
> J
>
>
>
>

Reply via email to