[jira] [Work logged] (BEAM-8372) Allow submission of Flink UberJar directly to flink cluster.

2019-10-28 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8372?focusedWorklogId=334916=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-334916
 ]

ASF GitHub Bot logged work on BEAM-8372:


Author: ASF GitHub Bot
Created on: 28/Oct/19 11:55
Start Date: 28/Oct/19 11:55
Worklog Time Spent: 10m 
  Work Description: mxm commented on pull request #9844: [BEAM-8372] 
Support flink_master parameter in job server
URL: https://github.com/apache/beam/pull/9844
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 334916)
Time Spent: 8.5h  (was: 8h 20m)

> Allow submission of Flink UberJar directly to flink cluster.
> 
>
> Key: BEAM-8372
> URL: https://issues.apache.org/jira/browse/BEAM-8372
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-core
>Reporter: Robert Bradshaw
>Assignee: Robert Bradshaw
>Priority: Major
>  Time Spent: 8.5h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8372) Allow submission of Flink UberJar directly to flink cluster.

2019-10-28 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8372?focusedWorklogId=334913=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-334913
 ]

ASF GitHub Bot logged work on BEAM-8372:


Author: ASF GitHub Bot
Created on: 28/Oct/19 11:49
Start Date: 28/Oct/19 11:49
Worklog Time Spent: 10m 
  Work Description: mxm commented on issue #9844: [BEAM-8372] Support both 
flink_master and flink_master_url parameter
URL: https://github.com/apache/beam/pull/9844#issuecomment-546911267
 
 
   Let's do the following: 
   
   1. Unify the two parameters `flink_master` and `flink_master_url` as fixed 
here for the job server and previously by #9803 .
   2. Make the master address a url. Add `http://` to the `flink_master` in 
Python if no scheme is specified. Similarly, remove any `http://` in Java, 
since the Java rest client does not expect a scheme.
   3. Deprecate the `[auto]` and `[local]` property, it should be sufficient to 
replace them with either an address string or an empty string. The empty string 
would either mean local execution or, in the context of the Flink CLI tool, 
loading the master address from the config.
   
   (2) and (3) should be follow-ups. I'll post a summary to the list.
   
   > If flink_master is the option, and has been for a long time, then we 
should just use that.
   
   +1
   
   > As for adapting #9775, REST does require specification of a protocol. 
Currently if we just pass --runner=FlinkRunner --flink_master=localhost:8081 we 
get
   
   Oh I see. I wasn't aware that we need to pass `http://` due to requests. In 
that regard the `_url` parameter makes sense. Still better to settle for a 
single parameter.
   
   > The use case here is a Flink REST client. In that context there is no 
Flink config, the REST API is fully described here: 
https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/rest_api.html
   
   This is not true. The Flink Rest client loads the config in order to support 
uses cases like SSL encryption (i.e. loading the trust store, initializing the 
ssl connection, then sending http requests over it). However, the code in 
Python's `FlinkRunner` attempts to directly communicate with the Flink master 
via the `requests` library.
   
   >I don't see why flink_master has to serve as endpoint URL necessarily. Why 
can there not be a separate property for just for the REST client?
   
   Why should there be multiple options for the same thing? `flink_master` is 
already the Rest endpoint address. No need to have another `flink_master_url` 
option. We can add URL support and probably it is fair to add `http://` in case 
no URL has been supplied.
   
   >I'm not sure what you mean by "a job run by Flink CLI." What does that mean
   for a Python job? (I was trying to read the docs and code for [auto] but
   wasn't able to figure it out.)
   
   I mean running a jar directly with the Flink command-line tool, i.e. 
`bin/flink`. This is the defacto standard way to run Flink jobs. It does not 
apply here but we want to make sure that generated Jars do not break when 
submitted through the CLI. The `[auto]` mode basically says, either (1) figure 
out the cluster address from the context (`bin/flink` reads it from a config 
file), or (2) execute locally if there is no such context or the jar is run 
directly.
   
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 334913)
Time Spent: 8h 20m  (was: 8h 10m)

> Allow submission of Flink UberJar directly to flink cluster.
> 
>
> Key: BEAM-8372
> URL: https://issues.apache.org/jira/browse/BEAM-8372
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-core
>Reporter: Robert Bradshaw
>Assignee: Robert Bradshaw
>Priority: Major
>  Time Spent: 8h 20m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8372) Allow submission of Flink UberJar directly to flink cluster.

2019-10-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8372?focusedWorklogId=334612=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-334612
 ]

ASF GitHub Bot logged work on BEAM-8372:


Author: ASF GitHub Bot
Created on: 27/Oct/19 06:31
Start Date: 27/Oct/19 06:31
Worklog Time Spent: 10m 
  Work Description: robertwb commented on issue #9844: [BEAM-8372] Support 
both flink_master and flink_master_url parameter
URL: https://github.com/apache/beam/pull/9844#issuecomment-54058
 
 
   I'm not sure what you mean by "a job run by Flink CLI." What does that mean
   for a Python job? (I was trying to read the docs and code for [auto] but
   wasn't able to figure it out.)
   
   As for re-using flink master for the REST client, this is to have the same
   behavior as when using the automatically started job server (e.g. for py <
   3.6).
   
   +1 to bringing this to the list. It's be good to just enumerate all the
   modes we have and how they should work.
   
   On Sat, Oct 26, 2019, 3:04 PM Thomas Weise  wrote:
   
   > [auto] is the correct default value, it will result in local execution
   > except when a job is run by Flink CLI or REST API. The use case here is a
   > Flink REST client. In that context there is no Flink config, the REST API
   > is fully described here:
   > 
https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/rest_api.html
   >
   > I don't see why flink_master has to serve as endpoint URL necessarily.
   > Why can there not be a separate property for just for the REST client?
   >
   > It would be good to separate the more general discussion from this PR and
   > surface it on the mailing list. That new job submission mode and related
   > changes need to become more visible to others interested in Flink.
   >
   > —
   > You are receiving this because your review was requested.
   > Reply to this email directly, view it on GitHub
   > 
,
   > or unsubscribe
   > 

   > .
   >
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 334612)
Time Spent: 8h 10m  (was: 8h)

> Allow submission of Flink UberJar directly to flink cluster.
> 
>
> Key: BEAM-8372
> URL: https://issues.apache.org/jira/browse/BEAM-8372
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-core
>Reporter: Robert Bradshaw
>Assignee: Robert Bradshaw
>Priority: Major
>  Time Spent: 8h 10m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8372) Allow submission of Flink UberJar directly to flink cluster.

2019-10-26 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8372?focusedWorklogId=334573=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-334573
 ]

ASF GitHub Bot logged work on BEAM-8372:


Author: ASF GitHub Bot
Created on: 26/Oct/19 22:04
Start Date: 26/Oct/19 22:04
Worklog Time Spent: 10m 
  Work Description: tweise commented on issue #9844: [BEAM-8372] Support 
both flink_master and flink_master_url parameter
URL: https://github.com/apache/beam/pull/9844#issuecomment-546643222
 
 
   `[auto]` is the correct default value, it will result in local execution 
except when a job is run by Flink CLI or REST API. The use case here is a Flink 
REST client. In that context there is no Flink config, the REST API is fully 
described here: 
https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/rest_api.html
   
   I don't see why `flink_master` has to serve as endpoint URL necessarily. Why 
can there not be a separate property for just for the REST client?
   
   It would be good to separate the more general discussion from this PR and 
surface it on the mailing list. That new job submission mode and related 
changes need to become more visible to others interested in Flink.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 334573)
Time Spent: 8h  (was: 7h 50m)

> Allow submission of Flink UberJar directly to flink cluster.
> 
>
> Key: BEAM-8372
> URL: https://issues.apache.org/jira/browse/BEAM-8372
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-core
>Reporter: Robert Bradshaw
>Assignee: Robert Bradshaw
>Priority: Major
>  Time Spent: 8h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8372) Allow submission of Flink UberJar directly to flink cluster.

2019-10-26 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8372?focusedWorklogId=334545=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-334545
 ]

ASF GitHub Bot logged work on BEAM-8372:


Author: ASF GitHub Bot
Created on: 26/Oct/19 16:09
Start Date: 26/Oct/19 16:09
Worklog Time Spent: 10m 
  Work Description: robertwb commented on issue #9844: [BEAM-8372] Support 
both flink_master and flink_master_url parameter
URL: https://github.com/apache/beam/pull/9844#issuecomment-546616164
 
 
   If `flink_master` is the option, and has been for a long time, then we 
should just use that. 
   
   As for adapting #9775, REST does require specification of a protocol. 
Currently if we just pass `--runner=FlinkRunner --flink_master=localhost:8081` 
we get
   
   ```
   File 
"/Users/robertwb/Work/beam/incubator-beam/sdks/python/apache_beam/runners/portability/portable_runner.py",
 line 315, in run_pipeline
   timeout=portable_options.job_server_timeout)
 File 
"/Users/robertwb/Work/beam/incubator-beam/sdks/python/apache_beam/runners/portability/abstract_job_service.py",
 line 53, in Prepare
   request.pipeline_options)
 File 
"/Users/robertwb/Work/beam/incubator-beam/sdks/python/apache_beam/runners/portability/flink_uber_jar_job_server.py",
 line 77, in create_beam_job
   self.executable_jar(),
 File 
"/Users/robertwb/Work/beam/incubator-beam/sdks/python/apache_beam/runners/portability/flink_uber_jar_job_server.py",
 line 66, in executable_jar
   'runners:flink:%s:job-server:shadowJar' % self.flink_version()))
 File 
"/Users/robertwb/Work/beam/incubator-beam/sdks/python/apache_beam/runners/portability/flink_uber_jar_job_server.py",
 line 70, in flink_version
   '%s/v1/config' % self._master_url).json()['flink-version']
 File 
"/Users/robertwb/Work/beam/venv3/lib/python3.6/site-packages/requests-2.22.0-py3.6.egg/requests/api.py",
 line 75, in get
   return request('get', url, params=params, **kwargs)
 File 
"/Users/robertwb/Work/beam/venv3/lib/python3.6/site-packages/requests-2.22.0-py3.6.egg/requests/api.py",
 line 60, in request
   return session.request(method=method, url=url, **kwargs)
 File 
"/Users/robertwb/Work/beam/venv3/lib/python3.6/site-packages/requests-2.22.0-py3.6.egg/requests/sessions.py",
 line 533, in request
   resp = self.send(prep, **send_kwargs)
 File 
"/Users/robertwb/Work/beam/venv3/lib/python3.6/site-packages/requests-2.22.0-py3.6.egg/requests/sessions.py",
 line 640, in send
   adapter = self.get_adapter(url=request.url)
 File 
"/Users/robertwb/Work/beam/venv3/lib/python3.6/site-packages/requests-2.22.0-py3.6.egg/requests/sessions.py",
 line 731, in get_adapter
   raise InvalidSchema("No connection adapters were found for '%s'" % url)
   requests.exceptions.InvalidSchema: No connection adapters were found for 
'localhost:8081/v1/config'
   ```
   
   because we're using requests to attempt to communicate with the flink 
master. (Note that in this case there is no `FlinkJobServerDriver`, etc.) We 
could automatically append `http` or `https` if we could figure out which one, 
but it seems strange design to have to read a config file when the address is 
passed explicitly. 
   
   Taking a step back, here's what I think the ideal experience should be. If 
one passes just `--runner=FlinkRunner` then it should start up a local runner 
and run against that, automatically enabling `LOOPBACK` mode if one hasn't been 
explicitly chosen for ease of use. If one passes `--runner=FlinkRunner 
--flink_master=address` it would submit the job to the cluster, without having 
to even download jars and invoke java. It's unclear how `[auto]` would get this 
behavior (or indeed what the behavior of `[auto]` even is). 
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 334545)
Time Spent: 7h 50m  (was: 7h 40m)

> Allow submission of Flink UberJar directly to flink cluster.
> 
>
> Key: BEAM-8372
> URL: https://issues.apache.org/jira/browse/BEAM-8372
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-core
>Reporter: Robert Bradshaw
>Assignee: Robert Bradshaw
>Priority: Major
>  Time Spent: 7h 50m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8372) Allow submission of Flink UberJar directly to flink cluster.

2019-10-26 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8372?focusedWorklogId=334513=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-334513
 ]

ASF GitHub Bot logged work on BEAM-8372:


Author: ASF GitHub Bot
Created on: 26/Oct/19 10:18
Start Date: 26/Oct/19 10:18
Worklog Time Spent: 10m 
  Work Description: mxm commented on pull request #9844: [BEAM-8372] 
Support both flink_master and flink_master_url parameter
URL: https://github.com/apache/beam/pull/9844#discussion_r339295003
 
 

 ##
 File path: sdks/python/apache_beam/runners/portability/flink_runner.py
 ##
 @@ -43,7 +43,13 @@ def default_job_server(self, options):
 class FlinkRunnerOptions(pipeline_options.PipelineOptions):
   @classmethod
   def _add_argparse_args(cls, parser):
-parser.add_argument('--flink_master', default='[local]')
+parser.add_argument('--flink_master',
+default='[auto]',
 
 Review comment:
   This looks like it is for testing? That looks ok.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 334513)
Time Spent: 7.5h  (was: 7h 20m)

> Allow submission of Flink UberJar directly to flink cluster.
> 
>
> Key: BEAM-8372
> URL: https://issues.apache.org/jira/browse/BEAM-8372
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-core
>Reporter: Robert Bradshaw
>Assignee: Robert Bradshaw
>Priority: Major
>  Time Spent: 7.5h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8372) Allow submission of Flink UberJar directly to flink cluster.

2019-10-26 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8372?focusedWorklogId=334514=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-334514
 ]

ASF GitHub Bot logged work on BEAM-8372:


Author: ASF GitHub Bot
Created on: 26/Oct/19 10:18
Start Date: 26/Oct/19 10:18
Worklog Time Spent: 10m 
  Work Description: mxm commented on pull request #9844: [BEAM-8372] 
Support both flink_master and flink_master_url parameter
URL: https://github.com/apache/beam/pull/9844#discussion_r339295003
 
 

 ##
 File path: sdks/python/apache_beam/runners/portability/flink_runner.py
 ##
 @@ -43,7 +43,13 @@ def default_job_server(self, options):
 class FlinkRunnerOptions(pipeline_options.PipelineOptions):
   @classmethod
   def _add_argparse_args(cls, parser):
-parser.add_argument('--flink_master', default='[local]')
+parser.add_argument('--flink_master',
+default='[auto]',
 
 Review comment:
   This looks like it is for testing? That is ok.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 334514)
Time Spent: 7h 40m  (was: 7.5h)

> Allow submission of Flink UberJar directly to flink cluster.
> 
>
> Key: BEAM-8372
> URL: https://issues.apache.org/jira/browse/BEAM-8372
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-core
>Reporter: Robert Bradshaw
>Assignee: Robert Bradshaw
>Priority: Major
>  Time Spent: 7h 40m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8372) Allow submission of Flink UberJar directly to flink cluster.

2019-10-26 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8372?focusedWorklogId=334512=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-334512
 ]

ASF GitHub Bot logged work on BEAM-8372:


Author: ASF GitHub Bot
Created on: 26/Oct/19 10:16
Start Date: 26/Oct/19 10:16
Worklog Time Spent: 10m 
  Work Description: mxm commented on pull request #9844: [BEAM-8372] 
