[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container

2018-10-12 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=153903&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-153903
 ]

ASF GitHub Bot logged work on BEAM-4130:


Author: ASF GitHub Bot
Created on: 12/Oct/18 15:01
Start Date: 12/Oct/18 15:01
Worklog Time Spent: 10m 
  Work Description: mxm opened a new pull request #6672: [BEAM-4130] Add 
tests for FlinkJobServerDriver
URL: https://github.com/apache/beam/pull/6672
 
 
   This adds a few test cases for FlinkJobServerDriver. It also changes the 
default
   host from empty host to `localhost`.
   
   CC @angoenka @tweise 
   
   Post-Commit Tests Status (on master branch)
   

   
   Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
   --- | --- | --- | --- | --- | --- | --- | ---
   Go | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_GradleBuild/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_GradleBuild/lastCompletedBuild/)
 | --- | --- | --- | --- | --- | ---
   Java | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_GradleBuild/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_GradleBuild/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex_Gradle/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Gradle/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Gradle/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump_Gradle/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza_Gradle/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark_Gradle/lastCompletedBuild/)
   Python | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/)
 | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)
  [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/)
 | --- | --- | ---
   
   
   
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 153903)
Time Spent: 11h 40m  (was: 11.5h)

> Portable Flink runner JobService entry point in a Docker container
> --
>
> Key: BEAM-4130
> URL: https://issues.apache.org/jira/browse/BEAM-4130
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-flink
>Reporter: Ben Sidhom
>Assignee: Maximilian Michels
>Priority: Minor
> Fix For: 2.7.0
>
>  Time Spent: 11h 40m
>  Remaining Estimate: 0h
>
> The portable Flink runner exists as a Job Service that runs somewhere. We 
> need a main entry point that itself spins up the job service (and artifact 
> staging service). The main program itself should be packaged into an uberjar 
> such that it can be run locally or submitted to a Flink deployment via `flink 
> run`.



--
This message was sent by

[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container

2018-10-09 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=152635&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-152635
 ]

ASF GitHub Bot logged work on BEAM-4130:


Author: ASF GitHub Bot
Created on: 09/Oct/18 11:37
Start Date: 09/Oct/18 11:37
Worklog Time Spent: 10m 
  Work Description: mxm closed pull request #6607: [BEAM-4130] Use port 0 
to pick dynamic port-BugFix
URL: https://github.com/apache/beam/pull/6607
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java
index 2096cb98550..34f2edb5abb 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java
@@ -17,7 +17,6 @@
  */
 package org.apache.beam.runners.flink;
 
-import com.google.common.base.Strings;
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
@@ -201,7 +200,7 @@ public void stop() {
   private GrpcFnServer createJobServer() throws 
IOException {
 InMemoryJobService service = createJobService();
 GrpcFnServer jobServiceGrpcFnServer;
-if (Strings.isNullOrEmpty(configuration.host)) {
+if (configuration.port == 0) {
   jobServiceGrpcFnServer = GrpcFnServer.allocatePortAndCreateFor(service, 
jobServerFactory);
 } else {
   Endpoints.ApiServiceDescriptor descriptor =
@@ -239,7 +238,7 @@ private InMemoryJobService createJobService() throws 
IOException {
   throws IOException {
 BeamFileSystemArtifactStagingService service = new 
BeamFileSystemArtifactStagingService();
 final GrpcFnServer 
artifactStagingService;
-if (Strings.isNullOrEmpty(configuration.host)) {
+if (configuration.artifactPort == 0) {
   artifactStagingService =
   GrpcFnServer.allocatePortAndCreateFor(service, 
artifactServerFactory);
 } else {


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 152635)
Time Spent: 11.5h  (was: 11h 20m)

> Portable Flink runner JobService entry point in a Docker container
> --
>
> Key: BEAM-4130
> URL: https://issues.apache.org/jira/browse/BEAM-4130
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-flink
>Reporter: Ben Sidhom
>Assignee: Maximilian Michels
>Priority: Minor
> Fix For: 2.7.0
>
>  Time Spent: 11.5h
>  Remaining Estimate: 0h
>
> The portable Flink runner exists as a Job Service that runs somewhere. We 
> need a main entry point that itself spins up the job service (and artifact 
> staging service). The main program itself should be packaged into an uberjar 
> such that it can be run locally or submitted to a Flink deployment via `flink 
> run`.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container

2018-10-09 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=152632&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-152632
 ]

ASF GitHub Bot logged work on BEAM-4130:


Author: ASF GitHub Bot
Created on: 09/Oct/18 11:36
Start Date: 09/Oct/18 11:36
Worklog Time Spent: 10m 
  Work Description: mxm commented on a change in pull request #6607: 
[BEAM-4130] Use port 0 to pick dynamic port-BugFix
URL: https://github.com/apache/beam/pull/6607#discussion_r223663160
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java
 ##
 @@ -201,7 +201,7 @@ public void stop() {
   private GrpcFnServer createJobServer() throws 
IOException {
 InMemoryJobService service = createJobService();
 GrpcFnServer jobServiceGrpcFnServer;
-if (Strings.isNullOrEmpty(configuration.host)) {
+if (configuration.port == 0) {
 
 Review comment:
   Ok, let's do that in a follow-up.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 152632)
Time Spent: 11h 20m  (was: 11h 10m)

> Portable Flink runner JobService entry point in a Docker container
> --
>
> Key: BEAM-4130
> URL: https://issues.apache.org/jira/browse/BEAM-4130
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-flink
>Reporter: Ben Sidhom
>Assignee: Maximilian Michels
>Priority: Minor
> Fix For: 2.7.0
>
>  Time Spent: 11h 20m
>  Remaining Estimate: 0h
>
> The portable Flink runner exists as a Job Service that runs somewhere. We 
> need a main entry point that itself spins up the job service (and artifact 
> staging service). The main program itself should be packaged into an uberjar 
> such that it can be run locally or submitted to a Flink deployment via `flink 
> run`.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container

2018-10-08 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=152413&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-152413
 ]

ASF GitHub Bot logged work on BEAM-4130:


Author: ASF GitHub Bot
Created on: 08/Oct/18 21:29
Start Date: 08/Oct/18 21:29
Worklog Time Spent: 10m 
  Work Description: angoenka commented on a change in pull request #6607: 
[BEAM-4130] Use port 0 to pick dynamic port-BugFix
URL: https://github.com/apache/beam/pull/6607#discussion_r223502676
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java
 ##
 @@ -201,7 +201,7 @@ public void stop() {
   private GrpcFnServer createJobServer() throws 
IOException {
 InMemoryJobService service = createJobService();
 GrpcFnServer jobServiceGrpcFnServer;
-if (Strings.isNullOrEmpty(configuration.host)) {
+if (configuration.port == 0) {
 
 Review comment:
   Yes, I think we should but there is no method as of now to do so. I think we 
should rethink and refactor GrpcFnServer.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 152413)
Time Spent: 11h 10m  (was: 11h)

> Portable Flink runner JobService entry point in a Docker container
> --
>
> Key: BEAM-4130
> URL: https://issues.apache.org/jira/browse/BEAM-4130
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-flink
>Reporter: Ben Sidhom
>Assignee: Maximilian Michels
>Priority: Minor
> Fix For: 2.7.0
>
>  Time Spent: 11h 10m
>  Remaining Estimate: 0h
>
> The portable Flink runner exists as a Job Service that runs somewhere. We 
> need a main entry point that itself spins up the job service (and artifact 
> staging service). The main program itself should be packaged into an uberjar 
> such that it can be run locally or submitted to a Flink deployment via `flink 
> run`.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container

2018-10-08 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=152390&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-152390
 ]

ASF GitHub Bot logged work on BEAM-4130:


Author: ASF GitHub Bot
Created on: 08/Oct/18 20:29
Start Date: 08/Oct/18 20:29
Worklog Time Spent: 10m 
  Work Description: mxm commented on a change in pull request #6607: 
[BEAM-4130] Use port 0 to pick dynamic port-BugFix
URL: https://github.com/apache/beam/pull/6607#discussion_r223488123
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java
 ##
 @@ -201,7 +201,7 @@ public void stop() {
   private GrpcFnServer createJobServer() throws 
IOException {
 InMemoryJobService service = createJobService();
 GrpcFnServer jobServiceGrpcFnServer;
-if (Strings.isNullOrEmpty(configuration.host)) {
+if (configuration.port == 0) {
 
 Review comment:
   Yes, that is better. Do you think we could also respect the host name when 
the port is 0? As of now, it would get ignored.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 152390)
Time Spent: 11h  (was: 10h 50m)

> Portable Flink runner JobService entry point in a Docker container
> --
>
> Key: BEAM-4130
> URL: https://issues.apache.org/jira/browse/BEAM-4130
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-flink
>Reporter: Ben Sidhom
>Assignee: Maximilian Michels
>Priority: Minor
> Fix For: 2.7.0
>
>  Time Spent: 11h
>  Remaining Estimate: 0h
>
> The portable Flink runner exists as a Job Service that runs somewhere. We 
> need a main entry point that itself spins up the job service (and artifact 
> staging service). The main program itself should be packaged into an uberjar 
> such that it can be run locally or submitted to a Flink deployment via `flink 
> run`.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container

2018-10-08 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=152378&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-152378
 ]

ASF GitHub Bot logged work on BEAM-4130:


Author: ASF GitHub Bot
Created on: 08/Oct/18 19:50
Start Date: 08/Oct/18 19:50
Worklog Time Spent: 10m 
  Work Description: angoenka commented on a change in pull request #6607: 
[BEAM-4130] Use port 0 to pick dynamic port-BugFix
URL: https://github.com/apache/beam/pull/6607#discussion_r223478440
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java
 ##
 @@ -201,7 +201,7 @@ public void stop() {
   private GrpcFnServer createJobServer() throws 
IOException {
 InMemoryJobService service = createJobService();
 GrpcFnServer jobServiceGrpcFnServer;
-if (Strings.isNullOrEmpty(configuration.host)) {
+if (configuration.port == 0) {
 
 Review comment:
   I think this makes the behavior less consistent.
   Example scenarios:
   When host=null : JobPort Dynamic, ArtifactPort Dynamic
   When host=localhost: JobPort 8099, ArtifactPort  8098
   When host=localhost, jobPort=0: JobPort dynamic, ArtifactPort  8098
   
   Just checking if the JobPort is 0 will remove this confusion


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 152378)
Time Spent: 10h 50m  (was: 10h 40m)

> Portable Flink runner JobService entry point in a Docker container
> --
>
> Key: BEAM-4130
> URL: https://issues.apache.org/jira/browse/BEAM-4130
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-flink
>Reporter: Ben Sidhom
>Assignee: Maximilian Michels
>Priority: Minor
> Fix For: 2.7.0
>
>  Time Spent: 10h 50m
>  Remaining Estimate: 0h
>
> The portable Flink runner exists as a Job Service that runs somewhere. We 
> need a main entry point that itself spins up the job service (and artifact 
> staging service). The main program itself should be packaged into an uberjar 
> such that it can be run locally or submitted to a Flink deployment via `flink 
> run`.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container

2018-10-08 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=152377&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-152377
 ]

ASF GitHub Bot logged work on BEAM-4130:


Author: ASF GitHub Bot
Created on: 08/Oct/18 19:49
Start Date: 08/Oct/18 19:49
Worklog Time Spent: 10m 
  Work Description: angoenka commented on a change in pull request #6607: 
[BEAM-4130] Use port 0 to pick dynamic port-BugFix
URL: https://github.com/apache/beam/pull/6607#discussion_r223478440
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java
 ##
 @@ -201,7 +201,7 @@ public void stop() {
   private GrpcFnServer createJobServer() throws 
IOException {
 InMemoryJobService service = createJobService();
 GrpcFnServer jobServiceGrpcFnServer;
-if (Strings.isNullOrEmpty(configuration.host)) {
+if (configuration.port == 0) {
 
 Review comment:
   This makes the behavior less consistent.
   Example scenarios:
   When host=null : JobPort Dynamic, ArtifactPort Dynamic
   When host=localhost: JobPort 8099, ArtifactPort  8098
   When host=localhost, jobPort=0: JobPort dynamic, ArtifactPort  8098
   
   Just checking if the JobPort is 0 will remove this confusion


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 152377)
Time Spent: 10h 40m  (was: 10.5h)

> Portable Flink runner JobService entry point in a Docker container
> --
>
> Key: BEAM-4130
> URL: https://issues.apache.org/jira/browse/BEAM-4130
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-flink
>Reporter: Ben Sidhom
>Assignee: Maximilian Michels
>Priority: Minor
> Fix For: 2.7.0
>
>  Time Spent: 10h 40m
>  Remaining Estimate: 0h
>
> The portable Flink runner exists as a Job Service that runs somewhere. We 
> need a main entry point that itself spins up the job service (and artifact 
> staging service). The main program itself should be packaged into an uberjar 
> such that it can be run locally or submitted to a Flink deployment via `flink 
> run`.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container

2018-10-08 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=152376&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-152376
 ]

ASF GitHub Bot logged work on BEAM-4130:


Author: ASF GitHub Bot
Created on: 08/Oct/18 19:45
Start Date: 08/Oct/18 19:45
Worklog Time Spent: 10m 
  Work Description: mxm commented on a change in pull request #6607: 
[BEAM-4130] Use port 0 to pick dynamic port-BugFix
URL: https://github.com/apache/beam/pull/6607#discussion_r223477264
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java
 ##
 @@ -239,7 +239,7 @@ private InMemoryJobService createJobService() throws 
IOException {
   throws IOException {
 BeamFileSystemArtifactStagingService service = new 
BeamFileSystemArtifactStagingService();
 final GrpcFnServer 
artifactStagingService;
-if (Strings.isNullOrEmpty(configuration.host)) {
+if (configuration.artifactPort == 0) {
 
 Review comment:
   Should this be `Strings.isNullOrEmpty(configuration.host) || 
configuration.port == 0`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 152376)
Time Spent: 10.5h  (was: 10h 20m)

> Portable Flink runner JobService entry point in a Docker container
> --
>
> Key: BEAM-4130
> URL: https://issues.apache.org/jira/browse/BEAM-4130
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-flink
>Reporter: Ben Sidhom
>Assignee: Maximilian Michels
>Priority: Minor
> Fix For: 2.7.0
>
>  Time Spent: 10.5h
>  Remaining Estimate: 0h
>
> The portable Flink runner exists as a Job Service that runs somewhere. We 
> need a main entry point that itself spins up the job service (and artifact 
> staging service). The main program itself should be packaged into an uberjar 
> such that it can be run locally or submitted to a Flink deployment via `flink 
> run`.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container

2018-10-08 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=152375&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-152375
 ]

ASF GitHub Bot logged work on BEAM-4130:


Author: ASF GitHub Bot
Created on: 08/Oct/18 19:45
Start Date: 08/Oct/18 19:45
Worklog Time Spent: 10m 
  Work Description: mxm commented on a change in pull request #6607: 
[BEAM-4130] Use port 0 to pick dynamic port-BugFix
URL: https://github.com/apache/beam/pull/6607#discussion_r223477139
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java
 ##
 @@ -201,7 +201,7 @@ public void stop() {
   private GrpcFnServer createJobServer() throws 
IOException {
 InMemoryJobService service = createJobService();
 GrpcFnServer jobServiceGrpcFnServer;
-if (Strings.isNullOrEmpty(configuration.host)) {
+if (configuration.port == 0) {
 
 Review comment:
   Should this be `Strings.isNullOrEmpty(configuration.host) || 
configuration.port == 0`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 152375)
Time Spent: 10h 20m  (was: 10h 10m)

> Portable Flink runner JobService entry point in a Docker container
> --
>
> Key: BEAM-4130
> URL: https://issues.apache.org/jira/browse/BEAM-4130
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-flink
>Reporter: Ben Sidhom
>Assignee: Maximilian Michels
>Priority: Minor
> Fix For: 2.7.0
>
>  Time Spent: 10h 20m
>  Remaining Estimate: 0h
>
> The portable Flink runner exists as a Job Service that runs somewhere. We 
> need a main entry point that itself spins up the job service (and artifact 
> staging service). The main program itself should be packaged into an uberjar 
> such that it can be run locally or submitted to a Flink deployment via `flink 
> run`.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container

2018-10-08 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=152368&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-152368
 ]

ASF GitHub Bot logged work on BEAM-4130:


Author: ASF GitHub Bot
Created on: 08/Oct/18 19:41
Start Date: 08/Oct/18 19:41
Worklog Time Spent: 10m 
  Work Description: angoenka commented on issue #6607: [BEAM-4130] Use port 
0 to pick dynamic port-BugFix
URL: https://github.com/apache/beam/pull/6607#issuecomment-427955480
 
 
   R: @mxm 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 152368)
Time Spent: 10h 10m  (was: 10h)

> Portable Flink runner JobService entry point in a Docker container
> --
>
> Key: BEAM-4130
> URL: https://issues.apache.org/jira/browse/BEAM-4130
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-flink
>Reporter: Ben Sidhom
>Assignee: Maximilian Michels
>Priority: Minor
> Fix For: 2.7.0
>
>  Time Spent: 10h 10m
>  Remaining Estimate: 0h
>
> The portable Flink runner exists as a Job Service that runs somewhere. We 
> need a main entry point that itself spins up the job service (and artifact 
> staging service). The main program itself should be packaged into an uberjar 
> such that it can be run locally or submitted to a Flink deployment via `flink 
> run`.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container

2018-10-08 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=152367&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-152367
 ]

ASF GitHub Bot logged work on BEAM-4130:


Author: ASF GitHub Bot
Created on: 08/Oct/18 19:40
Start Date: 08/Oct/18 19:40
Worklog Time Spent: 10m 
  Work Description: angoenka opened a new pull request #6607: [BEAM-4130] 
Use port 0 to pick dynamic port-BugFix
URL: https://github.com/apache/beam/pull/6607
 
 
   **Please** add a meaningful description for your change here
   
   
   
   Follow this checklist to help us incorporate your contribution quickly and 
easily:
   
- [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in 
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA 
issue, if applicable. This will automatically link the pull request to the 
issue.
- [ ] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   It will help us expedite review of your Pull Request if you tag someone 
(e.g. `@username`) to look at it.
   
   Post-Commit Tests Status (on master branch)
   

   
   Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
   --- | --- | --- | --- | --- | --- | --- | ---
   Go | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_GradleBuild/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_GradleBuild/lastCompletedBuild/)
 | --- | --- | --- | --- | --- | ---
   Java | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_GradleBuild/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_GradleBuild/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex_Gradle/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Gradle/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Gradle/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump_Gradle/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza_Gradle/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark_Gradle/lastCompletedBuild/)
   Python | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/)
 | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)
  [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/)
 | --- | --- | ---
   
   
   
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 152367)
Time Spent: 10h  (was: 9h 50m)

> Portable Flink runner JobService entry point in a Docker container
> --
>
> Key: BEAM-4130
> URL: https://issues.apache.org/jira/browse/BEAM-4130
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-flink
>

[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container

2018-10-02 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=150334&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-150334
 ]

ASF GitHub Bot logged work on BEAM-4130:


Author: ASF GitHub Bot
Created on: 02/Oct/18 10:28
Start Date: 02/Oct/18 10:28
Worklog Time Spent: 10m 
  Work Description: mxm closed pull request #6341: [BEAM-4130] Updating 
FlinkJobServerDriver for port 0 usage
URL: https://github.com/apache/beam/pull/6341
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java
index 0f567f6c6d5..7b0e55f70e6 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java
@@ -55,10 +55,16 @@
 @Option(name = "--job-host", usage = "The job server host name")
 private String host = "";
 
-@Option(name = "--job-port", usage = "The job service port. (Default: 
8099)")
+@Option(
+  name = "--job-port",
+  usage = "The job service port. 0 to use a dynamic port. (Default: 8099)"
+)
 private int port = 8099;
 
-@Option(name = "--artifact-port", usage = "The artifact service port. 
(Default: 8098)")
+@Option(
+  name = "--artifact-port",
+  usage = "The artifact service port. 0 to use a dynamic port. (Default: 
8098)"
+)
 private int artifactPort = 8098;
 
 @Option(name = "--artifacts-dir", usage = "The location to store staged 
artifact files")


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 150334)
Time Spent: 9h 50m  (was: 9h 40m)

> Portable Flink runner JobService entry point in a Docker container
> --
>
> Key: BEAM-4130
> URL: https://issues.apache.org/jira/browse/BEAM-4130
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-flink
>Reporter: Ben Sidhom
>Assignee: Maximilian Michels
>Priority: Minor
> Fix For: 2.7.0
>
>  Time Spent: 9h 50m
>  Remaining Estimate: 0h
>
> The portable Flink runner exists as a Job Service that runs somewhere. We 
> need a main entry point that itself spins up the job service (and artifact 
> staging service). The main program itself should be packaged into an uberjar 
> such that it can be run locally or submitted to a Flink deployment via `flink 
> run`.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container

2018-09-22 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=146762&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-146762
 ]

ASF GitHub Bot logged work on BEAM-4130:


Author: ASF GitHub Bot
Created on: 22/Sep/18 11:30
Start Date: 22/Sep/18 11:30
Worklog Time Spent: 10m 
  Work Description: mxm commented on a change in pull request #6341: 
[BEAM-4130] Fixing bug in flink job server creation
URL: https://github.com/apache/beam/pull/6341#discussion_r219669077
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java
 ##
 @@ -107,12 +107,19 @@ public static FlinkJobServerDriver 
fromConfig(ServerConfiguration configuration)
 new 
ThreadFactoryBuilder().setNameFormat("flink-runner-job-server").setDaemon(true).build();
 ListeningExecutorService executor =
 
MoreExecutors.listeningDecorator(Executors.newCachedThreadPool(threadFactory));
-ServerFactory jobServerFactory = ServerFactory.createWithPortSupplier(() 
-> configuration.port);
+ServerFactory jobServerFactory =
+ServerFactory.createWithPortSupplier(() -> getPort(configuration.host, 
configuration.port));
 ServerFactory artifactServerFactory =
-ServerFactory.createWithPortSupplier(() -> configuration.artifactPort);
+ServerFactory.createWithPortSupplier(
+() -> getPort(configuration.host, configuration.artifactPort));
 return create(configuration, executor, jobServerFactory, 
artifactServerFactory);
   }
 
+  private static int getPort(String host, int port) {
+// If host is empty then use dynamic port
+return Strings.isNullOrEmpty(host) ? 0 : port;
 
 Review comment:
   Here is the PR #6469.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 146762)
Time Spent: 9h 40m  (was: 9.5h)

> Portable Flink runner JobService entry point in a Docker container
> --
>
> Key: BEAM-4130
> URL: https://issues.apache.org/jira/browse/BEAM-4130
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-flink
>Reporter: Ben Sidhom
>Assignee: Maximilian Michels
>Priority: Minor
> Fix For: 2.7.0
>
>  Time Spent: 9h 40m
>  Remaining Estimate: 0h
>
> The portable Flink runner exists as a Job Service that runs somewhere. We 
> need a main entry point that itself spins up the job service (and artifact 
> staging service). The main program itself should be packaged into an uberjar 
> such that it can be run locally or submitted to a Flink deployment via `flink 
> run`.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container

2018-09-22 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=146754&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-146754
 ]

ASF GitHub Bot logged work on BEAM-4130:


Author: ASF GitHub Bot
Created on: 22/Sep/18 10:26
Start Date: 22/Sep/18 10:26
Worklog Time Spent: 10m 
  Work Description: mxm commented on a change in pull request #6341: 
[BEAM-4130] Fixing bug in flink job server creation
URL: https://github.com/apache/beam/pull/6341#discussion_r219667637
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java
 ##
 @@ -107,12 +107,19 @@ public static FlinkJobServerDriver 
fromConfig(ServerConfiguration configuration)
 new 
ThreadFactoryBuilder().setNameFormat("flink-runner-job-server").setDaemon(true).build();
 ListeningExecutorService executor =
 
MoreExecutors.listeningDecorator(Executors.newCachedThreadPool(threadFactory));
-ServerFactory jobServerFactory = ServerFactory.createWithPortSupplier(() 
-> configuration.port);
+ServerFactory jobServerFactory =
+ServerFactory.createWithPortSupplier(() -> getPort(configuration.host, 
configuration.port));
 ServerFactory artifactServerFactory =
-ServerFactory.createWithPortSupplier(() -> configuration.artifactPort);
+ServerFactory.createWithPortSupplier(
+() -> getPort(configuration.host, configuration.artifactPort));
 return create(configuration, executor, jobServerFactory, 
artifactServerFactory);
   }
 
+  private static int getPort(String host, int port) {
+// If host is empty then use dynamic port
+return Strings.isNullOrEmpty(host) ? 0 : port;
 
 Review comment:
   Does that work for you? 
   
   I'll open a PR to test the port supplier methods.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 146754)
Time Spent: 9.5h  (was: 9h 20m)

> Portable Flink runner JobService entry point in a Docker container
> --
>
> Key: BEAM-4130
> URL: https://issues.apache.org/jira/browse/BEAM-4130
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-flink
>Reporter: Ben Sidhom
>Assignee: Maximilian Michels
>Priority: Minor
> Fix For: 2.7.0
>
>  Time Spent: 9.5h
>  Remaining Estimate: 0h
>
> The portable Flink runner exists as a Job Service that runs somewhere. We 
> need a main entry point that itself spins up the job service (and artifact 
> staging service). The main program itself should be packaged into an uberjar 
> such that it can be run locally or submitted to a Flink deployment via `flink 
> run`.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container

2018-09-22 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=146753&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-146753
 ]

ASF GitHub Bot logged work on BEAM-4130:


Author: ASF GitHub Bot
Created on: 22/Sep/18 08:01
Start Date: 22/Sep/18 08:01
Worklog Time Spent: 10m 
  Work Description: angoenka commented on issue #6341: [BEAM-4130] Fixing 
bug in flink job server creation
URL: https://github.com/apache/beam/pull/6341#issuecomment-423725762
 
 
   Run Go PostCommit


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 146753)
Time Spent: 9h 20m  (was: 9h 10m)

> Portable Flink runner JobService entry point in a Docker container
> --
>
> Key: BEAM-4130
> URL: https://issues.apache.org/jira/browse/BEAM-4130
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-flink
>Reporter: Ben Sidhom
>Assignee: Maximilian Michels
>Priority: Minor
> Fix For: 2.7.0
>
>  Time Spent: 9h 20m
>  Remaining Estimate: 0h
>
> The portable Flink runner exists as a Job Service that runs somewhere. We 
> need a main entry point that itself spins up the job service (and artifact 
> staging service). The main program itself should be packaged into an uberjar 
> such that it can be run locally or submitted to a Flink deployment via `flink 
> run`.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container

2018-09-16 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=144662&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-144662
 ]

ASF GitHub Bot logged work on BEAM-4130:


Author: ASF GitHub Bot
Created on: 16/Sep/18 10:32
Start Date: 16/Sep/18 10:32
Worklog Time Spent: 10m 
  Work Description: mxm commented on a change in pull request #6341: 
[BEAM-4130] Fixing bug in flink job server creation
URL: https://github.com/apache/beam/pull/6341#discussion_r217909434
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java
 ##
 @@ -107,12 +107,19 @@ public static FlinkJobServerDriver 
fromConfig(ServerConfiguration configuration)
 new 
ThreadFactoryBuilder().setNameFormat("flink-runner-job-server").setDaemon(true).build();
 ListeningExecutorService executor =
 
MoreExecutors.listeningDecorator(Executors.newCachedThreadPool(threadFactory));
-ServerFactory jobServerFactory = ServerFactory.createWithPortSupplier(() 
-> configuration.port);
+ServerFactory jobServerFactory =
+ServerFactory.createWithPortSupplier(() -> getPort(configuration.host, 
configuration.port));
 ServerFactory artifactServerFactory =
-ServerFactory.createWithPortSupplier(() -> configuration.artifactPort);
+ServerFactory.createWithPortSupplier(
+() -> getPort(configuration.host, configuration.artifactPort));
 return create(configuration, executor, jobServerFactory, 
artifactServerFactory);
   }
 
+  private static int getPort(String host, int port) {
+// If host is empty then use dynamic port
+return Strings.isNullOrEmpty(host) ? 0 : port;
 
 Review comment:
   Imagine the user started the JobServer with `--job-port 1234`, then it would 
bind to a different port due to `--job-host` being empty. We should simply pass 
in `0` as the port in tests. This would then bind to an arbitrary port. 
Additionally, we should clarify the semantics through `ServerFactoryTest` (can 
be a follow-up).


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 144662)
Time Spent: 9h 10m  (was: 9h)

> Portable Flink runner JobService entry point in a Docker container
> --
>
> Key: BEAM-4130
> URL: https://issues.apache.org/jira/browse/BEAM-4130
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-flink
>Reporter: Ben Sidhom
>Assignee: Maximilian Michels
>Priority: Minor
> Fix For: 2.7.0
>
>  Time Spent: 9h 10m
>  Remaining Estimate: 0h
>
> The portable Flink runner exists as a Job Service that runs somewhere. We 
> need a main entry point that itself spins up the job service (and artifact 
> staging service). The main program itself should be packaged into an uberjar 
> such that it can be run locally or submitted to a Flink deployment via `flink 
> run`.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container

2018-09-14 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=144519&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-144519
 ]

ASF GitHub Bot logged work on BEAM-4130:


Author: ASF GitHub Bot
Created on: 15/Sep/18 00:57
Start Date: 15/Sep/18 00:57
Worklog Time Spent: 10m 
  Work Description: angoenka commented on issue #6341: [BEAM-4130] Fixing 
bug in flink job server creation
URL: https://github.com/apache/beam/pull/6341#issuecomment-421518808
 
 
   Run Java PreCommit


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 144519)
Time Spent: 9h  (was: 8h 50m)

> Portable Flink runner JobService entry point in a Docker container
> --
>
> Key: BEAM-4130
> URL: https://issues.apache.org/jira/browse/BEAM-4130
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-flink
>Reporter: Ben Sidhom
>Assignee: Maximilian Michels
>Priority: Minor
> Fix For: 2.7.0
>
>  Time Spent: 9h
>  Remaining Estimate: 0h
>
> The portable Flink runner exists as a Job Service that runs somewhere. We 
> need a main entry point that itself spins up the job service (and artifact 
> staging service). The main program itself should be packaged into an uberjar 
> such that it can be run locally or submitted to a Flink deployment via `flink 
> run`.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container

2018-09-14 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=144517&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-144517
 ]

ASF GitHub Bot logged work on BEAM-4130:


Author: ASF GitHub Bot
Created on: 15/Sep/18 00:56
Start Date: 15/Sep/18 00:56
Worklog Time Spent: 10m 
  Work Description: angoenka commented on a change in pull request #6341: 
[BEAM-4130] Fixing bug in flink job server creation
URL: https://github.com/apache/beam/pull/6341#discussion_r217869549
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java
 ##
 @@ -130,7 +137,7 @@ private FlinkJobServerDriver(
 this.configuration = configuration;
 this.executor = executor;
 this.jobServerFactory = jobServerFactory;
-this.artifactServerFactory = jobServerFactory;
+this.artifactServerFactory = artifactServerFactory;
 
 Review comment:
   Thanks!


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 144517)
Time Spent: 8h 50m  (was: 8h 40m)

> Portable Flink runner JobService entry point in a Docker container
> --
>
> Key: BEAM-4130
> URL: https://issues.apache.org/jira/browse/BEAM-4130
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-flink
>Reporter: Ben Sidhom
>Assignee: Maximilian Michels
>Priority: Minor
> Fix For: 2.7.0
>
>  Time Spent: 8h 50m
>  Remaining Estimate: 0h
>
> The portable Flink runner exists as a Job Service that runs somewhere. We 
> need a main entry point that itself spins up the job service (and artifact 
> staging service). The main program itself should be packaged into an uberjar 
> such that it can be run locally or submitted to a Flink deployment via `flink 
> run`.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container

2018-09-14 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=144516&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-144516
 ]

ASF GitHub Bot logged work on BEAM-4130:


Author: ASF GitHub Bot
Created on: 15/Sep/18 00:56
Start Date: 15/Sep/18 00:56
Worklog Time Spent: 10m 
  Work Description: angoenka commented on a change in pull request #6341: 
[BEAM-4130] Fixing bug in flink job server creation
URL: https://github.com/apache/beam/pull/6341#discussion_r217870561
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java
 ##
 @@ -107,12 +107,19 @@ public static FlinkJobServerDriver 
fromConfig(ServerConfiguration configuration)
 new 
ThreadFactoryBuilder().setNameFormat("flink-runner-job-server").setDaemon(true).build();
 ListeningExecutorService executor =
 
MoreExecutors.listeningDecorator(Executors.newCachedThreadPool(threadFactory));
-ServerFactory jobServerFactory = ServerFactory.createWithPortSupplier(() 
-> configuration.port);
+ServerFactory jobServerFactory =
+ServerFactory.createWithPortSupplier(() -> getPort(configuration.host, 
configuration.port));
 ServerFactory artifactServerFactory =
-ServerFactory.createWithPortSupplier(() -> configuration.artifactPort);
+ServerFactory.createWithPortSupplier(
+() -> getPort(configuration.host, configuration.artifactPort));
 return create(configuration, executor, jobServerFactory, 
artifactServerFactory);
   }
 
+  private static int getPort(String host, int port) {
+// If host is empty then use dynamic port
+return Strings.isNullOrEmpty(host) ? 0 : port;
 
 Review comment:
   We need to be able to use dynamic port to create server in case of test.
   Port supplier is used when using allocateAndCreate which is expected to use 
a dynamic port. As port was fixed in portSupplier, this functionality was 
broken.
   I am using host as the marker to create dynamic port or static as we do in 
[L181 
](https://github.com/apache/beam/pull/6341/files#diff-e52a25378b181af8cecbf3d274946fd0L181)
 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 144516)
Time Spent: 8h 50m  (was: 8h 40m)

> Portable Flink runner JobService entry point in a Docker container
> --
>
> Key: BEAM-4130
> URL: https://issues.apache.org/jira/browse/BEAM-4130
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-flink
>Reporter: Ben Sidhom
>Assignee: Maximilian Michels
>Priority: Minor
> Fix For: 2.7.0
>
>  Time Spent: 8h 50m
>  Remaining Estimate: 0h
>
> The portable Flink runner exists as a Job Service that runs somewhere. We 
> need a main entry point that itself spins up the job service (and artifact 
> staging service). The main program itself should be packaged into an uberjar 
> such that it can be run locally or submitted to a Flink deployment via `flink 
> run`.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container

2018-09-06 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=141711&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-141711
 ]

ASF GitHub Bot logged work on BEAM-4130:


Author: ASF GitHub Bot
Created on: 06/Sep/18 10:38
Start Date: 06/Sep/18 10:38
Worklog Time Spent: 10m 
  Work Description: mxm commented on a change in pull request #6341: 
[BEAM-4130] Fixing bug in flink job server creation
URL: https://github.com/apache/beam/pull/6341#discussion_r215577945
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java
 ##
 @@ -130,7 +137,7 @@ private FlinkJobServerDriver(
 this.configuration = configuration;
 this.executor = executor;
 this.jobServerFactory = jobServerFactory;
-this.artifactServerFactory = jobServerFactory;
+this.artifactServerFactory = artifactServerFactory;
 
 Review comment:
   I've merged this fix to master with fb23c7f2eacee5374cfd85a58d5d6746e18fabc7.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 141711)
Time Spent: 8h 40m  (was: 8.5h)

> Portable Flink runner JobService entry point in a Docker container
> --
>
> Key: BEAM-4130
> URL: https://issues.apache.org/jira/browse/BEAM-4130
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-flink
>Reporter: Ben Sidhom
>Assignee: Maximilian Michels
>Priority: Minor
> Fix For: 2.7.0
>
>  Time Spent: 8h 40m
>  Remaining Estimate: 0h
>
> The portable Flink runner exists as a Job Service that runs somewhere. We 
> need a main entry point that itself spins up the job service (and artifact 
> staging service). The main program itself should be packaged into an uberjar 
> such that it can be run locally or submitted to a Flink deployment via `flink 
> run`.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container

2018-09-06 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=141673&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-141673
 ]

ASF GitHub Bot logged work on BEAM-4130:


Author: ASF GitHub Bot
Created on: 06/Sep/18 08:46
Start Date: 06/Sep/18 08:46
Worklog Time Spent: 10m 
  Work Description: mxm commented on a change in pull request #6341: 
[BEAM-4130] Fixing bug in flink job server creation
URL: https://github.com/apache/beam/pull/6341#discussion_r215541028
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java
 ##
 @@ -130,7 +137,7 @@ private FlinkJobServerDriver(
 this.configuration = configuration;
 this.executor = executor;
 this.jobServerFactory = jobServerFactory;
-this.artifactServerFactory = jobServerFactory;
+this.artifactServerFactory = artifactServerFactory;
 
 Review comment:
   Thanks for spotting this.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 141673)
Time Spent: 8h 20m  (was: 8h 10m)

> Portable Flink runner JobService entry point in a Docker container
> --
>
> Key: BEAM-4130
> URL: https://issues.apache.org/jira/browse/BEAM-4130
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-flink
>Reporter: Ben Sidhom
>Assignee: Maximilian Michels
>Priority: Minor
> Fix For: 2.7.0
>
>  Time Spent: 8h 20m
>  Remaining Estimate: 0h
>
> The portable Flink runner exists as a Job Service that runs somewhere. We 
> need a main entry point that itself spins up the job service (and artifact 
> staging service). The main program itself should be packaged into an uberjar 
> such that it can be run locally or submitted to a Flink deployment via `flink 
> run`.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container

2018-09-06 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=141674&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-141674
 ]

ASF GitHub Bot logged work on BEAM-4130:


Author: ASF GitHub Bot
Created on: 06/Sep/18 08:46
Start Date: 06/Sep/18 08:46
Worklog Time Spent: 10m 
  Work Description: mxm commented on a change in pull request #6341: 
[BEAM-4130] Fixing bug in flink job server creation
URL: https://github.com/apache/beam/pull/6341#discussion_r215541807
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java
 ##
 @@ -107,12 +107,19 @@ public static FlinkJobServerDriver 
fromConfig(ServerConfiguration configuration)
 new 
ThreadFactoryBuilder().setNameFormat("flink-runner-job-server").setDaemon(true).build();
 ListeningExecutorService executor =
 
MoreExecutors.listeningDecorator(Executors.newCachedThreadPool(threadFactory));
-ServerFactory jobServerFactory = ServerFactory.createWithPortSupplier(() 
-> configuration.port);
+ServerFactory jobServerFactory =
+ServerFactory.createWithPortSupplier(() -> getPort(configuration.host, 
configuration.port));
 ServerFactory artifactServerFactory =
-ServerFactory.createWithPortSupplier(() -> configuration.artifactPort);
+ServerFactory.createWithPortSupplier(
+() -> getPort(configuration.host, configuration.artifactPort));
 return create(configuration, executor, jobServerFactory, 
artifactServerFactory);
   }
 
+  private static int getPort(String host, int port) {
+// If host is empty then use dynamic port
+return Strings.isNullOrEmpty(host) ? 0 : port;
 
 Review comment:
   Why do we not use the provided port when the host is null? I think it would 
make sense that the user doesn't care about the host name but wants to bind to 
a specific port.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 141674)
Time Spent: 8.5h  (was: 8h 20m)

> Portable Flink runner JobService entry point in a Docker container
> --
>
> Key: BEAM-4130
> URL: https://issues.apache.org/jira/browse/BEAM-4130
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-flink
>Reporter: Ben Sidhom
>Assignee: Maximilian Michels
>Priority: Minor
> Fix For: 2.7.0
>
>  Time Spent: 8.5h
>  Remaining Estimate: 0h
>
> The portable Flink runner exists as a Job Service that runs somewhere. We 
> need a main entry point that itself spins up the job service (and artifact 
> staging service). The main program itself should be packaged into an uberjar 
> such that it can be run locally or submitted to a Flink deployment via `flink 
> run`.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container

2018-09-05 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=141597&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-141597
 ]

ASF GitHub Bot logged work on BEAM-4130:


Author: ASF GitHub Bot
Created on: 06/Sep/18 01:49
Start Date: 06/Sep/18 01:49
Worklog Time Spent: 10m 
  Work Description: angoenka opened a new pull request #6341: [BEAM-4130] 
Fixing bug in flink job server creation
URL: https://github.com/apache/beam/pull/6341
 
 
   **Please** add a meaningful description for your change here
   
   
   
   Follow this checklist to help us incorporate your contribution quickly and 
easily:
   
- [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in 
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA 
issue, if applicable. This will automatically link the pull request to the 
issue.
- [ ] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   It will help us expedite review of your Pull Request if you tag someone 
(e.g. `@username`) to look at it.
   
   Post-Commit Tests Status (on master branch)
   

   
   Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
   --- | --- | --- | --- | --- | --- | --- | ---
   Go | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_GradleBuild/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_GradleBuild/lastCompletedBuild/)
 | --- | --- | --- | --- | --- | ---
   Java | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_GradleBuild/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_GradleBuild/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex_Gradle/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Gradle/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Gradle/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump_Gradle/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza_Gradle/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark_Gradle/lastCompletedBuild/)
   Python | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/)
 | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)
  [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/)
 | --- | --- | --- | ---
   
   
   
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 141597)
Time Spent: 8h  (was: 7h 50m)

