Re: PyFlink: Upload resource files to Flink cluster

2021-06-14 Thread Dian Fu
Hi Sumeet,

The archive files will be uploaded to the blob server. This is the same no 
matter specifying the archives via command line option `—pyArchives` or via 
`add_python_archive`. 


> And when I try to programmatically do this by calling add_python_archive(), 
> the job gets submitted but fails because the target directory is not found on 
> the UDF node.

Could you share some code snippet, e.g. is this a Table API program or a 
DataStream API program? Besides, could you share the exception stack?

Regards,
Dian

> 2021年6月11日 下午7:25,Sumeet Malhotra  写道:
> 
> I'm using a standalone deployment on Kubernetes for this use case. Does the 
> archive get uploaded to the cluster via the :8081 REST/WebUI port or via some 
> other port like 6123/RPC or 6124/BLOB-SERVER? I'm wondering if not exposing 
> those ports on the local machine might prevent the archive from getting 
> loaded? Although I would have expected an explicit error in that case.
> 
> NAMESPACE NAME   TYPE   PORTS 
> flink flink-jobmanager   ClusterIP  rpc:6123►0 blob-server:6124►0 
> webui:8081►0
> 
> Thanks,
> Sumeet
> 
> 
> On Fri, Jun 11, 2021 at 2:48 PM Roman Khachatryan  > wrote:
> Hi Sumeet,
> 
> Probably there is an issue with uploading the archive while submitting the 
> job.
> The commands and API usage look good to me.
> Dian could you please confirm that?
> 
> Regards,
> Roman
> 
> On Fri, Jun 11, 2021 at 9:04 AM Sumeet Malhotra
> mailto:sumeet.malho...@gmail.com>> wrote:
> >
> > Thank you Roman. Yes, that's what I am going to do.
> >
> > But I'm running into another issue... when I specify the --pyArchives 
> > option on the command line, the job never gets submitted and is stuck 
> > forever. And when I try to programmatically do this by calling 
> > add_python_archive(), the job gets submitted but fails because the target 
> > directory is not found on the UDF node. Flink is deployed on a K8S cluster 
> > in my case and the port 8081 is forwarded to the localhost.
> >
> > Here's the command line I use:
> >
> > ~/flink-1.13.0/bin/flink run --jobmanager localhost:8081  --python 
> > my_job.py  --pyArchives file:///path/to/schema.zip#schema
> >
> > And within the UDF I'm access the schema file as:
> >
> > read_schema('schema/my_schema.json')
> >
> > Or if I try using the API instead of the command-line, the call looks as:
> >
> > env = StreamExecutionEnvironment.get_execution_environment()
> > env.add_python_archive('schema.zip', 'schema')
> >
> > Initially, my_job.py itself had its own command line options, and I was 
> > thinking that might interfere with the overall Flink command line options, 
> > but even after removing that I'm not able to submit the job anymore. 
> > However, if I don't use the --pyArchives option and manually transfer the 
> > schema file to a location on the UDF node, the job gets submitted and works 
> > as expected.
> >
> > Any reason why this might happen?
> >
> > Thanks,
> > Sumeet
> >
> >
> > On Thu, Jun 10, 2021 at 11:09 PM Roman Khachatryan  > > wrote:
> >>
> >> Hi,
> >>
> >> I think the second option is what you need. The documentation says
> >> only zip format is supported.
> >> Alternatively, you could upload the files to S3 or other DFS and
> >> access from TMs and re-upload when needed.
> >>
> >> [1]
> >> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/python/dependency_management/#archives
> >>  
> >> 
> >>
> >> Regards,
> >> Roman
> >>
> >> On Wed, Jun 9, 2021 at 3:57 PM Sumeet Malhotra
> >> mailto:sumeet.malho...@gmail.com>> wrote:
> >> >
> >> > Hi,
> >> >
> >> > I'm using UDTFs in PyFlink, that depend upon a few resource files (JSON 
> >> > schema files actually). The path of this file can be passed into the 
> >> > UDTF, but essentially this path needs to exist on the Task Manager node 
> >> > where the task executes. What's the best way to upload these resource 
> >> > files? As of now, my custom Flink image creates a fixed path with the 
> >> > required resource files, but I'd like it to be run time configurable.
> >> >
> >> > There are 2 APIs available to load files when submitting a PyFlink job...
> >> >
> >> > stream_execution_environment.add_python_file() - Recommended to upload 
> >> > files (.py etc) but doesn't let me configure the final path on the 
> >> > target node. The files are added to PYTHONPATH, but it needs the UDTF 
> >> > function to lookup for this file. I'd like to pass the file location 
> >> > into the UDTF instead.
> >> >
> >> > stream_execution_environment.add_python_archive() - Appears to be more 
> >> > generic, in the sense that it allows a target directory to be specified. 
> >> > The documentation doesn't say anything about the contents of the 
> >> > archive, so I'm guessing it could be any type of file. Is this what is 
> >> > needed 