Support both flink_master and flink_master_url parameter
URL: https://github.com/apache/beam/pull/9844#discussion_r339294967
 
 

 ##
 File path: sdks/python/apache_beam/runners/portability/flink_runner.py
 ##
 @@ -43,7 +43,13 @@ def default_job_server(self, options):
 class FlinkRunnerOptions(pipeline_options.PipelineOptions):
   @classmethod
   def _add_argparse_args(cls, parser):
-parser.add_argument('--flink_master', default='[local]')
+parser.add_argument('--flink_master',
+default='[auto]',
+help='Flink master address (host:port) to submit the'
+ ' job against. Use "[local]" to start a local'
+ ' cluster for the execution. Use "[auto]" if you'
+ ' plan to either execute locally or submit 
through'
 
 Review comment:
   I'm coming from the Java SDK, where we use `[auto]` as the default. I think 
it makes sense because `[local]` has unexpected consequences when you use it 
with the Flink CLI (now that we have executable jars that is possible). It will 
force a local execution instead of submitting the job to the cluster. We could 
even think about removing the local mode, since it is covered by the auto mode 
as well.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 334512)
Time Spent: 7h 20m  (was: 7h 10m)

> Allow submission of Flink UberJar directly to flink cluster.
> 
>
> Key: BEAM-8372
> URL: https://issues.apache.org/jira/browse/BEAM-8372
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-core
>Reporter: Robert Bradshaw
>Assignee: Robert Bradshaw
>Priority: Major
>  Time Spent: 7h 20m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8372) Allow submission of Flink UberJar directly to flink cluster.

2019-10-26 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8372?focusedWorklogId=334510=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-334510
 ]

ASF GitHub Bot logged work on BEAM-8372:


Author: ASF GitHub Bot
Created on: 26/Oct/19 10:07
Start Date: 26/Oct/19 10:07
Worklog Time Spent: 10m 
  Work Description: mxm commented on issue #9844: [BEAM-8372] Support both 
flink_master and flink_master_url parameter
URL: https://github.com/apache/beam/pull/9844#issuecomment-546589217
 
 
   >I guess it's `--flink_master` that's new (in Python's FlinkRunner).
   
   This option since the inception of the Flink Runner was `flinkMaster` (Java) 
or `flink_master` (Python): 
https://github.com/apache/beam/blob/8972e5e10050b67a4a9a0027c09d2d2eb00b45ed/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java#L66
   
   
https://github.com/apache/beam/blob/50528d024b83a5083afa24bd7c3e3694a6347c15/sdks/python/apache_beam/options/pipeline_options.py#L669
   
   In the light of URLs not even being supported, it also makes sense to keep 
that name instead of renaming it to `flink_master_url`.
   
   > But if we don't have the http than we need to change the 
https://github.com/apache/beam/pull/9775 to add it. 
   
   Could there be a misunderstanding? Rest does not mean that we need to 
prepend `http://`. The format is `host:port` where host is a hostname or a 
IPv4/IPv6 address, and port is the port number, i.e. of Flink's Rest service 
(typically running at port 8081). Please correct me, but I don't see that we 
need to change #9775.
   
   
   

   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 334510)
Time Spent: 7h 10m  (was: 7h)

> Allow submission of Flink UberJar directly to flink cluster.
> 
>
> Key: BEAM-8372
> URL: https://issues.apache.org/jira/browse/BEAM-8372
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-core
>Reporter: Robert Bradshaw
>Assignee: Robert Bradshaw
>Priority: Major
>  Time Spent: 7h 10m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8372) Allow submission of Flink UberJar directly to flink cluster.

2019-10-25 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8372?focusedWorklogId=334495=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-334495
 ]

ASF GitHub Bot logged work on BEAM-8372:


Author: ASF GitHub Bot
Created on: 26/Oct/19 05:24
Start Date: 26/Oct/19 05:24
Worklog Time Spent: 10m 
  Work Description: robertwb commented on pull request #9844: [BEAM-8372] 
Support both flink_master and flink_master_url parameter
URL: https://github.com/apache/beam/pull/9844#discussion_r339286935
 
 

 ##
 File path: sdks/python/apache_beam/runners/portability/flink_runner.py
 ##
 @@ -43,7 +43,13 @@ def default_job_server(self, options):
 class FlinkRunnerOptions(pipeline_options.PipelineOptions):
   @classmethod
   def _add_argparse_args(cls, parser):
-parser.add_argument('--flink_master', default='[local]')
+parser.add_argument('--flink_master',
+default='[auto]',
+help='Flink master address (host:port) to submit the'
+ ' job against. Use "[local]" to start a local'
+ ' cluster for the execution. Use "[auto]" if you'
+ ' plan to either execute locally or submit 
through'
 
 Review comment:
   How is this choice made if `[auto]` is passed? Should it really be the 
default? 
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 334495)
Time Spent: 7h  (was: 6h 50m)

> Allow submission of Flink UberJar directly to flink cluster.
> 
>
> Key: BEAM-8372
> URL: https://issues.apache.org/jira/browse/BEAM-8372
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-core
>Reporter: Robert Bradshaw
>Assignee: Robert Bradshaw
>Priority: Major
>  Time Spent: 7h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8372) Allow submission of Flink UberJar directly to flink cluster.

2019-10-25 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8372?focusedWorklogId=334493=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-334493
 ]

ASF GitHub Bot logged work on BEAM-8372:


Author: ASF GitHub Bot
Created on: 26/Oct/19 05:24
Start Date: 26/Oct/19 05:24
Worklog Time Spent: 10m 
  Work Description: robertwb commented on issue #9844: [BEAM-8372] Support 
both flink_master and flink_master_url parameter
URL: https://github.com/apache/beam/pull/9844#issuecomment-546571130
 
 
   I guess it's `--flink_master` that's new (in Python's FlinkRunner). But if 
we don't have the http than we need to change the 
https://github.com/apache/beam/pull/9775 to add it. 
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 334493)
Time Spent: 6h 50m  (was: 6h 40m)

> Allow submission of Flink UberJar directly to flink cluster.
> 
>
> Key: BEAM-8372
> URL: https://issues.apache.org/jira/browse/BEAM-8372
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-core
>Reporter: Robert Bradshaw
>Assignee: Robert Bradshaw
>Priority: Major
>  Time Spent: 6h 50m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8372) Allow submission of Flink UberJar directly to flink cluster.

2019-10-25 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8372?focusedWorklogId=334494=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-334494
 ]

ASF GitHub Bot logged work on BEAM-8372:


Author: ASF GitHub Bot
Created on: 26/Oct/19 05:24
Start Date: 26/Oct/19 05:24
Worklog Time Spent: 10m 
  Work Description: robertwb commented on pull request #9844: [BEAM-8372] 
Support both flink_master and flink_master_url parameter
URL: https://github.com/apache/beam/pull/9844#discussion_r339286838
 
 

 ##
 File path: sdks/python/apache_beam/runners/portability/flink_runner.py
 ##
 @@ -43,7 +43,13 @@ def default_job_server(self, options):
 class FlinkRunnerOptions(pipeline_options.PipelineOptions):
   @classmethod
   def _add_argparse_args(cls, parser):
-parser.add_argument('--flink_master', default='[local]')
+parser.add_argument('--flink_master',
+default='[auto]',
 
 Review comment:
   `[local]` is also referenced at 
https://github.com/apache/beam/blob/release-2.17.0/sdks/python/apache_beam/runners/portability/flink_runner.py#L36
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 334494)
Time Spent: 7h  (was: 6h 50m)

> Allow submission of Flink UberJar directly to flink cluster.
> 
>
> Key: BEAM-8372
> URL: https://issues.apache.org/jira/browse/BEAM-8372
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-core
>Reporter: Robert Bradshaw
>Assignee: Robert Bradshaw
>Priority: Major
>  Time Spent: 7h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8372) Allow submission of Flink UberJar directly to flink cluster.

2019-10-25 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8372?focusedWorklogId=334357=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-334357
 ]

ASF GitHub Bot logged work on BEAM-8372:


Author: ASF GitHub Bot
Created on: 25/Oct/19 20:38
Start Date: 25/Oct/19 20:38
Worklog Time Spent: 10m 
  Work Description: mxm commented on issue #9844: [BEAM-8372] Support both 
flink_master and flink_master_url parameter
URL: https://github.com/apache/beam/pull/9844#issuecomment-546504944
 
 
   >`flink_master_url` is very new, I think we can consolidate on one option 
for now. 
   
   The instance in `FlinkJobServerDriver` has been there for over a year. It 
might be sufficient to convert it to an alias only there.
   
   >I think it would be good to get this resolved in the current release, if 
possible.
   
   +1
   
   >So, if I understand correctly, if `flink_master` is just host:port, there 
is no way to (simultaneously) submit a job to two different clusters (one with 
ssl, one without) without editing the config file in the middle? This seems 
quite unfortunate. Also, the code here does not read the config file.
   
   The `FlinkJobServerDriver` reads the config file before it submits the Flink 
job to the cluster. So the Python code does not have to worry about it. Yes, 
there is no way to submit against two clusters without changing the 
configuration file. You can start a second job server though, which uses a 
different configuration file.
   
   >What if, instead, we made the `http[s]://` part optional, where it would be 
stripped in Java and assumed to be just `http://` in Python if not specified? 
(Or it could try https and then fall back to http on failure.)
   
   That would be a new feature. Flink does not accept any url schemas like 
`http://`. It just expects host:port. The config file determines whether the 
Rest service running at the port uses SSL or not.
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 334357)
Time Spent: 6h 40m  (was: 6.5h)

> Allow submission of Flink UberJar directly to flink cluster.
> 
>
> Key: BEAM-8372
> URL: https://issues.apache.org/jira/browse/BEAM-8372
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-core
>Reporter: Robert Bradshaw
>Assignee: Robert Bradshaw
>Priority: Major
>  Time Spent: 6h 40m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8372) Allow submission of Flink UberJar directly to flink cluster.

2019-10-25 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8372?focusedWorklogId=334328=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-334328
 ]

ASF GitHub Bot logged work on BEAM-8372:


Author: ASF GitHub Bot
Created on: 25/Oct/19 19:01
Start Date: 25/Oct/19 19:01
Worklog Time Spent: 10m 
  Work Description: robertwb commented on issue #9844: [BEAM-8372] Support 
both flink_master and flink_master_url parameter
URL: https://github.com/apache/beam/pull/9844#issuecomment-546473901
 
 
   I think it would be good to get this resolved in the current release, if 
possible. 
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 334328)
Time Spent: 6.5h  (was: 6h 20m)

> Allow submission of Flink UberJar directly to flink cluster.
> 
>
> Key: BEAM-8372
> URL: https://issues.apache.org/jira/browse/BEAM-8372
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-core
>Reporter: Robert Bradshaw
>Assignee: Robert Bradshaw
>Priority: Major
>  Time Spent: 6.5h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8372) Allow submission of Flink UberJar directly to flink cluster.

2019-10-25 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8372?focusedWorklogId=334327=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-334327
 ]

ASF GitHub Bot logged work on BEAM-8372:


Author: ASF GitHub Bot
Created on: 25/Oct/19 19:01
Start Date: 25/Oct/19 19:01
Worklog Time Spent: 10m 
  Work Description: robertwb commented on issue #9844: [BEAM-8372] Support 
both flink_master and flink_master_url parameter
URL: https://github.com/apache/beam/pull/9844#issuecomment-546473642
 
 
   `flink_master_url` is very new, I think we can consolidate on one option for 
now. 
   
   So, if I understand correctly, if `flink_master` is just host:port, there is 
no way to (simultaneously) submit a job to two different clusters (one with 
ssl, one without) without editing the config file in the middle? This seems 
quite unfortunate. Also, the code here does not read the config file. 
   
   What if, instead, we made the `http[s]://` part optional, where it would be 
stripped in Java and assumed to be just `http://` in Python if not specified? 
(Or it could try https and then fall back to http on failure.)
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 334327)
Time Spent: 6h 20m  (was: 6h 10m)

> Allow submission of Flink UberJar directly to flink cluster.
> 
>
> Key: BEAM-8372
> URL: https://issues.apache.org/jira/browse/BEAM-8372
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-core
>Reporter: Robert Bradshaw
>Assignee: Robert Bradshaw
>Priority: Major
>  Time Spent: 6h 20m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8372) Allow submission of Flink UberJar directly to flink cluster.

2019-10-22 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8372?focusedWorklogId=332025=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-332025
 ]

ASF GitHub Bot logged work on BEAM-8372:


Author: ASF GitHub Bot
Created on: 22/Oct/19 14:21
Start Date: 22/Oct/19 14:21
Worklog Time Spent: 10m 
  Work Description: mxm commented on issue #9844: [BEAM-8372] Support both 
flink_master and flink_master_url parameter
URL: https://github.com/apache/beam/pull/9844#issuecomment-544986865
 
 
   The address was never meant to be anything different from `host:port`. 
Somehow the perception of an URL slipped in through time. I tried to clarify 
that in the parameter description.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 332025)
Time Spent: 6h 10m  (was: 6h)

> Allow submission of Flink UberJar directly to flink cluster.
> 
>
> Key: BEAM-8372
> URL: https://issues.apache.org/jira/browse/BEAM-8372
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-core
>Reporter: Robert Bradshaw
>Assignee: Robert Bradshaw
>Priority: Major
>  Time Spent: 6h 10m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8372) Allow submission of Flink UberJar directly to flink cluster.

2019-10-21 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8372?focusedWorklogId=331664=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-331664
 ]

ASF GitHub Bot logged work on BEAM-8372:


Author: ASF GitHub Bot
Created on: 21/Oct/19 21:44
Start Date: 21/Oct/19 21:44
Worklog Time Spent: 10m 
  Work Description: robertwb commented on issue #9844: [BEAM-8372] Support 
both flink_master and flink_master_url parameter
URL: https://github.com/apache/beam/pull/9844#issuecomment-544721281
 
 
   There's also the question of whether this should include the `http[s]://` 
portion. (Especially the 's' part is potentially ambiguous.)
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 331664)
Time Spent: 6h  (was: 5h 50m)

> Allow submission of Flink UberJar directly to flink cluster.
> 
>
> Key: BEAM-8372
> URL: https://issues.apache.org/jira/browse/BEAM-8372
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-core
>Reporter: Robert Bradshaw
>Assignee: Robert Bradshaw
>Priority: Major
>  Time Spent: 6h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8372) Allow submission of Flink UberJar directly to flink cluster.

2019-10-21 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8372?focusedWorklogId=331414=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-331414
 ]

ASF GitHub Bot logged work on BEAM-8372:


Author: ASF GitHub Bot
Created on: 21/Oct/19 13:30
Start Date: 21/Oct/19 13:30
Worklog Time Spent: 10m 
  Work Description: mxm commented on pull request #9844: [BEAM-8372] 
Support both flink_master and flink_master_url parameter
URL: https://github.com/apache/beam/pull/9844#discussion_r337017786
 
 

 ##
 File path: sdks/python/apache_beam/runners/portability/flink_runner.py
 ##
 @@ -43,7 +43,8 @@ def default_job_server(self, options):
 class FlinkRunnerOptions(pipeline_options.PipelineOptions):
   @classmethod
   def _add_argparse_args(cls, parser):
-parser.add_argument('--flink_master', default='[local]')
+parser.add_argument('--flink_master', '--flink_master_url',
+default='[local]')
 
 Review comment:
   Good point! We should.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 331414)
Time Spent: 5h 50m  (was: 5h 40m)