> Portable Flink runner JobService entry point in a Docker container
> --
>
> Key: BEAM-4130
> URL: https://issues.apache.org/jira/browse/BEAM-4130
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-flink
>Reporter: Ben Sidhom
>Assignee: Maximilian Michels
>Priority: Minor
> Fix For: 2.7.0
>
>  Time Spent: 8h
>  Remaining Estimate: 0h
>
> The porta

[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container

2018-09-05 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=141598&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-141598
 ]

ASF GitHub Bot logged work on BEAM-4130:


Author: ASF GitHub Bot
Created on: 06/Sep/18 01:49
Start Date: 06/Sep/18 01:49
Worklog Time Spent: 10m 
  Work Description: angoenka commented on issue #6341: [BEAM-4130] Fixing 
bug in flink job server creation
URL: https://github.com/apache/beam/pull/6341#issuecomment-418937279
 
 
   cc: @mxm 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 141598)
Time Spent: 8h 10m  (was: 8h)

> Portable Flink runner JobService entry point in a Docker container
> --
>
> Key: BEAM-4130
> URL: https://issues.apache.org/jira/browse/BEAM-4130
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-flink
>Reporter: Ben Sidhom
>Assignee: Maximilian Michels
>Priority: Minor
> Fix For: 2.7.0
>
>  Time Spent: 8h 10m
>  Remaining Estimate: 0h
>
> The portable Flink runner exists as a Job Service that runs somewhere. We 
> need a main entry point that itself spins up the job service (and artifact 
> staging service). The main program itself should be packaged into an uberjar 
> such that it can be run locally or submitted to a Flink deployment via `flink 
> run`.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container

2018-08-24 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=137920&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-137920
 ]

ASF GitHub Bot logged work on BEAM-4130:


Author: ASF GitHub Bot
Created on: 24/Aug/18 18:23
Start Date: 24/Aug/18 18:23
Worklog Time Spent: 10m 
  Work Description: tweise closed pull request #6265: [BEAM-4130] Bring up 
Job Server container for Python jobs
URL: https://github.com/apache/beam/pull/6265
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/runners/flink/job-server-container/Dockerfile 
b/runners/flink/job-server-container/Dockerfile
index a9aff21b6d6..569c2ab0406 100644
--- a/runners/flink/job-server-container/Dockerfile
+++ b/runners/flink/job-server-container/Dockerfile
@@ -19,7 +19,7 @@
 FROM openjdk:8
 MAINTAINER "Apache Beam "
 
-ADD target/beam-runners-flink_2.11-job-server.jar /opt/apache/beam/jars/
+ADD target/beam-runners-flink-job-server.jar /opt/apache/beam/jars/
 ADD target/flink-job-server.sh /opt/apache/beam/
 
 WORKDIR /opt/apache/beam
