Thank you Roman. Yes, that's what I am going to do.

But I'm running into another issue... when I specify the *--pyArchives*
option on the command line, the job never gets submitted and is stuck
forever. And when I try to programmatically do this by calling
*add_python_archive()*, the job gets submitted but fails because the target
directory is not found on the UDF node. Flink is deployed on a K8S cluster
in my case and the port 8081 is forwarded to the localhost.

Here's the command line I use:

~/flink-1.13.0/bin/flink run --jobmanager localhost:8081  --python
my_job.py  --pyArchives file:///path/to/schema.zip#schema

And within the UDF I'm access the schema file as:

read_schema('schema/my_schema.json')

Or if I try using the API instead of the command-line, the call looks as:

env = StreamExecutionEnvironment.get_execution_environment()
env.add_python_archive('schema.zip', 'schema')

Initially, my_job.py itself had its own command line options, and I was
thinking that might interfere with the overall Flink command line options,
but even after removing that I'm not able to submit the job anymore.
However, if I don't use the --pyArchives option and manually transfer the
schema file to a location on the UDF node, the job gets submitted and works
as expected.

Any reason why this might happen?

Thanks,
Sumeet


On Thu, Jun 10, 2021 at 11:09 PM Roman Khachatryan <ro...@apache.org> wrote:

> Hi,
>
> I think the second option is what you need. The documentation says
> only zip format is supported.
> Alternatively, you could upload the files to S3 or other DFS and
> access from TMs and re-upload when needed.
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/python/dependency_management/#archives
>
> Regards,
> Roman
>
> On Wed, Jun 9, 2021 at 3:57 PM Sumeet Malhotra
> <sumeet.malho...@gmail.com> wrote:
> >
> > Hi,
> >
> > I'm using UDTFs in PyFlink, that depend upon a few resource files (JSON
> schema files actually). The path of this file can be passed into the UDTF,
> but essentially this path needs to exist on the Task Manager node where the
> task executes. What's the best way to upload these resource files? As of
> now, my custom Flink image creates a fixed path with the required resource
> files, but I'd like it to be run time configurable.
> >
> > There are 2 APIs available to load files when submitting a PyFlink job...
> >
> > stream_execution_environment.add_python_file() - Recommended to upload
> files (.py etc) but doesn't let me configure the final path on the target
> node. The files are added to PYTHONPATH, but it needs the UDTF function to
> lookup for this file. I'd like to pass the file location into the UDTF
> instead.
> >
> > stream_execution_environment.add_python_archive() - Appears to be more
> generic, in the sense that it allows a target directory to be specified.
> The documentation doesn't say anything about the contents of the archive,
> so I'm guessing it could be any type of file. Is this what is needed for my
> use case?
> >
> > Or is there any other recommended way to upload non-Python
> dependencies/resources?
> >
> > Thanks in advance,
> > Sumeet
>

Reply via email to