> Allow submission of Flink UberJar directly to flink cluster.
> 
>
> Key: BEAM-8372
> URL: https://issues.apache.org/jira/browse/BEAM-8372
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-core
>Reporter: Robert Bradshaw
>Assignee: Robert Bradshaw
>Priority: Major
>  Time Spent: 5h 50m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8372) Allow submission of Flink UberJar directly to flink cluster.

2019-10-21 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8372?focusedWorklogId=331370=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-331370
 ]

ASF GitHub Bot logged work on BEAM-8372:


Author: ASF GitHub Bot
Created on: 21/Oct/19 11:46
Start Date: 21/Oct/19 11:46
Worklog Time Spent: 10m 
  Work Description: ibzib commented on pull request #9844: [BEAM-8372] 
Support both flink_master and flink_master_url parameter
URL: https://github.com/apache/beam/pull/9844#discussion_r336971171
 
 

 ##
 File path: sdks/python/apache_beam/runners/portability/flink_runner.py
 ##
 @@ -43,7 +43,8 @@ def default_job_server(self, options):
 class FlinkRunnerOptions(pipeline_options.PipelineOptions):
   @classmethod
   def _add_argparse_args(cls, parser):
-parser.add_argument('--flink_master', default='[local]')
+parser.add_argument('--flink_master', '--flink_master_url',
+default='[local]')
 
 Review comment:
   The default for Java is `[auto]`. Should we change the Python SDK to match?
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 331370)
Time Spent: 5h 40m  (was: 5.5h)

> Allow submission of Flink UberJar directly to flink cluster.
> 
>
> Key: BEAM-8372
> URL: https://issues.apache.org/jira/browse/BEAM-8372
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-core
>Reporter: Robert Bradshaw
>Assignee: Robert Bradshaw
>Priority: Major
>  Time Spent: 5h 40m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8372) Allow submission of Flink UberJar directly to flink cluster.

2019-10-21 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8372?focusedWorklogId=331368=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-331368
 ]

ASF GitHub Bot logged work on BEAM-8372:


Author: ASF GitHub Bot
Created on: 21/Oct/19 11:40
Start Date: 21/Oct/19 11:40
Worklog Time Spent: 10m 
  Work Description: mxm commented on pull request #9844: [BEAM-8372] 
Support both flink_master and flink_master_url parameter
URL: https://github.com/apache/beam/pull/9844
 
 
   The original parameter was named flink_master (defined in 
FlinkPipelineOptions)
   but since there have been inconsistencies. So it's best to support both now.
   
   Post-Commit Tests Status (on master branch)
   

   
   Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
   --- | --- | --- | --- | --- | --- | --- | ---
   Go | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/)
 | --- | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/)
 | --- | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
   Java | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)
   Python | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/)
 | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/)
 | [![Build 

[jira] [Work logged] (BEAM-8372) Allow submission of Flink UberJar directly to flink cluster.

2019-10-21 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8372?focusedWorklogId=331369=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-331369
 ]

ASF GitHub Bot logged work on BEAM-8372:


Author: ASF GitHub Bot
Created on: 21/Oct/19 11:40
Start Date: 21/Oct/19 11:40
Worklog Time Spent: 10m 
  Work Description: mxm commented on pull request #9803: [BEAM-8372] 
Follow-up to Flink UberJar submission.
URL: https://github.com/apache/beam/pull/9803#discussion_r336969270
 
 

 ##
 File path: sdks/python/apache_beam/runners/portability/flink_runner.py
 ##
 @@ -32,18 +32,18 @@
 
 class FlinkRunner(portable_runner.PortableRunner):
   def default_job_server(self, options):
-flink_master_url = options.view_as(FlinkRunnerOptions).flink_master_url
-if flink_master_url == '[local]' or sys.version_info < (3, 6):
+flink_master = options.view_as(FlinkRunnerOptions).flink_master
 
 Review comment:
   https://github.com/apache/beam/pull/9844
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 331369)
Time Spent: 5.5h  (was: 5h 20m)

> Allow submission of Flink UberJar directly to flink cluster.
> 
>
> Key: BEAM-8372
> URL: https://issues.apache.org/jira/browse/BEAM-8372
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-core
>Reporter: Robert Bradshaw
>Assignee: Robert Bradshaw
>Priority: Major
>  Time Spent: 5.5h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8372) Allow submission of Flink UberJar directly to flink cluster.

2019-10-18 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8372?focusedWorklogId=330799=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-330799
 ]

ASF GitHub Bot logged work on BEAM-8372:


Author: ASF GitHub Bot
Created on: 18/Oct/19 21:57
Start Date: 18/Oct/19 21:57
Worklog Time Spent: 10m 
  Work Description: robertwb commented on pull request #9803: [BEAM-8372] 
Follow-up to Flink UberJar submission.
URL: https://github.com/apache/beam/pull/9803#discussion_r336690127
 
 

 ##
 File path: sdks/python/apache_beam/runners/portability/flink_runner.py
 ##
 @@ -32,18 +32,18 @@
 
 class FlinkRunner(portable_runner.PortableRunner):
   def default_job_server(self, options):
-flink_master_url = options.view_as(FlinkRunnerOptions).flink_master_url
-if flink_master_url == '[local]' or sys.version_info < (3, 6):
+flink_master = options.view_as(FlinkRunnerOptions).flink_master
 
 Review comment:
   Thanks, let me know when you have a PR. I'd like to get this in by 2.17 so 
we can advertise this as a release thats really easy to use with Flink. 
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 330799)
Time Spent: 5h 10m  (was: 5h)

> Allow submission of Flink UberJar directly to flink cluster.
> 
>
> Key: BEAM-8372
> URL: https://issues.apache.org/jira/browse/BEAM-8372
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-core
>Reporter: Robert Bradshaw
>Assignee: Robert Bradshaw
>Priority: Major
>  Time Spent: 5h 10m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8372) Allow submission of Flink UberJar directly to flink cluster.

2019-10-17 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8372?focusedWorklogId=330083=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-330083
 ]

ASF GitHub Bot logged work on BEAM-8372:


Author: ASF GitHub Bot
Created on: 17/Oct/19 19:29
Start Date: 17/Oct/19 19:29
Worklog Time Spent: 10m 
  Work Description: tvalentyn commented on issue #9775: [BEAM-8372] Job 
server submitting UberJars directly to Flink Runner.
URL: https://github.com/apache/beam/pull/9775#issuecomment-543326484
 
 
   +1, also hit by the flakes 
(https://issues.apache.org/jira/browse/BEAM-8426), didn't notice that the bug 
was already open, thanks, @udim.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 330083)
Time Spent: 5h  (was: 4h 50m)

> Allow submission of Flink UberJar directly to flink cluster.
> 
>
> Key: BEAM-8372
> URL: https://issues.apache.org/jira/browse/BEAM-8372
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-core
>Reporter: Robert Bradshaw
>Assignee: Robert Bradshaw
>Priority: Major
>  Time Spent: 5h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8372) Allow submission of Flink UberJar directly to flink cluster.

2019-10-17 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8372?focusedWorklogId=329966=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-329966
 ]

ASF GitHub Bot logged work on BEAM-8372:


Author: ASF GitHub Bot
Created on: 17/Oct/19 15:49
Start Date: 17/Oct/19 15:49
Worklog Time Spent: 10m 
  Work Description: mxm commented on pull request #9803: [BEAM-8372] 
Follow-up to Flink UberJar submission.
URL: https://github.com/apache/beam/pull/9803
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 329966)
Time Spent: 4h 50m  (was: 4h 40m)

> Allow submission of Flink UberJar directly to flink cluster.
> 
>
> Key: BEAM-8372
> URL: https://issues.apache.org/jira/browse/BEAM-8372
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-core
>Reporter: Robert Bradshaw
>Assignee: Robert Bradshaw
>Priority: Major
>  Time Spent: 4h 50m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8372) Allow submission of Flink UberJar directly to flink cluster.

2019-10-17 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8372?focusedWorklogId=329965=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-329965
 ]

ASF GitHub Bot logged work on BEAM-8372:


Author: ASF GitHub Bot
Created on: 17/Oct/19 15:48
Start Date: 17/Oct/19 15:48
Worklog Time Spent: 10m 
  Work Description: mxm commented on pull request #9803: [BEAM-8372] 
Follow-up to Flink UberJar submission.
URL: https://github.com/apache/beam/pull/9803#discussion_r336086235
 
 

 ##
 File path: sdks/python/apache_beam/runners/portability/flink_runner.py
 ##
 @@ -32,18 +32,18 @@
 
 class FlinkRunner(portable_runner.PortableRunner):
   def default_job_server(self, options):
-flink_master_url = options.view_as(FlinkRunnerOptions).flink_master_url
-if flink_master_url == '[local]' or sys.version_info < (3, 6):
+flink_master = options.view_as(FlinkRunnerOptions).flink_master
 
 Review comment:
   Will do that in a follow-up.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 329965)
Time Spent: 4h 40m  (was: 4.5h)

> Allow submission of Flink UberJar directly to flink cluster.
> 
>
> Key: BEAM-8372
> URL: https://issues.apache.org/jira/browse/BEAM-8372
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-core
>Reporter: Robert Bradshaw
>Assignee: Robert Bradshaw
>Priority: Major
>  Time Spent: 4h 40m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8372) Allow submission of Flink UberJar directly to flink cluster.

2019-10-17 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8372?focusedWorklogId=329961=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-329961
 ]

ASF GitHub Bot logged work on BEAM-8372:


Author: ASF GitHub Bot
Created on: 17/Oct/19 15:46
Start Date: 17/Oct/19 15:46
Worklog Time Spent: 10m 
  Work Description: mxm commented on pull request #9803: [BEAM-8372] 
Follow-up to Flink UberJar submission.
URL: https://github.com/apache/beam/pull/9803#discussion_r336081192
 
 

 ##
 File path: sdks/python/apache_beam/runners/portability/flink_runner.py
 ##
 @@ -32,18 +32,18 @@
 
 class FlinkRunner(portable_runner.PortableRunner):
   def default_job_server(self, options):
-flink_master_url = options.view_as(FlinkRunnerOptions).flink_master_url
-if flink_master_url == '[local]' or sys.version_info < (3, 6):
+flink_master = options.view_as(FlinkRunnerOptions).flink_master
 
 Review comment:
   Actually, I just realized that in Python this is different from Java. There 
a various inconsistencies, also in the job server. Pushing a commit to adjust 
this by allowing both arguments.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 329961)
Time Spent: 4.5h  (was: 4h 20m)

> Allow submission of Flink UberJar directly to flink cluster.
> 
>
> Key: BEAM-8372
> URL: https://issues.apache.org/jira/browse/BEAM-8372
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-core
>Reporter: Robert Bradshaw
>Assignee: Robert Bradshaw
>Priority: Major
>  Time Spent: 4.5h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8372) Allow submission of Flink UberJar directly to flink cluster.

2019-10-16 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8372?focusedWorklogId=329497=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-329497
 ]

ASF GitHub Bot logged work on BEAM-8372:


Author: ASF GitHub Bot
Created on: 17/Oct/19 00:09
Start Date: 17/Oct/19 00:09
Worklog Time Spent: 10m 
  Work Description: udim commented on issue #9775: [BEAM-8372] Job server 
submitting UberJars directly to Flink Runner.
URL: https://github.com/apache/beam/pull/9775#issuecomment-542941178
 
 
   I opened a bug for flakiness in test_concurrent_requests. Perhaps this PR 
caused it?
   https://issues.apache.org/jira/browse/BEAM-8416
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 329497)
Time Spent: 4h 20m  (was: 4h 10m)

> Allow submission of Flink UberJar directly to flink cluster.
> 
>
> Key: BEAM-8372
> URL: https://issues.apache.org/jira/browse/BEAM-8372
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-core
>Reporter: Robert Bradshaw
>Assignee: Robert Bradshaw
>Priority: Major
>  Time Spent: 4h 20m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8372) Allow submission of Flink UberJar directly to flink cluster.

2019-10-16 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8372?focusedWorklogId=329218=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-329218
 ]

ASF GitHub Bot logged work on BEAM-8372:


Author: ASF GitHub Bot
Created on: 16/Oct/19 15:41
Start Date: 16/Oct/19 15:41
Worklog Time Spent: 10m 
  Work Description: mxm commented on issue #9803: [BEAM-8372] Follow-up to 
Flink UberJar submission.
URL: https://github.com/apache/beam/pull/9803#issuecomment-542763924
 
 
   I think there are some issues related to the ZIP tests.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 329218)
Time Spent: 4h 10m  (was: 4h)

> Allow submission of Flink UberJar directly to flink cluster.
> 
>
> Key: BEAM-8372
> URL: https://issues.apache.org/jira/browse/BEAM-8372
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-core
>Reporter: Robert Bradshaw
>Assignee: Robert Bradshaw
>Priority: Major
>  Time Spent: 4h 10m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8372) Allow submission of Flink UberJar directly to flink cluster.

2019-10-16 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8372?focusedWorklogId=329136=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-329136
 ]

ASF GitHub Bot logged work on BEAM-8372:


Author: ASF GitHub Bot
Created on: 16/Oct/19 12:10
Start Date: 16/Oct/19 12:10
Worklog Time Spent: 10m 
  Work Description: ibzib commented on issue #9803: [BEAM-8372] Follow-up 
to Flink UberJar submission.
URL: https://github.com/apache/beam/pull/9803#issuecomment-542669157
 
 
   Run PortableJar_Flink PostCommit
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 329136)
Time Spent: 4h  (was: 3h 50m)

> Allow submission of Flink UberJar directly to flink cluster.
> 
>
> Key: BEAM-8372
> URL: https://issues.apache.org/jira/browse/BEAM-8372
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-core
>Reporter: Robert Bradshaw
>Assignee: Robert Bradshaw
>Priority: Major
>  Time Spent: 4h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8372) Allow submission of Flink UberJar directly to flink cluster.

2019-10-16 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8372?focusedWorklogId=329011=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-329011
 ]

ASF GitHub Bot logged work on BEAM-8372:


Author: ASF GitHub Bot
Created on: 16/Oct/19 07:34
Start Date: 16/Oct/19 07:34
Worklog Time Spent: 10m 
  Work Description: mxm commented on issue #9803: [BEAM-8372] Follow-up to 
Flink UberJar submission.
URL: https://github.com/apache/beam/pull/9803#issuecomment-542565428
 
 
   Run Python PreCommit
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 329011)
Time Spent: 3h 50m  (was: 3h 40m)

> Allow submission of Flink UberJar directly to flink cluster.
> 
>
> Key: BEAM-8372
> URL: https://issues.apache.org/jira/browse/BEAM-8372
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-core
>Reporter: Robert Bradshaw
>Assignee: Robert Bradshaw
>Priority: Major
>  Time Spent: 3h 50m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8372) Allow submission of Flink UberJar directly to flink cluster.

2019-10-15 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8372?focusedWorklogId=328819=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-328819
 ]

ASF GitHub Bot logged work on BEAM-8372:


Author: ASF GitHub Bot
Created on: 15/Oct/19 22:47
Start Date: 15/Oct/19 22:47
Worklog Time Spent: 10m 
  Work Description: robertwb commented on pull request #9775: [BEAM-8372] 
Job server submitting UberJars directly to Flink Runner.
URL: https://github.com/apache/beam/pull/9775#discussion_r335210632
 
 

 ##
 File path: sdks/python/apache_beam/runners/portability/artifact_service.py
 ##
 @@ -77,47 +88,131 @@ def PutArtifact(self, request_iterator, context=None):
 metadata = request.metadata.metadata
 retrieval_token = self.retrieval_token(
 request.metadata.staging_session_token)