diff --git a/runners/flink/job-server-container/build.gradle 
b/runners/flink/job-server-container/build.gradle
index 4d5f53316e3..d2b026804d7 100644
--- a/runners/flink/job-server-container/build.gradle
+++ b/runners/flink/job-server-container/build.gradle
@@ -38,7 +38,7 @@ dependencies {
 task copyDockerfileDependencies(type: Copy) {
   // Required Jars
   from configurations.dockerDependency
-  rename 'beam-runners-flink_2.11-job-server.*.jar', 
'beam-runners-flink_2.11-job-server.jar'
+  rename 'beam-runners-flink_2.11-job-server.*.jar', 
'beam-runners-flink-job-server.jar'
   into "build/target"
   // Entry script
   from file("./flink-job-server.sh")
diff --git a/runners/flink/job-server/build.gradle 
b/runners/flink/job-server/build.gradle
index 5e1e01ae74d..ee6b70f1f37 100644
--- a/runners/flink/job-server/build.gradle
+++ b/runners/flink/job-server/build.gradle
@@ -55,7 +55,7 @@ dependencies {
 // task will not work because the flink runner classes only exist in the shadow
 // jar.
 runShadow {
-  def jobHost = project.hasProperty("jobHost") ? project.property("jobHost") : 
"localhost:8099"
+  def jobHost = project.hasProperty("jobHost") ? project.property("jobHost") : 
"localhost"
   def artifactsDir = project.hasProperty("artifactsDir") ?  
project.property("artifactsDir") : "/tmp/flink-artifacts"
   def cleanArtifactsPerJob = project.hasProperty("cleanArtifactsPerJob")
   args = ["--job-host=${jobHost}", "--artifacts-dir=${artifactsDir}"]
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java
index 7e9b14a2acd..13f48c53018 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java
@@ -45,15 +45,22 @@
 
   private final ListeningExecutorService executor;
   private final ServerConfiguration configuration;
-  private final ServerFactory serverFactory;
+  private final ServerFactory jobServerFactory;
+  private final ServerFactory artifactServerFactory;
   private GrpcFnServer jobServer;
   private GrpcFnServer 
artifactStagingServer;
 
   /** Configuration for the jobServer. */
   public static class ServerConfiguration {
-@Option(name = "--job-host", usage = "The job server host string")
+@Option(name = "--job-host", usage = "The job server host name")
 private String host = "";
 
+@Option(name = "--job-port", usage = "The job service port. (Default: 
8099)")
+private int port = 8099;
+
+@Option(name = "--artifact-port", usage = "The artifact service port. 
(Default: 8098)")
+private int artifactPort = 8098;
+
 @Option(name = "--artifacts-dir", usage = "The location to store staged 
artifact files")
 private String artifactStagingPath = "/tmp/beam-artifact-staging";
 
@@ -100,24 +107,30 @@ public static FlinkJobServerDriver 
fromConfig(ServerConfiguration configuration)
 new 
ThreadFactoryBuilder().setNameFormat("flink-runner-job-server").setDaemon(true).build();
 ListeningExecutorService executor =
 
MoreExecutors.listeningDecorator(Executors.newCachedThreadPool(threadFactory));
-ServerFactory serverFactory = ServerFactory.createDefault();
-return create(configuration, executor, serverFactory);
+ServerFactory jobServerFactory = ServerFactory.createWithPortSupplier(() 
-> configuration.port);
+ServerFactory artifactServerFactory =
+ServerFactory.createWithPortSupplier(() -> configuration.artifactPort);
+retu

[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container

2018-08-24 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=137898&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-137898
 ]

ASF GitHub Bot logged work on BEAM-4130:


Author: ASF GitHub Bot
Created on: 24/Aug/18 17:10
Start Date: 24/Aug/18 17:10
Worklog Time Spent: 10m 
  Work Description: mxm commented on a change in pull request #6265: 
[BEAM-4130] Bring up Job Server container for Python jobs
URL: https://github.com/apache/beam/pull/6265#discussion_r212694411
 
 

 ##
 File path: sdks/python/build.gradle
 ##
 @@ -204,19 +204,30 @@ task directRunnerIT(dependsOn: 'installGcpTest') {
 
 // Before running this, you need to:
 //
-// 1. build the SDK container:
+// 1. Build the SDK container:
 //
 //./gradlew -p sdks/python/container docker
 //
-// 2. start a local JobService, for example, the Portable Flink runner
+// 2. Either a) or b)
+//  a) If you want the Job Server to run in a Docker container:
+//
+//./gradlew :beam-runners-flink_2.11-job-server-container docker
 
 Review comment:
   Should be `./gradlew :beam-runners-flink_2.11-job-server-container:docker`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 137898)
Time Spent: 7h 40m  (was: 7.5h)

> Portable Flink runner JobService entry point in a Docker container
> --
>
> Key: BEAM-4130
> URL: https://issues.apache.org/jira/browse/BEAM-4130
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-flink
>Reporter: Ben Sidhom
>Assignee: Maximilian Michels
>Priority: Minor
>  Time Spent: 7h 40m
>  Remaining Estimate: 0h
>
> The portable Flink runner exists as a Job Service that runs somewhere. We 
> need a main entry point that itself spins up the job service (and artifact 
> staging service). The main program itself should be packaged into an uberjar 
> such that it can be run locally or submitted to a Flink deployment via `flink 
> run`.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container

2018-08-24 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=137897&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-137897
 ]

ASF GitHub Bot logged work on BEAM-4130:


Author: ASF GitHub Bot
Created on: 24/Aug/18 17:08
Start Date: 24/Aug/18 17:08
Worklog Time Spent: 10m 
  Work Description: tweise commented on a change in pull request #6265: 
[BEAM-4130] Bring up Job Server container for Python jobs
URL: https://github.com/apache/beam/pull/6265#discussion_r212693774
 
 

 ##
 File path: sdks/python/build.gradle
 ##
 @@ -204,19 +204,30 @@ task directRunnerIT(dependsOn: 'installGcpTest') {
 
 // Before running this, you need to:
 //
-// 1. build the SDK container:
+// 1. Build the SDK container:
 //
 //./gradlew -p sdks/python/container docker
 //
-// 2. start a local JobService, for example, the Portable Flink runner
+// 2. Either a) or b)
+//  a) If you want the Job Server to run in a Docker container:
+//
+//./gradlew :beam-runners-flink_2.11-job-server-container docker
 
 Review comment:
   while this does: `./gradlew -p runners/flink/job-server-container docker`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 137897)
Time Spent: 7.5h  (was: 7h 20m)

> Portable Flink runner JobService entry point in a Docker container
> --
>
> Key: BEAM-4130
> URL: https://issues.apache.org/jira/browse/BEAM-4130
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-flink
>Reporter: Ben Sidhom
>Assignee: Maximilian Michels
>Priority: Minor
>  Time Spent: 7.5h
>  Remaining Estimate: 0h
>
> The portable Flink runner exists as a Job Service that runs somewhere. We 
> need a main entry point that itself spins up the job service (and artifact 
> staging service). The main program itself should be packaged into an uberjar 
> such that it can be run locally or submitted to a Flink deployment via `flink 
> run`.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container

2018-08-24 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=137896&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-137896
 ]

ASF GitHub Bot logged work on BEAM-4130:


Author: ASF GitHub Bot
Created on: 24/Aug/18 17:07
Start Date: 24/Aug/18 17:07
Worklog Time Spent: 10m 
  Work Description: tweise commented on a change in pull request #6265: 
[BEAM-4130] Bring up Job Server container for Python jobs
URL: https://github.com/apache/beam/pull/6265#discussion_r212693575
 
 

 ##
 File path: sdks/python/build.gradle
 ##
 @@ -204,19 +204,30 @@ task directRunnerIT(dependsOn: 'installGcpTest') {
 
 // Before running this, you need to:
 //
-// 1. build the SDK container:
+// 1. Build the SDK container:
 //
 //./gradlew -p sdks/python/container docker
 //
-// 2. start a local JobService, for example, the Portable Flink runner
+// 2. Either a) or b)
+//  a) If you want the Job Server to run in a Docker container:
+//
+//./gradlew :beam-runners-flink_2.11-job-server-container docker
 
 Review comment:
   Still does not work: `Task 'beam-runners-flink_2.11-job-server-container' 
not found in root project 'beam'.`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 137896)
Time Spent: 7h 20m  (was: 7h 10m)

> Portable Flink runner JobService entry point in a Docker container
> --
>
> Key: BEAM-4130
> URL: https://issues.apache.org/jira/browse/BEAM-4130
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-flink
>Reporter: Ben Sidhom
>Assignee: Maximilian Michels
>Priority: Minor
>  Time Spent: 7h 20m
>  Remaining Estimate: 0h
>
> The portable Flink runner exists as a Job Service that runs somewhere. We 
> need a main entry point that itself spins up the job service (and artifact 
> staging service). The main program itself should be packaged into an uberjar 
> such that it can be run locally or submitted to a Flink deployment via `flink 
> run`.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container

2018-08-24 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=137893&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-137893
 ]

ASF GitHub Bot logged work on BEAM-4130:


Author: ASF GitHub Bot
Created on: 24/Aug/18 16:54
Start Date: 24/Aug/18 16:54
Worklog Time Spent: 10m 
  Work Description: tweise commented on a change in pull request #6265: 
[BEAM-4130] Bring up Job Server container for Python jobs
URL: https://github.com/apache/beam/pull/6265#discussion_r212690050
 
 

 ##
 File path: sdks/python/build.gradle
 ##
 @@ -204,19 +204,30 @@ task directRunnerIT(dependsOn: 'installGcpTest') {
 
 // Before running this, you need to:
 //
-// 1. build the SDK container:
+// 1. Build the SDK container:
 //
 //./gradlew -p sdks/python/container docker
 //
-// 2. start a local JobService, for example, the Portable Flink runner
+// 2. Either a) or b)
+//  a) If you want the Job Server to run in a Docker container:
+//
+//./gradlew :b®eam-runners-flink_2.11-job-server-container docker
+//
+//  b) Otherwise, start a local JobService, for example, the Portable Flink 
runner
 //(in a separate shell since it continues to run):
 //
 //./gradlew :beam-runners-flink_2.11-job-server:runShadow
 //
 // Then you can run this example:
 //
+//  Docker (2a): 
+//
 //./gradlew :beam-sdks-python:portableWordCount
 //
+//  Local JobService (2b):
+//
+//./gradlew :beam-sdks-python:portableWordCount -PJobEndpoint=localhost
 
 Review comment:
   (this is more related to PortableOptions on the Python side, not what 
FlinkJobServerDriver does)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 137893)
Time Spent: 7h 10m  (was: 7h)

> Portable Flink runner JobService entry point in a Docker container
> --
>
> Key: BEAM-4130
> URL: https://issues.apache.org/jira/browse/BEAM-4130
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-flink
>Reporter: Ben Sidhom
>Assignee: Maximilian Michels
>Priority: Minor
>  Time Spent: 7h 10m
>  Remaining Estimate: 0h
>
> The portable Flink runner exists as a Job Service that runs somewhere. We 
> need a main entry point that itself spins up the job service (and artifact 
> staging service). The main program itself should be packaged into an uberjar 
> such that it can be run locally or submitted to a Flink deployment via `flink 
> run`.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container

2018-08-24 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=137889&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-137889
 ]

ASF GitHub Bot logged work on BEAM-4130:


Author: ASF GitHub Bot
Created on: 24/Aug/18 16:37
Start Date: 24/Aug/18 16:37
Worklog Time Spent: 10m 
  Work Description: tweise commented on a change in pull request #6265: 
[BEAM-4130] Bring up Job Server container for Python jobs
URL: https://github.com/apache/beam/pull/6265#discussion_r212685952
 
 

 ##
 File path: sdks/python/build.gradle
 ##
 @@ -204,19 +204,30 @@ task directRunnerIT(dependsOn: 'installGcpTest') {
 
 // Before running this, you need to:
 //
-// 1. build the SDK container:
+// 1. Build the SDK container:
 //
 //./gradlew -p sdks/python/container docker
 //
-// 2. start a local JobService, for example, the Portable Flink runner
+// 2. Either a) or b)
+//  a) If you want the Job Server to run in a Docker container:
+//
+//./gradlew :b®eam-runners-flink_2.11-job-server-container docker
+//
+//  b) Otherwise, start a local JobService, for example, the Portable Flink 
runner
 //(in a separate shell since it continues to run):
 //
 //./gradlew :beam-runners-flink_2.11-job-server:runShadow
 //
 // Then you can run this example:
 //
+//  Docker (2a): 
+//
 //./gradlew :beam-sdks-python:portableWordCount
 //
+//  Local JobService (2b):
+//
+//./gradlew :beam-sdks-python:portableWordCount -PJobEndpoint=localhost
 
 Review comment:
   Why not support --job-endpoint in the previous form, that seems more 
intuitive?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 137889)
Time Spent: 7h  (was: 6h 50m)

> Portable Flink runner JobService entry point in a Docker container
> --
>
> Key: BEAM-4130
> URL: https://issues.apache.org/jira/browse/BEAM-4130
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-flink
>Reporter: Ben Sidhom
>Assignee: Maximilian Michels
>Priority: Minor
>  Time Spent: 7h
>  Remaining Estimate: 0h
>
> The portable Flink runner exists as a Job Service that runs somewhere. We 
> need a main entry point that itself spins up the job service (and artifact 
> staging service). The main program itself should be packaged into an uberjar 
> such that it can be run locally or submitted to a Flink deployment via `flink 
> run`.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container

2018-08-24 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=137885&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-137885
 ]

ASF GitHub Bot logged work on BEAM-4130:


Author: ASF GitHub Bot
Created on: 24/Aug/18 16:35
Start Date: 24/Aug/18 16:35
Worklog Time Spent: 10m 
  Work Description: mxm commented on a change in pull request #6265: 
[BEAM-4130] Bring up Job Server container for Python jobs
URL: https://github.com/apache/beam/pull/6265#discussion_r212685321
 
 

 ##
 File path: sdks/python/build.gradle
 ##
 @@ -204,19 +204,30 @@ task directRunnerIT(dependsOn: 'installGcpTest') {
 
 // Before running this, you need to:
 //
-// 1. build the SDK container:
+// 1. Build the SDK container:
 //
 //./gradlew -p sdks/python/container docker
 //
-// 2. start a local JobService, for example, the Portable Flink runner
+// 2. Either a) or b)
+//  a) If you want the Job Server to run in a Docker container:
+//
+//./gradlew :b®eam-runners-flink_2.11-job-server-container docker
+//
+//  b) Otherwise, start a local JobService, for example, the Portable Flink 
runner
 //(in a separate shell since it continues to run):
 //
 //./gradlew :beam-runners-flink_2.11-job-server:runShadow
 //
 // Then you can run this example:
 //
+//  Docker (2a): 
+//
 //./gradlew :beam-sdks-python:portableWordCount
 //
+//  Local JobService (2b):
+//
+//./gradlew :beam-sdks-python:portableWordCount -PJobEndpoint=localhost
 
 Review comment:
   Will change to lowerCamelCase.
   
   You can still specify the port via `--job_port` but it is not exposed as a 
property yet. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 137885)
Time Spent: 6h 40m  (was: 6.5h)

> Portable Flink runner JobService entry point in a Docker container
> --
>
> Key: BEAM-4130
> URL: https://issues.apache.org/jira/browse/BEAM-4130
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-flink
>Reporter: Ben Sidhom
>Assignee: Maximilian Michels
>Priority: Minor
>  Time Spent: 6h 40m
>  Remaining Estimate: 0h
>
> The portable Flink runner exists as a Job Service that runs somewhere. We 
> need a main entry point that itself spins up the job service (and artifact 
> staging service). The main program itself should be packaged into an uberjar 
> such that it can be run locally or submitted to a Flink deployment via `flink 
> run`.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container

2018-08-24 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=137887&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-137887
 ]

ASF GitHub Bot logged work on BEAM-4130:


Author: ASF GitHub Bot
Created on: 24/Aug/18 16:35
Start Date: 24/Aug/18 16:35
Worklog Time Spent: 10m 
  Work Description: mxm commented on a change in pull request #6265: 
[BEAM-4130] Bring up Job Server container for Python jobs
URL: https://github.com/apache/beam/pull/6265#discussion_r212685454
 
 

 ##
 File path: sdks/python/build.gradle
 ##
 @@ -204,19 +204,30 @@ task directRunnerIT(dependsOn: 'installGcpTest') {
 
 // Before running this, you need to:
 //
-// 1. build the SDK container:
+// 1. Build the SDK container:
 //
 //./gradlew -p sdks/python/container docker
 //
-// 2. start a local JobService, for example, the Portable Flink runner
+// 2. Either a) or b)
+//  a) If you want the Job Server to run in a Docker container:
+//
+//./gradlew :b®eam-runners-flink_2.11-job-server-container docker
 
 Review comment:
   Ah, there is a Unicode character in beam `®`. No idea how it ended up there.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 137887)
Time Spent: 6h 50m  (was: 6h 40m)

> Portable Flink runner JobService entry point in a Docker container
> --
>
> Key: BEAM-4130
> URL: https://issues.apache.org/jira/browse/BEAM-4130
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-flink
>Reporter: Ben Sidhom
>Assignee: Maximilian Michels
>Priority: Minor
>  Time Spent: 6h 50m
>  Remaining Estimate: 0h
>
> The portable Flink runner exists as a Job Service that runs somewhere. We 
> need a main entry point that itself spins up the job service (and artifact 
> staging service). The main program itself should be packaged into an uberjar 
> such that it can be run locally or submitted to a Flink deployment via `flink 
> run`.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container

2018-08-24 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=137882&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-137882
 ]

ASF GitHub Bot logged work on BEAM-4130:


Author: ASF GitHub Bot
Created on: 24/Aug/18 16:30
Start Date: 24/Aug/18 16:30
Worklog Time Spent: 10m 
  Work Description: tweise commented on a change in pull request #6265: 
[BEAM-4130] Bring up Job Server container for Python jobs
URL: https://github.com/apache/beam/pull/6265#discussion_r212684030
 
 

 ##
 File path: sdks/python/build.gradle
 ##
 @@ -204,19 +204,30 @@ task directRunnerIT(dependsOn: 'installGcpTest') {
 
 // Before running this, you need to:
 //
-// 1. build the SDK container:
+// 1. Build the SDK container:
 //
 //./gradlew -p sdks/python/container docker
 //
-// 2. start a local JobService, for example, the Portable Flink runner
+// 2. Either a) or b)
+//  a) If you want the Job Server to run in a Docker container:
+//
+//./gradlew :b®eam-runners-flink_2.11-job-server-container docker
+//
+//  b) Otherwise, start a local JobService, for example, the Portable Flink 
runner
 //(in a separate shell since it continues to run):
 //
 //./gradlew :beam-runners-flink_2.11-job-server:runShadow
 //
 // Then you can run this example:
 //
+//  Docker (2a): 
+//
 //./gradlew :beam-sdks-python:portableWordCount
 //
+//  Local JobService (2b):
+//
+//./gradlew :beam-sdks-python:portableWordCount -PJobEndpoint=localhost
 
 Review comment:
   nit: the existing properties are usually lowerCamelCase
   
   What if I want to specify a port, is that no longer supported? 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 137882)
Time Spent: 6.5h  (was: 6h 20m)

> Portable Flink runner JobService entry point in a Docker container
> --
>
> Key: BEAM-4130
> URL: https://issues.apache.org/jira/browse/BEAM-4130
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-flink
>Reporter: Ben Sidhom
>Assignee: Maximilian Michels
>Priority: Minor
>  Time Spent: 6.5h
>  Remaining Estimate: 0h
>
> The portable Flink runner exists as a Job Service that runs somewhere. We 
> need a main entry point that itself spins up the job service (and artifact 
> staging service). The main program itself should be packaged into an uberjar 
> such that it can be run locally or submitted to a Flink deployment via `flink 
> run`.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container

2018-08-24 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=137880&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-137880
 ]

ASF GitHub Bot logged work on BEAM-4130:


Author: ASF GitHub Bot
Created on: 24/Aug/18 16:21
Start Date: 24/Aug/18 16:21
Worklog Time Spent: 10m 
  Work Description: tweise commented on a change in pull request #6265: 
[BEAM-4130] Bring up Job Server container for Python jobs
URL: https://github.com/apache/beam/pull/6265#discussion_r212681815
 
 

 ##
 File path: sdks/python/build.gradle
 ##
 @@ -204,19 +204,30 @@ task directRunnerIT(dependsOn: 'installGcpTest') {
 
 // Before running this, you need to:
 //
-// 1. build the SDK container:
+// 1. Build the SDK container:
 //
 //./gradlew -p sdks/python/container docker
 //
-// 2. start a local JobService, for example, the Portable Flink runner
+// 2. Either a) or b)
+//  a) If you want the Job Server to run in a Docker container:
+//
+//./gradlew :b®eam-runners-flink_2.11-job-server-container docker
 
 Review comment:
   This command doesn't work (not sure why actually), but this here does: 
`./gradlew -p sdks/python/container docker`
   
   (see python container build above)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 137880)
Time Spent: 6h 20m  (was: 6h 10m)

> Portable Flink runner JobService entry point in a Docker container
> --
>
> Key: BEAM-4130
> URL: https://issues.apache.org/jira/browse/BEAM-4130
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-flink
>Reporter: Ben Sidhom
>Assignee: Maximilian Michels
>Priority: Minor
>  Time Spent: 6h 20m
>  Remaining Estimate: 0h
>
> The portable Flink runner exists as a Job Service that runs somewhere. We 
> need a main entry point that itself spins up the job service (and artifact 
> staging service). The main program itself should be packaged into an uberjar 
> such that it can be run locally or submitted to a Flink deployment via `flink 
> run`.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container

2018-08-24 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=137876&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-137876
 ]

ASF GitHub Bot logged work on BEAM-4130:


Author: ASF GitHub Bot
Created on: 24/Aug/18 16:14
Start Date: 24/Aug/18 16:14
Worklog Time Spent: 10m 
  Work Description: mxm commented on a change in pull request #6265: 
[BEAM-4130] Bring up Job Server container for Python jobs
URL: https://github.com/apache/beam/pull/6265#discussion_r212679944
 
 

 ##
 File path: sdks/python/build.gradle
 ##
 @@ -204,19 +204,30 @@ task directRunnerIT(dependsOn: 'installGcpTest') {
 
 // Before running this, you need to:
 //
-// 1. build the SDK container:
+// 1. Build the SDK container:
 //
 //./gradlew -p sdks/python/container docker
 //
-// 2. start a local JobService, for example, the Portable Flink runner
+// 2. Either a) or b)
+//  a) If you want the Job Server to run in a Docker container:
+//
+//./gradlew :b®eam-runners-flink_2.11-job-server-container docker
+//
+//  b) Otherwise, start a local JobService, for example, the Portable Flink 
runner
 //(in a separate shell since it continues to run):
 //
 //./gradlew :beam-runners-flink_2.11-job-server:runShadow
 //
 // Then you can run this example:
 //
+//  Docker (2a): 
+//
 //./gradlew :beam-sdks-python:portableWordCount
 //
+//  Local JobService (2b):
+//
+//./gradlew :beam-sdks-python:portableWordCount 
--job_endpoint=localhost:8099
 
 Review comment:
   Updated to include a property.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 137876)