Re: PyFlink: Upload resource files to Flink cluster

2021-06-11 Thread Sumeet Malhotra
I'm using a standalone deployment on Kubernetes for this use case. Does the
archive get uploaded to the cluster via the :8081 REST/WebUI port or via
some other port like 6123/RPC or 6124/BLOB-SERVER? I'm wondering if not
exposing those ports on the local machine might prevent the archive from
getting loaded? Although I would have expected an explicit error in that
case.

NAMESPACE NAME   TYPE   PORTS
flink flink-jobmanager   ClusterIP  rpc:6123►0
blob-server:6124►0 webui:8081►0

Thanks,
Sumeet


On Fri, Jun 11, 2021 at 2:48 PM Roman Khachatryan  wrote:

> Hi Sumeet,
>
> Probably there is an issue with uploading the archive while submitting the
> job.
> The commands and API usage look good to me.
> Dian could you please confirm that?
>
> Regards,
> Roman
>
> On Fri, Jun 11, 2021 at 9:04 AM Sumeet Malhotra
>  wrote:
> >
> > Thank you Roman. Yes, that's what I am going to do.
> >
> > But I'm running into another issue... when I specify the --pyArchives
> option on the command line, the job never gets submitted and is stuck
> forever. And when I try to programmatically do this by calling
> add_python_archive(), the job gets submitted but fails because the target
> directory is not found on the UDF node. Flink is deployed on a K8S cluster
> in my case and the port 8081 is forwarded to the localhost.
> >
> > Here's the command line I use:
> >
> > ~/flink-1.13.0/bin/flink run --jobmanager localhost:8081  --python
> my_job.py  --pyArchives file:///path/to/schema.zip#schema
> >
> > And within the UDF I'm access the schema file as:
> >
> > read_schema('schema/my_schema.json')
> >
> > Or if I try using the API instead of the command-line, the call looks as:
> >
> > env = StreamExecutionEnvironment.get_execution_environment()
> > env.add_python_archive('schema.zip', 'schema')
> >
> > Initially, my_job.py itself had its own command line options, and I was
> thinking that might interfere with the overall Flink command line options,
> but even after removing that I'm not able to submit the job anymore.
> However, if I don't use the --pyArchives option and manually transfer the
> schema file to a location on the UDF node, the job gets submitted and works
> as expected.
> >
> > Any reason why this might happen?
> >
> > Thanks,
> > Sumeet
> >
> >
> > On Thu, Jun 10, 2021 at 11:09 PM Roman Khachatryan 
> wrote:
> >>
> >> Hi,
> >>
> >> I think the second option is what you need. The documentation says
> >> only zip format is supported.
> >> Alternatively, you could upload the files to S3 or other DFS and
> >> access from TMs and re-upload when needed.
> >>
> >> [1]
> >>
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/python/dependency_management/#archives
> >>
> >> Regards,
> >> Roman
> >>
> >> On Wed, Jun 9, 2021 at 3:57 PM Sumeet Malhotra
> >>  wrote:
> >> >
> >> > Hi,
> >> >
> >> > I'm using UDTFs in PyFlink, that depend upon a few resource files
> (JSON schema files actually). The path of this file can be passed into the
> UDTF, but essentially this path needs to exist on the Task Manager node
> where the task executes. What's the best way to upload these resource
> files? As of now, my custom Flink image creates a fixed path with the
> required resource files, but I'd like it to be run time configurable.
> >> >
> >> > There are 2 APIs available to load files when submitting a PyFlink
> job...
> >> >
> >> > stream_execution_environment.add_python_file() - Recommended to
> upload files (.py etc) but doesn't let me configure the final path on the
> target node. The files are added to PYTHONPATH, but it needs the UDTF
> function to lookup for this file. I'd like to pass the file location into
> the UDTF instead.
> >> >
> >> > stream_execution_environment.add_python_archive() - Appears to be
> more generic, in the sense that it allows a target directory to be
> specified. The documentation doesn't say anything about the contents of the
> archive, so I'm guessing it could be any type of file. Is this what is
> needed for my use case?
> >> >
> >> > Or is there any other recommended way to upload non-Python
> dependencies/resources?
> >> >
> >> > Thanks in advance,
> >> > Sumeet
>