-self._mkdir(retrieval_token)
-temp_path = filesystems.FileSystems.join(
-self._root,
-retrieval_token,
-'%x.tmp' % random.getrandbits(128))
-fout = filesystems.FileSystems.create(temp_path)
+artifact_path = self._artifact_path(retrieval_token, metadata.name)
+temp_path = self._temp_path(artifact_path)
+fout = self._open(temp_path, 'w')
 hasher = hashlib.sha256()
   else:
 hasher.update(request.data.data)
 fout.write(request.data.data)
 fout.close()
 data_hash = hasher.hexdigest()
 if metadata.sha256 and metadata.sha256 != data_hash:
-  filesystems.FileSystems.delete([temp_path])
+  self._delete(temp_path)
   raise ValueError('Bad metadata hash: %s vs %s' % (
-  metadata.metadata.sha256, data_hash))
-filesystems.FileSystems.rename(
-[temp_path], [self._artifact_path(retrieval_token, metadata.name)])
+  metadata.sha256, data_hash))
+self._rename(temp_path, artifact_path)
 return beam_artifact_api_pb2.PutArtifactResponse()
 
   def CommitManifest(self, request, context=None):
 retrieval_token = self.retrieval_token(request.staging_session_token)
-with filesystems.FileSystems.create(
-self._manifest_path(retrieval_token)) as fout:
-  fout.write(request.manifest.SerializeToString())
+proxy_manifest = beam_artifact_api_pb2.ProxyManifest(
+manifest=request.manifest,
+location=[
+beam_artifact_api_pb2.ProxyManifest.Location(
+name=metadata.name,
+uri=self._artifact_path(retrieval_token, metadata.name))
+for metadata in request.manifest.artifact])
+with self._open(self._manifest_path(retrieval_token), 'w') as fout:
+  fout.write(json_format.MessageToJson(proxy_manifest).encode('utf-8'))
 return beam_artifact_api_pb2.CommitManifestResponse(
 retrieval_token=retrieval_token)
 
   def GetManifest(self, request, context=None):
-with filesystems.FileSystems.open(
-self._manifest_path(request.retrieval_token)) as fin:
-  return beam_artifact_api_pb2.GetManifestResponse(
-  manifest=beam_artifact_api_pb2.Manifest.FromString(
-  fin.read()))
+return beam_artifact_api_pb2.GetManifestResponse(
+manifest=self._get_manifest_proxy(request.retrieval_token).manifest)
 
   def GetArtifact(self, request, context=None):
-with filesystems.FileSystems.open(
-self._artifact_path(request.retrieval_token, request.name)) as fin:
-  # This value is not emitted, but lets us yield a single empty
-  # chunk on an empty file.
-  chunk = True
-  while chunk:
-chunk = fin.read(self._chunk_size)
-yield beam_artifact_api_pb2.ArtifactChunk(data=chunk)
+for artifact in self._get_manifest_proxy(request.retrieval_token).location:
+  if artifact.name == request.name:
+with self._open(artifact.uri, 'r') as fin:
+  # This value is not emitted, but lets us yield a single empty
+  # chunk on an empty file.
+  chunk = True
+  while chunk:
+chunk = fin.read(self._chunk_size)
+yield beam_artifact_api_pb2.ArtifactChunk(data=chunk)
+break
+else:
+  raise ValueError('Unknown artifact: %s' % request.name)
+
+
+class ZipFileArtifactService(AbstractArtifactService):
+  """Stores artifacts in a zip file.
+
+  This is particularly useful for storing artifacts as part of an UberJar for
+  submitting to an upstream runner's cluster.
+
+  Writing to zip files requires Python 3.6+.
+  """
+
+  def __init__(self, path, chunk_size=None):
+if sys.version_info < (3, 6):
+  raise RuntimeError(
+  'Writing to zip files requires Python 3.6+, '
+  'but current version is %s' % sys.version)
 
 Review comment:
   Yeah. 3.5 is pretty old though, and 2.7 is EOL, so not worth implementing 
our own backport. 
 

This is an 

[jira] [Work logged] (BEAM-8372) Allow submission of Flink UberJar directly to flink cluster.

2019-10-15 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8372?focusedWorklogId=328818=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-328818
 ]

ASF GitHub Bot logged work on BEAM-8372:


Author: ASF GitHub Bot
Created on: 15/Oct/19 22:44
Start Date: 15/Oct/19 22:44
Worklog Time Spent: 10m 
  Work Description: robertwb commented on pull request #9775: [BEAM-8372] 
Job server submitting UberJars directly to Flink Runner.
URL: https://github.com/apache/beam/pull/9775#discussion_r335209883
 
 

 ##
 File path: sdks/python/apache_beam/runners/portability/flink_runner.py
 ##
 @@ -29,7 +32,12 @@
 
 class FlinkRunner(portable_runner.PortableRunner):
   def default_job_server(self, options):
-return job_server.StopOnExitJobServer(FlinkJarJobServer(options))
+flink_master_url = options.view_as(FlinkRunnerOptions).flink_master_url
 
 Review comment:
   Sorry, I merged before refreshing and seeing your comments. 
https://github.com/apache/beam/pull/9803
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 328818)
Time Spent: 3.5h  (was: 3h 20m)

> Allow submission of Flink UberJar directly to flink cluster.
> 
>
> Key: BEAM-8372
> URL: https://issues.apache.org/jira/browse/BEAM-8372
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-core
>Reporter: Robert Bradshaw
>Assignee: Robert Bradshaw
>Priority: Major
>  Time Spent: 3.5h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8372) Allow submission of Flink UberJar directly to flink cluster.

2019-10-15 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8372?focusedWorklogId=328817=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-328817
 ]

ASF GitHub Bot logged work on BEAM-8372:


Author: ASF GitHub Bot
Created on: 15/Oct/19 22:44
Start Date: 15/Oct/19 22:44
Worklog Time Spent: 10m 
  Work Description: robertwb commented on pull request #9803: [BEAM-8372] 
Follow-up to Flink UberJar submission.
URL: https://github.com/apache/beam/pull/9803
 
 
   
   
   
   Thank you for your contribution! Follow this checklist to help us 
incorporate your contribution quickly and easily:
   
- [ ] [**Choose 
reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and 
mention them in a comment (`R: @username`).
- [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in 
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA 
issue, if applicable. This will automatically link the pull request to the 
issue.
- [ ] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   Post-Commit Tests Status (on master branch)
   

   
   Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
   --- | --- | --- | --- | --- | --- | --- | ---
   Go | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/)
 | --- | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/)
 | --- | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
   Java | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)
   Python | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/)
 | --- | [![Build 

[jira] [Work logged] (BEAM-8372) Allow submission of Flink UberJar directly to flink cluster.

2019-10-15 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8372?focusedWorklogId=328676=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-328676
 ]

ASF GitHub Bot logged work on BEAM-8372:


Author: ASF GitHub Bot
Created on: 15/Oct/19 16:50
Start Date: 15/Oct/19 16:50
Worklog Time Spent: 10m 
  Work Description: robertwb commented on pull request #9775: [BEAM-8372] 
Job server submitting UberJars directly to Flink Runner.
URL: https://github.com/apache/beam/pull/9775
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 328676)
Time Spent: 3h 10m  (was: 3h)

> Allow submission of Flink UberJar directly to flink cluster.
> 
>
> Key: BEAM-8372
> URL: https://issues.apache.org/jira/browse/BEAM-8372
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-core
>Reporter: Robert Bradshaw
>Assignee: Robert Bradshaw
>Priority: Major
>  Time Spent: 3h 10m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8372) Allow submission of Flink UberJar directly to flink cluster.

2019-10-15 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8372?focusedWorklogId=328542=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-328542
 ]

ASF GitHub Bot logged work on BEAM-8372:


Author: ASF GitHub Bot
Created on: 15/Oct/19 12:46
Start Date: 15/Oct/19 12:46
Worklog Time Spent: 10m 
  Work Description: mxm commented on pull request #9775: [BEAM-8372] Job 
server submitting UberJars directly to Flink Runner.
URL: https://github.com/apache/beam/pull/9775#discussion_r334926043
 
 

 ##
 File path: 
sdks/python/apache_beam/runners/portability/flink_uber_jar_job_server.py
 ##
 @@ -0,0 +1,238 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""A job server submitting portable pipelines as uber jars to Flink."""
+
+from __future__ import absolute_import
+from __future__ import print_function
+
+import json
+import logging
+import os
+import shutil
+import tempfile
+import time
+import zipfile
+from concurrent import futures
+
+import grpc
+import requests
+from google.protobuf import json_format
+
+from apache_beam.portability.api import beam_artifact_api_pb2_grpc
+from apache_beam.portability.api import beam_job_api_pb2
+from apache_beam.portability.api import endpoints_pb2
+from apache_beam.runners.portability import abstract_job_service
+from apache_beam.runners.portability import artifact_service
+from apache_beam.runners.portability import job_server
+
+
+class FlinkUberJarJobServer(abstract_job_service.AbstractJobServiceServicer):
+
+  def __init__(self, master_url, executable_jar=None):
+super(FlinkUberJarJobServer, self).__init__()
+self._master_url = master_url
+self._executable_jar = executable_jar
+self._temp_dir = tempfile.mkdtemp(prefix='apache-beam-flink')
+
+  def start(self):
+return self
+
+  def stop(self):
+pass
+
+  def executable_jar(self):
+return self._executable_jar or job_server.JavaJarJobServer.local_jar(
+job_server.JavaJarJobServer.path_to_beam_jar(
+'runners:flink:%s:job-server:shadowJar' % self.flink_version()))
+
+  def flink_version(self):
+full_version = requests.get(
+'%s/v1/config' % self._master_url).json()['flink-version']
+# Only return up to minor version.
+return '.'.join(full_version.split('.')[:2])
+
+  def create_beam_job(self, job_id, job_name, pipeline, options):
+return FlinkBeamJob(
+self._master_url,
+self.executable_jar(),
+job_id,
+job_name,
+pipeline,
+options)
+
+
+class FlinkBeamJob(abstract_job_service.AbstractBeamJob):
+
+  # These must agree with those defined in PortablePipelineJarUtils.java.
+  PIPELINE_FOLDER = 'BEAM-PIPELINE'
+  PIPELINE_MANIFEST = PIPELINE_FOLDER + '/pipeline-manifest.json'
+
+  # We only stage a single pipeline in the jar.
+  PIPELINE_NAME = 'pipeline'
+  PIPELINE_PATH = '/'.join(
+  [PIPELINE_FOLDER, PIPELINE_NAME, "pipeline.json"])
+  PIPELINE_OPTIONS_PATH = '/'.join(
+  [PIPELINE_FOLDER, PIPELINE_NAME, 'pipeline-options.json'])
+  ARTIFACT_MANIFEST_PATH = '/'.join(
+  [PIPELINE_FOLDER, PIPELINE_NAME, 'artifact-manifest.json'])
+
+  def __init__(
+  self, master_url, executable_jar, job_id, job_name, pipeline, options):
+super(FlinkBeamJob, self).__init__(job_id, job_name, pipeline, options)
+self._master_url = master_url
+self._executable_jar = executable_jar
+self._jar_uploaded = False
+
+  def prepare(self):
+# Copy the executable jar, injecting the pipeline and options as resources.
+with tempfile.NamedTemporaryFile(suffix='.jar') as tout:
+  self._jar = tout.name
+shutil.copy(self._executable_jar, self._jar)
+with zipfile.ZipFile(self._jar, 'a', compression=zipfile.ZIP_DEFLATED) as 
z:
+  with z.open(self.PIPELINE_PATH, 'w') as fout:
+fout.write(json_format.MessageToJson(
+self._pipeline_proto).encode('utf-8'))
+  with z.open(self.PIPELINE_OPTIONS_PATH, 'w') as fout:
+fout.write(json_format.MessageToJson(
+self._pipeline_options).encode('utf-8'))
+  with z.open(self.PIPELINE_MANIFEST, 'w') as fout:
+fout.write(json.dumps(
+ 

[jira] [Work logged] (BEAM-8372) Allow submission of Flink UberJar directly to flink cluster.

2019-10-15 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8372?focusedWorklogId=328540=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-328540
 ]

ASF GitHub Bot logged work on BEAM-8372:


Author: ASF GitHub Bot
Created on: 15/Oct/19 12:46
Start Date: 15/Oct/19 12:46
Worklog Time Spent: 10m 
  Work Description: mxm commented on pull request #9775: [BEAM-8372] Job 
server submitting UberJars directly to Flink Runner.
URL: https://github.com/apache/beam/pull/9775#discussion_r334923329
 
 

 ##
 File path: 
sdks/python/apache_beam/runners/portability/flink_uber_jar_job_server.py
 ##
 @@ -0,0 +1,238 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""A job server submitting portable pipelines as uber jars to Flink."""
+
+from __future__ import absolute_import
+from __future__ import print_function
+
+import json
+import logging
+import os
+import shutil
+import tempfile
+import time
+import zipfile
+from concurrent import futures
+
+import grpc
+import requests
+from google.protobuf import json_format
+
+from apache_beam.portability.api import beam_artifact_api_pb2_grpc
+from apache_beam.portability.api import beam_job_api_pb2
+from apache_beam.portability.api import endpoints_pb2
+from apache_beam.runners.portability import abstract_job_service
+from apache_beam.runners.portability import artifact_service
+from apache_beam.runners.portability import job_server
+
+
+class FlinkUberJarJobServer(abstract_job_service.AbstractJobServiceServicer):
+
+  def __init__(self, master_url, executable_jar=None):
+super(FlinkUberJarJobServer, self).__init__()
+self._master_url = master_url
+self._executable_jar = executable_jar
+self._temp_dir = tempfile.mkdtemp(prefix='apache-beam-flink')
+
+  def start(self):
+return self
+
+  def stop(self):
+pass
+
+  def executable_jar(self):
+return self._executable_jar or job_server.JavaJarJobServer.local_jar(
+job_server.JavaJarJobServer.path_to_beam_jar(
+'runners:flink:%s:job-server:shadowJar' % self.flink_version()))
+
+  def flink_version(self):
+full_version = requests.get(
+'%s/v1/config' % self._master_url).json()['flink-version']
+# Only return up to minor version.
+return '.'.join(full_version.split('.')[:2])
+
+  def create_beam_job(self, job_id, job_name, pipeline, options):
+return FlinkBeamJob(
+self._master_url,
+self.executable_jar(),
+job_id,
+job_name,
+pipeline,
+options)
+
+
+class FlinkBeamJob(abstract_job_service.AbstractBeamJob):
 
 Review comment:
   Would be nice to have a docstring here.
   ```suggestion
   class FlinkBeamJob(abstract_job_service.AbstractBeamJob):
   """Runs a Beam job on Flink by staging all contents into a Jar 
   and uploading it via the Flink Rest API."""
   ```
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 328540)
Time Spent: 2h 50m  (was: 2h 40m)

> Allow submission of Flink UberJar directly to flink cluster.
> 
>
> Key: BEAM-8372
> URL: https://issues.apache.org/jira/browse/BEAM-8372
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-core
>Reporter: Robert Bradshaw
>Assignee: Robert Bradshaw
>Priority: Major
>  Time Spent: 2h 50m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8372) Allow submission of Flink UberJar directly to flink cluster.

2019-10-15 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8372?focusedWorklogId=328539=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-328539
 ]