Time Spent: 6h 10m  (was: 6h)

> Portable Flink runner JobService entry point in a Docker container
> --
>
> Key: BEAM-4130
> URL: https://issues.apache.org/jira/browse/BEAM-4130
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-flink
>Reporter: Ben Sidhom
>Assignee: Maximilian Michels
>Priority: Minor
>  Time Spent: 6h 10m
>  Remaining Estimate: 0h
>
> The portable Flink runner exists as a Job Service that runs somewhere. We 
> need a main entry point that itself spins up the job service (and artifact 
> staging service). The main program itself should be packaged into an uberjar 
> such that it can be run locally or submitted to a Flink deployment via `flink 
> run`.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container

2018-08-24 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=137874&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-137874
 ]

ASF GitHub Bot logged work on BEAM-4130:


Author: ASF GitHub Bot
Created on: 24/Aug/18 16:05
Start Date: 24/Aug/18 16:05
Worklog Time Spent: 10m 
  Work Description: tweise commented on a change in pull request #6265: 
[BEAM-4130] Bring up Job Server container for Python jobs
URL: https://github.com/apache/beam/pull/6265#discussion_r212677632
 
 

 ##
 File path: sdks/python/build.gradle
 ##
 @@ -204,19 +204,30 @@ task directRunnerIT(dependsOn: 'installGcpTest') {
 
 // Before running this, you need to:
 //
-// 1. build the SDK container:
+// 1. Build the SDK container:
 //
 //./gradlew -p sdks/python/container docker
 //
-// 2. start a local JobService, for example, the Portable Flink runner
+// 2. Either a) or b)
+//  a) If you want the Job Server to run in a Docker container:
+//
+//./gradlew :b®eam-runners-flink_2.11-job-server-container docker
+//
+//  b) Otherwise, start a local JobService, for example, the Portable Flink 
runner
 //(in a separate shell since it continues to run):
 //
 //./gradlew :beam-runners-flink_2.11-job-server:runShadow
 //
 // Then you can run this example:
 //
+//  Docker (2a): 
+//
 //./gradlew :beam-sdks-python:portableWordCount
 //
+//  Local JobService (2b):
+//
+//./gradlew :beam-sdks-python:portableWordCount 
--job_endpoint=localhost:8099
 
 Review comment:
   like this
   ``` 
   if (project.hasProperty("flinkMasterUrl"))
   args += ["--flink-master-url=${project.property('flinkMasterUrl')}"]
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 137874)
Time Spent: 6h  (was: 5h 50m)

> Portable Flink runner JobService entry point in a Docker container
> --
>
> Key: BEAM-4130
> URL: https://issues.apache.org/jira/browse/BEAM-4130
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-flink
>Reporter: Ben Sidhom
>Assignee: Maximilian Michels
>Priority: Minor
>  Time Spent: 6h
>  Remaining Estimate: 0h
>
> The portable Flink runner exists as a Job Service that runs somewhere. We 
> need a main entry point that itself spins up the job service (and artifact 
> staging service). The main program itself should be packaged into an uberjar 
> such that it can be run locally or submitted to a Flink deployment via `flink 
> run`.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container

2018-08-24 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=137872&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-137872
 ]

ASF GitHub Bot logged work on BEAM-4130:


Author: ASF GitHub Bot
Created on: 24/Aug/18 16:01
Start Date: 24/Aug/18 16:01
Worklog Time Spent: 10m 
  Work Description: tweise commented on a change in pull request #6265: 
[BEAM-4130] Bring up Job Server container for Python jobs
URL: https://github.com/apache/beam/pull/6265#discussion_r212676518
 
 

 ##
 File path: sdks/python/build.gradle
 ##
 @@ -204,19 +204,30 @@ task directRunnerIT(dependsOn: 'installGcpTest') {
 
 // Before running this, you need to:
 //
-// 1. build the SDK container:
+// 1. Build the SDK container:
 //
 //./gradlew -p sdks/python/container docker
 //
-// 2. start a local JobService, for example, the Portable Flink runner
+// 2. Either a) or b)
+//  a) If you want the Job Server to run in a Docker container:
+//
+//./gradlew :b®eam-runners-flink_2.11-job-server-container docker
+//
+//  b) Otherwise, start a local JobService, for example, the Portable Flink 
runner
 //(in a separate shell since it continues to run):
 //
 //./gradlew :beam-runners-flink_2.11-job-server:runShadow
 //
 // Then you can run this example:
 //
+//  Docker (2a): 
+//
 //./gradlew :beam-sdks-python:portableWordCount
 //
+//  Local JobService (2b):
+//
+//./gradlew :beam-sdks-python:portableWordCount 
--job_endpoint=localhost:8099
 
 Review comment:
   `Unknown command-line option '--job_endpoint`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 137872)
Time Spent: 5h 50m  (was: 5h 40m)

> Portable Flink runner JobService entry point in a Docker container
> --
>
> Key: BEAM-4130
> URL: https://issues.apache.org/jira/browse/BEAM-4130
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-flink
>Reporter: Ben Sidhom
>Assignee: Maximilian Michels
>Priority: Minor
>  Time Spent: 5h 50m
>  Remaining Estimate: 0h
>
> The portable Flink runner exists as a Job Service that runs somewhere. We 
> need a main entry point that itself spins up the job service (and artifact 
> staging service). The main program itself should be packaged into an uberjar 
> such that it can be run locally or submitted to a Flink deployment via `flink 
> run`.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container

2018-08-24 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=137870&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-137870
 ]

ASF GitHub Bot logged work on BEAM-4130:


Author: ASF GitHub Bot
Created on: 24/Aug/18 15:59
Start Date: 24/Aug/18 15:59
Worklog Time Spent: 10m 
  Work Description: mxm commented on a change in pull request #6265: 
[BEAM-4130] Bring up Job Server container for Python jobs
URL: https://github.com/apache/beam/pull/6265#discussion_r212675908
 
 

 ##
 File path: sdks/python/build.gradle
 ##
 @@ -204,19 +204,30 @@ task directRunnerIT(dependsOn: 'installGcpTest') {
 
 // Before running this, you need to:
 //
-// 1. build the SDK container:
+// 1. Build the SDK container:
 //
 //./gradlew -p sdks/python/container docker
 //
-// 2. start a local JobService, for example, the Portable Flink runner
+// 2. Either a) or b)
+//  a) If you want the Job Server to run in a Docker container:
+//
+//./gradlew :b®eam-runners-flink_2.11-job-server-container docker
+//
+//  b) Otherwise, start a local JobService, for example, the Portable Flink 
runner
 //(in a separate shell since it continues to run):
 //
 //./gradlew :beam-runners-flink_2.11-job-server:runShadow
 //
 // Then you can run this example:
 //
+//  Docker (2a): 
+//
 //./gradlew :beam-sdks-python:portableWordCount
 //
+//  Local JobService (2b):
+//
+//./gradlew :beam-sdks-python:portableWordCount 
--job_endpoint=localhost:8099
 
 Review comment:
   Ah yes.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 137870)
Time Spent: 5h 40m  (was: 5.5h)

> Portable Flink runner JobService entry point in a Docker container
> --
>
> Key: BEAM-4130
> URL: https://issues.apache.org/jira/browse/BEAM-4130
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-flink
>Reporter: Ben Sidhom
>Assignee: Maximilian Michels
>Priority: Minor
>  Time Spent: 5h 40m
>  Remaining Estimate: 0h
>
> The portable Flink runner exists as a Job Service that runs somewhere. We 
> need a main entry point that itself spins up the job service (and artifact 
> staging service). The main program itself should be packaged into an uberjar 
> such that it can be run locally or submitted to a Flink deployment via `flink 
> run`.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container

2018-08-24 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=137858&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-137858
 ]

ASF GitHub Bot logged work on BEAM-4130:


Author: ASF GitHub Bot
Created on: 24/Aug/18 15:51
Start Date: 24/Aug/18 15:51
Worklog Time Spent: 10m 
  Work Description: tweise commented on a change in pull request #6265: 
[BEAM-4130] Bring up Job Server container for Python jobs
URL: https://github.com/apache/beam/pull/6265#discussion_r212673728
 
 

 ##
 File path: sdks/python/build.gradle
 ##
 @@ -204,19 +204,30 @@ task directRunnerIT(dependsOn: 'installGcpTest') {
 
 // Before running this, you need to:
 //
-// 1. build the SDK container:
+// 1. Build the SDK container:
 //
 //./gradlew -p sdks/python/container docker
 //
-// 2. start a local JobService, for example, the Portable Flink runner
+// 2. Either a) or b)
+//  a) If you want the Job Server to run in a Docker container:
+//
+//./gradlew :b®eam-runners-flink_2.11-job-server-container docker
+//
+//  b) Otherwise, start a local JobService, for example, the Portable Flink 
runner
 //(in a separate shell since it continues to run):
 //
 //./gradlew :beam-runners-flink_2.11-job-server:runShadow
 //
 // Then you can run this example:
 //
+//  Docker (2a): 
+//
 //./gradlew :beam-sdks-python:portableWordCount
 //
+//  Local JobService (2b):
+//
+//./gradlew :beam-sdks-python:portableWordCount 
--job_endpoint=localhost:8099
 
 Review comment:
 Doesn't this require a project property?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 137858)
Time Spent: 5.5h  (was: 5h 20m)

> Portable Flink runner JobService entry point in a Docker container
> --
>
> Key: BEAM-4130
> URL: https://issues.apache.org/jira/browse/BEAM-4130
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-flink
>Reporter: Ben Sidhom
>Assignee: Maximilian Michels
>Priority: Minor
>  Time Spent: 5.5h
>  Remaining Estimate: 0h
>
> The portable Flink runner exists as a Job Service that runs somewhere. We 
> need a main entry point that itself spins up the job service (and artifact 
> staging service). The main program itself should be packaged into an uberjar 
> such that it can be run locally or submitted to a Flink deployment via `flink 
> run`.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container

2018-08-24 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=137824&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-137824
 ]

ASF GitHub Bot logged work on BEAM-4130:


Author: ASF GitHub Bot
Created on: 24/Aug/18 13:58
Start Date: 24/Aug/18 13:58
Worklog Time Spent: 10m 
  Work Description: mxm commented on a change in pull request #6265: 
[BEAM-4130] Bring up Job Server container for Python jobs
URL: https://github.com/apache/beam/pull/6265#discussion_r212636822
 
 

 ##
 File path: sdks/python/build.gradle
 ##
 @@ -204,14 +204,10 @@ task directRunnerIT(dependsOn: 'installGcpTest') {
 
 // Before running this, you need to:
 //
-// 1. build the SDK container:
 
 Review comment:
   +1


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 137824)
Time Spent: 5h 20m  (was: 5h 10m)

> Portable Flink runner JobService entry point in a Docker container
> --
>
> Key: BEAM-4130
> URL: https://issues.apache.org/jira/browse/BEAM-4130
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-flink
>Reporter: Ben Sidhom
>Assignee: Maximilian Michels
>Priority: Minor
>  Time Spent: 5h 20m
>  Remaining Estimate: 0h
>
> The portable Flink runner exists as a Job Service that runs somewhere. We 
> need a main entry point that itself spins up the job service (and artifact 
> staging service). The main program itself should be packaged into an uberjar 
> such that it can be run locally or submitted to a Flink deployment via `flink 
> run`.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container

2018-08-23 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=137633&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-137633
 ]

ASF GitHub Bot logged work on BEAM-4130:


Author: ASF GitHub Bot
Created on: 24/Aug/18 00:54
Start Date: 24/Aug/18 00:54
Worklog Time Spent: 10m 
  Work Description: tweise commented on a change in pull request #6265: 
[BEAM-4130] Bring up Job Server container for Python jobs
URL: https://github.com/apache/beam/pull/6265#discussion_r212496888
 
 

 ##
 File path: sdks/python/build.gradle
 ##
 @@ -204,14 +204,10 @@ task directRunnerIT(dependsOn: 'installGcpTest') {
 
 // Before running this, you need to:
 //
-// 1. build the SDK container:
 
 Review comment:
   Perhaps we should keep the alternative documented here, since running 
against an existing job server is useful, for example when working on/debugging 
the runner. In the side input PR I already made some changes to the options 
here, to utilize Gradle properties.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 137633)
Time Spent: 5h 10m  (was: 5h)

> Portable Flink runner JobService entry point in a Docker container
> --
>
> Key: BEAM-4130
> URL: https://issues.apache.org/jira/browse/BEAM-4130
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-flink
>Reporter: Ben Sidhom
>Assignee: Maximilian Michels
>Priority: Minor
>  Time Spent: 5h 10m
>  Remaining Estimate: 0h
>
> The portable Flink runner exists as a Job Service that runs somewhere. We 
> need a main entry point that itself spins up the job service (and artifact 
> staging service). The main program itself should be packaged into an uberjar 
> such that it can be run locally or submitted to a Flink deployment via `flink 
> run`.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container

2018-08-22 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=137111&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-137111
 ]

ASF GitHub Bot logged work on BEAM-4130:


Author: ASF GitHub Bot
Created on: 22/Aug/18 18:47
Start Date: 22/Aug/18 18:47
Worklog Time Spent: 10m 
  Work Description: mxm opened a new pull request #6265: [BEAM-4130] Bring 
up Job Server container for Python jobs
URL: https://github.com/apache/beam/pull/6265
 
 
   This starts the Job Server in a container when Python pipelines are run and 
no
   Job Server endpoint has been specified. The Job Server has the docker 
binaries
   and socket mounted from the host to bring up SDK harness containers directly 
on
   the host. This feature is currently only is available for Flink.
   
   Due to host networking not being available on MacOs, we need to explicitly 
map
   ports from the host system to the container. On Linux this is not necessary
   because host networking works as expected. Generally, getting Docker support 
to
   work on MacOs proved to be a bit tricky.
   
   You can test this via:
   
   ```
gradle :beam-sdks-python:portableWordCount
   ```
   
   Some rough edges on MacOS:
   
   - At times, the JobService socket is not available, even though 
`grpc.channel_ready_future(channel).result()` in the portable_runner returns.
   - The port range for the SDK harness is restricted to be between 8100 and 
8200 (configurable) due to the necessary explicit port forwarding
   
   Potentially problematic on Linux:
   
   - The Docker binary might not be statically compiled and depend on shared
 libraries, though this needs to be tested
   
   CC @angoenka 
   
   Post-Commit Tests Status (on master branch)
   

   
   Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
   --- | --- | --- | --- | --- | --- | --- | ---
   Go | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_GradleBuild/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_GradleBuild/lastCompletedBuild/)
 | --- | --- | --- | --- | --- | ---
   Java | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_GradleBuild/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_GradleBuild/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex_Gradle/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Gradle/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Gradle/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump_Gradle/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza_Gradle/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark_Gradle/lastCompletedBuild/)
   Python | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/)
 | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)
  [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/)
 | --- | --- | --- | ---
   
   
   
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 137111)
Time Spent: 5h  (was: 4h 50m)

> Port

[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container

2018-08-17 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=135710&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-135710
 ]

ASF GitHub Bot logged work on BEAM-4130:


Author: ASF GitHub Bot
Created on: 17/Aug/18 14:38
Start Date: 17/Aug/18 14:38
Worklog Time Spent: 10m 
  Work Description: asfgit closed pull request #6238: [BEAM-4130] Build 
Docker image for Flink's JobServer
URL: https://github.com/apache/beam/pull/6238
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/runners/flink/job-server-container/Dockerfile 
b/runners/flink/job-server-container/Dockerfile
new file mode 100644
index 000..a9aff21b6d6
--- /dev/null
+++ b/runners/flink/job-server-container/Dockerfile
@@ -0,0 +1,26 @@
+###
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+###
+
+FROM openjdk:8
+MAINTAINER "Apache Beam "
+
+ADD target/beam-runners-flink_2.11-job-server.jar /opt/apache/beam/jars/
+ADD target/flink-job-server.sh /opt/apache/beam/
+
+WORKDIR /opt/apache/beam
+ENTRYPOINT ["./flink-job-server.sh"]
diff --git a/runners/flink/job-server-container/build.gradle 
b/runners/flink/job-server-container/build.gradle
new file mode 100644
index 000..4d5f53316e3
--- /dev/null
+++ b/runners/flink/job-server-container/build.gradle
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Build a Docker image to bootstrap FlinkJobServerDriver which requires a 
Java environment.
+ * Alternatively, it can also be bootstrapped through 
:beam-runners-flink_2.11-job-server:runShadow
+ * or by directly running the generated JAR file.
+ */
+
+apply plugin: org.apache.beam.gradle.BeamModulePlugin
+applyDockerNature()
+
+description = "Apache Beam :: Runners :: Flink :: Job Server :: Container"
+
+configurations {
+  dockerDependency
+}
+
+dependencies {
+  dockerDependency project(path: ":beam-runners-flink_2.11-job-server", 
configuration: "shadow")
+}
+
+task copyDockerfileDependencies(type: Copy) {
+  // Required Jars
+  from configurations.dockerDependency
+  rename 'beam-runners-flink_2.11-job-server.*.jar', 
'beam-runners-flink_2.11-job-server.jar'
+  into "build/target"
+  // Entry script
+  from file("./flink-job-server.sh")
+  into "build/target"
+}
+
+docker {
+  name containerImageName(name: "flink-job-server")
+  files "./build/"
+}
+
+// Ensure that we build the required resources and copy and file dependencies 
from related projects
+dockerPrepare.dependsOn copyDockerfileDependencies
diff --git a/runners/flink/job-server-container/flink-job-server.sh 
b/runners/flink/job-server-container/flink-job-server.sh
new file mode 100755
index 000..399e5e4c64a
--- /dev/null
+++ b/runners/flink/job-server-container/flink-job-server.sh
@@ -0,0 +1,29 @@
+#!/bin/sh
+###
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed w

[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container

2018-08-16 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=135562&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-135562
 ]

ASF GitHub Bot logged work on BEAM-4130:


Author: ASF GitHub Bot
Created on: 17/Aug/18 01:26
Start Date: 17/Aug/18 01:26
Worklog Time Spent: 10m 
  Work Description: angoenka commented on a change in pull request #6238: 
[BEAM-4130] Build Docker image for Flink's JobServer
URL: https://github.com/apache/beam/pull/6238#discussion_r210784330
 
 

 ##
 File path: runners/flink/job-server-container/flink-job-server.sh
 ##
 @@ -0,0 +1,29 @@
+#!/bin/sh
+###
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+###
+
+### Just a simple script to bootstrap the FlinkJobServerDriver
+### For the environment, see the Dockerfile
+
+# The following (forking to the background, then waiting) enables to use 
CTRL+C to kill the container.
+# We're PID 1 which doesn't handle signals. By forking the Java process to the 
background,
+# a PID > 1 is created which handles signals. After the command shuts down, 
the script and
+# thus the container will also exit.
+
+java -cp "jars/*" org.apache.beam.runners.flink.FlinkJobServerDriver "$@" &
+wait
 
 Review comment:
   please add a new line.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 135562)
Time Spent: 4h 40m  (was: 4.5h)

> Portable Flink runner JobService entry point in a Docker container
> --
>
> Key: BEAM-4130
> URL: https://issues.apache.org/jira/browse/BEAM-4130
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-flink
>Reporter: Ben Sidhom
>Assignee: Maximilian Michels
>Priority: Minor
>  Time Spent: 4h 40m
>  Remaining Estimate: 0h
>
> The portable Flink runner exists as a Job Service that runs somewhere. We 
> need a main entry point that itself spins up the job service (and artifact 
> staging service). The main program itself should be packaged into an uberjar 
> such that it can be run locally or submitted to a Flink deployment via `flink 
> run`.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container

2018-08-16 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=135394&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-135394
 ]

ASF GitHub Bot logged work on BEAM-4130:


Author: ASF GitHub Bot
Created on: 16/Aug/18 15:02
Start Date: 16/Aug/18 15:02
Worklog Time Spent: 10m 
  Work Description: mxm opened a new pull request #6238: [BEAM-4130] Build 
Docker image for Flink's JobServer
URL: https://github.com/apache/beam/pull/6238
 
 
   This adds a new Gradle module flink-job-server-container which builds a 
docker
   image during the `docker` task. The image contains the FlinkJobServerDriver
   which is the entry point for submitting Beam pipelines to the cluster.
   
   The image can then be used to spawn a JobServer container when executing a 
Beam
   pipeline, i.e. `p.run()`. The SDKs (Java/Python/Go) need to be updated to 
either
   spawn up a container or use the address of a remote JobServer.
   
   CC @angoenka 
   
   Post-Commit Tests Status (on master branch)
   

   
   Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
   --- | --- | --- | --- | --- | --- | --- | ---
   Go | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_GradleBuild/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_GradleBuild/lastCompletedBuild/)
 | --- | --- | --- | --- | --- | ---
   Java | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_GradleBuild/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_GradleBuild/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex_Gradle/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Gradle/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Gradle/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump_Gradle/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza_Gradle/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark_Gradle/lastCompletedBuild/)
   Python | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/)
 | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)
  [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/)
 | --- | --- | --- | ---
   
   
   
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 135394)
Time Spent: 4.5h  (was: 4h 20m)

> Portable Flink runner JobService entry point in a Docker container
> --
>
> Key: BEAM-4130
> URL: https://issues.apache.org/jira/browse/BEAM-4130
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-flink
>Reporter: Ben Sidhom
>Assignee: Maximilian Michels
>Priority: Minor
>  Time Spent: 4.5h
>  Remaining Estimate: 0h
>
> The portable Flink runner exists as a Job Service that runs somewhere. We 
> need a main entry point that itself spins up the job service (and artifact 
> staging service). The main program itself should be packaged into an uberjar 

[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container

2018-06-14 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=112078&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-112078
 ]

ASF GitHub Bot logged work on BEAM-4130:


Author: ASF GitHub Bot
Created on: 14/Jun/18 22:11
Start Date: 14/Jun/18 22:11
Worklog Time Spent: 10m 
  Work Description: angoenka commented on issue #5493: [BEAM-4130] Add job 
