Thank you Roman. Yes, that's what I am going to do. But I'm running into another issue... when I specify the *--pyArchives* option on the command line, the job never gets submitted and is stuck forever. And when I try to programmatically do this by calling *add_python_archive()*, the job gets submitted but fails because the target directory is not found on the UDF node. Flink is deployed on a K8S cluster in my case and the port 8081 is forwarded to the localhost.
Here's the command line I use: ~/flink-1.13.0/bin/flink run --jobmanager localhost:8081 --python my_job.py --pyArchives file:///path/to/schema.zip#schema And within the UDF I'm access the schema file as: read_schema('schema/my_schema.json') Or if I try using the API instead of the command-line, the call looks as: env = StreamExecutionEnvironment.get_execution_environment() env.add_python_archive('schema.zip', 'schema') Initially, my_job.py itself had its own command line options, and I was thinking that might interfere with the overall Flink command line options, but even after removing that I'm not able to submit the job anymore. However, if I don't use the --pyArchives option and manually transfer the schema file to a location on the UDF node, the job gets submitted and works as expected. Any reason why this might happen? Thanks, Sumeet On Thu, Jun 10, 2021 at 11:09 PM Roman Khachatryan <ro...@apache.org> wrote: > Hi, > > I think the second option is what you need. The documentation says > only zip format is supported. > Alternatively, you could upload the files to S3 or other DFS and > access from TMs and re-upload when needed. > > [1] > > https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/python/dependency_management/#archives > > Regards, > Roman > > On Wed, Jun 9, 2021 at 3:57 PM Sumeet Malhotra > <sumeet.malho...@gmail.com> wrote: > > > > Hi, > > > > I'm using UDTFs in PyFlink, that depend upon a few resource files (JSON > schema files actually). The path of this file can be passed into the UDTF, > but essentially this path needs to exist on the Task Manager node where the > task executes. What's the best way to upload these resource files? As of > now, my custom Flink image creates a fixed path with the required resource > files, but I'd like it to be run time configurable. > > > > There are 2 APIs available to load files when submitting a PyFlink job... > > > > stream_execution_environment.add_python_file() - Recommended to upload > files (.py etc) but doesn't let me configure the final path on the target > node. The files are added to PYTHONPATH, but it needs the UDTF function to > lookup for this file. I'd like to pass the file location into the UDTF > instead. > > > > stream_execution_environment.add_python_archive() - Appears to be more > generic, in the sense that it allows a target directory to be specified. > The documentation doesn't say anything about the contents of the archive, > so I'm guessing it could be any type of file. Is this what is needed for my > use case? > > > > Or is there any other recommended way to upload non-Python > dependencies/resources? > > > > Thanks in advance, > > Sumeet >