ASF GitHub Bot logged work on BEAM-8372:


Author: ASF GitHub Bot
Created on: 15/Oct/19 12:46
Start Date: 15/Oct/19 12:46
Worklog Time Spent: 10m 
  Work Description: mxm commented on pull request #9775: [BEAM-8372] Job 
server submitting UberJars directly to Flink Runner.
URL: https://github.com/apache/beam/pull/9775#discussion_r334918124
 
 

 ##
 File path: 
sdks/python/apache_beam/runners/portability/flink_uber_jar_job_server.py
 ##
 @@ -0,0 +1,238 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""A job server submitting portable pipelines as uber jars to Flink."""
+
+from __future__ import absolute_import
+from __future__ import print_function
+
+import json
+import logging
+import os
+import shutil
+import tempfile
+import time
+import zipfile
+from concurrent import futures
+
+import grpc
+import requests
+from google.protobuf import json_format
+
+from apache_beam.portability.api import beam_artifact_api_pb2_grpc
+from apache_beam.portability.api import beam_job_api_pb2
+from apache_beam.portability.api import endpoints_pb2
+from apache_beam.runners.portability import abstract_job_service
+from apache_beam.runners.portability import artifact_service
+from apache_beam.runners.portability import job_server
+
+
+class FlinkUberJarJobServer(abstract_job_service.AbstractJobServiceServicer):
 
 Review comment:
   Would be nice to have a docstring here.
   ```suggestion
   class FlinkUberJarJobServer(abstract_job_service.AbstractJobServiceServicer):
   """A Job server which submits a self-contained Jar to a Flink cluster. 
   The jar contains the Beam dependencies and the Python artifacts."""
   ```
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 328539)
Time Spent: 2h 40m  (was: 2.5h)

> Allow submission of Flink UberJar directly to flink cluster.
> 
>
> Key: BEAM-8372
> URL: https://issues.apache.org/jira/browse/BEAM-8372
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-core
>Reporter: Robert Bradshaw
>Assignee: Robert Bradshaw
>Priority: Major
>  Time Spent: 2h 40m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8372) Allow submission of Flink UberJar directly to flink cluster.

2019-10-15 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8372?focusedWorklogId=328538=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-328538
 ]

ASF GitHub Bot logged work on BEAM-8372:


Author: ASF GitHub Bot
Created on: 15/Oct/19 12:46
Start Date: 15/Oct/19 12:46
Worklog Time Spent: 10m 
  Work Description: mxm commented on pull request #9775: [BEAM-8372] Job 
server submitting UberJars directly to Flink Runner.
URL: https://github.com/apache/beam/pull/9775#discussion_r334914432
 
 

 ##
 File path: sdks/python/apache_beam/runners/portability/flink_runner.py
 ##
 @@ -29,7 +32,12 @@
 
 class FlinkRunner(portable_runner.PortableRunner):
   def default_job_server(self, options):
-return job_server.StopOnExitJobServer(FlinkJarJobServer(options))
+flink_master_url = options.view_as(FlinkRunnerOptions).flink_master_url
 
 Review comment:
   This should be `flink_master` to be compliant with 
https://github.com/apache/beam/blob/14b5ef8b3e63d77f5f34730ce10b9ed581380ffa/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java#L66
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 328538)
Time Spent: 2.5h  (was: 2h 20m)

> Allow submission of Flink UberJar directly to flink cluster.
> 
>
> Key: BEAM-8372
> URL: https://issues.apache.org/jira/browse/BEAM-8372
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-core
>Reporter: Robert Bradshaw
>Assignee: Robert Bradshaw
>Priority: Major
>  Time Spent: 2.5h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8372) Allow submission of Flink UberJar directly to flink cluster.

2019-10-15 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8372?focusedWorklogId=328541=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-328541
 ]

ASF GitHub Bot logged work on BEAM-8372:


Author: ASF GitHub Bot
Created on: 15/Oct/19 12:46
Start Date: 15/Oct/19 12:46
Worklog Time Spent: 10m 
  Work Description: mxm commented on pull request #9775: [BEAM-8372] Job 
server submitting UberJars directly to Flink Runner.
URL: https://github.com/apache/beam/pull/9775#discussion_r334916756
 
 

 ##
 File path: sdks/python/apache_beam/runners/portability/artifact_service.py
 ##
 @@ -77,47 +88,131 @@ def PutArtifact(self, request_iterator, context=None):
 metadata = request.metadata.metadata
 retrieval_token = self.retrieval_token(
 request.metadata.staging_session_token)
-self._mkdir(retrieval_token)
-temp_path = filesystems.FileSystems.join(
-self._root,
-retrieval_token,
-'%x.tmp' % random.getrandbits(128))
-fout = filesystems.FileSystems.create(temp_path)
+artifact_path = self._artifact_path(retrieval_token, metadata.name)
+temp_path = self._temp_path(artifact_path)
+fout = self._open(temp_path, 'w')
 hasher = hashlib.sha256()
   else:
 hasher.update(request.data.data)
 fout.write(request.data.data)
 fout.close()
 data_hash = hasher.hexdigest()
 if metadata.sha256 and metadata.sha256 != data_hash:
-  filesystems.FileSystems.delete([temp_path])
+  self._delete(temp_path)
   raise ValueError('Bad metadata hash: %s vs %s' % (
-  metadata.metadata.sha256, data_hash))
-filesystems.FileSystems.rename(
-[temp_path], [self._artifact_path(retrieval_token, metadata.name)])
+  metadata.sha256, data_hash))
+self._rename(temp_path, artifact_path)
 return beam_artifact_api_pb2.PutArtifactResponse()
 
   def CommitManifest(self, request, context=None):
 retrieval_token = self.retrieval_token(request.staging_session_token)
-with filesystems.FileSystems.create(
-self._manifest_path(retrieval_token)) as fout:
-  fout.write(request.manifest.SerializeToString())
+proxy_manifest = beam_artifact_api_pb2.ProxyManifest(
+manifest=request.manifest,
+location=[
+beam_artifact_api_pb2.ProxyManifest.Location(
+name=metadata.name,
+uri=self._artifact_path(retrieval_token, metadata.name))
+for metadata in request.manifest.artifact])
+with self._open(self._manifest_path(retrieval_token), 'w') as fout:
+  fout.write(json_format.MessageToJson(proxy_manifest).encode('utf-8'))
 return beam_artifact_api_pb2.CommitManifestResponse(
 retrieval_token=retrieval_token)
 
   def GetManifest(self, request, context=None):
-with filesystems.FileSystems.open(
-self._manifest_path(request.retrieval_token)) as fin:
-  return beam_artifact_api_pb2.GetManifestResponse(
-  manifest=beam_artifact_api_pb2.Manifest.FromString(
-  fin.read()))
+return beam_artifact_api_pb2.GetManifestResponse(
+manifest=self._get_manifest_proxy(request.retrieval_token).manifest)
 
   def GetArtifact(self, request, context=None):
-with filesystems.FileSystems.open(
-self._artifact_path(request.retrieval_token, request.name)) as fin:
-  # This value is not emitted, but lets us yield a single empty
-  # chunk on an empty file.
-  chunk = True
-  while chunk:
-chunk = fin.read(self._chunk_size)
-yield beam_artifact_api_pb2.ArtifactChunk(data=chunk)
+for artifact in self._get_manifest_proxy(request.retrieval_token).location:
+  if artifact.name == request.name:
+with self._open(artifact.uri, 'r') as fin:
+  # This value is not emitted, but lets us yield a single empty
+  # chunk on an empty file.
+  chunk = True
+  while chunk:
+chunk = fin.read(self._chunk_size)
+yield beam_artifact_api_pb2.ArtifactChunk(data=chunk)
+break
+else:
+  raise ValueError('Unknown artifact: %s' % request.name)
+
+
+class ZipFileArtifactService(AbstractArtifactService):
+  """Stores artifacts in a zip file.
+
+  This is particularly useful for storing artifacts as part of an UberJar for
+  submitting to an upstream runner's cluster.
+
+  Writing to zip files requires Python 3.6+.
+  """
+
+  def __init__(self, path, chunk_size=None):
+if sys.version_info < (3, 6):
+  raise RuntimeError(
+  'Writing to zip files requires Python 3.6+, '
+  'but current version is %s' % sys.version)
 
 Review comment:
   That is a bit unfortunate for portability, but ok in terms of code 
simplicity.
 

This is an automated message 

[jira] [Work logged] (BEAM-8372) Allow submission of Flink UberJar directly to flink cluster.

2019-10-15 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8372?focusedWorklogId=328528=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-328528
 ]

ASF GitHub Bot logged work on BEAM-8372:


Author: ASF GitHub Bot
Created on: 15/Oct/19 12:17
Start Date: 15/Oct/19 12:17
Worklog Time Spent: 10m 
  Work Description: mxm commented on issue #9775: [BEAM-8372] Job server 
submitting UberJars directly to Flink Runner.
URL: https://github.com/apache/beam/pull/9775#issuecomment-542182866
 
 
   Thanks @robertwb. This looks great. Having a look now.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 328528)
Time Spent: 2h 20m  (was: 2h 10m)

> Allow submission of Flink UberJar directly to flink cluster.
> 
>
> Key: BEAM-8372
> URL: https://issues.apache.org/jira/browse/BEAM-8372
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-core
>Reporter: Robert Bradshaw
>Assignee: Robert Bradshaw
>Priority: Major
>  Time Spent: 2h 20m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8372) Allow submission of Flink UberJar directly to flink cluster.

2019-10-14 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8372?focusedWorklogId=328235=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-328235
 ]

ASF GitHub Bot logged work on BEAM-8372:


Author: ASF GitHub Bot
Created on: 15/Oct/19 00:27
Start Date: 15/Oct/19 00:27
Worklog Time Spent: 10m 
  Work Description: robertwb commented on issue #9775: [BEAM-8372] Job 
server submitting UberJars directly to Flink Runner.
URL: https://github.com/apache/beam/pull/9775#issuecomment-541984452
 
 
   Run Python PreCommit
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 328235)
Time Spent: 2h 10m  (was: 2h)

> Allow submission of Flink UberJar directly to flink cluster.
> 
>
> Key: BEAM-8372
> URL: https://issues.apache.org/jira/browse/BEAM-8372
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-core
>Reporter: Robert Bradshaw
>Assignee: Robert Bradshaw
>Priority: Major
>  Time Spent: 2h 10m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8372) Allow submission of Flink UberJar directly to flink cluster.

2019-10-14 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8372?focusedWorklogId=328131=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-328131
 ]

ASF GitHub Bot logged work on BEAM-8372:


Author: ASF GitHub Bot
Created on: 14/Oct/19 20:47
Start Date: 14/Oct/19 20:47
Worklog Time Spent: 10m 
  Work Description: ibzib commented on pull request #9775: [BEAM-8372] Job 
server submitting UberJars directly to Flink Runner.
URL: https://github.com/apache/beam/pull/9775#discussion_r334652926
 
 

 ##
 File path: sdks/python/apache_beam/runners/portability/artifact_service.py
 ##
 @@ -23,51 +23,61 @@
 from __future__ import print_function
 
 import hashlib
-import random
-import re
+import threading
+import zipfile
+
+from google.protobuf import json_format
 
 from apache_beam.io import filesystems
 from apache_beam.portability.api import beam_artifact_api_pb2
 from apache_beam.portability.api import beam_artifact_api_pb2_grpc
 
 
-class BeamFilesystemArtifactService(
+class AbstractArtifactService(
 beam_artifact_api_pb2_grpc.ArtifactStagingServiceServicer,
 beam_artifact_api_pb2_grpc.ArtifactRetrievalServiceServicer):
 
   _DEFAULT_CHUNK_SIZE = 2 << 20  # 2mb
 
-  def __init__(self, root, chunk_size=_DEFAULT_CHUNK_SIZE):
+  def __init__(self, root, chunk_size=None):
 
 Review comment:
   Missed that, makes sense.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 328131)
Time Spent: 2h  (was: 1h 50m)

> Allow submission of Flink UberJar directly to flink cluster.
> 
>
> Key: BEAM-8372
> URL: https://issues.apache.org/jira/browse/BEAM-8372
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-core
>Reporter: Robert Bradshaw
>Assignee: Robert Bradshaw
>Priority: Major
>  Time Spent: 2h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8372) Allow submission of Flink UberJar directly to flink cluster.

2019-10-14 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8372?focusedWorklogId=328103=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-328103
 ]

ASF GitHub Bot logged work on BEAM-8372:


Author: ASF GitHub Bot
Created on: 14/Oct/19 20:14
Start Date: 14/Oct/19 20:14
Worklog Time Spent: 10m 
  Work Description: robertwb commented on pull request #9775: [BEAM-8372] 
Job server submitting UberJars directly to Flink Runner.
URL: https://github.com/apache/beam/pull/9775#discussion_r334623991
 
 

 ##
 File path: 
sdks/python/apache_beam/runners/portability/flink_uber_jar_job_server.py
 ##
 @@ -0,0 +1,233 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""A job server submitting portable pipelines as uber jars to Flink."""
+
+from __future__ import absolute_import
+from __future__ import print_function
+
+import logging
+import os
+import shutil
+import tempfile
+import time
+import zipfile
+
+from google.protobuf import json_format
+import grpc
+import requests
+from concurrent import futures
+
+from apache_beam.portability.api import beam_artifact_api_pb2_grpc
+from apache_beam.portability.api import beam_job_api_pb2
+from apache_beam.portability.api import endpoints_pb2
+from apache_beam.runners.portability import abstract_job_service
+from apache_beam.runners.portability import artifact_service
+from apache_beam.runners.portability import job_server
+
+
+class FlinkUberJarJobServer(abstract_job_service.AbstractJobServiceServicer):
+
+  def __init__(self, master_url, executable_jar=None):
+super(FlinkUberJarJobServer, self).__init__()
+self._master_url = master_url
+self._executable_jar = executable_jar
+self._temp_dir = tempfile.mkdtemp(prefix='apache-beam-flink')
+
+  def start(self):
+return self
+
+  def stop(self):
+pass
+
+  def executable_jar(self):
+return self._executable_jar or job_server.JavaJarJobServer.local_jar(
+job_server.JavaJarJobServer.path_to_beam_jar(
+'runners:flink:%s:job-server:shadowJar' % self.flink_version()))
+
+  def flink_version(self):
+full_version = requests.get(
+'%s/v1/config' % self._master_url).json()['flink-version']
+# Only return up to minor version.
+return '.'.join(full_version.split('.')[:2])
+
+  def create_beam_job(self, job_id, job_name, pipeline, options):
+return FlinkBeamJob(
+self._master_url,
+self.executable_jar(),
+job_id,
+job_name,
+pipeline,
+options)
+
+
+class FlinkBeamJob(abstract_job_service.AbstractBeamJob):
+
+  # These must agree with those defined in PortablePipelineJarUtils.java.
+  PIPELINE_FOLDER_PATH = "BEAM-PIPELINE"
+  PIPELINE_PATH = PIPELINE_FOLDER_PATH + "/pipeline.json"
+  PIPELINE_OPTIONS_PATH = PIPELINE_FOLDER_PATH + "/pipeline-options.json"
+  ARTIFACT_STAGING_FOLDER_PATH = "BEAM-ARTIFACT-STAGING"
+  ARTIFACT_MANIFEST_PATH = (
+  ARTIFACT_STAGING_FOLDER_PATH + "/artifact-manifest.json")
+
+  def __init__(
+  self, master_url, executable_jar, job_id, job_name, pipeline, options):
+super(FlinkBeamJob, self).__init__(job_id, job_name, pipeline, options)
+self._master_url = master_url
+self._executable_jar = executable_jar
+self._jar_uploaded = False
+
+  def prepare(self):
+# Copy the executable jar, injecting the pipeline and options as resources.
+with tempfile.NamedTemporaryFile(suffix='.jar') as tout:
+  self._jar = tout.name
+shutil.copy(self._executable_jar, self._jar)
+with zipfile.ZipFile(self._jar, 'a', compression=zipfile.ZIP_DEFLATED) as 
z:
+  with z.open(self.PIPELINE_PATH, 'w') as fout:
+fout.write(json_format.MessageToJson(
+self._pipeline_proto).encode('utf-8'))
+  with z.open(self.PIPELINE_OPTIONS_PATH, 'w') as fout:
+fout.write(json_format.MessageToJson(
+self._pipeline_options).encode('utf-8'))
+self._start_artifact_service(self._jar)
+
+  def _start_artifact_service(self, jar):
+self._artifact_staging_service = artifact_service.ZipFileArtifactService(
+jar)
+self._artifact_staging_server = 