submission capabilities to Flink runner.
URL: https://github.com/apache/beam/pull/5493#issuecomment-397454040
 
 
   In that case we can go ahead with the merge.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 112078)
Time Spent: 4h 10m  (was: 4h)

> Portable Flink runner JobService entry point in a Docker container
> --
>
> Key: BEAM-4130
> URL: https://issues.apache.org/jira/browse/BEAM-4130
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-flink
>Reporter: Ben Sidhom
>Priority: Minor
>  Time Spent: 4h 10m
>  Remaining Estimate: 0h
>
> The portable Flink runner exists as a Job Service that runs somewhere. We 
> need a main entry point that itself spins up the job service (and artifact 
> staging service). The main program itself should be packaged into an uberjar 
> such that it can be run locally or submitted to a Flink deployment via `flink 
> run`.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container

2018-06-14 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=112079&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-112079
 ]

ASF GitHub Bot logged work on BEAM-4130:


Author: ASF GitHub Bot
Created on: 14/Jun/18 22:11
Start Date: 14/Jun/18 22:11
Worklog Time Spent: 10m 
  Work Description: robertwb closed pull request #5493: [BEAM-4130] Add job 
submission capabilities to Flink runner.
URL: https://github.com/apache/beam/pull/5493
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/runners/flink/build.gradle b/runners/flink/build.gradle
index ebd607c0e21..4eec38962c6 100644
--- a/runners/flink/build.gradle
+++ b/runners/flink/build.gradle
@@ -59,6 +59,7 @@ dependencies {
   shadow library.java.slf4j_api
   shadow library.java.joda_time
   shadow library.java.commons_compress
+  shadow library.java.args4j
   shadow "org.apache.flink:flink-clients_2.11:$flink_version"
   shadow "org.apache.flink:flink-core:$flink_version"
   shadow "org.apache.flink:flink-metrics-core:$flink_version"
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java
index b7ee08378dc..60e9be798f7 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java
@@ -98,7 +98,7 @@
  *   FlinkBatchPortablePipelineTranslator translator =
  *   FlinkBatchPortablePipelineTranslator.createTranslator();
  *   BatchTranslationContext context =
- *   
FlinkBatchPortablePipelineTranslator.createTranslationContext(options);
+ *   
FlinkBatchPortablePipelineTranslator.createTranslationContext(jobInfo);
  *   translator.translate(context, pipeline);
  *   ExecutionEnvironment executionEnvironment = 
context.getExecutionEnvironment();
  *   // Do something with executionEnvironment...
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvocation.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvocation.java
new file mode 100644
index 000..cfc6b3898fe
--- /dev/null
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvocation.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.function.Consumer;
+import javax.annotation.Nullable;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessage;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobState;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
+import org.apache.beam.runners.core.construction.graph.GreedyPipelineFuser;
+import org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation;
+import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.metrics.MetricsEnvironment;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Invocation of a Flink Job via {@link FlinkRunner}.
+ */
+public class FlinkJobInvocation implements JobInvocation {
+  private static final Logger LO

[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container

2018-06-14 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=112076&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-112076
 ]

ASF GitHub Bot logged work on BEAM-4130:


Author: ASF GitHub Bot
Created on: 14/Jun/18 22:09
Start Date: 14/Jun/18 22:09
Worklog Time Spent: 10m 
  Work Description: robertwb commented on issue #5493: [BEAM-4130] Add job 
submission capabilities to Flink runner.
URL: https://github.com/apache/beam/pull/5493#issuecomment-397453738
 
 
   It's just hitting 
org.apache.beam.runners.direct.portable.ReferenceRunnerTest.pipelineExecution 
which is unrelated and flaky in head too. 
   
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 112076)
Time Spent: 4h  (was: 3h 50m)

> Portable Flink runner JobService entry point in a Docker container
> --
>
> Key: BEAM-4130
> URL: https://issues.apache.org/jira/browse/BEAM-4130
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-flink
>Reporter: Ben Sidhom
>Priority: Minor
>  Time Spent: 4h
>  Remaining Estimate: 0h
>
> The portable Flink runner exists as a Job Service that runs somewhere. We 
> need a main entry point that itself spins up the job service (and artifact 
> staging service). The main program itself should be packaged into an uberjar 
> such that it can be run locally or submitted to a Flink deployment via `flink 
> run`.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container

2018-06-14 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=112038&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-112038
 ]

ASF GitHub Bot logged work on BEAM-4130:


Author: ASF GitHub Bot
Created on: 14/Jun/18 20:21
Start Date: 14/Jun/18 20:21
Worklog Time Spent: 10m 
  Work Description: angoenka commented on issue #5493: [BEAM-4130] Add job 
submission capabilities to Flink runner.
URL: https://github.com/apache/beam/pull/5493#issuecomment-397426050
 
 
   @robertwb Based on the discussion in the chat thread, should we remove 
BeamFileSystemArtifactSource as ArtifactRetrivalService is meant to do the same 
job.
   If you want we can keep it as it is as I can clean it up when I go about 
removing ArtifactSource interface.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 112038)
Time Spent: 3h 50m  (was: 3h 40m)

> Portable Flink runner JobService entry point in a Docker container
> --
>
> Key: BEAM-4130
> URL: https://issues.apache.org/jira/browse/BEAM-4130
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-flink
>Reporter: Ben Sidhom
>Priority: Minor
>  Time Spent: 3h 50m
>  Remaining Estimate: 0h
>
> The portable Flink runner exists as a Job Service that runs somewhere. We 
> need a main entry point that itself spins up the job service (and artifact 
> staging service). The main program itself should be packaged into an uberjar 
> such that it can be run locally or submitted to a Flink deployment via `flink 
> run`.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container

2018-06-14 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=111986&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-111986
 ]

ASF GitHub Bot logged work on BEAM-4130:


Author: ASF GitHub Bot
Created on: 14/Jun/18 18:59
Start Date: 14/Jun/18 18:59
Worklog Time Spent: 10m 
  Work Description: angoenka commented on issue #5493: [BEAM-4130] Add job 
submission capabilities to Flink runner.
URL: https://github.com/apache/beam/pull/5493#issuecomment-397403414
 
 
   LGTM
   Please make sure that the checks are green.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 111986)
Time Spent: 3h 40m  (was: 3.5h)

> Portable Flink runner JobService entry point in a Docker container
> --
>
> Key: BEAM-4130
> URL: https://issues.apache.org/jira/browse/BEAM-4130
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-flink
>Reporter: Ben Sidhom
>Priority: Minor
>  Time Spent: 3h 40m
>  Remaining Estimate: 0h
>
> The portable Flink runner exists as a Job Service that runs somewhere. We 
> need a main entry point that itself spins up the job service (and artifact 
> staging service). The main program itself should be packaged into an uberjar 
> such that it can be run locally or submitted to a Flink deployment via `flink 
> run`.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container

2018-06-13 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=111718&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-111718
 ]

ASF GitHub Bot logged work on BEAM-4130:


Author: ASF GitHub Bot
Created on: 13/Jun/18 23:28
Start Date: 13/Jun/18 23:28
Worklog Time Spent: 10m 
  Work Description: robertwb commented on a change in pull request #5493: 
[BEAM-4130] Add job submission capabilities to Flink runner.
URL: https://github.com/apache/beam/pull/5493#discussion_r195264829
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvoker.java
 ##
 @@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.protobuf.Struct;
+import java.io.IOException;
+import java.util.UUID;
+import javax.annotation.Nullable;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
+import org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation;
+import org.apache.beam.runners.fnexecution.jobsubmission.JobInvoker;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Job Invoker for the {@link FlinkRunner}.
+ */
+public class FlinkJobInvoker implements JobInvoker {
+  private static final Logger LOG = 
LoggerFactory.getLogger(FlinkJobInvoker.class);
+
+  public static FlinkJobInvoker create(ListeningExecutorService 
executorService) {
+return new FlinkJobInvoker(executorService);
+  }
+
+  private final ListeningExecutorService executorService;
+
+  private FlinkJobInvoker(ListeningExecutorService executorService) {
+this.executorService = executorService;
+  }
+
+  @Override
+  public JobInvocation invoke(
+  RunnerApi.Pipeline pipeline, Struct options, @Nullable String 
artifactToken)
+  throws IOException {
+// TODO: How to make Java/Python agree on names of keys and their values?
+LOG.trace("Parsing pipeline options");
+FlinkPipelineOptions flinkOptions = 
PipelineOptionsTranslation.fromProto(options)
+.as(FlinkPipelineOptions.class);
+
+String invocationId = String.format(
+"%s_%s", flinkOptions.getJobName(), UUID.randomUUID().toString());
+LOG.debug("Invoking job {}", invocationId);
+
+// Set Flink Master to [auto] if no option was specified.
+if (flinkOptions.getFlinkMaster() == null) {
+  flinkOptions.setFlinkMaster("[auto]");
+}
+
+flinkOptions.setRunner(null);
 
 Review comment:
   @axelmagn ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 111718)
Time Spent: 3h 10m  (was: 3h)

> Portable Flink runner JobService entry point in a Docker container
> --
>
> Key: BEAM-4130
> URL: https://issues.apache.org/jira/browse/BEAM-4130
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-flink
>Reporter: Ben Sidhom
>Priority: Minor
>  Time Spent: 3h 10m
>  Remaining Estimate: 0h
>
> The portable Flink runner exists as a Job Service that runs somewhere. We 
> need a main entry point that itself spins up the job service (and artifact 
> staging service). The main program itself should be packaged into an uberjar 
> such that it can be run locally or submitted to a Flink deployment via `flink 
> run`.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container

2018-06-13 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=111719&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-111719
 ]

ASF GitHub Bot logged work on BEAM-4130:


Author: ASF GitHub Bot
Created on: 13/Jun/18 23:29
Start Date: 13/Jun/18 23:29
Worklog Time Spent: 10m 
  Work Description: robertwb commented on a change in pull request #5493: 
[BEAM-4130] Add job submission capabilities to Flink runner.
URL: https://github.com/apache/beam/pull/5493#discussion_r195264045
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvocation.java
 ##
 @@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.function.Consumer;