Re: PyFlink: Upload resource files to Flink cluster

2021-06-11 Thread Roman Khachatryan
Hi Sumeet,

Probably there is an issue with uploading the archive while submitting the job.
The commands and API usage look good to me.
Dian could you please confirm that?

Regards,
Roman

On Fri, Jun 11, 2021 at 9:04 AM Sumeet Malhotra
 wrote:
>
> Thank you Roman. Yes, that's what I am going to do.
>
> But I'm running into another issue... when I specify the --pyArchives option 
> on the command line, the job never gets submitted and is stuck forever. And 
> when I try to programmatically do this by calling add_python_archive(), the 
> job gets submitted but fails because the target directory is not found on the 
> UDF node. Flink is deployed on a K8S cluster in my case and the port 8081 is 
> forwarded to the localhost.
>
> Here's the command line I use:
>
> ~/flink-1.13.0/bin/flink run --jobmanager localhost:8081  --python my_job.py  
> --pyArchives file:///path/to/schema.zip#schema
>
> And within the UDF I'm access the schema file as:
>
> read_schema('schema/my_schema.json')
>
> Or if I try using the API instead of the command-line, the call looks as:
>
> env = StreamExecutionEnvironment.get_execution_environment()
> env.add_python_archive('schema.zip', 'schema')
>
> Initially, my_job.py itself had its own command line options, and I was 
> thinking that might interfere with the overall Flink command line options, 
> but even after removing that I'm not able to submit the job anymore. However, 
> if I don't use the --pyArchives option and manually transfer the schema file 
> to a location on the UDF node, the job gets submitted and works as expected.
>
> Any reason why this might happen?
>
> Thanks,
> Sumeet
>
>
> On Thu, Jun 10, 2021 at 11:09 PM Roman Khachatryan  wrote:
>>
>> Hi,
>>
>> I think the second option is what you need. The documentation says
>> only zip format is supported.
>> Alternatively, you could upload the files to S3 or other DFS and
>> access from TMs and re-upload when needed.
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/python/dependency_management/#archives
>>
>> Regards,
>> Roman
>>
>> On Wed, Jun 9, 2021 at 3:57 PM Sumeet Malhotra
>>  wrote:
>> >
>> > Hi,
>> >
>> > I'm using UDTFs in PyFlink, that depend upon a few resource files (JSON 
>> > schema files actually). The path of this file can be passed into the UDTF, 
>> > but essentially this path needs to exist on the Task Manager node where 
>> > the task executes. What's the best way to upload these resource files? As 
>> > of now, my custom Flink image creates a fixed path with the required 
>> > resource files, but I'd like it to be run time configurable.
>> >
>> > There are 2 APIs available to load files when submitting a PyFlink job...
>> >
>> > stream_execution_environment.add_python_file() - Recommended to upload 
>> > files (.py etc) but doesn't let me configure the final path on the target 
>> > node. The files are added to PYTHONPATH, but it needs the UDTF function to 
>> > lookup for this file. I'd like to pass the file location into the UDTF 
>> > instead.
>> >
>> > stream_execution_environment.add_python_archive() - Appears to be more 
>> > generic, in the sense that it allows a target directory to be specified. 
>> > The documentation doesn't say anything about the contents of the archive, 
>> > so I'm guessing it could be any type of file. Is this what is needed for 
>> > my use case?
>> >
>> > Or is there any other recommended way to upload non-Python 
>> > dependencies/resources?
>> >
>> > Thanks in advance,
>> > Sumeet


Re: PyFlink: Upload resource files to Flink cluster

2021-06-11 Thread Sumeet Malhotra
Thank you Roman. Yes, that's what I am going to do.

But I'm running into another issue... when I specify the *--pyArchives*
option on the command line, the job never gets submitted and is stuck
forever. And when I try to programmatically do this by calling
*add_python_archive()*, the job gets submitted but fails because the target
directory is not found on the UDF node. Flink is deployed on a K8S cluster
in my case and the port 8081 is forwarded to the localhost.

Here's the command line I use:

~/flink-1.13.0/bin/flink run --jobmanager localhost:8081  --python
my_job.py  --pyArchives file:///path/to/schema.zip#schema

And within the UDF I'm access the schema file as:

read_schema('schema/my_schema.json')

Or if I try using the API instead of the command-line, the call looks as:

env = StreamExecutionEnvironment.get_execution_environment()
env.add_python_archive('schema.zip', 'schema')