[jira] [Work logged] (BEAM-8372) Allow submission of Flink UberJar directly to flink cluster.

2019-10-14 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8372?focusedWorklogId=328101=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-328101
 ]

ASF GitHub Bot logged work on BEAM-8372:


Author: ASF GitHub Bot
Created on: 14/Oct/19 20:14
Start Date: 14/Oct/19 20:14
Worklog Time Spent: 10m 
  Work Description: robertwb commented on pull request #9775: [BEAM-8372] 
Job server submitting UberJars directly to Flink Runner.
URL: https://github.com/apache/beam/pull/9775#discussion_r334632076
 
 

 ##
 File path: sdks/python/apache_beam/runners/portability/flink_runner.py
 ##
 @@ -29,7 +32,13 @@
 
 class FlinkRunner(portable_runner.PortableRunner):
   def default_job_server(self, options):
-return job_server.StopOnExitJobServer(FlinkJarJobServer(options))
+flink_master_url = options.view_as(FlinkRunnerOptions).flink_master_url
+flink_master_url = 'http://localhost:8081'
 
 Review comment:
   Oops. Fixed. 
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 328101)
Time Spent: 1h 50m  (was: 1h 40m)

> Allow submission of Flink UberJar directly to flink cluster.
> 
>
> Key: BEAM-8372
> URL: https://issues.apache.org/jira/browse/BEAM-8372
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-core
>Reporter: Robert Bradshaw
>Assignee: Robert Bradshaw
>Priority: Major
>  Time Spent: 1h 50m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8372) Allow submission of Flink UberJar directly to flink cluster.

2019-10-14 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8372?focusedWorklogId=328098=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-328098
 ]

ASF GitHub Bot logged work on BEAM-8372:


Author: ASF GitHub Bot
Created on: 14/Oct/19 20:14
Start Date: 14/Oct/19 20:14
Worklog Time Spent: 10m 
  Work Description: robertwb commented on pull request #9775: [BEAM-8372] 
Job server submitting UberJars directly to Flink Runner.
URL: https://github.com/apache/beam/pull/9775#discussion_r334624917
 
 

 ##
 File path: sdks/python/apache_beam/runners/portability/flink_runner.py
 ##
 @@ -29,7 +32,13 @@
 
 class FlinkRunner(portable_runner.PortableRunner):
   def default_job_server(self, options):
-return job_server.StopOnExitJobServer(FlinkJarJobServer(options))
+flink_master_url = options.view_as(FlinkRunnerOptions).flink_master_url
+flink_master_url = 'http://localhost:8081'
+if flink_master_url == '[local]' or sys.version_info < (3, 6):
+  # TOOD: Also default to LOOPBACK?
 
 Review comment:
   Done.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 328098)
Time Spent: 1.5h  (was: 1h 20m)

> Allow submission of Flink UberJar directly to flink cluster.
> 
>
> Key: BEAM-8372
> URL: https://issues.apache.org/jira/browse/BEAM-8372
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-core
>Reporter: Robert Bradshaw
>Assignee: Robert Bradshaw
>Priority: Major
>  Time Spent: 1.5h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8372) Allow submission of Flink UberJar directly to flink cluster.

2019-10-14 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8372?focusedWorklogId=328102=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-328102
 ]

ASF GitHub Bot logged work on BEAM-8372:


Author: ASF GitHub Bot
Created on: 14/Oct/19 20:14
Start Date: 14/Oct/19 20:14
Worklog Time Spent: 10m 
  Work Description: robertwb commented on pull request #9775: [BEAM-8372] 
Job server submitting UberJars directly to Flink Runner.
URL: https://github.com/apache/beam/pull/9775#discussion_r334644554
 
 

 ##
 File path: 
sdks/python/apache_beam/runners/portability/flink_uber_jar_job_server.py
 ##
 @@ -0,0 +1,233 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""A job server submitting portable pipelines as uber jars to Flink."""
+
+from __future__ import absolute_import
+from __future__ import print_function
+
+import logging
+import os
+import shutil
+import tempfile
+import time
+import zipfile
+
+from google.protobuf import json_format
+import grpc
+import requests
+from concurrent import futures
+
+from apache_beam.portability.api import beam_artifact_api_pb2_grpc
+from apache_beam.portability.api import beam_job_api_pb2
+from apache_beam.portability.api import endpoints_pb2
+from apache_beam.runners.portability import abstract_job_service
+from apache_beam.runners.portability import artifact_service
+from apache_beam.runners.portability import job_server
+
+
+class FlinkUberJarJobServer(abstract_job_service.AbstractJobServiceServicer):
+
+  def __init__(self, master_url, executable_jar=None):
+super(FlinkUberJarJobServer, self).__init__()
+self._master_url = master_url
+self._executable_jar = executable_jar
+self._temp_dir = tempfile.mkdtemp(prefix='apache-beam-flink')
+
+  def start(self):
+return self
+
+  def stop(self):
+pass
+
+  def executable_jar(self):
+return self._executable_jar or job_server.JavaJarJobServer.local_jar(
+job_server.JavaJarJobServer.path_to_beam_jar(
+'runners:flink:%s:job-server:shadowJar' % self.flink_version()))
+
+  def flink_version(self):
+full_version = requests.get(
+'%s/v1/config' % self._master_url).json()['flink-version']
+# Only return up to minor version.
+return '.'.join(full_version.split('.')[:2])
+
+  def create_beam_job(self, job_id, job_name, pipeline, options):
+return FlinkBeamJob(
+self._master_url,
+self.executable_jar(),
+job_id,
+job_name,
+pipeline,
+options)
+
+
+class FlinkBeamJob(abstract_job_service.AbstractBeamJob):
+
+  # These must agree with those defined in PortablePipelineJarUtils.java.
 
 Review comment:
   Thanks for the heads up.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 328102)
Time Spent: 1h 50m  (was: 1h 40m)

> Allow submission of Flink UberJar directly to flink cluster.
> 
>
> Key: BEAM-8372
> URL: https://issues.apache.org/jira/browse/BEAM-8372
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-core
>Reporter: Robert Bradshaw
>Assignee: Robert Bradshaw
>Priority: Major
>  Time Spent: 1h 50m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8372) Allow submission of Flink UberJar directly to flink cluster.

2019-10-14 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8372?focusedWorklogId=328100=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-328100
 ]

ASF GitHub Bot logged work on BEAM-8372:


Author: ASF GitHub Bot
Created on: 14/Oct/19 20:14
Start Date: 14/Oct/19 20:14
Worklog Time Spent: 10m 
  Work Description: robertwb commented on pull request #9775: [BEAM-8372] 
Job server submitting UberJars directly to Flink Runner.
URL: https://github.com/apache/beam/pull/9775#discussion_r334625678
 
 

 ##
 File path: sdks/python/apache_beam/runners/portability/abstract_job_service.py
 ##
 @@ -0,0 +1,139 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from __future__ import absolute_import
+
+import logging
+import uuid
+from builtins import object
+
+from apache_beam.portability.api import beam_job_api_pb2
+from apache_beam.portability.api import beam_job_api_pb2_grpc
+
+TERMINAL_STATES = [
+beam_job_api_pb2.JobState.DONE,
+beam_job_api_pb2.JobState.STOPPED,
+beam_job_api_pb2.JobState.FAILED,
+beam_job_api_pb2.JobState.CANCELLED,
+]
+
+
+class AbstractJobServiceServicer(beam_job_api_pb2_grpc.JobServiceServicer):
+  """Manages one or more pipelines, possibly concurrently.
+  Experimental: No backward compatibility guaranteed.
+  Servicer for the Beam Job API.
+  """
+  def __init__(self):
+self._jobs = {}
+
+  def create_beam_job(self, preparation_id, job_name, pipeline, options):
+"""Returns an instance of AbstractBeamJob specific to this servicer."""
+raise NotImplementedError(type(self))
+
+  def Prepare(self, request, context=None, timeout=None):
+# For now, just use the job name as the job id.
 
 Review comment:
   Yeah. This comment is out of date. Removed.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 328100)
Time Spent: 1h 40m  (was: 1.5h)

> Allow submission of Flink UberJar directly to flink cluster.
> 
>
> Key: BEAM-8372
> URL: https://issues.apache.org/jira/browse/BEAM-8372
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-core
>Reporter: Robert Bradshaw
>Assignee: Robert Bradshaw
>Priority: Major
>  Time Spent: 1h 40m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8372) Allow submission of Flink UberJar directly to flink cluster.

2019-10-14 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8372?focusedWorklogId=328099=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-328099
 ]

ASF GitHub Bot logged work on BEAM-8372:


Author: ASF GitHub Bot
Created on: 14/Oct/19 20:14
Start Date: 14/Oct/19 20:14
Worklog Time Spent: 10m 
  Work Description: robertwb commented on pull request #9775: [BEAM-8372] 
Job server submitting UberJars directly to Flink Runner.
URL: https://github.com/apache/beam/pull/9775#discussion_r334625937
 
 

 ##
 File path: sdks/python/apache_beam/runners/portability/artifact_service.py
 ##
 @@ -23,51 +23,61 @@
 from __future__ import print_function
 
 import hashlib
-import random
-import re
+import threading
+import zipfile
+
+from google.protobuf import json_format
 
 from apache_beam.io import filesystems
 from apache_beam.portability.api import beam_artifact_api_pb2
 from apache_beam.portability.api import beam_artifact_api_pb2_grpc
 
 
-class BeamFilesystemArtifactService(
+class AbstractArtifactService(
 beam_artifact_api_pb2_grpc.ArtifactStagingServiceServicer,
 beam_artifact_api_pb2_grpc.ArtifactRetrievalServiceServicer):
 
   _DEFAULT_CHUNK_SIZE = 2 << 20  # 2mb
 
-  def __init__(self, root, chunk_size=_DEFAULT_CHUNK_SIZE):
+  def __init__(self, root, chunk_size=None):
 
 Review comment:
   One can now explicitly pass None to get the default (e.g. in a superclass 
constructor call). 
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 328099)
Time Spent: 1h 40m  (was: 1.5h)

> Allow submission of Flink UberJar directly to flink cluster.
> 
>
> Key: BEAM-8372
> URL: https://issues.apache.org/jira/browse/BEAM-8372
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-core
>Reporter: Robert Bradshaw
>Assignee: Robert Bradshaw
>Priority: Major
>  Time Spent: 1h 40m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8372) Allow submission of Flink UberJar directly to flink cluster.

2019-10-11 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8372?focusedWorklogId=327119=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-327119
 ]

ASF GitHub Bot logged work on BEAM-8372:


Author: ASF GitHub Bot
Created on: 11/Oct/19 22:50
Start Date: 11/Oct/19 22:50
Worklog Time Spent: 10m 
  Work Description: ibzib commented on pull request #9775: [BEAM-8372] Job 
server submitting UberJars directly to Flink Runner.
URL: https://github.com/apache/beam/pull/9775#discussion_r334198101
 
 

 ##
 File path: 
sdks/python/apache_beam/runners/portability/flink_uber_jar_job_server.py
 ##
 @@ -0,0 +1,233 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""A job server submitting portable pipelines as uber jars to Flink."""
+
+from __future__ import absolute_import
+from __future__ import print_function
+
+import logging
+import os
+import shutil
+import tempfile
+import time
+import zipfile
+
+from google.protobuf import json_format
+import grpc
+import requests
+from concurrent import futures
+
+from apache_beam.portability.api import beam_artifact_api_pb2_grpc
+from apache_beam.portability.api import beam_job_api_pb2
+from apache_beam.portability.api import endpoints_pb2
+from apache_beam.runners.portability import abstract_job_service
+from apache_beam.runners.portability import artifact_service
+from apache_beam.runners.portability import job_server
+
+
+class FlinkUberJarJobServer(abstract_job_service.AbstractJobServiceServicer):
+
+  def __init__(self, master_url, executable_jar=None):
+super(FlinkUberJarJobServer, self).__init__()
+self._master_url = master_url
+self._executable_jar = executable_jar
+self._temp_dir = tempfile.mkdtemp(prefix='apache-beam-flink')
+
+  def start(self):
+return self
+
+  def stop(self):
+pass
+
+  def executable_jar(self):
+return self._executable_jar or job_server.JavaJarJobServer.local_jar(
+job_server.JavaJarJobServer.path_to_beam_jar(
+'runners:flink:%s:job-server:shadowJar' % self.flink_version()))
+
+  def flink_version(self):
+full_version = requests.get(
+'%s/v1/config' % self._master_url).json()['flink-version']
+# Only return up to minor version.
+return '.'.join(full_version.split('.')[:2])
+
+  def create_beam_job(self, job_id, job_name, pipeline, options):
+return FlinkBeamJob(
+self._master_url,
+self.executable_jar(),
+job_id,
+job_name,
+pipeline,
+options)
+
+
+class FlinkBeamJob(abstract_job_service.AbstractBeamJob):
+
+  # These must agree with those defined in PortablePipelineJarUtils.java.
 
 Review comment:
   s/open/merged so you'll have to rebase, sorry about that
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 327119)
Time Spent: 1h 20m  (was: 1h 10m)

> Allow submission of Flink UberJar directly to flink cluster.
> 
>
> Key: BEAM-8372
> URL: https://issues.apache.org/jira/browse/BEAM-8372
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-core
>Reporter: Robert Bradshaw
>Assignee: Robert Bradshaw
>Priority: Major
>  Time Spent: 1h 20m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8372) Allow submission of Flink UberJar directly to flink cluster.

2019-10-11 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8372?focusedWorklogId=327118=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-327118
 ]

ASF GitHub Bot logged work on BEAM-8372:


Author: ASF GitHub Bot
Created on: 11/Oct/19 22:49
Start Date: 11/Oct/19 22:49
Worklog Time Spent: 10m 
  Work Description: ibzib commented on pull request #9775: [BEAM-8372] Job 
server submitting UberJars directly to Flink Runner.
URL: https://github.com/apache/beam/pull/9775#discussion_r334198101
 
 

 ##
 File path: 