+import javax.annotation.Nullable;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessage;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
+import org.apache.beam.runners.core.construction.graph.GreedyPipelineFuser;
+import org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation;
+import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.metrics.MetricsEnvironment;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Invocation of a Flink Job via {@link FlinkRunner}.
+ */
+public class FlinkJobInvocation implements JobInvocation {
+  private static final Logger LOG = 
LoggerFactory.getLogger(FlinkJobInvocation.class);
+
+  public static FlinkJobInvocation create(String id, ListeningExecutorService 
executorService,
+  RunnerApi.Pipeline pipeline, FlinkPipelineOptions pipelineOptions) {
+return new FlinkJobInvocation(id, executorService, pipeline, 
pipelineOptions);
+  }
+
+  private final String id;
+  private final ListeningExecutorService executorService;
+  private final RunnerApi.Pipeline pipeline;
+  private final FlinkPipelineOptions pipelineOptions;
+  private Enum jobState;
+  private List> stateObservers;
+
+  @Nullable
+  private ListenableFuture invocationFuture;
+
+  private FlinkJobInvocation(String id, ListeningExecutorService 
executorService,
+  RunnerApi.Pipeline pipeline, FlinkPipelineOptions pipelineOptions) {
+this.id = id;
+this.executorService = executorService;
+this.pipeline = pipeline;
+this.pipelineOptions = pipelineOptions;
+this.invocationFuture = null;
+this.jobState = Enum.STOPPED;
+this.stateObservers = new ArrayList<>();
+  }
+
+  private PipelineResult runPipeline() throws Exception {
+LOG.trace("Translating pipeline from proto");
+
+MetricsEnvironment.setMetricsSupported(true);
+
+LOG.info("Translating pipeline to Flink program.");
+// Fused pipeline proto.
+RunnerApi.Pipeline fusedPipeline = 
GreedyPipelineFuser.fuse(pipeline).toPipeline();
+JobInfo jobInfo = JobInfo.create(
+id, pipelineOptions.getJobName(), 
PipelineOptionsTranslation.toProto(pipelineOptions));
+final JobExecutionResult result;
+
+if (!pipelineOptions.isStreaming() && 
!hasUnboundedPCollections(fusedPipeline)) {
+  // TODO: Do we need to inspect for unbounded sources before fusing?
+  // batch translation
+  FlinkBatchPortablePipelineTranslator translator =
+  FlinkBatchPortablePipelineTranslator.createTranslator();
+  FlinkBatchPortablePipelineTranslator.BatchTranslationContext context =
+  
FlinkBatchPortablePipelineTranslator.createTranslationContext(jobInfo);
+  tra

[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container

2018-06-13 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=111720&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-111720
 ]

ASF GitHub Bot logged work on BEAM-4130:


Author: ASF GitHub Bot
Created on: 13/Jun/18 23:29
Start Date: 13/Jun/18 23:29
Worklog Time Spent: 10m 
  Work Description: robertwb commented on a change in pull request #5493: 
[BEAM-4130] Add job submission capabilities to Flink runner.
URL: https://github.com/apache/beam/pull/5493#discussion_r195263412
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvocation.java
 ##
 @@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.function.Consumer;
+import javax.annotation.Nullable;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessage;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
+import org.apache.beam.runners.core.construction.graph.GreedyPipelineFuser;
+import org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation;
+import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.metrics.MetricsEnvironment;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Invocation of a Flink Job via {@link FlinkRunner}.
+ */
+public class FlinkJobInvocation implements JobInvocation {
+  private static final Logger LOG = 
LoggerFactory.getLogger(FlinkJobInvocation.class);
+
+  public static FlinkJobInvocation create(String id, ListeningExecutorService 
executorService,
+  RunnerApi.Pipeline pipeline, FlinkPipelineOptions pipelineOptions) {
+return new FlinkJobInvocation(id, executorService, pipeline, 
pipelineOptions);
+  }
+
+  private final String id;
+  private final ListeningExecutorService executorService;
+  private final RunnerApi.Pipeline pipeline;
+  private final FlinkPipelineOptions pipelineOptions;
+  private Enum jobState;
+  private List> stateObservers;
+
+  @Nullable
+  private ListenableFuture invocationFuture;
+
+  private FlinkJobInvocation(String id, ListeningExecutorService 
executorService,
+  RunnerApi.Pipeline pipeline, FlinkPipelineOptions pipelineOptions) {
+this.id = id;
+this.executorService = executorService;
+this.pipeline = pipeline;
+this.pipelineOptions = pipelineOptions;
+this.invocationFuture = null;
+this.jobState = Enum.STOPPED;
+this.stateObservers = new ArrayList<>();
+  }
+
+  private PipelineResult runPipeline() throws Exception {
+LOG.trace("Translating pipeline from proto");
+
+MetricsEnvironment.setMetricsSupported(true);
+
+LOG.info("Translating pipeline to Flink program.");
+// Fused pipeline proto.
+RunnerApi.Pipeline fusedPipeline = 
GreedyPipelineFuser.fuse(pipeline).toPipeline();
+JobInfo jobInfo = JobInfo.create(
+id, pipelineOptions.getJobName(), 
PipelineOptionsTranslation.toProto(pipelineOptions));
+final JobExecutionResult result;
+
+if (!pipelineOptions.isStreaming() && 
!hasUnboundedPCollections(fusedPipeline)) {
+  // TODO: Do we need to inspect for unbounded sources before fusing?
+  // batch translation
+  FlinkBatchPortablePipelineTranslator translator =
+  FlinkBatchPortablePipelineTranslator.createTranslator();
+  FlinkBatchPortablePipelineTranslator.BatchTranslationContext context =
+  
FlinkBatchPortablePipelineTranslator.createTranslationContext(jobInfo);
+  tra

[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container

2018-06-13 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=111717&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-111717
 ]

ASF GitHub Bot logged work on BEAM-4130:


Author: ASF GitHub Bot
Created on: 13/Jun/18 23:28
Start Date: 13/Jun/18 23:28
Worklog Time Spent: 10m 
  Work Description: robertwb commented on a change in pull request #5493: 
[BEAM-4130] Add job submission capabilities to Flink runner.
URL: https://github.com/apache/beam/pull/5493#discussion_r195263336
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvocation.java
 ##
 @@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.function.Consumer;
+import javax.annotation.Nullable;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessage;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
+import org.apache.beam.runners.core.construction.graph.GreedyPipelineFuser;
+import org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation;
+import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.metrics.MetricsEnvironment;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Invocation of a Flink Job via {@link FlinkRunner}.
+ */
+public class FlinkJobInvocation implements JobInvocation {
+  private static final Logger LOG = 
LoggerFactory.getLogger(FlinkJobInvocation.class);
+
+  public static FlinkJobInvocation create(String id, ListeningExecutorService 
executorService,
+  RunnerApi.Pipeline pipeline, FlinkPipelineOptions pipelineOptions) {
+return new FlinkJobInvocation(id, executorService, pipeline, 
pipelineOptions);
+  }
+
+  private final String id;
+  private final ListeningExecutorService executorService;
+  private final RunnerApi.Pipeline pipeline;
+  private final FlinkPipelineOptions pipelineOptions;
+  private Enum jobState;
+  private List> stateObservers;
+
+  @Nullable
+  private ListenableFuture invocationFuture;
+
+  private FlinkJobInvocation(String id, ListeningExecutorService 
executorService,
+  RunnerApi.Pipeline pipeline, FlinkPipelineOptions pipelineOptions) {
+this.id = id;
+this.executorService = executorService;
+this.pipeline = pipeline;
+this.pipelineOptions = pipelineOptions;
+this.invocationFuture = null;
+this.jobState = Enum.STOPPED;
+this.stateObservers = new ArrayList<>();
+  }
+
+  private PipelineResult runPipeline() throws Exception {
+LOG.trace("Translating pipeline from proto");
+
+MetricsEnvironment.setMetricsSupported(true);
+
+LOG.info("Translating pipeline to Flink program.");
+// Fused pipeline proto.
+RunnerApi.Pipeline fusedPipeline = 
GreedyPipelineFuser.fuse(pipeline).toPipeline();
+JobInfo jobInfo = JobInfo.create(
+id, pipelineOptions.getJobName(), 
PipelineOptionsTranslation.toProto(pipelineOptions));
+final JobExecutionResult result;
+
+if (!pipelineOptions.isStreaming() && 
!hasUnboundedPCollections(fusedPipeline)) {
+  // TODO: Do we need to inspect for unbounded sources before fusing?
+  // batch translation
+  FlinkBatchPortablePipelineTranslator translator =
+  FlinkBatchPortablePipelineTranslator.createTranslator();
+  FlinkBatchPortablePipelineTranslator.BatchTranslationContext context =
+  
FlinkBatchPortablePipelineTranslator.createTranslationContext(jobInfo);
+  tra

[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container

2018-06-13 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=111711&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-111711
 ]

ASF GitHub Bot logged work on BEAM-4130:


Author: ASF GitHub Bot
Created on: 13/Jun/18 22:42
Start Date: 13/Jun/18 22:42
Worklog Time Spent: 10m 
  Work Description: robertwb commented on a change in pull request #5493: 
[BEAM-4130] Add job submission capabilities to Flink runner.
URL: https://github.com/apache/beam/pull/5493#discussion_r195258050
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvocation.java
 ##
 @@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.function.Consumer;
+import javax.annotation.Nullable;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessage;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
+import org.apache.beam.runners.core.construction.graph.GreedyPipelineFuser;
+import org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation;
+import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.metrics.MetricsEnvironment;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Invocation of a Flink Job via {@link FlinkRunner}.
+ */
+public class FlinkJobInvocation implements JobInvocation {
+  private static final Logger LOG = 
LoggerFactory.getLogger(FlinkJobInvocation.class);
+
+  public static FlinkJobInvocation create(String id, ListeningExecutorService 
executorService,
+  RunnerApi.Pipeline pipeline, FlinkPipelineOptions pipelineOptions) {
+return new FlinkJobInvocation(id, executorService, pipeline, 
pipelineOptions);
+  }
+
+  private final String id;
+  private final ListeningExecutorService executorService;
+  private final RunnerApi.Pipeline pipeline;
+  private final FlinkPipelineOptions pipelineOptions;
+  private Enum jobState;
+  private List> stateObservers;
+
+  @Nullable
+  private ListenableFuture invocationFuture;
+
+  private FlinkJobInvocation(String id, ListeningExecutorService 
executorService,
+  RunnerApi.Pipeline pipeline, FlinkPipelineOptions pipelineOptions) {
+this.id = id;
+this.executorService = executorService;
+this.pipeline = pipeline;
+this.pipelineOptions = pipelineOptions;
+this.invocationFuture = null;
+this.jobState = Enum.STOPPED;
+this.stateObservers = new ArrayList<>();
+  }
+
+  private PipelineResult runPipeline() throws Exception {
+LOG.trace("Translating pipeline from proto");
 
 Review comment:
   Done.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 111711)
Time Spent: 2h 50m  (was: 2h 40m)

> Portable Flink runner JobService entry point in a Docker container
> --
>
> Key: BEAM-4130
> URL: https://issues.apache.org/jira/browse/BEAM-4130
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-flink
>Reporter: Ben Sidhom
>Priority: Minor
>  Time Spent: 2h 50m
>

[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container

2018-06-13 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=111701&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-111701
 ]

ASF GitHub Bot logged work on BEAM-4130:


Author: ASF GitHub Bot
Created on: 13/Jun/18 22:25
Start Date: 13/Jun/18 22:25
Worklog Time Spent: 10m 
  Work Description: angoenka commented on a change in pull request #5493: 
[BEAM-4130] Add job submission capabilities to Flink runner.
URL: https://github.com/apache/beam/pull/5493#discussion_r195211177
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvocation.java
 ##
 @@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.function.Consumer;
+import javax.annotation.Nullable;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessage;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
+import org.apache.beam.runners.core.construction.graph.GreedyPipelineFuser;
+import org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation;
+import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.metrics.MetricsEnvironment;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Invocation of a Flink Job via {@link FlinkRunner}.
+ */
+public class FlinkJobInvocation implements JobInvocation {
+  private static final Logger LOG = 
LoggerFactory.getLogger(FlinkJobInvocation.class);
+
+  public static FlinkJobInvocation create(String id, ListeningExecutorService 
executorService,
+  RunnerApi.Pipeline pipeline, FlinkPipelineOptions pipelineOptions) {
+return new FlinkJobInvocation(id, executorService, pipeline, 
pipelineOptions);
+  }
+
+  private final String id;
+  private final ListeningExecutorService executorService;
+  private final RunnerApi.Pipeline pipeline;
+  private final FlinkPipelineOptions pipelineOptions;
+  private Enum jobState;
+  private List> stateObservers;
+
+  @Nullable
+  private ListenableFuture invocationFuture;
+
+  private FlinkJobInvocation(String id, ListeningExecutorService 
executorService,
+  RunnerApi.Pipeline pipeline, FlinkPipelineOptions pipelineOptions) {
+this.id = id;
+this.executorService = executorService;
+this.pipeline = pipeline;
+this.pipelineOptions = pipelineOptions;
+this.invocationFuture = null;
+this.jobState = Enum.STOPPED;
+this.stateObservers = new ArrayList<>();
+  }
+
+  private PipelineResult runPipeline() throws Exception {
+LOG.trace("Translating pipeline from proto");
+
+MetricsEnvironment.setMetricsSupported(true);
+
+LOG.info("Translating pipeline to Flink program.");
+// Fused pipeline proto.
+RunnerApi.Pipeline fusedPipeline = 
GreedyPipelineFuser.fuse(pipeline).toPipeline();
+JobInfo jobInfo = JobInfo.create(
+id, pipelineOptions.getJobName(), 
PipelineOptionsTranslation.toProto(pipelineOptions));
+final JobExecutionResult result;
+
+if (!pipelineOptions.isStreaming() && 
!hasUnboundedPCollections(fusedPipeline)) {
+  // TODO: Do we need to inspect for unbounded sources before fusing?
+  // batch translation
+  FlinkBatchPortablePipelineTranslator translator =
+  FlinkBatchPortablePipelineTranslator.createTranslator();
+  FlinkBatchPortablePipelineTranslator.BatchTranslationContext context =
+  
FlinkBatchPortablePipelineTranslator.createTranslationContext(jobInfo);
+  tra

[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container

2018-06-13 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=111688&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-111688
 ]

ASF GitHub Bot logged work on BEAM-4130:


Author: ASF GitHub Bot
Created on: 13/Jun/18 22:25
Start Date: 13/Jun/18 22:25
Worklog Time Spent: 10m 
  Work Description: angoenka commented on a change in pull request #5493: 
[BEAM-4130] Add job submission capabilities to Flink runner.
URL: https://github.com/apache/beam/pull/5493#discussion_r195215990
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java
 ##
 @@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.io.IOException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import org.apache.beam.model.pipeline.v1.Endpoints;
+import org.apache.beam.runners.fnexecution.GrpcFnServer;
+import org.apache.beam.runners.fnexecution.ServerFactory;
+import 
org.apache.beam.runners.fnexecution.artifact.BeamFileSystemArtifactStagingService;
+import org.apache.beam.runners.fnexecution.jobsubmission.InMemoryJobService;
+import org.apache.beam.runners.fnexecution.jobsubmission.JobInvoker;
+import org.kohsuke.args4j.CmdLineException;
+import org.kohsuke.args4j.CmdLineParser;
+import org.kohsuke.args4j.Option;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/** Driver program that starts a job server. */
+public class FlinkJobServerDriver implements Runnable {
+  private static final Logger LOG = 
LoggerFactory.getLogger(FlinkJobServerDriver.class);
+
+  private static class ServerConfiguration {
+@Option(
+name = "--job-host",
+required = true,
+usage = "The job server host string"
+)
+private String host = "";
+
+@Option(
+name = "--artifacts-dir",
+usage = "The location to store staged artifact files"
+)
+private String artifactStagingPath = "/tmp/beam-artifact-staging";
+  }
+
+  public static void main(String[] args) {
+ServerConfiguration configuration = new ServerConfiguration();
+CmdLineParser parser = new CmdLineParser(configuration);
+try {
+  parser.parseArgument(args);
+} catch (CmdLineException e) {
+  e.printStackTrace(System.err);
 
 Review comment:
   Logger?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 111688)
Time Spent: 50m  (was: 40m)

> Portable Flink runner JobService entry point in a Docker container
> --
>
> Key: BEAM-4130
> URL: https://issues.apache.org/jira/browse/BEAM-4130
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-flink
>Reporter: Ben Sidhom
>Priority: Minor
>  Time Spent: 50m
>  Remaining Estimate: 0h
>
> The portable Flink runner exists as a Job Service that runs somewhere. We 
> need a main entry point that itself spins up the job service (and artifact 
> staging service). The main program itself should be packaged into an uberjar 
> such that it can be run locally or submitted to a Flink deployment via `flink 
> run`.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container

2018-06-13 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=111694&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-111694
 ]

ASF GitHub Bot logged work on BEAM-4130:


Author: ASF GitHub Bot
Created on: 13/Jun/18 22:25
Start Date: 13/Jun/18 22:25
Worklog Time Spent: 10m 
  Work Description: angoenka commented on a change in pull request #5493: 
[BEAM-4130] Add job submission capabilities to Flink runner.
URL: https://github.com/apache/beam/pull/5493#discussion_r195206698
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvocation.java
 ##
 @@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.function.Consumer;
+import javax.annotation.Nullable;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessage;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
+import org.apache.beam.runners.core.construction.graph.GreedyPipelineFuser;
+import org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation;
+import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.metrics.MetricsEnvironment;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Invocation of a Flink Job via {@link FlinkRunner}.
+ */
+public class FlinkJobInvocation implements JobInvocation {
+  private static final Logger LOG = 
LoggerFactory.getLogger(FlinkJobInvocation.class);
+
+  public static FlinkJobInvocation create(String id, ListeningExecutorService 
executorService,
+  RunnerApi.Pipeline pipeline, FlinkPipelineOptions pipelineOptions) {
+return new FlinkJobInvocation(id, executorService, pipeline, 
pipelineOptions);
+  }
+
+  private final String id;
+  private final ListeningExecutorService executorService;
+  private final RunnerApi.Pipeline pipeline;
+  private final FlinkPipelineOptions pipelineOptions;
+  private Enum jobState;
+  private List> stateObservers;
+
+  @Nullable
+  private ListenableFuture invocationFuture;
+
+  private FlinkJobInvocation(String id, ListeningExecutorService 
executorService,
+  RunnerApi.Pipeline pipeline, FlinkPipelineOptions pipelineOptions) {
+this.id = id;
+this.executorService = executorService;
+this.pipeline = pipeline;
+this.pipelineOptions = pipelineOptions;
+this.invocationFuture = null;
+this.jobState = Enum.STOPPED;
+this.stateObservers = new ArrayList<>();
+  }
+
+  private PipelineResult runPipeline() throws Exception {
+LOG.trace("Translating pipeline from proto");
+
+MetricsEnvironment.setMetricsSupported(true);
+
+LOG.info("Translating pipeline to Flink program.");
+// Fused pipeline proto.
+RunnerApi.Pipeline fusedPipeline = 
GreedyPipelineFuser.fuse(pipeline).toPipeline();
+JobInfo jobInfo = JobInfo.create(
+id, pipelineOptions.getJobName(), 
PipelineOptionsTranslation.toProto(pipelineOptions));
+final JobExecutionResult result;
+
+if (!pipelineOptions.isStreaming() && 
!hasUnboundedPCollections(fusedPipeline)) {
+  // TODO: Do we need to inspect for unbounded sources before fusing?
+  // batch translation
+  FlinkBatchPortablePipelineTranslator translator =
+  FlinkBatchPortablePipelineTranslator.createTranslator();
+  FlinkBatchPortablePipelineTranslator.BatchTranslationContext context =
+  
FlinkBatchPortablePipelineTranslator.createTranslationContext(jobInfo);
+  tra

[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container

2018-06-13 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=111691&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-111691
 ]

ASF GitHub Bot logged work on BEAM-4130:


Author: ASF GitHub Bot
Created on: 13/Jun/18 22:25
Start Date: 13/Jun/18 22:25
Worklog Time Spent: 10m 
  Work Description: angoenka commented on a change in pull request #5493: 
[BEAM-4130] Add job submission capabilities to Flink runner.
URL: https://github.com/apache/beam/pull/5493#discussion_r195213110
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvoker.java
 ##
 @@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.protobuf.Struct;
+import java.io.IOException;
+import java.util.UUID;
+import javax.annotation.Nullable;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
+import org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation;
+import org.apache.beam.runners.fnexecution.jobsubmission.JobInvoker;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Job Invoker for the {@link FlinkRunner}.
+ */
+public class FlinkJobInvoker implements JobInvoker {
+  private static final Logger LOG = 
LoggerFactory.getLogger(FlinkJobInvoker.class);
+
+  public static FlinkJobInvoker create(ListeningExecutorService 
executorService) {
+return new FlinkJobInvoker(executorService);
+  }
+
+  private final ListeningExecutorService executorService;
+
+  private FlinkJobInvoker(ListeningExecutorService executorService) {
+this.executorService = executorService;
+  }
+
+  @Override
+  public JobInvocation invoke(
+  RunnerApi.Pipeline pipeline, Struct options, @Nullable String 
artifactToken)
+  throws IOException {
+// TODO: How to make Java/Python agree on names of keys and their values?
+LOG.trace("Parsing pipeline options");
+FlinkPipelineOptions flinkOptions = 
PipelineOptionsTranslation.fromProto(options)
+.as(FlinkPipelineOptions.class);
+
+String invocationId = String.format(
+"%s_%s", flinkOptions.getJobName(), UUID.randomUUID().toString());
+LOG.debug("Invoking job {}", invocationId);
 
 Review comment:
   Info?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 111691)
Time Spent: 1h 20m  (was: 1h 10m)

> Portable Flink runner JobService entry point in a Docker container
> --
>
> Key: BEAM-4130
> URL: https://issues.apache.org/jira/browse/BEAM-4130
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-flink
>Reporter: Ben Sidhom
>Priority: Minor
>  Time Spent: 1h 20m
>  Remaining Estimate: 0h
>
> The portable Flink runner exists as a Job Service that runs somewhere. We 
> need a main entry point that itself spins up the job service (and artifact 
> staging service). The main program itself should be packaged into an uberjar 
> such that it can be run locally or submitted to a Flink deployment via `flink 
> run`.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container

2018-06-13 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=111704&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-111704
 ]

ASF GitHub Bot logged work on BEAM-4130:


Author: ASF GitHub Bot
Created on: 13/Jun/18 22:25
Start Date: 13/Jun/18 22:25
Worklog Time Spent: 10m 
  Work Description: angoenka commented on a change in pull request #5493: 
[BEAM-4130] Add job submission capabilities to Flink runner.
URL: https://github.com/apache/beam/pull/5493#discussion_r195251040
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java
 ##
 @@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.io.IOException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import org.apache.beam.model.pipeline.v1.Endpoints;
+import org.apache.beam.runners.fnexecution.GrpcFnServer;
+import org.apache.beam.runners.fnexecution.ServerFactory;
+import 
org.apache.beam.runners.fnexecution.artifact.BeamFileSystemArtifactStagingService;
+import org.apache.beam.runners.fnexecution.jobsubmission.InMemoryJobService;
+import org.apache.beam.runners.fnexecution.jobsubmission.JobInvoker;
+import org.kohsuke.args4j.CmdLineException;
+import org.kohsuke.args4j.CmdLineParser;
+import org.kohsuke.args4j.Option;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/** Driver program that starts a job server. */
+public class FlinkJobServerDriver implements Runnable {
+  private static final Logger LOG = 
LoggerFactory.getLogger(FlinkJobServerDriver.class);
+
+  private static class ServerConfiguration {
+@Option(
+name = "--job-host",
+required = true,
+usage = "The job server host string"
+)
+private String host = "";
+
+@Option(
+name = "--artifacts-dir",
+usage = "The location to store staged artifact files"
+)
+private String artifactStagingPath = "/tmp/beam-artifact-staging";
+  }
+
+  public static void main(String[] args) {
+ServerConfiguration configuration = new ServerConfiguration();
+CmdLineParser parser = new CmdLineParser(configuration);
+try {
+  parser.parseArgument(args);
+} catch (CmdLineException e) {
+  e.printStackTrace(System.err);
+  printUsage(parser);
+  return;
+}
+FlinkJobServerDriver driver = fromConfig(configuration);
+driver.run();
+  }
+
+  private static void printUsage(CmdLineParser parser) {
+System.err.println(
+String.format(
+"Usage: java %s arguments...", 
FlinkJobServerDriver.class.getSimpleName()));
+parser.printUsage(System.err);
+System.err.println();
+  }
+
+  public static FlinkJobServerDriver fromConfig(ServerConfiguration 
configuration) {
+ThreadFactory threadFactory =
+new 
ThreadFactoryBuilder().setNameFormat("flink-runner-job-server").setDaemon(true).build();
+ListeningExecutorService executor =
+
MoreExecutors.listeningDecorator(Executors.newCachedThreadPool(threadFactory));
+ServerFactory serverFactory = ServerFactory.createDefault();
+return create(configuration, executor, serverFactory);
+  }
+
+  public static FlinkJobServerDriver create(
+  ServerConfiguration configuration,
+  ListeningExecutorService executor,
+  ServerFactory serverFactory) {
+return new FlinkJobServerDriver(configuration, executor, serverFactory);
+  }
+
+  private final ListeningExecutorService executor;
 
 Review comment:
   We should move these variables to the top.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue T

[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container

2018-06-13 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=111695&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-111695
 ]

ASF GitHub Bot logged work on BEAM-4130:


Author: ASF GitHub Bot
Created on: 13/Jun/18 22:25
Start Date: 13/Jun/18 22:25
Worklog Time Spent: 10m 
  Work Description: angoenka commented on a change in pull request #5493: 
[BEAM-4130] Add job submission capabilities to Flink runner.
URL: https://github.com/apache/beam/pull/5493#discussion_r195254596
 
 

 ##
 File path: 
runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/artifact/BeamFileSystemArtifactSourceTest.java
 ##
 @@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.fnexecution.artifact;
+
+import static org.hamcrest.Matchers.containsInAnyOrder;
+
+import com.google.protobuf.ByteString;
+import io.grpc.stub.StreamObserver;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.beam.model.jobmanagement.v1.ArtifactApi;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for BeamFileSystemArtifactSource.
+ */
+@RunWith(JUnit4.class) public class BeamFileSystemArtifactSourceTest {
+
+  BeamFileSystemArtifactStagingService stagingService = new 
BeamFileSystemArtifactStagingService();
+
+  @Rule public TemporaryFolder stagingDir = new TemporaryFolder();
+
+  @Test public void testStagingService() throws Exception {
+String stagingSession = "stagingSession";
+String stagingSessionToken = BeamFileSystemArtifactStagingService
+.generateStagingSessionToken(stagingSession, 
stagingDir.newFolder().getPath());
+List metadata = new ArrayList<>();
+
+
metadata.add(ArtifactApi.ArtifactMetadata.newBuilder().setName("file1").build());
+putArtifactContents(stagingSessionToken, "first", "file1");
+
+
metadata.add(ArtifactApi.ArtifactMetadata.newBuilder().setName("file2").build());
+putArtifactContents(stagingSessionToken, "second", "file2");
+
+String stagingToken = commitManifest(stagingSessionToken, metadata);
+
+BeamFileSystemArtifactSource artifactSource = new 
BeamFileSystemArtifactSource(stagingToken);
+Assert.assertEquals("first", getArtifactContents(artifactSource, "file1"));
+Assert.assertEquals("second", getArtifactContents(artifactSource, 
"file2"));
+Assert.assertThat(artifactSource.getManifest().getArtifactList(),
+containsInAnyOrder(metadata.toArray(new 
ArtifactApi.ArtifactMetadata[0])));
+  }
+
+  private String commitManifest(String stagingSessionToken,
+  List artifacts) {
+String[] stagingTokenHolder = new String[1];
+stagingService.commitManifest(
+
ArtifactApi.CommitManifestRequest.newBuilder().setStagingSessionToken(stagingSessionToken)
+
.setManifest(ArtifactApi.Manifest.newBuilder().addAllArtifact(artifacts)).build(),
+new StreamObserver() {
+
+  @Override public void onNext(ArtifactApi.CommitManifestResponse 
commitManifestResponse) {
+stagingTokenHolder[0] = commitManifestResponse.getRetrievalToken();
+  }
+
+  @Override public void onError(Throwable throwable) {
+throw new RuntimeException(throwable);
+  }
+
+  @Override public void onCompleted() {
+  }
+});
+
+return stagingTokenHolder[0];
 
 Review comment:
   We should wait for the commit call to finish before returning.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 111695)
Time Spent: 1h 50m  (was: 1h 40m)

> Portable Flink runner JobSe

[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container

2018-06-13 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=111705&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-111705
 ]

ASF GitHub Bot logged work on BEAM-4130:


Author: ASF GitHub Bot
Created on: 13/Jun/18 22:25
Start Date: 13/Jun/18 22:25
Worklog Time Spent: 10m 
  Work Description: angoenka commented on a change in pull request #5493: 
[BEAM-4130] Add job submission capabilities to Flink runner.
URL: https://github.com/apache/beam/pull/5493#discussion_r195213503
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvoker.java
 ##
 @@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.protobuf.Struct;
+import java.io.IOException;
+import java.util.UUID;
+import javax.annotation.Nullable;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
+import org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation;
+import org.apache.beam.runners.fnexecution.jobsubmission.JobInvoker;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Job Invoker for the {@link FlinkRunner}.
+ */
+public class FlinkJobInvoker implements JobInvoker {
+  private static final Logger LOG = 
LoggerFactory.getLogger(FlinkJobInvoker.class);
+
+  public static FlinkJobInvoker create(ListeningExecutorService 
executorService) {
+return new FlinkJobInvoker(executorService);
+  }
+
+  private final ListeningExecutorService executorService;
+
+  private FlinkJobInvoker(ListeningExecutorService executorService) {
+this.executorService = executorService;
+  }
+
+  @Override
+  public JobInvocation invoke(
+  RunnerApi.Pipeline pipeline, Struct options, @Nullable String 
artifactToken)
+  throws IOException {
+// TODO: How to make Java/Python agree on names of keys and their values?
+LOG.trace("Parsing pipeline options");
 
 Review comment:
   Should we log some info here. If possible with some pipeline identifier.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 111705)
Time Spent: 2h 40m  (was: 2.5h)

> Portable Flink runner JobService entry point in a Docker container
> --
>
> Key: BEAM-4130
> URL: https://issues.apache.org/jira/browse/BEAM-4130
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-flink
>Reporter: Ben Sidhom
>Priority: Minor
>  Time Spent: 2h 40m
>  Remaining Estimate: 0h
>
> The portable Flink runner exists as a Job Service that runs somewhere. We 
> need a main entry point that itself spins up the job service (and artifact 
> staging service). The main program itself should be packaged into an uberjar 
> such that it can be run locally or submitted to a Flink deployment via `flink 
> run`.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container

2018-06-13 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=111693&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-111693
 ]

ASF GitHub Bot logged work on BEAM-4130:


Author: ASF GitHub Bot
Created on: 13/Jun/18 22:25
Start Date: 13/Jun/18 22:25
Worklog Time Spent: 10m 
  Work Description: angoenka commented on a change in pull request #5493: 
[BEAM-4130] Add job submission capabilities to Flink runner.
URL: https://github.com/apache/beam/pull/5493#discussion_r195205949
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvocation.java
 ##
 @@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.function.Consumer;
+import javax.annotation.Nullable;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessage;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
+import org.apache.beam.runners.core.construction.graph.GreedyPipelineFuser;
+import org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation;
+import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.metrics.MetricsEnvironment;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Invocation of a Flink Job via {@link FlinkRunner}.
+ */
+public class FlinkJobInvocation implements JobInvocation {
+  private static final Logger LOG = 
LoggerFactory.getLogger(FlinkJobInvocation.class);
+
+  public static FlinkJobInvocation create(String id, ListeningExecutorService 
executorService,
+  RunnerApi.Pipeline pipeline, FlinkPipelineOptions pipelineOptions) {
+return new FlinkJobInvocation(id, executorService, pipeline, 
pipelineOptions);
+  }
+
+  private final String id;
+  private final ListeningExecutorService executorService;
+  private final RunnerApi.Pipeline pipeline;
+  private final FlinkPipelineOptions pipelineOptions;
+  private Enum jobState;
+  private List> stateObservers;
+
+  @Nullable
+  private ListenableFuture invocationFuture;
+
+  private FlinkJobInvocation(String id, ListeningExecutorService 
executorService,
+  RunnerApi.Pipeline pipeline, FlinkPipelineOptions pipelineOptions) {
+this.id = id;
+this.executorService = executorService;
+this.pipeline = pipeline;
+this.pipelineOptions = pipelineOptions;
+this.invocationFuture = null;
+this.jobState = Enum.STOPPED;
+this.stateObservers = new ArrayList<>();
+  }
+
+  private PipelineResult runPipeline() throws Exception {
+LOG.trace("Translating pipeline from proto");
+
+MetricsEnvironment.setMetricsSupported(true);
+
+LOG.info("Translating pipeline to Flink program.");
+// Fused pipeline proto.
+RunnerApi.Pipeline fusedPipeline = 
GreedyPipelineFuser.fuse(pipeline).toPipeline();
+JobInfo jobInfo = JobInfo.create(
+id, pipelineOptions.getJobName(), 
PipelineOptionsTranslation.toProto(pipelineOptions));
+final JobExecutionResult result;
+
+if (!pipelineOptions.isStreaming() && 
!hasUnboundedPCollections(fusedPipeline)) {
+  // TODO: Do we need to inspect for unbounded sources before fusing?
+  // batch translation
+  FlinkBatchPortablePipelineTranslator translator =
+  FlinkBatchPortablePipelineTranslator.createTranslator();
+  FlinkBatchPortablePipelineTranslator.BatchTranslationContext context =
 
 Review comment:
   Should we only create the context in the if-else block and do the remain

[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container

2018-06-13 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=111703&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-111703
 ]

ASF GitHub Bot logged work on BEAM-4130:


Author: ASF GitHub Bot
Created on: 13/Jun/18 22:25
Start Date: 13/Jun/18 22:25
Worklog Time Spent: 10m 
  Work Description: angoenka commented on a change in pull request #5493: 
[BEAM-4130] Add job submission capabilities to Flink runner.
URL: https://github.com/apache/beam/pull/5493#discussion_r195209989
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvocation.java
 ##
 @@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.function.Consumer;
+import javax.annotation.Nullable;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessage;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
+import org.apache.beam.runners.core.construction.graph.GreedyPipelineFuser;
+import org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation;
+import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.metrics.MetricsEnvironment;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Invocation of a Flink Job via {@link FlinkRunner}.
+ */
+public class FlinkJobInvocation implements JobInvocation {
+  private static final Logger LOG = 
LoggerFactory.getLogger(FlinkJobInvocation.class);
+
+  public static FlinkJobInvocation create(String id, ListeningExecutorService 
executorService,
+  RunnerApi.Pipeline pipeline, FlinkPipelineOptions pipelineOptions) {
+return new FlinkJobInvocation(id, executorService, pipeline, 
pipelineOptions);
+  }
+
+  private final String id;
+  private final ListeningExecutorService executorService;
+  private final RunnerApi.Pipeline pipeline;
+  private final FlinkPipelineOptions pipelineOptions;
+  private Enum jobState;
+  private List> stateObservers;
+
+  @Nullable
+  private ListenableFuture invocationFuture;
+
+  private FlinkJobInvocation(String id, ListeningExecutorService 
executorService,
+  RunnerApi.Pipeline pipeline, FlinkPipelineOptions pipelineOptions) {
+this.id = id;
+this.executorService = executorService;
+this.pipeline = pipeline;
+this.pipelineOptions = pipelineOptions;
+this.invocationFuture = null;
+this.jobState = Enum.STOPPED;
+this.stateObservers = new ArrayList<>();
+  }
+
+  private PipelineResult runPipeline() throws Exception {
+LOG.trace("Translating pipeline from proto");
+
+MetricsEnvironment.setMetricsSupported(true);
+
+LOG.info("Translating pipeline to Flink program.");
+// Fused pipeline proto.
+RunnerApi.Pipeline fusedPipeline = 
GreedyPipelineFuser.fuse(pipeline).toPipeline();
+JobInfo jobInfo = JobInfo.create(
+id, pipelineOptions.getJobName(), 
PipelineOptionsTranslation.toProto(pipelineOptions));
+final JobExecutionResult result;
+
+if (!pipelineOptions.isStreaming() && 
!hasUnboundedPCollections(fusedPipeline)) {
+  // TODO: Do we need to inspect for unbounded sources before fusing?
+  // batch translation
+  FlinkBatchPortablePipelineTranslator translator =
+  FlinkBatchPortablePipelineTranslator.createTranslator();
+  FlinkBatchPortablePipelineTranslator.BatchTranslationContext context =
+  
FlinkBatchPortablePipelineTranslator.createTranslationContext(jobInfo);
+  tra

[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container

2018-06-13 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=111699&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-111699
 ]

ASF GitHub Bot logged work on BEAM-4130:


Author: ASF GitHub Bot
Created on: 13/Jun/18 22:25
Start Date: 13/Jun/18 22:25
Worklog Time Spent: 10m 
  Work Description: angoenka commented on a change in pull request #5493: 
[BEAM-4130] Add job submission capabilities to Flink runner.
URL: https://github.com/apache/beam/pull/5493#discussion_r195207823
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvocation.java
 ##
 @@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.function.Consumer;
+import javax.annotation.Nullable;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessage;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
+import org.apache.beam.runners.core.construction.graph.GreedyPipelineFuser;
+import org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation;
+import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.metrics.MetricsEnvironment;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Invocation of a Flink Job via {@link FlinkRunner}.
+ */
+public class FlinkJobInvocation implements JobInvocation {
+  private static final Logger LOG = 
LoggerFactory.getLogger(FlinkJobInvocation.class);
+
+  public static FlinkJobInvocation create(String id, ListeningExecutorService 
executorService,
+  RunnerApi.Pipeline pipeline, FlinkPipelineOptions pipelineOptions) {
+return new FlinkJobInvocation(id, executorService, pipeline, 
pipelineOptions);
+  }
+
+  private final String id;
+  private final ListeningExecutorService executorService;
+  private final RunnerApi.Pipeline pipeline;
+  private final FlinkPipelineOptions pipelineOptions;
+  private Enum jobState;
+  private List> stateObservers;
+
+  @Nullable
+  private ListenableFuture invocationFuture;
+
+  private FlinkJobInvocation(String id, ListeningExecutorService 
executorService,
+  RunnerApi.Pipeline pipeline, FlinkPipelineOptions pipelineOptions) {
+this.id = id;
+this.executorService = executorService;
+this.pipeline = pipeline;
+this.pipelineOptions = pipelineOptions;
+this.invocationFuture = null;
+this.jobState = Enum.STOPPED;
+this.stateObservers = new ArrayList<>();
+  }
+
+  private PipelineResult runPipeline() throws Exception {
+LOG.trace("Translating pipeline from proto");
+
+MetricsEnvironment.setMetricsSupported(true);
+
+LOG.info("Translating pipeline to Flink program.");
+// Fused pipeline proto.
+RunnerApi.Pipeline fusedPipeline = 
GreedyPipelineFuser.fuse(pipeline).toPipeline();
+JobInfo jobInfo = JobInfo.create(
+id, pipelineOptions.getJobName(), 
PipelineOptionsTranslation.toProto(pipelineOptions));
+final JobExecutionResult result;
+
+if (!pipelineOptions.isStreaming() && 
!hasUnboundedPCollections(fusedPipeline)) {
+  // TODO: Do we need to inspect for unbounded sources before fusing?
+  // batch translation
+  FlinkBatchPortablePipelineTranslator translator =
+  FlinkBatchPortablePipelineTranslator.createTranslator();
+  FlinkBatchPortablePipelineTranslator.BatchTranslationContext context =
+  
FlinkBatchPortablePipelineTranslator.createTranslationContext(jobInfo);
+  tra

[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container

2018-06-13 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=111707&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-111707
 ]

ASF GitHub Bot logged work on BEAM-4130:


Author: ASF GitHub Bot
Created on: 13/Jun/18 22:25
Start Date: 13/Jun/18 22:25
Worklog Time Spent: 10m 
  Work Description: angoenka commented on a change in pull request #5493: 
[BEAM-4130] Add job submission capabilities to Flink runner.
URL: https://github.com/apache/beam/pull/5493#discussion_r195210651
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvocation.java
 ##
 @@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.function.Consumer;
+import javax.annotation.Nullable;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessage;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
+import org.apache.beam.runners.core.construction.graph.GreedyPipelineFuser;
+import org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation;
+import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.metrics.MetricsEnvironment;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Invocation of a Flink Job via {@link FlinkRunner}.
+ */
+public class FlinkJobInvocation implements JobInvocation {
+  private static final Logger LOG = 
LoggerFactory.getLogger(FlinkJobInvocation.class);
+
+  public static FlinkJobInvocation create(String id, ListeningExecutorService 
executorService,
+  RunnerApi.Pipeline pipeline, FlinkPipelineOptions pipelineOptions) {
+return new FlinkJobInvocation(id, executorService, pipeline, 
pipelineOptions);
+  }
+
+  private final String id;
+  private final ListeningExecutorService executorService;
+  private final RunnerApi.Pipeline pipeline;
+  private final FlinkPipelineOptions pipelineOptions;
+  private Enum jobState;
+  private List> stateObservers;
+
+  @Nullable
+  private ListenableFuture invocationFuture;
+
+  private FlinkJobInvocation(String id, ListeningExecutorService 
executorService,
+  RunnerApi.Pipeline pipeline, FlinkPipelineOptions pipelineOptions) {
+this.id = id;
+this.executorService = executorService;
+this.pipeline = pipeline;
+this.pipelineOptions = pipelineOptions;
+this.invocationFuture = null;
+this.jobState = Enum.STOPPED;
+this.stateObservers = new ArrayList<>();
+  }
+
+  private PipelineResult runPipeline() throws Exception {
+LOG.trace("Translating pipeline from proto");
+
+MetricsEnvironment.setMetricsSupported(true);
+
+LOG.info("Translating pipeline to Flink program.");
+// Fused pipeline proto.
+RunnerApi.Pipeline fusedPipeline = 
GreedyPipelineFuser.fuse(pipeline).toPipeline();
+JobInfo jobInfo = JobInfo.create(
+id, pipelineOptions.getJobName(), 
PipelineOptionsTranslation.toProto(pipelineOptions));
+final JobExecutionResult result;
+
+if (!pipelineOptions.isStreaming() && 
!hasUnboundedPCollections(fusedPipeline)) {
+  // TODO: Do we need to inspect for unbounded sources before fusing?
+  // batch translation
+  FlinkBatchPortablePipelineTranslator translator =
+  FlinkBatchPortablePipelineTranslator.createTranslator();
+  FlinkBatchPortablePipelineTranslator.BatchTranslationContext context =
+  
FlinkBatchPortablePipelineTranslator.createTranslationContext(jobInfo);
+  tra

[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container

2018-06-13 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=111692&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-111692
 ]

ASF GitHub Bot logged work on BEAM-4130:


Author: ASF GitHub Bot
Created on: 13/Jun/18 22:25
Start Date: 13/Jun/18 22:25
Worklog Time Spent: 10m 
  Work Description: angoenka commented on a change in pull request #5493: 
[BEAM-4130] Add job submission capabilities to Flink runner.
URL: https://github.com/apache/beam/pull/5493#discussion_r195208514
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvocation.java
 ##
 @@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.function.Consumer;
+import javax.annotation.Nullable;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessage;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
+import org.apache.beam.runners.core.construction.graph.GreedyPipelineFuser;
+import org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation;
+import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.metrics.MetricsEnvironment;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Invocation of a Flink Job via {@link FlinkRunner}.
+ */
+public class FlinkJobInvocation implements JobInvocation {
+  private static final Logger LOG = 
LoggerFactory.getLogger(FlinkJobInvocation.class);
+
+  public static FlinkJobInvocation create(String id, ListeningExecutorService 
executorService,
+  RunnerApi.Pipeline pipeline, FlinkPipelineOptions pipelineOptions) {
+return new FlinkJobInvocation(id, executorService, pipeline, 
pipelineOptions);
+  }
+
+  private final String id;
+  private final ListeningExecutorService executorService;
+  private final RunnerApi.Pipeline pipeline;
+  private final FlinkPipelineOptions pipelineOptions;
+  private Enum jobState;
+  private List> stateObservers;
+
+  @Nullable
+  private ListenableFuture invocationFuture;
+
+  private FlinkJobInvocation(String id, ListeningExecutorService 
executorService,
+  RunnerApi.Pipeline pipeline, FlinkPipelineOptions pipelineOptions) {
+this.id = id;
+this.executorService = executorService;
+this.pipeline = pipeline;
+this.pipelineOptions = pipelineOptions;
+this.invocationFuture = null;
+this.jobState = Enum.STOPPED;
+this.stateObservers = new ArrayList<>();
+  }
+
+  private PipelineResult runPipeline() throws Exception {
+LOG.trace("Translating pipeline from proto");
+
+MetricsEnvironment.setMetricsSupported(true);
+
+LOG.info("Translating pipeline to Flink program.");
+// Fused pipeline proto.
+RunnerApi.Pipeline fusedPipeline = 
GreedyPipelineFuser.fuse(pipeline).toPipeline();
+JobInfo jobInfo = JobInfo.create(
+id, pipelineOptions.getJobName(), 
PipelineOptionsTranslation.toProto(pipelineOptions));
+final JobExecutionResult result;
+
+if (!pipelineOptions.isStreaming() && 
!hasUnboundedPCollections(fusedPipeline)) {
+  // TODO: Do we need to inspect for unbounded sources before fusing?
+  // batch translation
+  FlinkBatchPortablePipelineTranslator translator =
+  FlinkBatchPortablePipelineTranslator.createTranslator();
+  FlinkBatchPortablePipelineTranslator.BatchTranslationContext context =
+  
FlinkBatchPortablePipelineTranslator.createTranslationContext(jobInfo);
+  tra

[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container

2018-06-13 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=111696&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-111696
 ]

ASF GitHub Bot logged work on BEAM-4130:


Author: ASF GitHub Bot
Created on: 13/Jun/18 22:25
Start Date: 13/Jun/18 22:25
Worklog Time Spent: 10m 
  Work Description: angoenka commented on a change in pull request #5493: 
[BEAM-4130] Add job submission capabilities to Flink runner.
URL: https://github.com/apache/beam/pull/5493#discussion_r195209626
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvocation.java
 ##
 @@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.function.Consumer;
+import javax.annotation.Nullable;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessage;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
+import org.apache.beam.runners.core.construction.graph.GreedyPipelineFuser;
+import org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation;
+import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.metrics.MetricsEnvironment;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Invocation of a Flink Job via {@link FlinkRunner}.
+ */
+public class FlinkJobInvocation implements JobInvocation {
+  private static final Logger LOG = 
LoggerFactory.getLogger(FlinkJobInvocation.class);
+
+  public static FlinkJobInvocation create(String id, ListeningExecutorService 
executorService,
+  RunnerApi.Pipeline pipeline, FlinkPipelineOptions pipelineOptions) {
+return new FlinkJobInvocation(id, executorService, pipeline, 
pipelineOptions);
+  }
+
+  private final String id;
+  private final ListeningExecutorService executorService;
+  private final RunnerApi.Pipeline pipeline;
+  private final FlinkPipelineOptions pipelineOptions;
+  private Enum jobState;
+  private List> stateObservers;
+
+  @Nullable
+  private ListenableFuture invocationFuture;
+
+  private FlinkJobInvocation(String id, ListeningExecutorService 
executorService,
+  RunnerApi.Pipeline pipeline, FlinkPipelineOptions pipelineOptions) {
+this.id = id;
+this.executorService = executorService;
+this.pipeline = pipeline;
+this.pipelineOptions = pipelineOptions;
+this.invocationFuture = null;
+this.jobState = Enum.STOPPED;
+this.stateObservers = new ArrayList<>();
+  }
+
+  private PipelineResult runPipeline() throws Exception {
+LOG.trace("Translating pipeline from proto");
+
+MetricsEnvironment.setMetricsSupported(true);
+
+LOG.info("Translating pipeline to Flink program.");
+// Fused pipeline proto.
+RunnerApi.Pipeline fusedPipeline = 
GreedyPipelineFuser.fuse(pipeline).toPipeline();
+JobInfo jobInfo = JobInfo.create(
+id, pipelineOptions.getJobName(), 
PipelineOptionsTranslation.toProto(pipelineOptions));
+final JobExecutionResult result;
+
+if (!pipelineOptions.isStreaming() && 
!hasUnboundedPCollections(fusedPipeline)) {
+  // TODO: Do we need to inspect for unbounded sources before fusing?
+  // batch translation
+  FlinkBatchPortablePipelineTranslator translator =
+  FlinkBatchPortablePipelineTranslator.createTranslator();
+  FlinkBatchPortablePipelineTranslator.BatchTranslationContext context =
+  
FlinkBatchPortablePipelineTranslator.createTranslationContext(jobInfo);
+  tra

[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container

2018-06-13 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=111708&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-111708
 ]

ASF GitHub Bot logged work on BEAM-4130:


Author: ASF GitHub Bot
Created on: 13/Jun/18 22:25
Start Date: 13/Jun/18 22:25
Worklog Time Spent: 10m 
  Work Description: angoenka commented on a change in pull request #5493: 
[BEAM-4130] Add job submission capabilities to Flink runner.
URL: https://github.com/apache/beam/pull/5493#discussion_r195214125
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvoker.java
 ##
 @@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.protobuf.Struct;
+import java.io.IOException;
+import java.util.UUID;
+import javax.annotation.Nullable;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
+import org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation;
+import org.apache.beam.runners.fnexecution.jobsubmission.JobInvoker;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Job Invoker for the {@link FlinkRunner}.
+ */
+public class FlinkJobInvoker implements JobInvoker {
+  private static final Logger LOG = 
LoggerFactory.getLogger(FlinkJobInvoker.class);
+
+  public static FlinkJobInvoker create(ListeningExecutorService 
executorService) {
+return new FlinkJobInvoker(executorService);
+  }
+
+  private final ListeningExecutorService executorService;
+
+  private FlinkJobInvoker(ListeningExecutorService executorService) {
+this.executorService = executorService;
+  }
+
+  @Override
+  public JobInvocation invoke(
+  RunnerApi.Pipeline pipeline, Struct options, @Nullable String 
artifactToken)
+  throws IOException {
+// TODO: How to make Java/Python agree on names of keys and their values?
+LOG.trace("Parsing pipeline options");
+FlinkPipelineOptions flinkOptions = 
PipelineOptionsTranslation.fromProto(options)
+.as(FlinkPipelineOptions.class);
+
+String invocationId = String.format(
+"%s_%s", flinkOptions.getJobName(), UUID.randomUUID().toString());
+LOG.debug("Invoking job {}", invocationId);
+
+// Set Flink Master to [auto] if no option was specified.
+if (flinkOptions.getFlinkMaster() == null) {
+  flinkOptions.setFlinkMaster("[auto]");
+}
+
+flinkOptions.setRunner(null);
 
 Review comment:
   Document why do we need to set runner as null.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 111708)

> Portable Flink runner JobService entry point in a Docker container
> --
>
> Key: BEAM-4130
> URL: https://issues.apache.org/jira/browse/BEAM-4130
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-flink
>Reporter: Ben Sidhom
>Priority: Minor
>  Time Spent: 2h 40m
>  Remaining Estimate: 0h
>
> The portable Flink runner exists as a Job Service that runs somewhere. We 
> need a main entry point that itself spins up the job service (and artifact 
> staging service). The main program itself should be packaged into an uberjar 
> such that it can be run locally or submitted to a Flink deployment via `flink 
> run`.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container

2018-06-13 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=111697&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-111697
 ]

ASF GitHub Bot logged work on BEAM-4130:


Author: ASF GitHub Bot
Created on: 13/Jun/18 22:25
Start Date: 13/Jun/18 22:25
Worklog Time Spent: 10m 
  Work Description: angoenka commented on a change in pull request #5493: 
[BEAM-4130] Add job submission capabilities to Flink runner.
URL: https://github.com/apache/beam/pull/5493#discussion_r195209114
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvocation.java
 ##
 @@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.function.Consumer;
+import javax.annotation.Nullable;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessage;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
+import org.apache.beam.runners.core.construction.graph.GreedyPipelineFuser;
+import org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation;
+import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.metrics.MetricsEnvironment;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Invocation of a Flink Job via {@link FlinkRunner}.
+ */
+public class FlinkJobInvocation implements JobInvocation {
+  private static final Logger LOG = 
LoggerFactory.getLogger(FlinkJobInvocation.class);
+
+  public static FlinkJobInvocation create(String id, ListeningExecutorService 
executorService,
+  RunnerApi.Pipeline pipeline, FlinkPipelineOptions pipelineOptions) {
+return new FlinkJobInvocation(id, executorService, pipeline, 
pipelineOptions);
+  }
+
+  private final String id;
+  private final ListeningExecutorService executorService;
+  private final RunnerApi.Pipeline pipeline;
+  private final FlinkPipelineOptions pipelineOptions;
+  private Enum jobState;
+  private List> stateObservers;
+
+  @Nullable
+  private ListenableFuture invocationFuture;
+
+  private FlinkJobInvocation(String id, ListeningExecutorService 
executorService,
+  RunnerApi.Pipeline pipeline, FlinkPipelineOptions pipelineOptions) {
+this.id = id;
+this.executorService = executorService;
+this.pipeline = pipeline;
+this.pipelineOptions = pipelineOptions;
+this.invocationFuture = null;
+this.jobState = Enum.STOPPED;
+this.stateObservers = new ArrayList<>();
+  }
+
+  private PipelineResult runPipeline() throws Exception {
+LOG.trace("Translating pipeline from proto");
+
+MetricsEnvironment.setMetricsSupported(true);
+
+LOG.info("Translating pipeline to Flink program.");
+// Fused pipeline proto.
+RunnerApi.Pipeline fusedPipeline = 
GreedyPipelineFuser.fuse(pipeline).toPipeline();
+JobInfo jobInfo = JobInfo.create(
+id, pipelineOptions.getJobName(), 
PipelineOptionsTranslation.toProto(pipelineOptions));
+final JobExecutionResult result;
+
+if (!pipelineOptions.isStreaming() && 
!hasUnboundedPCollections(fusedPipeline)) {
+  // TODO: Do we need to inspect for unbounded sources before fusing?
+  // batch translation
+  FlinkBatchPortablePipelineTranslator translator =
+  FlinkBatchPortablePipelineTranslator.createTranslator();
+  FlinkBatchPortablePipelineTranslator.BatchTranslationContext context =
+  
FlinkBatchPortablePipelineTranslator.createTranslationContext(jobInfo);
+  tra

[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container

2018-06-13 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=111702&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-111702
 ]

ASF GitHub Bot logged work on BEAM-4130:


Author: ASF GitHub Bot
Created on: 13/Jun/18 22:25
Start Date: 13/Jun/18 22:25
Worklog Time Spent: 10m 
  Work Description: angoenka commented on a change in pull request #5493: 
[BEAM-4130] Add job submission capabilities to Flink runner.
URL: https://github.com/apache/beam/pull/5493#discussion_r195254764
 
 

 ##
 File path: 
runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/artifact/BeamFileSystemArtifactSourceTest.java
 ##
 @@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.fnexecution.artifact;
+
+import static org.hamcrest.Matchers.containsInAnyOrder;
+
+import com.google.protobuf.ByteString;
+import io.grpc.stub.StreamObserver;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.beam.model.jobmanagement.v1.ArtifactApi;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for BeamFileSystemArtifactSource.
+ */
+@RunWith(JUnit4.class) public class BeamFileSystemArtifactSourceTest {
+
+  BeamFileSystemArtifactStagingService stagingService = new 
BeamFileSystemArtifactStagingService();
+
+  @Rule public TemporaryFolder stagingDir = new TemporaryFolder();
+
+  @Test public void testStagingService() throws Exception {
+String stagingSession = "stagingSession";
+String stagingSessionToken = BeamFileSystemArtifactStagingService
+.generateStagingSessionToken(stagingSession, 
stagingDir.newFolder().getPath());
+List metadata = new ArrayList<>();
+
+
metadata.add(ArtifactApi.ArtifactMetadata.newBuilder().setName("file1").build());
+putArtifactContents(stagingSessionToken, "first", "file1");
+
+
metadata.add(ArtifactApi.ArtifactMetadata.newBuilder().setName("file2").build());
+putArtifactContents(stagingSessionToken, "second", "file2");
+
+String stagingToken = commitManifest(stagingSessionToken, metadata);
+
+BeamFileSystemArtifactSource artifactSource = new 
BeamFileSystemArtifactSource(stagingToken);
+Assert.assertEquals("first", getArtifactContents(artifactSource, "file1"));
+Assert.assertEquals("second", getArtifactContents(artifactSource, 
"file2"));
+Assert.assertThat(artifactSource.getManifest().getArtifactList(),
+containsInAnyOrder(metadata.toArray(new 
ArtifactApi.ArtifactMetadata[0])));
+  }
+
+  private String commitManifest(String stagingSessionToken,
+  List artifacts) {
+String[] stagingTokenHolder = new String[1];
+stagingService.commitManifest(
+
ArtifactApi.CommitManifestRequest.newBuilder().setStagingSessionToken(stagingSessionToken)
+
.setManifest(ArtifactApi.Manifest.newBuilder().addAllArtifact(artifacts)).build(),
+new StreamObserver() {
+
+  @Override public void onNext(ArtifactApi.CommitManifestResponse 
commitManifestResponse) {
+stagingTokenHolder[0] = commitManifestResponse.getRetrievalToken();
+  }
+
+  @Override public void onError(Throwable throwable) {
+throw new RuntimeException(throwable);
+  }
+
+  @Override public void onCompleted() {
+  }
+});
+
+return stagingTokenHolder[0];
+  }
+
+  private void putArtifactContents(String stagingSessionToken, String 
contents, String name) {
+StreamObserver outputStreamObserver = 
stagingService
+.putArtifact(new StreamObserver() {
+
+  @Override public void onNext(ArtifactApi.PutArtifactResponse 
putArtifactResponse) {
+  }
+
+  @Override public void onError(Throwable throwable) {
+throw new RuntimeException(throwable);
+  }
+
+  @Override public void onCompleted() {
+  }
+});
+
+
outputStrea

[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container

2018-06-13 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=111706&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-111706
 ]

ASF GitHub Bot logged work on BEAM-4130:


Author: ASF GitHub Bot
Created on: 13/Jun/18 22:25
Start Date: 13/Jun/18 22:25
Worklog Time Spent: 10m 
  Work Description: angoenka commented on a change in pull request #5493: 
[BEAM-4130] Add job submission capabilities to Flink runner.
URL: https://github.com/apache/beam/pull/5493#discussion_r195253607
 
 

 ##
 File path: 
runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/InMemoryJobService.java
 ##
 @@ -60,17 +60,36 @@
 
   public static InMemoryJobService create(
 
 Review comment:
   Can we remove this method?
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 111706)
Time Spent: 2h 40m  (was: 2.5h)

> Portable Flink runner JobService entry point in a Docker container
> --
>
> Key: BEAM-4130
> URL: https://issues.apache.org/jira/browse/BEAM-4130
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-flink
>Reporter: Ben Sidhom
>Priority: Minor
>  Time Spent: 2h 40m
>  Remaining Estimate: 0h
>
> The portable Flink runner exists as a Job Service that runs somewhere. We 
> need a main entry point that itself spins up the job service (and artifact 
> staging service). The main program itself should be packaged into an uberjar 
> such that it can be run locally or submitted to a Flink deployment via `flink 
> run`.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container

2018-06-13 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=111698&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-111698
 ]

ASF GitHub Bot logged work on BEAM-4130:


Author: ASF GitHub Bot
Created on: 13/Jun/18 22:25
Start Date: 13/Jun/18 22:25
Worklog Time Spent: 10m 
  Work Description: angoenka commented on a change in pull request #5493: 
[BEAM-4130] Add job submission capabilities to Flink runner.
URL: https://github.com/apache/beam/pull/5493#discussion_r195208381
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvocation.java
 ##
 @@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.function.Consumer;
+import javax.annotation.Nullable;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessage;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
+import org.apache.beam.runners.core.construction.graph.GreedyPipelineFuser;
+import org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation;
+import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.metrics.MetricsEnvironment;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Invocation of a Flink Job via {@link FlinkRunner}.
+ */
+public class FlinkJobInvocation implements JobInvocation {
+  private static final Logger LOG = 
LoggerFactory.getLogger(FlinkJobInvocation.class);
+
+  public static FlinkJobInvocation create(String id, ListeningExecutorService 
executorService,
+  RunnerApi.Pipeline pipeline, FlinkPipelineOptions pipelineOptions) {
+return new FlinkJobInvocation(id, executorService, pipeline, 
pipelineOptions);
+  }
+
+  private final String id;
+  private final ListeningExecutorService executorService;
+  private final RunnerApi.Pipeline pipeline;
+  private final FlinkPipelineOptions pipelineOptions;
+  private Enum jobState;
+  private List> stateObservers;
+
+  @Nullable
+  private ListenableFuture invocationFuture;
+
+  private FlinkJobInvocation(String id, ListeningExecutorService 
executorService,
+  RunnerApi.Pipeline pipeline, FlinkPipelineOptions pipelineOptions) {
+this.id = id;
+this.executorService = executorService;
+this.pipeline = pipeline;
+this.pipelineOptions = pipelineOptions;
+this.invocationFuture = null;
+this.jobState = Enum.STOPPED;
+this.stateObservers = new ArrayList<>();
+  }
+
+  private PipelineResult runPipeline() throws Exception {
+LOG.trace("Translating pipeline from proto");
+
+MetricsEnvironment.setMetricsSupported(true);
+
+LOG.info("Translating pipeline to Flink program.");
+// Fused pipeline proto.
+RunnerApi.Pipeline fusedPipeline = 
GreedyPipelineFuser.fuse(pipeline).toPipeline();
+JobInfo jobInfo = JobInfo.create(
+id, pipelineOptions.getJobName(), 
PipelineOptionsTranslation.toProto(pipelineOptions));
+final JobExecutionResult result;
+
+if (!pipelineOptions.isStreaming() && 
!hasUnboundedPCollections(fusedPipeline)) {
+  // TODO: Do we need to inspect for unbounded sources before fusing?
+  // batch translation
+  FlinkBatchPortablePipelineTranslator translator =
+  FlinkBatchPortablePipelineTranslator.createTranslator();
+  FlinkBatchPortablePipelineTranslator.BatchTranslationContext context =
+  
FlinkBatchPortablePipelineTranslator.createTranslationContext(jobInfo);
+  tra

[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container

2018-06-13 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=111700&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-111700
 ]

ASF GitHub Bot logged work on BEAM-4130:


Author: ASF GitHub Bot
Created on: 13/Jun/18 22:25
Start Date: 13/Jun/18 22:25
Worklog Time Spent: 10m 
  Work Description: angoenka commented on a change in pull request #5493: 
[BEAM-4130] Add job submission capabilities to Flink runner.
URL: https://github.com/apache/beam/pull/5493#discussion_r195209379
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvocation.java
 ##
 @@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.function.Consumer;
+import javax.annotation.Nullable;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessage;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
+import org.apache.beam.runners.core.construction.graph.GreedyPipelineFuser;
+import org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation;
+import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.metrics.MetricsEnvironment;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Invocation of a Flink Job via {@link FlinkRunner}.
+ */
+public class FlinkJobInvocation implements JobInvocation {
+  private static final Logger LOG = 
LoggerFactory.getLogger(FlinkJobInvocation.class);
+
+  public static FlinkJobInvocation create(String id, ListeningExecutorService 
executorService,
+  RunnerApi.Pipeline pipeline, FlinkPipelineOptions pipelineOptions) {
+return new FlinkJobInvocation(id, executorService, pipeline, 
pipelineOptions);
+  }
+
+  private final String id;
+  private final ListeningExecutorService executorService;
+  private final RunnerApi.Pipeline pipeline;
+  private final FlinkPipelineOptions pipelineOptions;
+  private Enum jobState;
+  private List> stateObservers;
+
+  @Nullable
+  private ListenableFuture invocationFuture;
+
+  private FlinkJobInvocation(String id, ListeningExecutorService 
executorService,
+  RunnerApi.Pipeline pipeline, FlinkPipelineOptions pipelineOptions) {
+this.id = id;
+this.executorService = executorService;
+this.pipeline = pipeline;
+this.pipelineOptions = pipelineOptions;
+this.invocationFuture = null;
+this.jobState = Enum.STOPPED;
+this.stateObservers = new ArrayList<>();
+  }
+
+  private PipelineResult runPipeline() throws Exception {
+LOG.trace("Translating pipeline from proto");
+
+MetricsEnvironment.setMetricsSupported(true);
+
+LOG.info("Translating pipeline to Flink program.");
+// Fused pipeline proto.
+RunnerApi.Pipeline fusedPipeline = 
GreedyPipelineFuser.fuse(pipeline).toPipeline();
+JobInfo jobInfo = JobInfo.create(
+id, pipelineOptions.getJobName(), 
PipelineOptionsTranslation.toProto(pipelineOptions));
+final JobExecutionResult result;
+
+if (!pipelineOptions.isStreaming() && 
!hasUnboundedPCollections(fusedPipeline)) {
+  // TODO: Do we need to inspect for unbounded sources before fusing?
+  // batch translation
+  FlinkBatchPortablePipelineTranslator translator =
+  FlinkBatchPortablePipelineTranslator.createTranslator();
+  FlinkBatchPortablePipelineTranslator.BatchTranslationContext context =
+  
FlinkBatchPortablePipelineTranslator.createTranslationContext(jobInfo);
+  tra

[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container

2018-06-13 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=111690&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-111690
 ]

ASF GitHub Bot logged work on BEAM-4130:


Author: ASF GitHub Bot
Created on: 13/Jun/18 22:25
Start Date: 13/Jun/18 22:25
Worklog Time Spent: 10m 
  Work Description: angoenka commented on a change in pull request #5493: 
[BEAM-4130] Add job submission capabilities to Flink runner.
URL: https://github.com/apache/beam/pull/5493#discussion_r195199604
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvocation.java
 ##
 @@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.function.Consumer;
+import javax.annotation.Nullable;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessage;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
+import org.apache.beam.runners.core.construction.graph.GreedyPipelineFuser;
+import org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation;
+import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.metrics.MetricsEnvironment;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Invocation of a Flink Job via {@link FlinkRunner}.
+ */
+public class FlinkJobInvocation implements JobInvocation {
+  private static final Logger LOG = 
LoggerFactory.getLogger(FlinkJobInvocation.class);
+
+  public static FlinkJobInvocation create(String id, ListeningExecutorService 
executorService,
+  RunnerApi.Pipeline pipeline, FlinkPipelineOptions pipelineOptions) {
+return new FlinkJobInvocation(id, executorService, pipeline, 
pipelineOptions);
+  }
+
+  private final String id;
+  private final ListeningExecutorService executorService;
+  private final RunnerApi.Pipeline pipeline;
+  private final FlinkPipelineOptions pipelineOptions;
+  private Enum jobState;
+  private List> stateObservers;
+
+  @Nullable
+  private ListenableFuture invocationFuture;
+
+  private FlinkJobInvocation(String id, ListeningExecutorService 
executorService,
+  RunnerApi.Pipeline pipeline, FlinkPipelineOptions pipelineOptions) {
+this.id = id;
+this.executorService = executorService;
+this.pipeline = pipeline;
+this.pipelineOptions = pipelineOptions;
+this.invocationFuture = null;
+this.jobState = Enum.STOPPED;
+this.stateObservers = new ArrayList<>();
+  }
+
+  private PipelineResult runPipeline() throws Exception {
+LOG.trace("Translating pipeline from proto");
+
+MetricsEnvironment.setMetricsSupported(true);
+
+LOG.info("Translating pipeline to Flink program.");
+// Fused pipeline proto.
+RunnerApi.Pipeline fusedPipeline = 
GreedyPipelineFuser.fuse(pipeline).toPipeline();
 
 Review comment:
   Should we pass the fuser Function in constructor parameter?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 111690)
Time Spent: 1h 10m  (was: 1h)

> Portable Flink runner JobService entry point in a Docker container
> --
>
>  

[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container

2018-06-13 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=111689&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-111689
 ]

ASF GitHub Bot logged work on BEAM-4130:


Author: ASF GitHub Bot
Created on: 13/Jun/18 22:25
Start Date: 13/Jun/18 22:25
Worklog Time Spent: 10m 
  Work Description: angoenka commented on a change in pull request #5493: 
[BEAM-4130] Add job submission capabilities to Flink runner.
URL: https://github.com/apache/beam/pull/5493#discussion_r195252523
 
 

 ##
 File path: 
runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/BeamFileSystemArtifactSource.java
 ##
 @@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.fnexecution.artifact;
+
+import com.google.protobuf.ByteString;
+import com.google.protobuf.util.JsonFormat;
+import io.grpc.stub.StreamObserver;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.charset.StandardCharsets;
+import org.apache.beam.model.jobmanagement.v1.ArtifactApi;
+import org.apache.beam.sdk.io.FileSystems;
+
+/**
+ * An ArtifactSource suitable for retrieving artifacts uploaded via
+ * {@link BeamFileSystemArtifactStagingService}.
+ */
+public class BeamFileSystemArtifactSource implements ArtifactSource {
+
+  private static final int CHUNK_SIZE = 2 * 1024 * 1024;
+
+  private final String retrievalToken;
+  private ArtifactApi.ProxyManifest proxyManifest;
+
+  public BeamFileSystemArtifactSource(String retrievalToken) {
+this.retrievalToken = retrievalToken;
+  }
+
+  public static BeamFileSystemArtifactSource create(String artifactToken) {
+return new BeamFileSystemArtifactSource(artifactToken);
+  }
+
+  @Override
+  public ArtifactApi.Manifest getManifest() throws IOException {
+return getProxyManifest().getManifest();
+  }
+
+  @Override
+  public void getArtifact(String name,
+  StreamObserver responseObserver) throws 
IOException {
+ReadableByteChannel artifact = FileSystems
+.open(FileSystems.matchNewResource(lookupUri(name), false));
+ByteBuffer buffer = ByteBuffer.allocate(CHUNK_SIZE);
+while (artifact.read(buffer) > -1) {
+  buffer.flip();
+  responseObserver.onNext(
+  
ArtifactApi.ArtifactChunk.newBuilder().setData(ByteString.copyFrom(buffer)).build());
+  buffer.clear();
+}
+  }
+
+  private String lookupUri(String name) throws IOException {
+for (ArtifactApi.ProxyManifest.Location location : 
getProxyManifest().getLocationList()) {
+  if (location.getName().equals(name)) {
+return location.getUri();
+  }
+}
+throw new IllegalArgumentException("No such artifact: " + name);
+  }
+
+  private ArtifactApi.ProxyManifest getProxyManifest() throws IOException {
+if (proxyManifest == null) {
+  ArtifactApi.ProxyManifest.Builder builder = 
ArtifactApi.ProxyManifest.newBuilder();
+  JsonFormat.parser().merge(Channels.newReader(
+  FileSystems.open(FileSystems.matchNewResource(retrievalToken, false 
/* is directory */)),
+  StandardCharsets.UTF_8.name()), builder);
 
 Review comment:
   Shall we standardize the characterset for all BeamFileSystemArtifact ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 111689)
Time Spent: 1h  (was: 50m)

> Portable Flink runner JobService entry point in a Docker container
> --
>
> Key: BEAM-4130
> URL: https://issues.apache.org/jira/browse/BEAM-4130
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-flink
>Reporter: Ben Sidhom
> 

[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container

2018-06-13 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=111687&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-111687
 ]

ASF GitHub Bot logged work on BEAM-4130:


Author: ASF GitHub Bot
Created on: 13/Jun/18 22:24
Start Date: 13/Jun/18 22:24
Worklog Time Spent: 10m 
  Work Description: angoenka commented on a change in pull request #5493: 
[BEAM-4130] Add job submission capabilities to Flink runner.
URL: https://github.com/apache/beam/pull/5493#discussion_r195199350
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvocation.java
 ##
 @@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.function.Consumer;
+import javax.annotation.Nullable;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessage;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
+import org.apache.beam.runners.core.construction.graph.GreedyPipelineFuser;
+import org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation;
+import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.metrics.MetricsEnvironment;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Invocation of a Flink Job via {@link FlinkRunner}.
+ */
+public class FlinkJobInvocation implements JobInvocation {
+  private static final Logger LOG = 
LoggerFactory.getLogger(FlinkJobInvocation.class);
+
+  public static FlinkJobInvocation create(String id, ListeningExecutorService 
executorService,
+  RunnerApi.Pipeline pipeline, FlinkPipelineOptions pipelineOptions) {
+return new FlinkJobInvocation(id, executorService, pipeline, 
pipelineOptions);
+  }
+
+  private final String id;
+  private final ListeningExecutorService executorService;
+  private final RunnerApi.Pipeline pipeline;
+  private final FlinkPipelineOptions pipelineOptions;
+  private Enum jobState;
+  private List> stateObservers;
+
+  @Nullable
+  private ListenableFuture invocationFuture;
+
+  private FlinkJobInvocation(String id, ListeningExecutorService 
executorService,
+  RunnerApi.Pipeline pipeline, FlinkPipelineOptions pipelineOptions) {
+this.id = id;
+this.executorService = executorService;
+this.pipeline = pipeline;
+this.pipelineOptions = pipelineOptions;
+this.invocationFuture = null;
+this.jobState = Enum.STOPPED;
+this.stateObservers = new ArrayList<>();
+  }
+
+  private PipelineResult runPipeline() throws Exception {
+LOG.trace("Translating pipeline from proto");
 
 Review comment:
   This trace is  unnecessary 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 111687)
Time Spent: 40m  (was: 0.5h)

> Portable Flink runner JobService entry point in a Docker container
> --
>
> Key: BEAM-4130
> URL: https://issues.apache.org/jira/browse/BEAM-4130
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-flink
>Reporter: Ben Sidhom
>Priority: Minor
>  Tim

[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container

2018-06-13 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=111663&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-111663
 ]

ASF GitHub Bot logged work on BEAM-4130:


Author: ASF GitHub Bot
Created on: 13/Jun/18 21:16
Start Date: 13/Jun/18 21:16
Worklog Time Spent: 10m 
  Work Description: robertwb commented on a change in pull request #5493: 
[BEAM-4130] Add job submission capabilities to Flink runner.
URL: https://github.com/apache/beam/pull/5493#discussion_r195238847
 
 

 ##
 File path: 
runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/InMemoryJobService.java
 ##
 @@ -60,17 +60,28 @@
 
   public static InMemoryJobService create(
   Endpoints.ApiServiceDescriptor stagingServiceDescriptor, JobInvoker 
invoker) {
-return new InMemoryJobService(stagingServiceDescriptor, invoker);
+return new InMemoryJobService(stagingServiceDescriptor, (String session) 
-> "token", invoker);
+  }
+
+  public static InMemoryJobService create(
+  Endpoints.ApiServiceDescriptor stagingServiceDescriptor,
+  Function stagingServiceTokenProvider,
 
 Review comment:
   Good point. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 111663)
Time Spent: 0.5h  (was: 20m)

> Portable Flink runner JobService entry point in a Docker container
> --
>
> Key: BEAM-4130
> URL: https://issues.apache.org/jira/browse/BEAM-4130
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-flink
>Reporter: Ben Sidhom
>Priority: Minor
>  Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> The portable Flink runner exists as a Job Service that runs somewhere. We 
> need a main entry point that itself spins up the job service (and artifact 
> staging service). The main program itself should be packaged into an uberjar 
> such that it can be run locally or submitted to a Flink deployment via `flink 
> run`.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container

2018-06-13 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=111633&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-111633
 ]

ASF GitHub Bot logged work on BEAM-4130:


Author: ASF GitHub Bot
Created on: 13/Jun/18 20:20
Start Date: 13/Jun/18 20:20
Worklog Time Spent: 10m 
  Work Description: axelmagn commented on a change in pull request #5493: 
[BEAM-4130] Add job submission capabilities to Flink runner.
URL: https://github.com/apache/beam/pull/5493#discussion_r195221468
 
 

 ##
 File path: 
runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/InMemoryJobService.java
 ##
 @@ -60,17 +60,28 @@
 
   public static InMemoryJobService create(
   Endpoints.ApiServiceDescriptor stagingServiceDescriptor, JobInvoker 
invoker) {
-return new InMemoryJobService(stagingServiceDescriptor, invoker);
+return new InMemoryJobService(stagingServiceDescriptor, (String session) 
-> "token", invoker);
+  }
+
+  public static InMemoryJobService create(
+  Endpoints.ApiServiceDescriptor stagingServiceDescriptor,
+  Function stagingServiceTokenProvider,
 
 Review comment:
   maybe document that this function expects a preparation ID as input.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 111633)
Time Spent: 20m  (was: 10m)

> Portable Flink runner JobService entry point in a Docker container
> --
>
> Key: BEAM-4130
> URL: https://issues.apache.org/jira/browse/BEAM-4130
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-flink
>Reporter: Ben Sidhom
>Priority: Minor
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> The portable Flink runner exists as a Job Service that runs somewhere. We 
> need a main entry point that itself spins up the job service (and artifact 
> staging service). The main program itself should be packaged into an uberjar 
> such that it can be run locally or submitted to a Flink deployment via `flink 
> run`.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container

2018-06-13 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=111558&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-111558
 ]

ASF GitHub Bot logged work on BEAM-4130:


Author: ASF GitHub Bot
Created on: 13/Jun/18 16:57
Start Date: 13/Jun/18 16:57
Worklog Time Spent: 10m 
  Work Description: robertwb commented on issue #5493: [BEAM-4130] Add job 
submission capabilities to Flink runner.
URL: https://github.com/apache/beam/pull/5493#issuecomment-397010704
 
 
   R: @axelmagn, @angoenka Please take a look.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 111558)
Time Spent: 10m
Remaining Estimate: 0h

> Portable Flink runner JobService entry point in a Docker container
> --
>
> Key: BEAM-4130
> URL: https://issues.apache.org/jira/browse/BEAM-4130
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-flink
>Reporter: Ben Sidhom
>Priority: Minor
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> The portable Flink runner exists as a Job Service that runs somewhere. We 
> need a main entry point that itself spins up the job service (and artifact 
> staging service). The main program itself should be packaged into an uberjar 
> such that it can be run locally or submitted to a Flink deployment via `flink 
> run`.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)