Initially, my_job.py itself had its own command line options, and I was
thinking that might interfere with the overall Flink command line options,
but even after removing that I'm not able to submit the job anymore.
However, if I don't use the --pyArchives option and manually transfer the
schema file to a location on the UDF node, the job gets submitted and works
as expected.

Any reason why this might happen?

Thanks,
Sumeet


On Thu, Jun 10, 2021 at 11:09 PM Roman Khachatryan  wrote:

> Hi,
>
> I think the second option is what you need. The documentation says
> only zip format is supported.
> Alternatively, you could upload the files to S3 or other DFS and
> access from TMs and re-upload when needed.
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/python/dependency_management/#archives
>
> Regards,
> Roman
>
> On Wed, Jun 9, 2021 at 3:57 PM Sumeet Malhotra
>  wrote:
> >
> > Hi,
> >
> > I'm using UDTFs in PyFlink, that depend upon a few resource files (JSON
> schema files actually). The path of this file can be passed into the UDTF,
> but essentially this path needs to exist on the Task Manager node where the
> task executes. What's the best way to upload these resource files? As of
> now, my custom Flink image creates a fixed path with the required resource
> files, but I'd like it to be run time configurable.
> >
> > There are 2 APIs available to load files when submitting a PyFlink job...
> >
> > stream_execution_environment.add_python_file() - Recommended to upload
> files (.py etc) but doesn't let me configure the final path on the target
> node. The files are added to PYTHONPATH, but it needs the UDTF function to
> lookup for this file. I'd like to pass the file location into the UDTF
> instead.
> >
> > stream_execution_environment.add_python_archive() - Appears to be more
> generic, in the sense that it allows a target directory to be specified.
> The documentation doesn't say anything about the contents of the archive,
> so I'm guessing it could be any type of file. Is this what is needed for my
> use case?
> >
> > Or is there any other recommended way to upload non-Python
> dependencies/resources?
> >
> > Thanks in advance,
> > Sumeet
>


Re: PyFlink: Upload resource files to Flink cluster

2021-06-10 Thread Roman Khachatryan
Hi,

I think the second option is what you need. The documentation says
only zip format is supported.
Alternatively, you could upload the files to S3 or other DFS and
access from TMs and re-upload when needed.

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/python/dependency_management/#archives

Regards,
Roman

On Wed, Jun 9, 2021 at 3:57 PM Sumeet Malhotra
 wrote:
>
> Hi,
>
> I'm using UDTFs in PyFlink, that depend upon a few resource files (JSON 
> schema files actually). The path of this file can be passed into the UDTF, 
> but essentially this path needs to exist on the Task Manager node where the 
> task executes. What's the best way to upload these resource files? As of now, 
> my custom Flink image creates a fixed path with the required resource files, 
> but I'd like it to be run time configurable.
>
> There are 2 APIs available to load files when submitting a PyFlink job...
>
> stream_execution_environment.add_python_file() - Recommended to upload files 
> (.py etc) but doesn't let me configure the final path on the target node. The 
> files are added to PYTHONPATH, but it needs the UDTF function to lookup for 
> this file. I'd like to pass the file location into the UDTF instead.
>
> stream_execution_environment.add_python_archive() - Appears to be more 
> generic, in the sense that it allows a target directory to be specified. The 
> documentation doesn't say anything about the contents of the archive, so I'm 
> guessing it could be any type of file. Is this what is needed for my use case?
>
> Or is there any other recommended way to upload non-Python 
> dependencies/resources?
>
> Thanks in advance,
> Sumeet


PyFlink: Upload resource files to Flink cluster

2021-06-09 Thread Sumeet Malhotra
Hi,

I'm using UDTFs in PyFlink, that depend upon a few resource files (JSON
schema files actually). The path of this file can be passed into the UDTF,
but essentially this path needs to exist on the Task Manager node where the
task executes. What's the best way to upload these resource files? As of
now, my custom Flink image creates a fixed path with the required resource
files, but I'd like it to be run time configurable.

There are 2 APIs available to load files when submitting a PyFlink job...

*stream_execution_environment.add_python_file()* - Recommended to upload
files (.py etc) but doesn't let me configure the final path on the target
node. The files are added to PYTHONPATH, but it needs the UDTF function to
lookup for this file. I'd like to pass the file location into the UDTF
instead.

*stream_execution_environment.add_python_archive()* - Appears to be more
generic, in the sense that it allows a target directory to be specified.
The documentation doesn't say anything about the contents of the archive,
so I'm guessing it could be any type of file. Is this what is needed for my
use case?

Or is there any other recommended way to upload non-Python
dependencies/resources?

Thanks in advance,
Sumeet