sdks/python/apache_beam/runners/portability/flink_uber_jar_job_server.py
 ##
 @@ -0,0 +1,233 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""A job server submitting portable pipelines as uber jars to Flink."""
+
+from __future__ import absolute_import
+from __future__ import print_function
+
+import logging
+import os
+import shutil
+import tempfile
+import time
+import zipfile
+
+from google.protobuf import json_format
+import grpc
+import requests
+from concurrent import futures
+
+from apache_beam.portability.api import beam_artifact_api_pb2_grpc
+from apache_beam.portability.api import beam_job_api_pb2
+from apache_beam.portability.api import endpoints_pb2
+from apache_beam.runners.portability import abstract_job_service
+from apache_beam.runners.portability import artifact_service
+from apache_beam.runners.portability import job_server
+
+
+class FlinkUberJarJobServer(abstract_job_service.AbstractJobServiceServicer):
+
+  def __init__(self, master_url, executable_jar=None):
+super(FlinkUberJarJobServer, self).__init__()
+self._master_url = master_url
+self._executable_jar = executable_jar
+self._temp_dir = tempfile.mkdtemp(prefix='apache-beam-flink')
+
+  def start(self):
+return self
+
+  def stop(self):
+pass
+
+  def executable_jar(self):
+return self._executable_jar or job_server.JavaJarJobServer.local_jar(
+job_server.JavaJarJobServer.path_to_beam_jar(
+'runners:flink:%s:job-server:shadowJar' % self.flink_version()))
+
+  def flink_version(self):
+full_version = requests.get(
+'%s/v1/config' % self._master_url).json()['flink-version']
+# Only return up to minor version.
+return '.'.join(full_version.split('.')[:2])
+
+  def create_beam_job(self, job_id, job_name, pipeline, options):
+return FlinkBeamJob(
+self._master_url,
+self.executable_jar(),
+job_id,
+job_name,
+pipeline,
+options)
+
+
+class FlinkBeamJob(abstract_job_service.AbstractBeamJob):
+
+  # These must agree with those defined in PortablePipelineJarUtils.java.
 
 Review comment:
   s/open/merged so you'll have to rebase, sorry about that
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 327118)
Time Spent: 1h 10m  (was: 1h)

> Allow submission of Flink UberJar directly to flink cluster.
> 
>
> Key: BEAM-8372
> URL: https://issues.apache.org/jira/browse/BEAM-8372
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-core
>Reporter: Robert Bradshaw
>Assignee: Robert Bradshaw
>Priority: Major
>  Time Spent: 1h 10m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8372) Allow submission of Flink UberJar directly to flink cluster.

2019-10-11 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8372?focusedWorklogId=327111=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-327111
 ]

ASF GitHub Bot logged work on BEAM-8372:


Author: ASF GitHub Bot
Created on: 11/Oct/19 22:44
Start Date: 11/Oct/19 22:44
Worklog Time Spent: 10m 
  Work Description: ibzib commented on pull request #9775: [BEAM-8372] Job 
server submitting UberJars directly to Flink Runner.
URL: https://github.com/apache/beam/pull/9775#discussion_r334193773
 
 

 ##
 File path: sdks/python/apache_beam/runners/portability/flink_runner.py
 ##
 @@ -29,7 +32,13 @@
 
 class FlinkRunner(portable_runner.PortableRunner):
   def default_job_server(self, options):
-return job_server.StopOnExitJobServer(FlinkJarJobServer(options))
+flink_master_url = options.view_as(FlinkRunnerOptions).flink_master_url
+flink_master_url = 'http://localhost:8081'
+if flink_master_url == '[local]' or sys.version_info < (3, 6):
+  # TOOD: Also default to LOOPBACK?
 
 Review comment:
   Seems reasonable. Maybe file a JIRA for it to link here.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 327111)
Time Spent: 50m  (was: 40m)

> Allow submission of Flink UberJar directly to flink cluster.
> 
>
> Key: BEAM-8372
> URL: https://issues.apache.org/jira/browse/BEAM-8372
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-core
>Reporter: Robert Bradshaw
>Assignee: Robert Bradshaw
>Priority: Major
>  Time Spent: 50m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8372) Allow submission of Flink UberJar directly to flink cluster.

2019-10-11 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8372?focusedWorklogId=327106=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-327106
 ]

ASF GitHub Bot logged work on BEAM-8372:


Author: ASF GitHub Bot
Created on: 11/Oct/19 22:44
Start Date: 11/Oct/19 22:44
Worklog Time Spent: 10m 
  Work Description: ibzib commented on pull request #9775: [BEAM-8372] Job 
server submitting UberJars directly to Flink Runner.
URL: https://github.com/apache/beam/pull/9775#discussion_r334164163
 
 

 ##
 File path: 
sdks/python/apache_beam/runners/portability/flink_uber_jar_job_server.py
 ##
 @@ -0,0 +1,233 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""A job server submitting portable pipelines as uber jars to Flink."""
+
+from __future__ import absolute_import
+from __future__ import print_function
+
+import logging
+import os
+import shutil
+import tempfile
+import time
+import zipfile
+
+from google.protobuf import json_format
+import grpc
+import requests
+from concurrent import futures
+
+from apache_beam.portability.api import beam_artifact_api_pb2_grpc
+from apache_beam.portability.api import beam_job_api_pb2
+from apache_beam.portability.api import endpoints_pb2
+from apache_beam.runners.portability import abstract_job_service
+from apache_beam.runners.portability import artifact_service
+from apache_beam.runners.portability import job_server
+
+
+class FlinkUberJarJobServer(abstract_job_service.AbstractJobServiceServicer):
+
+  def __init__(self, master_url, executable_jar=None):
+super(FlinkUberJarJobServer, self).__init__()
+self._master_url = master_url
+self._executable_jar = executable_jar
+self._temp_dir = tempfile.mkdtemp(prefix='apache-beam-flink')
+
+  def start(self):
+return self
+
+  def stop(self):
+pass
+
+  def executable_jar(self):
+return self._executable_jar or job_server.JavaJarJobServer.local_jar(
+job_server.JavaJarJobServer.path_to_beam_jar(
+'runners:flink:%s:job-server:shadowJar' % self.flink_version()))
+
+  def flink_version(self):
+full_version = requests.get(
+'%s/v1/config' % self._master_url).json()['flink-version']
+# Only return up to minor version.
+return '.'.join(full_version.split('.')[:2])
+
+  def create_beam_job(self, job_id, job_name, pipeline, options):
+return FlinkBeamJob(
+self._master_url,
+self.executable_jar(),
+job_id,
+job_name,
+pipeline,
+options)
+
+
+class FlinkBeamJob(abstract_job_service.AbstractBeamJob):
+
+  # These must agree with those defined in PortablePipelineJarUtils.java.
+  PIPELINE_FOLDER_PATH = "BEAM-PIPELINE"
+  PIPELINE_PATH = PIPELINE_FOLDER_PATH + "/pipeline.json"
+  PIPELINE_OPTIONS_PATH = PIPELINE_FOLDER_PATH + "/pipeline-options.json"
+  ARTIFACT_STAGING_FOLDER_PATH = "BEAM-ARTIFACT-STAGING"
+  ARTIFACT_MANIFEST_PATH = (
+  ARTIFACT_STAGING_FOLDER_PATH + "/artifact-manifest.json")
+
+  def __init__(
+  self, master_url, executable_jar, job_id, job_name, pipeline, options):
+super(FlinkBeamJob, self).__init__(job_id, job_name, pipeline, options)
+self._master_url = master_url
+self._executable_jar = executable_jar
+self._jar_uploaded = False
+
+  def prepare(self):
+# Copy the executable jar, injecting the pipeline and options as resources.
+with tempfile.NamedTemporaryFile(suffix='.jar') as tout:
+  self._jar = tout.name
+shutil.copy(self._executable_jar, self._jar)
+with zipfile.ZipFile(self._jar, 'a', compression=zipfile.ZIP_DEFLATED) as 
z:
+  with z.open(self.PIPELINE_PATH, 'w') as fout:
+fout.write(json_format.MessageToJson(
+self._pipeline_proto).encode('utf-8'))
+  with z.open(self.PIPELINE_OPTIONS_PATH, 'w') as fout:
+fout.write(json_format.MessageToJson(
+self._pipeline_options).encode('utf-8'))
+self._start_artifact_service(self._jar)
+
+  def _start_artifact_service(self, jar):
+self._artifact_staging_service = artifact_service.ZipFileArtifactService(
+jar)
+self._artifact_staging_server = 

[jira] [Work logged] (BEAM-8372) Allow submission of Flink UberJar directly to flink cluster.

2019-10-11 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8372?focusedWorklogId=327112=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-327112
 ]

ASF GitHub Bot logged work on BEAM-8372:


Author: ASF GitHub Bot
Created on: 11/Oct/19 22:44
Start Date: 11/Oct/19 22:44
Worklog Time Spent: 10m 
  Work Description: ibzib commented on pull request #9775: [BEAM-8372] Job 
server submitting UberJars directly to Flink Runner.
URL: https://github.com/apache/beam/pull/9775#discussion_r334165761
 
 

 ##
 File path: sdks/python/apache_beam/runners/portability/abstract_job_service.py
 ##
 @@ -0,0 +1,140 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from __future__ import absolute_import
+
+import logging
+import uuid
+from builtins import object
+
+
+from apache_beam.portability.api import beam_job_api_pb2
+from apache_beam.portability.api import beam_job_api_pb2_grpc
+
+TERMINAL_STATES = [
+beam_job_api_pb2.JobState.DONE,
+beam_job_api_pb2.JobState.STOPPED,
+beam_job_api_pb2.JobState.FAILED,
+beam_job_api_pb2.JobState.CANCELLED,
+]
+
+
+class AbstractJobServiceServicer(beam_job_api_pb2_grpc.JobServiceServicer):
+  """Manages one or more pipelines, possibly concurrently.
+  Experimental: No backward compatibility guaranteed.
+  Servicer for the Beam Job API.
+  """
+  def __init__(self):
+self._jobs = {}
+
+  def create_beam_job(self, preparation_id, job_name, pipeline, options):
+"""Returns an instance of AbstractBeamJob specific to this servicer."""
+raise NotImplementedError(type(self))
+
+  def Prepare(self, request, context=None, timeout=None):
+# For now, just use the job name as the job id.
+logging.debug('Got Prepare request.')
+preparation_id = '%s-%s' % (request.job_name, uuid.uuid4())
+self._jobs[preparation_id] = self.create_beam_job(
+preparation_id,
+request.job_name,
+request.pipeline,
+request.pipeline_options)
+self._jobs[preparation_id].prepare()
+logging.debug("Prepared job '%s' as '%s'", request.job_name, 
preparation_id)
+return beam_job_api_pb2.PrepareJobResponse(
+preparation_id=preparation_id,
+artifact_staging_endpoint=self._jobs[
+preparation_id].artifact_staging_endpoint(),
+staging_session_token=preparation_id)
+
+  def Run(self, request, context=None, timeout=None):
+job_id = request.preparation_id
+logging.info("Runing job '%s'", job_id)
 
 Review comment:
   typo "Running"
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 327112)
Time Spent: 50m  (was: 40m)

> Allow submission of Flink UberJar directly to flink cluster.
> 
>
> Key: BEAM-8372
> URL: https://issues.apache.org/jira/browse/BEAM-8372
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-core
>Reporter: Robert Bradshaw
>Assignee: Robert Bradshaw
>Priority: Major
>  Time Spent: 50m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8372) Allow submission of Flink UberJar directly to flink cluster.

2019-10-11 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8372?focusedWorklogId=327113=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-327113
 ]

ASF GitHub Bot logged work on BEAM-8372:


Author: ASF GitHub Bot
Created on: 11/Oct/19 22:44
Start Date: 11/Oct/19 22:44
Worklog Time Spent: 10m 
  Work Description: ibzib commented on pull request #9775: [BEAM-8372] Job 
server submitting UberJars directly to Flink Runner.
URL: https://github.com/apache/beam/pull/9775#discussion_r334189131
 
 

 ##
 File path: sdks/python/apache_beam/runners/portability/abstract_job_service.py
 ##
 @@ -0,0 +1,139 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from __future__ import absolute_import
+
+import logging
+import uuid
+from builtins import object
+
+from apache_beam.portability.api import beam_job_api_pb2
+from apache_beam.portability.api import beam_job_api_pb2_grpc
+
+TERMINAL_STATES = [
+beam_job_api_pb2.JobState.DONE,
+beam_job_api_pb2.JobState.STOPPED,
+beam_job_api_pb2.JobState.FAILED,
+beam_job_api_pb2.JobState.CANCELLED,
+]
+
+
+class AbstractJobServiceServicer(beam_job_api_pb2_grpc.JobServiceServicer):
+  """Manages one or more pipelines, possibly concurrently.
+  Experimental: No backward compatibility guaranteed.
+  Servicer for the Beam Job API.
+  """
+  def __init__(self):
+self._jobs = {}
+
+  def create_beam_job(self, preparation_id, job_name, pipeline, options):
+"""Returns an instance of AbstractBeamJob specific to this servicer."""
+raise NotImplementedError(type(self))
+
+  def Prepare(self, request, context=None, timeout=None):
+# For now, just use the job name as the job id.
 
 Review comment:
   Strictly speaking, the job name would just be a prefix of the job id.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 327113)
Time Spent: 1h  (was: 50m)

> Allow submission of Flink UberJar directly to flink cluster.
> 
>
> Key: BEAM-8372
> URL: https://issues.apache.org/jira/browse/BEAM-8372
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-core
>Reporter: Robert Bradshaw
>Assignee: Robert Bradshaw
>Priority: Major
>  Time Spent: 1h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8372) Allow submission of Flink UberJar directly to flink cluster.

2019-10-11 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8372?focusedWorklogId=327109=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-327109
 ]

ASF GitHub Bot logged work on BEAM-8372:


Author: ASF GitHub Bot
Created on: 11/Oct/19 22:44
Start Date: 11/Oct/19 22:44
Worklog Time Spent: 10m 
  Work Description: ibzib commented on pull request #9775: [BEAM-8372] Job 
server submitting UberJars directly to Flink Runner.
URL: https://github.com/apache/beam/pull/9775#discussion_r334195194
 
 

 ##
 File path: sdks/python/apache_beam/runners/portability/artifact_service.py
 ##
 @@ -77,47 +87,120 @@ def PutArtifact(self, request_iterator, context=None):
 metadata = request.metadata.metadata
 retrieval_token = self.retrieval_token(
 request.metadata.staging_session_token)
-self._mkdir(retrieval_token)
-temp_path = filesystems.FileSystems.join(
-self._root,
-retrieval_token,
-'%x.tmp' % random.getrandbits(128))
-fout = filesystems.FileSystems.create(temp_path)
+artifact_path = self._artifact_path(retrieval_token, metadata.name)
+temp_path = self._temp_path(artifact_path)
+fout = self._open(temp_path, 'w')
 hasher = hashlib.sha256()
   else:
 hasher.update(request.data.data)
 fout.write(request.data.data)
 fout.close()
 data_hash = hasher.hexdigest()
 if metadata.sha256 and metadata.sha256 != data_hash:
-  filesystems.FileSystems.delete([temp_path])
+  self._delete(temp_path)
   raise ValueError('Bad metadata hash: %s vs %s' % (
-  metadata.metadata.sha256, data_hash))
-filesystems.FileSystems.rename(
-[temp_path], [self._artifact_path(retrieval_token, metadata.name)])
+  metadata.sha256, data_hash))
+self._rename(temp_path, artifact_path)
 return beam_artifact_api_pb2.PutArtifactResponse()
 
   def CommitManifest(self, request, context=None):
 retrieval_token = self.retrieval_token(request.staging_session_token)
-with filesystems.FileSystems.create(
-self._manifest_path(retrieval_token)) as fout:
-  fout.write(request.manifest.SerializeToString())
+proxy_manifest = beam_artifact_api_pb2.ProxyManifest(
+manifest=request.manifest,
+location=[
+beam_artifact_api_pb2.ProxyManifest.Location(
+name=metadata.name,
+uri=self._artifact_path(retrieval_token, metadata.name))
+for metadata in request.manifest.artifact])
+with self._open(self._manifest_path(retrieval_token), 'w') as fout:
+  fout.write(json_format.MessageToJson(proxy_manifest).encode('utf-8'))
 return beam_artifact_api_pb2.CommitManifestResponse(
 retrieval_token=retrieval_token)
 
   def GetManifest(self, request, context=None):
-with filesystems.FileSystems.open(
-self._manifest_path(request.retrieval_token)) as fin:
-  return beam_artifact_api_pb2.GetManifestResponse(
-  manifest=beam_artifact_api_pb2.Manifest.FromString(
-  fin.read()))
+return beam_artifact_api_pb2.GetManifestResponse(
+manifest=self._get_manifest_proxy(request.retrieval_token).manifest)
 
   def GetArtifact(self, request, context=None):
-with filesystems.FileSystems.open(
-self._artifact_path(request.retrieval_token, request.name)) as fin:
-  # This value is not emitted, but lets us yield a single empty
-  # chunk on an empty file.
-  chunk = True
-  while chunk:
-chunk = fin.read(self._chunk_size)
-yield beam_artifact_api_pb2.ArtifactChunk(data=chunk)
+for artifact in self._get_manifest_proxy(request.retrieval_token).location:
+  if artifact.name == request.name:
+with self._open(artifact.uri, 'r') as fin:
+  # This value is not emitted, but lets us yield a single empty
+  # chunk on an empty file.
+  chunk = True
+  while chunk:
+chunk = fin.read(self._chunk_size)
+yield beam_artifact_api_pb2.ArtifactChunk(data=chunk)
+break
+else:
+  raise ValueError('Unknown artifact: %s' % request.name)
+
+
+class ZipFileArtifactService(AbstractArtifactService):
 
 Review comment:
   If this is the piece that depends on Python 3.6, could you document that 
here? And maybe add a check if you think it's needed?
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 327109)
Time Spent: 50m  (was: 40m)

> Allow submission of Flink UberJar directly to flink 

[jira] [Work logged] (BEAM-8372) Allow submission of Flink UberJar directly to flink cluster.

2019-10-11 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8372?focusedWorklogId=327105=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-327105
 ]

ASF GitHub Bot logged work on BEAM-8372:


Author: ASF GitHub Bot
Created on: 11/Oct/19 22:44
Start Date: 11/Oct/19 22:44
Worklog Time Spent: 10m 
  Work Description: ibzib commented on pull request #9775: [BEAM-8372] Job 
server submitting UberJars directly to Flink Runner.
URL: https://github.com/apache/beam/pull/9775#discussion_r334156913
 
 

 ##
 File path: 
sdks/python/apache_beam/runners/portability/flink_uber_jar_job_server.py
 ##
 @@ -0,0 +1,233 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""A job server submitting portable pipelines as uber jars to Flink."""
+
+from __future__ import absolute_import
+from __future__ import print_function
+
+import logging
+import os
+import shutil
+import tempfile
+import time
+import zipfile
+
+from google.protobuf import json_format
+import grpc
+import requests
+from concurrent import futures
+
+from apache_beam.portability.api import beam_artifact_api_pb2_grpc
+from apache_beam.portability.api import beam_job_api_pb2
+from apache_beam.portability.api import endpoints_pb2
+from apache_beam.runners.portability import abstract_job_service
+from apache_beam.runners.portability import artifact_service
+from apache_beam.runners.portability import job_server
+
+
+class FlinkUberJarJobServer(abstract_job_service.AbstractJobServiceServicer):
+
+  def __init__(self, master_url, executable_jar=None):
+super(FlinkUberJarJobServer, self).__init__()
+self._master_url = master_url
+self._executable_jar = executable_jar
+self._temp_dir = tempfile.mkdtemp(prefix='apache-beam-flink')
+
+  def start(self):
+return self
+
+  def stop(self):
+pass
+
+  def executable_jar(self):
+return self._executable_jar or job_server.JavaJarJobServer.local_jar(
+job_server.JavaJarJobServer.path_to_beam_jar(
+'runners:flink:%s:job-server:shadowJar' % self.flink_version()))
+
+  def flink_version(self):
+full_version = requests.get(
+'%s/v1/config' % self._master_url).json()['flink-version']
+# Only return up to minor version.
+return '.'.join(full_version.split('.')[:2])
+
+  def create_beam_job(self, job_id, job_name, pipeline, options):
+return FlinkBeamJob(
+self._master_url,
+self.executable_jar(),
+job_id,
+job_name,
+pipeline,
+options)
+
+
+class FlinkBeamJob(abstract_job_service.AbstractBeamJob):
+
+  # These must agree with those defined in PortablePipelineJarUtils.java.
 
 Review comment:
   FYI, I have an open PR #9752 that would change the jar structure.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 327105)
Time Spent: 0.5h  (was: 20m)

> Allow submission of Flink UberJar directly to flink cluster.
> 
>
> Key: BEAM-8372
> URL: https://issues.apache.org/jira/browse/BEAM-8372
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-core
>Reporter: Robert Bradshaw
>Assignee: Robert Bradshaw
>Priority: Major
>  Time Spent: 0.5h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8372) Allow submission of Flink UberJar directly to flink cluster.

2019-10-11 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8372?focusedWorklogId=327110=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-327110
 ]

ASF GitHub Bot logged work on BEAM-8372:


Author: ASF GitHub Bot
Created on: 11/Oct/19 22:44
Start Date: 11/Oct/19 22:44
Worklog Time Spent: 10m 
  Work Description: ibzib commented on pull request #9775: [BEAM-8372] Job 
server submitting UberJars directly to Flink Runner.
URL: https://github.com/apache/beam/pull/9775#discussion_r334190129
 
 

 ##
 File path: sdks/python/apache_beam/runners/portability/artifact_service.py
 ##
 @@ -23,51 +23,61 @@
 from __future__ import print_function
 
 import hashlib
-import random
-import re
+import threading
+import zipfile
+
+from google.protobuf import json_format
 
 from apache_beam.io import filesystems
 from apache_beam.portability.api import beam_artifact_api_pb2
 from apache_beam.portability.api import beam_artifact_api_pb2_grpc
 
 
-class BeamFilesystemArtifactService(
+class AbstractArtifactService(
 beam_artifact_api_pb2_grpc.ArtifactStagingServiceServicer,
 beam_artifact_api_pb2_grpc.ArtifactRetrievalServiceServicer):
 
   _DEFAULT_CHUNK_SIZE = 2 << 20  # 2mb
 
-  def __init__(self, root, chunk_size=_DEFAULT_CHUNK_SIZE):
+  def __init__(self, root, chunk_size=None):
 
 Review comment:
   I don't see the difference here.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 327110)
Time Spent: 50m  (was: 40m)

> Allow submission of Flink UberJar directly to flink cluster.
> 
>
> Key: BEAM-8372
> URL: https://issues.apache.org/jira/browse/BEAM-8372
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-core
>Reporter: Robert Bradshaw
>Assignee: Robert Bradshaw
>Priority: Major
>  Time Spent: 50m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8372) Allow submission of Flink UberJar directly to flink cluster.

2019-10-11 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8372?focusedWorklogId=327107=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-327107
 ]

ASF GitHub Bot logged work on BEAM-8372:


Author: ASF GitHub Bot
Created on: 11/Oct/19 22:44
Start Date: 11/Oct/19 22:44
Worklog Time Spent: 10m 
  Work Description: ibzib commented on pull request #9775: [BEAM-8372] Job 
server submitting UberJars directly to Flink Runner.
URL: https://github.com/apache/beam/pull/9775#discussion_r334192456
 
 

 ##
 File path: sdks/python/apache_beam/runners/portability/flink_runner.py
 ##
 @@ -29,7 +32,13 @@
 
 class FlinkRunner(portable_runner.PortableRunner):
   def default_job_server(self, options):
-return job_server.StopOnExitJobServer(FlinkJarJobServer(options))
+flink_master_url = options.view_as(FlinkRunnerOptions).flink_master_url
+flink_master_url = 'http://localhost:8081'
 
 Review comment:
   This looks wrong.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 327107)
Time Spent: 40m  (was: 0.5h)

> Allow submission of Flink UberJar directly to flink cluster.
> 
>
> Key: BEAM-8372
> URL: https://issues.apache.org/jira/browse/BEAM-8372
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-core
>Reporter: Robert Bradshaw
>Assignee: Robert Bradshaw
>Priority: Major
>  Time Spent: 40m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8372) Allow submission of Flink UberJar directly to flink cluster.

2019-10-11 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8372?focusedWorklogId=327108=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-327108
 ]

ASF GitHub Bot logged work on BEAM-8372:


Author: ASF GitHub Bot
Created on: 11/Oct/19 22:44
Start Date: 11/Oct/19 22:44
Worklog Time Spent: 10m 
  Work Description: ibzib commented on pull request #9775: [BEAM-8372] Job 
server submitting UberJars directly to Flink Runner.
URL: https://github.com/apache/beam/pull/9775#discussion_r334194808
 
 

 ##
 File path: sdks/python/apache_beam/runners/portability/artifact_service.py
 ##
 @@ -77,47 +87,120 @@ def PutArtifact(self, request_iterator, context=None):
 metadata = request.metadata.metadata
 retrieval_token = self.retrieval_token(
 request.metadata.staging_session_token)
-self._mkdir(retrieval_token)
-temp_path = filesystems.FileSystems.join(
-self._root,
-retrieval_token,
-'%x.tmp' % random.getrandbits(128))
-fout = filesystems.FileSystems.create(temp_path)
+artifact_path = self._artifact_path(retrieval_token, metadata.name)
+temp_path = self._temp_path(artifact_path)
+fout = self._open(temp_path, 'w')
 hasher = hashlib.sha256()
   else:
 hasher.update(request.data.data)
 fout.write(request.data.data)
 fout.close()
 data_hash = hasher.hexdigest()
 if metadata.sha256 and metadata.sha256 != data_hash:
-  filesystems.FileSystems.delete([temp_path])
+  self._delete(temp_path)
   raise ValueError('Bad metadata hash: %s vs %s' % (
-  metadata.metadata.sha256, data_hash))
-filesystems.FileSystems.rename(
-[temp_path], [self._artifact_path(retrieval_token, metadata.name)])
+  metadata.sha256, data_hash))
+self._rename(temp_path, artifact_path)
 return beam_artifact_api_pb2.PutArtifactResponse()
 
   def CommitManifest(self, request, context=None):
 retrieval_token = self.retrieval_token(request.staging_session_token)
-with filesystems.FileSystems.create(
-self._manifest_path(retrieval_token)) as fout:
-  fout.write(request.manifest.SerializeToString())
+proxy_manifest = beam_artifact_api_pb2.ProxyManifest(
+manifest=request.manifest,
+location=[
+beam_artifact_api_pb2.ProxyManifest.Location(
+name=metadata.name,
+uri=self._artifact_path(retrieval_token, metadata.name))
+for metadata in request.manifest.artifact])
+with self._open(self._manifest_path(retrieval_token), 'w') as fout:
+  fout.write(json_format.MessageToJson(proxy_manifest).encode('utf-8'))
 return beam_artifact_api_pb2.CommitManifestResponse(
 retrieval_token=retrieval_token)
 
   def GetManifest(self, request, context=None):
-with filesystems.FileSystems.open(
-self._manifest_path(request.retrieval_token)) as fin:
-  return beam_artifact_api_pb2.GetManifestResponse(
-  manifest=beam_artifact_api_pb2.Manifest.FromString(
-  fin.read()))
+return beam_artifact_api_pb2.GetManifestResponse(
+manifest=self._get_manifest_proxy(request.retrieval_token).manifest)
 
   def GetArtifact(self, request, context=None):
-with filesystems.FileSystems.open(
-self._artifact_path(request.retrieval_token, request.name)) as fin:
-  # This value is not emitted, but lets us yield a single empty
-  # chunk on an empty file.
-  chunk = True
-  while chunk:
-chunk = fin.read(self._chunk_size)
-yield beam_artifact_api_pb2.ArtifactChunk(data=chunk)
+for artifact in self._get_manifest_proxy(request.retrieval_token).location:
+  if artifact.name == request.name:
+with self._open(artifact.uri, 'r') as fin:
+  # This value is not emitted, but lets us yield a single empty
+  # chunk on an empty file.
+  chunk = True
+  while chunk:
+chunk = fin.read(self._chunk_size)
+yield beam_artifact_api_pb2.ArtifactChunk(data=chunk)
+break
+else:
+  raise ValueError('Unknown artifact: %s' % request.name)
+
+
+class ZipFileArtifactService(AbstractArtifactService):
 
 Review comment:
   Can you add a docstring?
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 327108)
Time Spent: 50m  (was: 40m)

> Allow submission of Flink UberJar directly to flink cluster.
> 
>
> Key: BEAM-8372
>  

[jira] [Work logged] (BEAM-8372) Allow submission of Flink UberJar directly to flink cluster.

2019-10-11 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8372?focusedWorklogId=327032=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-327032
 ]

ASF GitHub Bot logged work on BEAM-8372:


Author: ASF GitHub Bot
Created on: 11/Oct/19 18:35
Start Date: 11/Oct/19 18:35
Worklog Time Spent: 10m 
  Work Description: robertwb commented on issue #9775: [BEAM-8372] Job 
server submitting UberJars directly to Flink Runner.
URL: https://github.com/apache/beam/pull/9775#issuecomment-541176830
 
 
   CC: @mxm 
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 327032)
Time Spent: 20m  (was: 10m)

> Allow submission of Flink UberJar directly to flink cluster.
> 
>
> Key: BEAM-8372
> URL: https://issues.apache.org/jira/browse/BEAM-8372
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-core
>Reporter: Robert Bradshaw
>Assignee: Robert Bradshaw
>Priority: Major
>  Time Spent: 20m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8372) Allow submission of Flink UberJar directly to flink cluster.

2019-10-11 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8372?focusedWorklogId=327031=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-327031
 ]

ASF GitHub Bot logged work on BEAM-8372:


Author: ASF GitHub Bot
Created on: 11/Oct/19 18:35
Start Date: 11/Oct/19 18:35
Worklog Time Spent: 10m 
  Work Description: robertwb commented on pull request #9775: [BEAM-8372] 
Job server submitting UberJars directly to Flink Runner.
URL: https://github.com/apache/beam/pull/9775
 
 
   One can now use the FlinkRunner with the --flink_master_url=flink-master-url
   and an UberJar will automatically be created with the artifacts and pipeline
   definition, uploaded to the master, and started.
   
   
   
   Thank you for your contribution! Follow this checklist to help us 
incorporate your contribution quickly and easily:
   
- [ ] [**Choose 
reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and 
mention them in a comment (`R: @username`).
- [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in 
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA 
issue, if applicable. This will automatically link the pull request to the 
issue.
- [ ] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   Post-Commit Tests Status (on master branch)
   

   
   Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
   --- | --- | --- | --- | --- | --- | --- | ---
   Go | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/)
 | --- | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/)
 | --- | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
   Java | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)
   Python | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)[![Build