[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container
[ https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=153903&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-153903 ] ASF GitHub Bot logged work on BEAM-4130: Author: ASF GitHub Bot Created on: 12/Oct/18 15:01 Start Date: 12/Oct/18 15:01 Worklog Time Spent: 10m Work Description: mxm opened a new pull request #6672: [BEAM-4130] Add tests for FlinkJobServerDriver URL: https://github.com/apache/beam/pull/6672 This adds a few test cases for FlinkJobServerDriver. It also changes the default host from empty host to `localhost`. CC @angoenka @tweise Post-Commit Tests Status (on master branch) Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark --- | --- | --- | --- | --- | --- | --- | --- Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_GradleBuild/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_GradleBuild/lastCompletedBuild/) | --- | --- | --- | --- | --- | --- Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_GradleBuild/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_GradleBuild/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex_Gradle/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Gradle/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Gradle/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump_Gradle/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza_Gradle/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark_Gradle/lastCompletedBuild/) Python | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/) | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/) [![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/) | --- | --- | --- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 153903) Time Spent: 11h 40m (was: 11.5h) > Portable Flink runner JobService entry point in a Docker container > -- > > Key: BEAM-4130 > URL: https://issues.apache.org/jira/browse/BEAM-4130 > Project: Beam > Issue Type: New Feature > Components: runner-flink >Reporter: Ben Sidhom >Assignee: Maximilian Michels >Priority: Minor > Fix For: 2.7.0 > > Time Spent: 11h 40m > Remaining Estimate: 0h > > The portable Flink runner exists as a Job Service that runs somewhere. We > need a main entry point that itself spins up the job service (and artifact > staging service). The main program itself should be packaged into an uberjar > such that it can be run locally or submitted to a Flink deployment via `flink > run`. -- This message was sent by
[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container
[ https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=152635&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-152635 ] ASF GitHub Bot logged work on BEAM-4130: Author: ASF GitHub Bot Created on: 09/Oct/18 11:37 Start Date: 09/Oct/18 11:37 Worklog Time Spent: 10m Work Description: mxm closed pull request #6607: [BEAM-4130] Use port 0 to pick dynamic port-BugFix URL: https://github.com/apache/beam/pull/6607 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java index 2096cb98550..34f2edb5abb 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java @@ -17,7 +17,6 @@ */ package org.apache.beam.runners.flink; -import com.google.common.base.Strings; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.ThreadFactoryBuilder; @@ -201,7 +200,7 @@ public void stop() { private GrpcFnServer createJobServer() throws IOException { InMemoryJobService service = createJobService(); GrpcFnServer jobServiceGrpcFnServer; -if (Strings.isNullOrEmpty(configuration.host)) { +if (configuration.port == 0) { jobServiceGrpcFnServer = GrpcFnServer.allocatePortAndCreateFor(service, jobServerFactory); } else { Endpoints.ApiServiceDescriptor descriptor = @@ -239,7 +238,7 @@ private InMemoryJobService createJobService() throws IOException { throws IOException { BeamFileSystemArtifactStagingService service = new BeamFileSystemArtifactStagingService(); final GrpcFnServer artifactStagingService; -if (Strings.isNullOrEmpty(configuration.host)) { +if (configuration.artifactPort == 0) { artifactStagingService = GrpcFnServer.allocatePortAndCreateFor(service, artifactServerFactory); } else { This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 152635) Time Spent: 11.5h (was: 11h 20m) > Portable Flink runner JobService entry point in a Docker container > -- > > Key: BEAM-4130 > URL: https://issues.apache.org/jira/browse/BEAM-4130 > Project: Beam > Issue Type: New Feature > Components: runner-flink >Reporter: Ben Sidhom >Assignee: Maximilian Michels >Priority: Minor > Fix For: 2.7.0 > > Time Spent: 11.5h > Remaining Estimate: 0h > > The portable Flink runner exists as a Job Service that runs somewhere. We > need a main entry point that itself spins up the job service (and artifact > staging service). The main program itself should be packaged into an uberjar > such that it can be run locally or submitted to a Flink deployment via `flink > run`. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container
[ https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=152632&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-152632 ] ASF GitHub Bot logged work on BEAM-4130: Author: ASF GitHub Bot Created on: 09/Oct/18 11:36 Start Date: 09/Oct/18 11:36 Worklog Time Spent: 10m Work Description: mxm commented on a change in pull request #6607: [BEAM-4130] Use port 0 to pick dynamic port-BugFix URL: https://github.com/apache/beam/pull/6607#discussion_r223663160 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java ## @@ -201,7 +201,7 @@ public void stop() { private GrpcFnServer createJobServer() throws IOException { InMemoryJobService service = createJobService(); GrpcFnServer jobServiceGrpcFnServer; -if (Strings.isNullOrEmpty(configuration.host)) { +if (configuration.port == 0) { Review comment: Ok, let's do that in a follow-up. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 152632) Time Spent: 11h 20m (was: 11h 10m) > Portable Flink runner JobService entry point in a Docker container > -- > > Key: BEAM-4130 > URL: https://issues.apache.org/jira/browse/BEAM-4130 > Project: Beam > Issue Type: New Feature > Components: runner-flink >Reporter: Ben Sidhom >Assignee: Maximilian Michels >Priority: Minor > Fix For: 2.7.0 > > Time Spent: 11h 20m > Remaining Estimate: 0h > > The portable Flink runner exists as a Job Service that runs somewhere. We > need a main entry point that itself spins up the job service (and artifact > staging service). The main program itself should be packaged into an uberjar > such that it can be run locally or submitted to a Flink deployment via `flink > run`. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container
[ https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=152413&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-152413 ] ASF GitHub Bot logged work on BEAM-4130: Author: ASF GitHub Bot Created on: 08/Oct/18 21:29 Start Date: 08/Oct/18 21:29 Worklog Time Spent: 10m Work Description: angoenka commented on a change in pull request #6607: [BEAM-4130] Use port 0 to pick dynamic port-BugFix URL: https://github.com/apache/beam/pull/6607#discussion_r223502676 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java ## @@ -201,7 +201,7 @@ public void stop() { private GrpcFnServer createJobServer() throws IOException { InMemoryJobService service = createJobService(); GrpcFnServer jobServiceGrpcFnServer; -if (Strings.isNullOrEmpty(configuration.host)) { +if (configuration.port == 0) { Review comment: Yes, I think we should but there is no method as of now to do so. I think we should rethink and refactor GrpcFnServer. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 152413) Time Spent: 11h 10m (was: 11h) > Portable Flink runner JobService entry point in a Docker container > -- > > Key: BEAM-4130 > URL: https://issues.apache.org/jira/browse/BEAM-4130 > Project: Beam > Issue Type: New Feature > Components: runner-flink >Reporter: Ben Sidhom >Assignee: Maximilian Michels >Priority: Minor > Fix For: 2.7.0 > > Time Spent: 11h 10m > Remaining Estimate: 0h > > The portable Flink runner exists as a Job Service that runs somewhere. We > need a main entry point that itself spins up the job service (and artifact > staging service). The main program itself should be packaged into an uberjar > such that it can be run locally or submitted to a Flink deployment via `flink > run`. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container
[ https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=152390&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-152390 ] ASF GitHub Bot logged work on BEAM-4130: Author: ASF GitHub Bot Created on: 08/Oct/18 20:29 Start Date: 08/Oct/18 20:29 Worklog Time Spent: 10m Work Description: mxm commented on a change in pull request #6607: [BEAM-4130] Use port 0 to pick dynamic port-BugFix URL: https://github.com/apache/beam/pull/6607#discussion_r223488123 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java ## @@ -201,7 +201,7 @@ public void stop() { private GrpcFnServer createJobServer() throws IOException { InMemoryJobService service = createJobService(); GrpcFnServer jobServiceGrpcFnServer; -if (Strings.isNullOrEmpty(configuration.host)) { +if (configuration.port == 0) { Review comment: Yes, that is better. Do you think we could also respect the host name when the port is 0? As of now, it would get ignored. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 152390) Time Spent: 11h (was: 10h 50m) > Portable Flink runner JobService entry point in a Docker container > -- > > Key: BEAM-4130 > URL: https://issues.apache.org/jira/browse/BEAM-4130 > Project: Beam > Issue Type: New Feature > Components: runner-flink >Reporter: Ben Sidhom >Assignee: Maximilian Michels >Priority: Minor > Fix For: 2.7.0 > > Time Spent: 11h > Remaining Estimate: 0h > > The portable Flink runner exists as a Job Service that runs somewhere. We > need a main entry point that itself spins up the job service (and artifact > staging service). The main program itself should be packaged into an uberjar > such that it can be run locally or submitted to a Flink deployment via `flink > run`. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container
[ https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=152378&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-152378 ] ASF GitHub Bot logged work on BEAM-4130: Author: ASF GitHub Bot Created on: 08/Oct/18 19:50 Start Date: 08/Oct/18 19:50 Worklog Time Spent: 10m Work Description: angoenka commented on a change in pull request #6607: [BEAM-4130] Use port 0 to pick dynamic port-BugFix URL: https://github.com/apache/beam/pull/6607#discussion_r223478440 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java ## @@ -201,7 +201,7 @@ public void stop() { private GrpcFnServer createJobServer() throws IOException { InMemoryJobService service = createJobService(); GrpcFnServer jobServiceGrpcFnServer; -if (Strings.isNullOrEmpty(configuration.host)) { +if (configuration.port == 0) { Review comment: I think this makes the behavior less consistent. Example scenarios: When host=null : JobPort Dynamic, ArtifactPort Dynamic When host=localhost: JobPort 8099, ArtifactPort 8098 When host=localhost, jobPort=0: JobPort dynamic, ArtifactPort 8098 Just checking if the JobPort is 0 will remove this confusion This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 152378) Time Spent: 10h 50m (was: 10h 40m) > Portable Flink runner JobService entry point in a Docker container > -- > > Key: BEAM-4130 > URL: https://issues.apache.org/jira/browse/BEAM-4130 > Project: Beam > Issue Type: New Feature > Components: runner-flink >Reporter: Ben Sidhom >Assignee: Maximilian Michels >Priority: Minor > Fix For: 2.7.0 > > Time Spent: 10h 50m > Remaining Estimate: 0h > > The portable Flink runner exists as a Job Service that runs somewhere. We > need a main entry point that itself spins up the job service (and artifact > staging service). The main program itself should be packaged into an uberjar > such that it can be run locally or submitted to a Flink deployment via `flink > run`. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container
[ https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=152377&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-152377 ] ASF GitHub Bot logged work on BEAM-4130: Author: ASF GitHub Bot Created on: 08/Oct/18 19:49 Start Date: 08/Oct/18 19:49 Worklog Time Spent: 10m Work Description: angoenka commented on a change in pull request #6607: [BEAM-4130] Use port 0 to pick dynamic port-BugFix URL: https://github.com/apache/beam/pull/6607#discussion_r223478440 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java ## @@ -201,7 +201,7 @@ public void stop() { private GrpcFnServer createJobServer() throws IOException { InMemoryJobService service = createJobService(); GrpcFnServer jobServiceGrpcFnServer; -if (Strings.isNullOrEmpty(configuration.host)) { +if (configuration.port == 0) { Review comment: This makes the behavior less consistent. Example scenarios: When host=null : JobPort Dynamic, ArtifactPort Dynamic When host=localhost: JobPort 8099, ArtifactPort 8098 When host=localhost, jobPort=0: JobPort dynamic, ArtifactPort 8098 Just checking if the JobPort is 0 will remove this confusion This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 152377) Time Spent: 10h 40m (was: 10.5h) > Portable Flink runner JobService entry point in a Docker container > -- > > Key: BEAM-4130 > URL: https://issues.apache.org/jira/browse/BEAM-4130 > Project: Beam > Issue Type: New Feature > Components: runner-flink >Reporter: Ben Sidhom >Assignee: Maximilian Michels >Priority: Minor > Fix For: 2.7.0 > > Time Spent: 10h 40m > Remaining Estimate: 0h > > The portable Flink runner exists as a Job Service that runs somewhere. We > need a main entry point that itself spins up the job service (and artifact > staging service). The main program itself should be packaged into an uberjar > such that it can be run locally or submitted to a Flink deployment via `flink > run`. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container
[ https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=152376&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-152376 ] ASF GitHub Bot logged work on BEAM-4130: Author: ASF GitHub Bot Created on: 08/Oct/18 19:45 Start Date: 08/Oct/18 19:45 Worklog Time Spent: 10m Work Description: mxm commented on a change in pull request #6607: [BEAM-4130] Use port 0 to pick dynamic port-BugFix URL: https://github.com/apache/beam/pull/6607#discussion_r223477264 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java ## @@ -239,7 +239,7 @@ private InMemoryJobService createJobService() throws IOException { throws IOException { BeamFileSystemArtifactStagingService service = new BeamFileSystemArtifactStagingService(); final GrpcFnServer artifactStagingService; -if (Strings.isNullOrEmpty(configuration.host)) { +if (configuration.artifactPort == 0) { Review comment: Should this be `Strings.isNullOrEmpty(configuration.host) || configuration.port == 0`? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 152376) Time Spent: 10.5h (was: 10h 20m) > Portable Flink runner JobService entry point in a Docker container > -- > > Key: BEAM-4130 > URL: https://issues.apache.org/jira/browse/BEAM-4130 > Project: Beam > Issue Type: New Feature > Components: runner-flink >Reporter: Ben Sidhom >Assignee: Maximilian Michels >Priority: Minor > Fix For: 2.7.0 > > Time Spent: 10.5h > Remaining Estimate: 0h > > The portable Flink runner exists as a Job Service that runs somewhere. We > need a main entry point that itself spins up the job service (and artifact > staging service). The main program itself should be packaged into an uberjar > such that it can be run locally or submitted to a Flink deployment via `flink > run`. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container
[ https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=152375&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-152375 ] ASF GitHub Bot logged work on BEAM-4130: Author: ASF GitHub Bot Created on: 08/Oct/18 19:45 Start Date: 08/Oct/18 19:45 Worklog Time Spent: 10m Work Description: mxm commented on a change in pull request #6607: [BEAM-4130] Use port 0 to pick dynamic port-BugFix URL: https://github.com/apache/beam/pull/6607#discussion_r223477139 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java ## @@ -201,7 +201,7 @@ public void stop() { private GrpcFnServer createJobServer() throws IOException { InMemoryJobService service = createJobService(); GrpcFnServer jobServiceGrpcFnServer; -if (Strings.isNullOrEmpty(configuration.host)) { +if (configuration.port == 0) { Review comment: Should this be `Strings.isNullOrEmpty(configuration.host) || configuration.port == 0`? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 152375) Time Spent: 10h 20m (was: 10h 10m) > Portable Flink runner JobService entry point in a Docker container > -- > > Key: BEAM-4130 > URL: https://issues.apache.org/jira/browse/BEAM-4130 > Project: Beam > Issue Type: New Feature > Components: runner-flink >Reporter: Ben Sidhom >Assignee: Maximilian Michels >Priority: Minor > Fix For: 2.7.0 > > Time Spent: 10h 20m > Remaining Estimate: 0h > > The portable Flink runner exists as a Job Service that runs somewhere. We > need a main entry point that itself spins up the job service (and artifact > staging service). The main program itself should be packaged into an uberjar > such that it can be run locally or submitted to a Flink deployment via `flink > run`. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container
[ https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=152368&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-152368 ] ASF GitHub Bot logged work on BEAM-4130: Author: ASF GitHub Bot Created on: 08/Oct/18 19:41 Start Date: 08/Oct/18 19:41 Worklog Time Spent: 10m Work Description: angoenka commented on issue #6607: [BEAM-4130] Use port 0 to pick dynamic port-BugFix URL: https://github.com/apache/beam/pull/6607#issuecomment-427955480 R: @mxm This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 152368) Time Spent: 10h 10m (was: 10h) > Portable Flink runner JobService entry point in a Docker container > -- > > Key: BEAM-4130 > URL: https://issues.apache.org/jira/browse/BEAM-4130 > Project: Beam > Issue Type: New Feature > Components: runner-flink >Reporter: Ben Sidhom >Assignee: Maximilian Michels >Priority: Minor > Fix For: 2.7.0 > > Time Spent: 10h 10m > Remaining Estimate: 0h > > The portable Flink runner exists as a Job Service that runs somewhere. We > need a main entry point that itself spins up the job service (and artifact > staging service). The main program itself should be packaged into an uberjar > such that it can be run locally or submitted to a Flink deployment via `flink > run`. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container
[ https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=152367&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-152367 ] ASF GitHub Bot logged work on BEAM-4130: Author: ASF GitHub Bot Created on: 08/Oct/18 19:40 Start Date: 08/Oct/18 19:40 Worklog Time Spent: 10m Work Description: angoenka opened a new pull request #6607: [BEAM-4130] Use port 0 to pick dynamic port-BugFix URL: https://github.com/apache/beam/pull/6607 **Please** add a meaningful description for your change here Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). It will help us expedite review of your Pull Request if you tag someone (e.g. `@username`) to look at it. Post-Commit Tests Status (on master branch) Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark --- | --- | --- | --- | --- | --- | --- | --- Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_GradleBuild/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_GradleBuild/lastCompletedBuild/) | --- | --- | --- | --- | --- | --- Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_GradleBuild/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_GradleBuild/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex_Gradle/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Gradle/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Gradle/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump_Gradle/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza_Gradle/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark_Gradle/lastCompletedBuild/) Python | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/) | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/) [![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/) | --- | --- | --- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 152367) Time Spent: 10h (was: 9h 50m) > Portable Flink runner JobService entry point in a Docker container > -- > > Key: BEAM-4130 > URL: https://issues.apache.org/jira/browse/BEAM-4130 > Project: Beam > Issue Type: New Feature > Components: runner-flink >
[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container
[ https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=150334&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-150334 ] ASF GitHub Bot logged work on BEAM-4130: Author: ASF GitHub Bot Created on: 02/Oct/18 10:28 Start Date: 02/Oct/18 10:28 Worklog Time Spent: 10m Work Description: mxm closed pull request #6341: [BEAM-4130] Updating FlinkJobServerDriver for port 0 usage URL: https://github.com/apache/beam/pull/6341 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java index 0f567f6c6d5..7b0e55f70e6 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java @@ -55,10 +55,16 @@ @Option(name = "--job-host", usage = "The job server host name") private String host = ""; -@Option(name = "--job-port", usage = "The job service port. (Default: 8099)") +@Option( + name = "--job-port", + usage = "The job service port. 0 to use a dynamic port. (Default: 8099)" +) private int port = 8099; -@Option(name = "--artifact-port", usage = "The artifact service port. (Default: 8098)") +@Option( + name = "--artifact-port", + usage = "The artifact service port. 0 to use a dynamic port. (Default: 8098)" +) private int artifactPort = 8098; @Option(name = "--artifacts-dir", usage = "The location to store staged artifact files") This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 150334) Time Spent: 9h 50m (was: 9h 40m) > Portable Flink runner JobService entry point in a Docker container > -- > > Key: BEAM-4130 > URL: https://issues.apache.org/jira/browse/BEAM-4130 > Project: Beam > Issue Type: New Feature > Components: runner-flink >Reporter: Ben Sidhom >Assignee: Maximilian Michels >Priority: Minor > Fix For: 2.7.0 > > Time Spent: 9h 50m > Remaining Estimate: 0h > > The portable Flink runner exists as a Job Service that runs somewhere. We > need a main entry point that itself spins up the job service (and artifact > staging service). The main program itself should be packaged into an uberjar > such that it can be run locally or submitted to a Flink deployment via `flink > run`. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container
[ https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=146762&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-146762 ] ASF GitHub Bot logged work on BEAM-4130: Author: ASF GitHub Bot Created on: 22/Sep/18 11:30 Start Date: 22/Sep/18 11:30 Worklog Time Spent: 10m Work Description: mxm commented on a change in pull request #6341: [BEAM-4130] Fixing bug in flink job server creation URL: https://github.com/apache/beam/pull/6341#discussion_r219669077 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java ## @@ -107,12 +107,19 @@ public static FlinkJobServerDriver fromConfig(ServerConfiguration configuration) new ThreadFactoryBuilder().setNameFormat("flink-runner-job-server").setDaemon(true).build(); ListeningExecutorService executor = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool(threadFactory)); -ServerFactory jobServerFactory = ServerFactory.createWithPortSupplier(() -> configuration.port); +ServerFactory jobServerFactory = +ServerFactory.createWithPortSupplier(() -> getPort(configuration.host, configuration.port)); ServerFactory artifactServerFactory = -ServerFactory.createWithPortSupplier(() -> configuration.artifactPort); +ServerFactory.createWithPortSupplier( +() -> getPort(configuration.host, configuration.artifactPort)); return create(configuration, executor, jobServerFactory, artifactServerFactory); } + private static int getPort(String host, int port) { +// If host is empty then use dynamic port +return Strings.isNullOrEmpty(host) ? 0 : port; Review comment: Here is the PR #6469. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 146762) Time Spent: 9h 40m (was: 9.5h) > Portable Flink runner JobService entry point in a Docker container > -- > > Key: BEAM-4130 > URL: https://issues.apache.org/jira/browse/BEAM-4130 > Project: Beam > Issue Type: New Feature > Components: runner-flink >Reporter: Ben Sidhom >Assignee: Maximilian Michels >Priority: Minor > Fix For: 2.7.0 > > Time Spent: 9h 40m > Remaining Estimate: 0h > > The portable Flink runner exists as a Job Service that runs somewhere. We > need a main entry point that itself spins up the job service (and artifact > staging service). The main program itself should be packaged into an uberjar > such that it can be run locally or submitted to a Flink deployment via `flink > run`. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container
[ https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=146754&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-146754 ] ASF GitHub Bot logged work on BEAM-4130: Author: ASF GitHub Bot Created on: 22/Sep/18 10:26 Start Date: 22/Sep/18 10:26 Worklog Time Spent: 10m Work Description: mxm commented on a change in pull request #6341: [BEAM-4130] Fixing bug in flink job server creation URL: https://github.com/apache/beam/pull/6341#discussion_r219667637 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java ## @@ -107,12 +107,19 @@ public static FlinkJobServerDriver fromConfig(ServerConfiguration configuration) new ThreadFactoryBuilder().setNameFormat("flink-runner-job-server").setDaemon(true).build(); ListeningExecutorService executor = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool(threadFactory)); -ServerFactory jobServerFactory = ServerFactory.createWithPortSupplier(() -> configuration.port); +ServerFactory jobServerFactory = +ServerFactory.createWithPortSupplier(() -> getPort(configuration.host, configuration.port)); ServerFactory artifactServerFactory = -ServerFactory.createWithPortSupplier(() -> configuration.artifactPort); +ServerFactory.createWithPortSupplier( +() -> getPort(configuration.host, configuration.artifactPort)); return create(configuration, executor, jobServerFactory, artifactServerFactory); } + private static int getPort(String host, int port) { +// If host is empty then use dynamic port +return Strings.isNullOrEmpty(host) ? 0 : port; Review comment: Does that work for you? I'll open a PR to test the port supplier methods. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 146754) Time Spent: 9.5h (was: 9h 20m) > Portable Flink runner JobService entry point in a Docker container > -- > > Key: BEAM-4130 > URL: https://issues.apache.org/jira/browse/BEAM-4130 > Project: Beam > Issue Type: New Feature > Components: runner-flink >Reporter: Ben Sidhom >Assignee: Maximilian Michels >Priority: Minor > Fix For: 2.7.0 > > Time Spent: 9.5h > Remaining Estimate: 0h > > The portable Flink runner exists as a Job Service that runs somewhere. We > need a main entry point that itself spins up the job service (and artifact > staging service). The main program itself should be packaged into an uberjar > such that it can be run locally or submitted to a Flink deployment via `flink > run`. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container
[ https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=146753&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-146753 ] ASF GitHub Bot logged work on BEAM-4130: Author: ASF GitHub Bot Created on: 22/Sep/18 08:01 Start Date: 22/Sep/18 08:01 Worklog Time Spent: 10m Work Description: angoenka commented on issue #6341: [BEAM-4130] Fixing bug in flink job server creation URL: https://github.com/apache/beam/pull/6341#issuecomment-423725762 Run Go PostCommit This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 146753) Time Spent: 9h 20m (was: 9h 10m) > Portable Flink runner JobService entry point in a Docker container > -- > > Key: BEAM-4130 > URL: https://issues.apache.org/jira/browse/BEAM-4130 > Project: Beam > Issue Type: New Feature > Components: runner-flink >Reporter: Ben Sidhom >Assignee: Maximilian Michels >Priority: Minor > Fix For: 2.7.0 > > Time Spent: 9h 20m > Remaining Estimate: 0h > > The portable Flink runner exists as a Job Service that runs somewhere. We > need a main entry point that itself spins up the job service (and artifact > staging service). The main program itself should be packaged into an uberjar > such that it can be run locally or submitted to a Flink deployment via `flink > run`. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container
[ https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=144662&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-144662 ] ASF GitHub Bot logged work on BEAM-4130: Author: ASF GitHub Bot Created on: 16/Sep/18 10:32 Start Date: 16/Sep/18 10:32 Worklog Time Spent: 10m Work Description: mxm commented on a change in pull request #6341: [BEAM-4130] Fixing bug in flink job server creation URL: https://github.com/apache/beam/pull/6341#discussion_r217909434 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java ## @@ -107,12 +107,19 @@ public static FlinkJobServerDriver fromConfig(ServerConfiguration configuration) new ThreadFactoryBuilder().setNameFormat("flink-runner-job-server").setDaemon(true).build(); ListeningExecutorService executor = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool(threadFactory)); -ServerFactory jobServerFactory = ServerFactory.createWithPortSupplier(() -> configuration.port); +ServerFactory jobServerFactory = +ServerFactory.createWithPortSupplier(() -> getPort(configuration.host, configuration.port)); ServerFactory artifactServerFactory = -ServerFactory.createWithPortSupplier(() -> configuration.artifactPort); +ServerFactory.createWithPortSupplier( +() -> getPort(configuration.host, configuration.artifactPort)); return create(configuration, executor, jobServerFactory, artifactServerFactory); } + private static int getPort(String host, int port) { +// If host is empty then use dynamic port +return Strings.isNullOrEmpty(host) ? 0 : port; Review comment: Imagine the user started the JobServer with `--job-port 1234`, then it would bind to a different port due to `--job-host` being empty. We should simply pass in `0` as the port in tests. This would then bind to an arbitrary port. Additionally, we should clarify the semantics through `ServerFactoryTest` (can be a follow-up). This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 144662) Time Spent: 9h 10m (was: 9h) > Portable Flink runner JobService entry point in a Docker container > -- > > Key: BEAM-4130 > URL: https://issues.apache.org/jira/browse/BEAM-4130 > Project: Beam > Issue Type: New Feature > Components: runner-flink >Reporter: Ben Sidhom >Assignee: Maximilian Michels >Priority: Minor > Fix For: 2.7.0 > > Time Spent: 9h 10m > Remaining Estimate: 0h > > The portable Flink runner exists as a Job Service that runs somewhere. We > need a main entry point that itself spins up the job service (and artifact > staging service). The main program itself should be packaged into an uberjar > such that it can be run locally or submitted to a Flink deployment via `flink > run`. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container
[ https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=144519&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-144519 ] ASF GitHub Bot logged work on BEAM-4130: Author: ASF GitHub Bot Created on: 15/Sep/18 00:57 Start Date: 15/Sep/18 00:57 Worklog Time Spent: 10m Work Description: angoenka commented on issue #6341: [BEAM-4130] Fixing bug in flink job server creation URL: https://github.com/apache/beam/pull/6341#issuecomment-421518808 Run Java PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 144519) Time Spent: 9h (was: 8h 50m) > Portable Flink runner JobService entry point in a Docker container > -- > > Key: BEAM-4130 > URL: https://issues.apache.org/jira/browse/BEAM-4130 > Project: Beam > Issue Type: New Feature > Components: runner-flink >Reporter: Ben Sidhom >Assignee: Maximilian Michels >Priority: Minor > Fix For: 2.7.0 > > Time Spent: 9h > Remaining Estimate: 0h > > The portable Flink runner exists as a Job Service that runs somewhere. We > need a main entry point that itself spins up the job service (and artifact > staging service). The main program itself should be packaged into an uberjar > such that it can be run locally or submitted to a Flink deployment via `flink > run`. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container
[ https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=144517&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-144517 ] ASF GitHub Bot logged work on BEAM-4130: Author: ASF GitHub Bot Created on: 15/Sep/18 00:56 Start Date: 15/Sep/18 00:56 Worklog Time Spent: 10m Work Description: angoenka commented on a change in pull request #6341: [BEAM-4130] Fixing bug in flink job server creation URL: https://github.com/apache/beam/pull/6341#discussion_r217869549 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java ## @@ -130,7 +137,7 @@ private FlinkJobServerDriver( this.configuration = configuration; this.executor = executor; this.jobServerFactory = jobServerFactory; -this.artifactServerFactory = jobServerFactory; +this.artifactServerFactory = artifactServerFactory; Review comment: Thanks! This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 144517) Time Spent: 8h 50m (was: 8h 40m) > Portable Flink runner JobService entry point in a Docker container > -- > > Key: BEAM-4130 > URL: https://issues.apache.org/jira/browse/BEAM-4130 > Project: Beam > Issue Type: New Feature > Components: runner-flink >Reporter: Ben Sidhom >Assignee: Maximilian Michels >Priority: Minor > Fix For: 2.7.0 > > Time Spent: 8h 50m > Remaining Estimate: 0h > > The portable Flink runner exists as a Job Service that runs somewhere. We > need a main entry point that itself spins up the job service (and artifact > staging service). The main program itself should be packaged into an uberjar > such that it can be run locally or submitted to a Flink deployment via `flink > run`. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container
[ https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=144516&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-144516 ] ASF GitHub Bot logged work on BEAM-4130: Author: ASF GitHub Bot Created on: 15/Sep/18 00:56 Start Date: 15/Sep/18 00:56 Worklog Time Spent: 10m Work Description: angoenka commented on a change in pull request #6341: [BEAM-4130] Fixing bug in flink job server creation URL: https://github.com/apache/beam/pull/6341#discussion_r217870561 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java ## @@ -107,12 +107,19 @@ public static FlinkJobServerDriver fromConfig(ServerConfiguration configuration) new ThreadFactoryBuilder().setNameFormat("flink-runner-job-server").setDaemon(true).build(); ListeningExecutorService executor = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool(threadFactory)); -ServerFactory jobServerFactory = ServerFactory.createWithPortSupplier(() -> configuration.port); +ServerFactory jobServerFactory = +ServerFactory.createWithPortSupplier(() -> getPort(configuration.host, configuration.port)); ServerFactory artifactServerFactory = -ServerFactory.createWithPortSupplier(() -> configuration.artifactPort); +ServerFactory.createWithPortSupplier( +() -> getPort(configuration.host, configuration.artifactPort)); return create(configuration, executor, jobServerFactory, artifactServerFactory); } + private static int getPort(String host, int port) { +// If host is empty then use dynamic port +return Strings.isNullOrEmpty(host) ? 0 : port; Review comment: We need to be able to use dynamic port to create server in case of test. Port supplier is used when using allocateAndCreate which is expected to use a dynamic port. As port was fixed in portSupplier, this functionality was broken. I am using host as the marker to create dynamic port or static as we do in [L181 ](https://github.com/apache/beam/pull/6341/files#diff-e52a25378b181af8cecbf3d274946fd0L181) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 144516) Time Spent: 8h 50m (was: 8h 40m) > Portable Flink runner JobService entry point in a Docker container > -- > > Key: BEAM-4130 > URL: https://issues.apache.org/jira/browse/BEAM-4130 > Project: Beam > Issue Type: New Feature > Components: runner-flink >Reporter: Ben Sidhom >Assignee: Maximilian Michels >Priority: Minor > Fix For: 2.7.0 > > Time Spent: 8h 50m > Remaining Estimate: 0h > > The portable Flink runner exists as a Job Service that runs somewhere. We > need a main entry point that itself spins up the job service (and artifact > staging service). The main program itself should be packaged into an uberjar > such that it can be run locally or submitted to a Flink deployment via `flink > run`. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container
[ https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=141711&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-141711 ] ASF GitHub Bot logged work on BEAM-4130: Author: ASF GitHub Bot Created on: 06/Sep/18 10:38 Start Date: 06/Sep/18 10:38 Worklog Time Spent: 10m Work Description: mxm commented on a change in pull request #6341: [BEAM-4130] Fixing bug in flink job server creation URL: https://github.com/apache/beam/pull/6341#discussion_r215577945 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java ## @@ -130,7 +137,7 @@ private FlinkJobServerDriver( this.configuration = configuration; this.executor = executor; this.jobServerFactory = jobServerFactory; -this.artifactServerFactory = jobServerFactory; +this.artifactServerFactory = artifactServerFactory; Review comment: I've merged this fix to master with fb23c7f2eacee5374cfd85a58d5d6746e18fabc7. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 141711) Time Spent: 8h 40m (was: 8.5h) > Portable Flink runner JobService entry point in a Docker container > -- > > Key: BEAM-4130 > URL: https://issues.apache.org/jira/browse/BEAM-4130 > Project: Beam > Issue Type: New Feature > Components: runner-flink >Reporter: Ben Sidhom >Assignee: Maximilian Michels >Priority: Minor > Fix For: 2.7.0 > > Time Spent: 8h 40m > Remaining Estimate: 0h > > The portable Flink runner exists as a Job Service that runs somewhere. We > need a main entry point that itself spins up the job service (and artifact > staging service). The main program itself should be packaged into an uberjar > such that it can be run locally or submitted to a Flink deployment via `flink > run`. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container
[ https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=141673&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-141673 ] ASF GitHub Bot logged work on BEAM-4130: Author: ASF GitHub Bot Created on: 06/Sep/18 08:46 Start Date: 06/Sep/18 08:46 Worklog Time Spent: 10m Work Description: mxm commented on a change in pull request #6341: [BEAM-4130] Fixing bug in flink job server creation URL: https://github.com/apache/beam/pull/6341#discussion_r215541028 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java ## @@ -130,7 +137,7 @@ private FlinkJobServerDriver( this.configuration = configuration; this.executor = executor; this.jobServerFactory = jobServerFactory; -this.artifactServerFactory = jobServerFactory; +this.artifactServerFactory = artifactServerFactory; Review comment: Thanks for spotting this. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 141673) Time Spent: 8h 20m (was: 8h 10m) > Portable Flink runner JobService entry point in a Docker container > -- > > Key: BEAM-4130 > URL: https://issues.apache.org/jira/browse/BEAM-4130 > Project: Beam > Issue Type: New Feature > Components: runner-flink >Reporter: Ben Sidhom >Assignee: Maximilian Michels >Priority: Minor > Fix For: 2.7.0 > > Time Spent: 8h 20m > Remaining Estimate: 0h > > The portable Flink runner exists as a Job Service that runs somewhere. We > need a main entry point that itself spins up the job service (and artifact > staging service). The main program itself should be packaged into an uberjar > such that it can be run locally or submitted to a Flink deployment via `flink > run`. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container
[ https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=141674&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-141674 ] ASF GitHub Bot logged work on BEAM-4130: Author: ASF GitHub Bot Created on: 06/Sep/18 08:46 Start Date: 06/Sep/18 08:46 Worklog Time Spent: 10m Work Description: mxm commented on a change in pull request #6341: [BEAM-4130] Fixing bug in flink job server creation URL: https://github.com/apache/beam/pull/6341#discussion_r215541807 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java ## @@ -107,12 +107,19 @@ public static FlinkJobServerDriver fromConfig(ServerConfiguration configuration) new ThreadFactoryBuilder().setNameFormat("flink-runner-job-server").setDaemon(true).build(); ListeningExecutorService executor = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool(threadFactory)); -ServerFactory jobServerFactory = ServerFactory.createWithPortSupplier(() -> configuration.port); +ServerFactory jobServerFactory = +ServerFactory.createWithPortSupplier(() -> getPort(configuration.host, configuration.port)); ServerFactory artifactServerFactory = -ServerFactory.createWithPortSupplier(() -> configuration.artifactPort); +ServerFactory.createWithPortSupplier( +() -> getPort(configuration.host, configuration.artifactPort)); return create(configuration, executor, jobServerFactory, artifactServerFactory); } + private static int getPort(String host, int port) { +// If host is empty then use dynamic port +return Strings.isNullOrEmpty(host) ? 0 : port; Review comment: Why do we not use the provided port when the host is null? I think it would make sense that the user doesn't care about the host name but wants to bind to a specific port. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 141674) Time Spent: 8.5h (was: 8h 20m) > Portable Flink runner JobService entry point in a Docker container > -- > > Key: BEAM-4130 > URL: https://issues.apache.org/jira/browse/BEAM-4130 > Project: Beam > Issue Type: New Feature > Components: runner-flink >Reporter: Ben Sidhom >Assignee: Maximilian Michels >Priority: Minor > Fix For: 2.7.0 > > Time Spent: 8.5h > Remaining Estimate: 0h > > The portable Flink runner exists as a Job Service that runs somewhere. We > need a main entry point that itself spins up the job service (and artifact > staging service). The main program itself should be packaged into an uberjar > such that it can be run locally or submitted to a Flink deployment via `flink > run`. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container
[ https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=141597&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-141597 ] ASF GitHub Bot logged work on BEAM-4130: Author: ASF GitHub Bot Created on: 06/Sep/18 01:49 Start Date: 06/Sep/18 01:49 Worklog Time Spent: 10m Work Description: angoenka opened a new pull request #6341: [BEAM-4130] Fixing bug in flink job server creation URL: https://github.com/apache/beam/pull/6341 **Please** add a meaningful description for your change here Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). It will help us expedite review of your Pull Request if you tag someone (e.g. `@username`) to look at it. Post-Commit Tests Status (on master branch) Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark --- | --- | --- | --- | --- | --- | --- | --- Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_GradleBuild/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_GradleBuild/lastCompletedBuild/) | --- | --- | --- | --- | --- | --- Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_GradleBuild/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_GradleBuild/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex_Gradle/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Gradle/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Gradle/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump_Gradle/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza_Gradle/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark_Gradle/lastCompletedBuild/) Python | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/) | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/) [![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | --- | --- | --- | --- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 141597) Time Spent: 8h (was: 7h 50m) > Portable Flink runner JobService entry point in a Docker container > -- > > Key: BEAM-4130 > URL: https://issues.apache.org/jira/browse/BEAM-4130 > Project: Beam > Issue Type: New Feature > Components: runner-flink >Reporter: Ben Sidhom >Assignee: Maximilian Michels >Priority: Minor > Fix For: 2.7.0 > > Time Spent: 8h > Remaining Estimate: 0h > > The porta
[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container
[ https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=141598&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-141598 ] ASF GitHub Bot logged work on BEAM-4130: Author: ASF GitHub Bot Created on: 06/Sep/18 01:49 Start Date: 06/Sep/18 01:49 Worklog Time Spent: 10m Work Description: angoenka commented on issue #6341: [BEAM-4130] Fixing bug in flink job server creation URL: https://github.com/apache/beam/pull/6341#issuecomment-418937279 cc: @mxm This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 141598) Time Spent: 8h 10m (was: 8h) > Portable Flink runner JobService entry point in a Docker container > -- > > Key: BEAM-4130 > URL: https://issues.apache.org/jira/browse/BEAM-4130 > Project: Beam > Issue Type: New Feature > Components: runner-flink >Reporter: Ben Sidhom >Assignee: Maximilian Michels >Priority: Minor > Fix For: 2.7.0 > > Time Spent: 8h 10m > Remaining Estimate: 0h > > The portable Flink runner exists as a Job Service that runs somewhere. We > need a main entry point that itself spins up the job service (and artifact > staging service). The main program itself should be packaged into an uberjar > such that it can be run locally or submitted to a Flink deployment via `flink > run`. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container
[ https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=137920&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-137920 ] ASF GitHub Bot logged work on BEAM-4130: Author: ASF GitHub Bot Created on: 24/Aug/18 18:23 Start Date: 24/Aug/18 18:23 Worklog Time Spent: 10m Work Description: tweise closed pull request #6265: [BEAM-4130] Bring up Job Server container for Python jobs URL: https://github.com/apache/beam/pull/6265 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/runners/flink/job-server-container/Dockerfile b/runners/flink/job-server-container/Dockerfile index a9aff21b6d6..569c2ab0406 100644 --- a/runners/flink/job-server-container/Dockerfile +++ b/runners/flink/job-server-container/Dockerfile @@ -19,7 +19,7 @@ FROM openjdk:8 MAINTAINER "Apache Beam " -ADD target/beam-runners-flink_2.11-job-server.jar /opt/apache/beam/jars/ +ADD target/beam-runners-flink-job-server.jar /opt/apache/beam/jars/ ADD target/flink-job-server.sh /opt/apache/beam/ WORKDIR /opt/apache/beam diff --git a/runners/flink/job-server-container/build.gradle b/runners/flink/job-server-container/build.gradle index 4d5f53316e3..d2b026804d7 100644 --- a/runners/flink/job-server-container/build.gradle +++ b/runners/flink/job-server-container/build.gradle @@ -38,7 +38,7 @@ dependencies { task copyDockerfileDependencies(type: Copy) { // Required Jars from configurations.dockerDependency - rename 'beam-runners-flink_2.11-job-server.*.jar', 'beam-runners-flink_2.11-job-server.jar' + rename 'beam-runners-flink_2.11-job-server.*.jar', 'beam-runners-flink-job-server.jar' into "build/target" // Entry script from file("./flink-job-server.sh") diff --git a/runners/flink/job-server/build.gradle b/runners/flink/job-server/build.gradle index 5e1e01ae74d..ee6b70f1f37 100644 --- a/runners/flink/job-server/build.gradle +++ b/runners/flink/job-server/build.gradle @@ -55,7 +55,7 @@ dependencies { // task will not work because the flink runner classes only exist in the shadow // jar. runShadow { - def jobHost = project.hasProperty("jobHost") ? project.property("jobHost") : "localhost:8099" + def jobHost = project.hasProperty("jobHost") ? project.property("jobHost") : "localhost" def artifactsDir = project.hasProperty("artifactsDir") ? project.property("artifactsDir") : "/tmp/flink-artifacts" def cleanArtifactsPerJob = project.hasProperty("cleanArtifactsPerJob") args = ["--job-host=${jobHost}", "--artifacts-dir=${artifactsDir}"] diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java index 7e9b14a2acd..13f48c53018 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java @@ -45,15 +45,22 @@ private final ListeningExecutorService executor; private final ServerConfiguration configuration; - private final ServerFactory serverFactory; + private final ServerFactory jobServerFactory; + private final ServerFactory artifactServerFactory; private GrpcFnServer jobServer; private GrpcFnServer artifactStagingServer; /** Configuration for the jobServer. */ public static class ServerConfiguration { -@Option(name = "--job-host", usage = "The job server host string") +@Option(name = "--job-host", usage = "The job server host name") private String host = ""; +@Option(name = "--job-port", usage = "The job service port. (Default: 8099)") +private int port = 8099; + +@Option(name = "--artifact-port", usage = "The artifact service port. (Default: 8098)") +private int artifactPort = 8098; + @Option(name = "--artifacts-dir", usage = "The location to store staged artifact files") private String artifactStagingPath = "/tmp/beam-artifact-staging"; @@ -100,24 +107,30 @@ public static FlinkJobServerDriver fromConfig(ServerConfiguration configuration) new ThreadFactoryBuilder().setNameFormat("flink-runner-job-server").setDaemon(true).build(); ListeningExecutorService executor = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool(threadFactory)); -ServerFactory serverFactory = ServerFactory.createDefault(); -return create(configuration, executor, serverFactory); +ServerFactory jobServerFactory = ServerFactory.createWithPortSupplier(() -> configuration.port); +ServerFactory artifactServerFactory = +ServerFactory.createWithPortSupplier(() -> configuration.artifactPort); +retu
[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container
[ https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=137898&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-137898 ] ASF GitHub Bot logged work on BEAM-4130: Author: ASF GitHub Bot Created on: 24/Aug/18 17:10 Start Date: 24/Aug/18 17:10 Worklog Time Spent: 10m Work Description: mxm commented on a change in pull request #6265: [BEAM-4130] Bring up Job Server container for Python jobs URL: https://github.com/apache/beam/pull/6265#discussion_r212694411 ## File path: sdks/python/build.gradle ## @@ -204,19 +204,30 @@ task directRunnerIT(dependsOn: 'installGcpTest') { // Before running this, you need to: // -// 1. build the SDK container: +// 1. Build the SDK container: // //./gradlew -p sdks/python/container docker // -// 2. start a local JobService, for example, the Portable Flink runner +// 2. Either a) or b) +// a) If you want the Job Server to run in a Docker container: +// +//./gradlew :beam-runners-flink_2.11-job-server-container docker Review comment: Should be `./gradlew :beam-runners-flink_2.11-job-server-container:docker` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 137898) Time Spent: 7h 40m (was: 7.5h) > Portable Flink runner JobService entry point in a Docker container > -- > > Key: BEAM-4130 > URL: https://issues.apache.org/jira/browse/BEAM-4130 > Project: Beam > Issue Type: New Feature > Components: runner-flink >Reporter: Ben Sidhom >Assignee: Maximilian Michels >Priority: Minor > Time Spent: 7h 40m > Remaining Estimate: 0h > > The portable Flink runner exists as a Job Service that runs somewhere. We > need a main entry point that itself spins up the job service (and artifact > staging service). The main program itself should be packaged into an uberjar > such that it can be run locally or submitted to a Flink deployment via `flink > run`. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container
[ https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=137897&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-137897 ] ASF GitHub Bot logged work on BEAM-4130: Author: ASF GitHub Bot Created on: 24/Aug/18 17:08 Start Date: 24/Aug/18 17:08 Worklog Time Spent: 10m Work Description: tweise commented on a change in pull request #6265: [BEAM-4130] Bring up Job Server container for Python jobs URL: https://github.com/apache/beam/pull/6265#discussion_r212693774 ## File path: sdks/python/build.gradle ## @@ -204,19 +204,30 @@ task directRunnerIT(dependsOn: 'installGcpTest') { // Before running this, you need to: // -// 1. build the SDK container: +// 1. Build the SDK container: // //./gradlew -p sdks/python/container docker // -// 2. start a local JobService, for example, the Portable Flink runner +// 2. Either a) or b) +// a) If you want the Job Server to run in a Docker container: +// +//./gradlew :beam-runners-flink_2.11-job-server-container docker Review comment: while this does: `./gradlew -p runners/flink/job-server-container docker` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 137897) Time Spent: 7.5h (was: 7h 20m) > Portable Flink runner JobService entry point in a Docker container > -- > > Key: BEAM-4130 > URL: https://issues.apache.org/jira/browse/BEAM-4130 > Project: Beam > Issue Type: New Feature > Components: runner-flink >Reporter: Ben Sidhom >Assignee: Maximilian Michels >Priority: Minor > Time Spent: 7.5h > Remaining Estimate: 0h > > The portable Flink runner exists as a Job Service that runs somewhere. We > need a main entry point that itself spins up the job service (and artifact > staging service). The main program itself should be packaged into an uberjar > such that it can be run locally or submitted to a Flink deployment via `flink > run`. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container
[ https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=137896&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-137896 ] ASF GitHub Bot logged work on BEAM-4130: Author: ASF GitHub Bot Created on: 24/Aug/18 17:07 Start Date: 24/Aug/18 17:07 Worklog Time Spent: 10m Work Description: tweise commented on a change in pull request #6265: [BEAM-4130] Bring up Job Server container for Python jobs URL: https://github.com/apache/beam/pull/6265#discussion_r212693575 ## File path: sdks/python/build.gradle ## @@ -204,19 +204,30 @@ task directRunnerIT(dependsOn: 'installGcpTest') { // Before running this, you need to: // -// 1. build the SDK container: +// 1. Build the SDK container: // //./gradlew -p sdks/python/container docker // -// 2. start a local JobService, for example, the Portable Flink runner +// 2. Either a) or b) +// a) If you want the Job Server to run in a Docker container: +// +//./gradlew :beam-runners-flink_2.11-job-server-container docker Review comment: Still does not work: `Task 'beam-runners-flink_2.11-job-server-container' not found in root project 'beam'.` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 137896) Time Spent: 7h 20m (was: 7h 10m) > Portable Flink runner JobService entry point in a Docker container > -- > > Key: BEAM-4130 > URL: https://issues.apache.org/jira/browse/BEAM-4130 > Project: Beam > Issue Type: New Feature > Components: runner-flink >Reporter: Ben Sidhom >Assignee: Maximilian Michels >Priority: Minor > Time Spent: 7h 20m > Remaining Estimate: 0h > > The portable Flink runner exists as a Job Service that runs somewhere. We > need a main entry point that itself spins up the job service (and artifact > staging service). The main program itself should be packaged into an uberjar > such that it can be run locally or submitted to a Flink deployment via `flink > run`. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container
[ https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=137893&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-137893 ] ASF GitHub Bot logged work on BEAM-4130: Author: ASF GitHub Bot Created on: 24/Aug/18 16:54 Start Date: 24/Aug/18 16:54 Worklog Time Spent: 10m Work Description: tweise commented on a change in pull request #6265: [BEAM-4130] Bring up Job Server container for Python jobs URL: https://github.com/apache/beam/pull/6265#discussion_r212690050 ## File path: sdks/python/build.gradle ## @@ -204,19 +204,30 @@ task directRunnerIT(dependsOn: 'installGcpTest') { // Before running this, you need to: // -// 1. build the SDK container: +// 1. Build the SDK container: // //./gradlew -p sdks/python/container docker // -// 2. start a local JobService, for example, the Portable Flink runner +// 2. Either a) or b) +// a) If you want the Job Server to run in a Docker container: +// +//./gradlew :b®eam-runners-flink_2.11-job-server-container docker +// +// b) Otherwise, start a local JobService, for example, the Portable Flink runner //(in a separate shell since it continues to run): // //./gradlew :beam-runners-flink_2.11-job-server:runShadow // // Then you can run this example: // +// Docker (2a): +// //./gradlew :beam-sdks-python:portableWordCount // +// Local JobService (2b): +// +//./gradlew :beam-sdks-python:portableWordCount -PJobEndpoint=localhost Review comment: (this is more related to PortableOptions on the Python side, not what FlinkJobServerDriver does) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 137893) Time Spent: 7h 10m (was: 7h) > Portable Flink runner JobService entry point in a Docker container > -- > > Key: BEAM-4130 > URL: https://issues.apache.org/jira/browse/BEAM-4130 > Project: Beam > Issue Type: New Feature > Components: runner-flink >Reporter: Ben Sidhom >Assignee: Maximilian Michels >Priority: Minor > Time Spent: 7h 10m > Remaining Estimate: 0h > > The portable Flink runner exists as a Job Service that runs somewhere. We > need a main entry point that itself spins up the job service (and artifact > staging service). The main program itself should be packaged into an uberjar > such that it can be run locally or submitted to a Flink deployment via `flink > run`. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container
[ https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=137889&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-137889 ] ASF GitHub Bot logged work on BEAM-4130: Author: ASF GitHub Bot Created on: 24/Aug/18 16:37 Start Date: 24/Aug/18 16:37 Worklog Time Spent: 10m Work Description: tweise commented on a change in pull request #6265: [BEAM-4130] Bring up Job Server container for Python jobs URL: https://github.com/apache/beam/pull/6265#discussion_r212685952 ## File path: sdks/python/build.gradle ## @@ -204,19 +204,30 @@ task directRunnerIT(dependsOn: 'installGcpTest') { // Before running this, you need to: // -// 1. build the SDK container: +// 1. Build the SDK container: // //./gradlew -p sdks/python/container docker // -// 2. start a local JobService, for example, the Portable Flink runner +// 2. Either a) or b) +// a) If you want the Job Server to run in a Docker container: +// +//./gradlew :b®eam-runners-flink_2.11-job-server-container docker +// +// b) Otherwise, start a local JobService, for example, the Portable Flink runner //(in a separate shell since it continues to run): // //./gradlew :beam-runners-flink_2.11-job-server:runShadow // // Then you can run this example: // +// Docker (2a): +// //./gradlew :beam-sdks-python:portableWordCount // +// Local JobService (2b): +// +//./gradlew :beam-sdks-python:portableWordCount -PJobEndpoint=localhost Review comment: Why not support --job-endpoint in the previous form, that seems more intuitive? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 137889) Time Spent: 7h (was: 6h 50m) > Portable Flink runner JobService entry point in a Docker container > -- > > Key: BEAM-4130 > URL: https://issues.apache.org/jira/browse/BEAM-4130 > Project: Beam > Issue Type: New Feature > Components: runner-flink >Reporter: Ben Sidhom >Assignee: Maximilian Michels >Priority: Minor > Time Spent: 7h > Remaining Estimate: 0h > > The portable Flink runner exists as a Job Service that runs somewhere. We > need a main entry point that itself spins up the job service (and artifact > staging service). The main program itself should be packaged into an uberjar > such that it can be run locally or submitted to a Flink deployment via `flink > run`. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container
[ https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=137885&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-137885 ] ASF GitHub Bot logged work on BEAM-4130: Author: ASF GitHub Bot Created on: 24/Aug/18 16:35 Start Date: 24/Aug/18 16:35 Worklog Time Spent: 10m Work Description: mxm commented on a change in pull request #6265: [BEAM-4130] Bring up Job Server container for Python jobs URL: https://github.com/apache/beam/pull/6265#discussion_r212685321 ## File path: sdks/python/build.gradle ## @@ -204,19 +204,30 @@ task directRunnerIT(dependsOn: 'installGcpTest') { // Before running this, you need to: // -// 1. build the SDK container: +// 1. Build the SDK container: // //./gradlew -p sdks/python/container docker // -// 2. start a local JobService, for example, the Portable Flink runner +// 2. Either a) or b) +// a) If you want the Job Server to run in a Docker container: +// +//./gradlew :b®eam-runners-flink_2.11-job-server-container docker +// +// b) Otherwise, start a local JobService, for example, the Portable Flink runner //(in a separate shell since it continues to run): // //./gradlew :beam-runners-flink_2.11-job-server:runShadow // // Then you can run this example: // +// Docker (2a): +// //./gradlew :beam-sdks-python:portableWordCount // +// Local JobService (2b): +// +//./gradlew :beam-sdks-python:portableWordCount -PJobEndpoint=localhost Review comment: Will change to lowerCamelCase. You can still specify the port via `--job_port` but it is not exposed as a property yet. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 137885) Time Spent: 6h 40m (was: 6.5h) > Portable Flink runner JobService entry point in a Docker container > -- > > Key: BEAM-4130 > URL: https://issues.apache.org/jira/browse/BEAM-4130 > Project: Beam > Issue Type: New Feature > Components: runner-flink >Reporter: Ben Sidhom >Assignee: Maximilian Michels >Priority: Minor > Time Spent: 6h 40m > Remaining Estimate: 0h > > The portable Flink runner exists as a Job Service that runs somewhere. We > need a main entry point that itself spins up the job service (and artifact > staging service). The main program itself should be packaged into an uberjar > such that it can be run locally or submitted to a Flink deployment via `flink > run`. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container
[ https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=137887&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-137887 ] ASF GitHub Bot logged work on BEAM-4130: Author: ASF GitHub Bot Created on: 24/Aug/18 16:35 Start Date: 24/Aug/18 16:35 Worklog Time Spent: 10m Work Description: mxm commented on a change in pull request #6265: [BEAM-4130] Bring up Job Server container for Python jobs URL: https://github.com/apache/beam/pull/6265#discussion_r212685454 ## File path: sdks/python/build.gradle ## @@ -204,19 +204,30 @@ task directRunnerIT(dependsOn: 'installGcpTest') { // Before running this, you need to: // -// 1. build the SDK container: +// 1. Build the SDK container: // //./gradlew -p sdks/python/container docker // -// 2. start a local JobService, for example, the Portable Flink runner +// 2. Either a) or b) +// a) If you want the Job Server to run in a Docker container: +// +//./gradlew :b®eam-runners-flink_2.11-job-server-container docker Review comment: Ah, there is a Unicode character in beam `®`. No idea how it ended up there. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 137887) Time Spent: 6h 50m (was: 6h 40m) > Portable Flink runner JobService entry point in a Docker container > -- > > Key: BEAM-4130 > URL: https://issues.apache.org/jira/browse/BEAM-4130 > Project: Beam > Issue Type: New Feature > Components: runner-flink >Reporter: Ben Sidhom >Assignee: Maximilian Michels >Priority: Minor > Time Spent: 6h 50m > Remaining Estimate: 0h > > The portable Flink runner exists as a Job Service that runs somewhere. We > need a main entry point that itself spins up the job service (and artifact > staging service). The main program itself should be packaged into an uberjar > such that it can be run locally or submitted to a Flink deployment via `flink > run`. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container
[ https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=137882&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-137882 ] ASF GitHub Bot logged work on BEAM-4130: Author: ASF GitHub Bot Created on: 24/Aug/18 16:30 Start Date: 24/Aug/18 16:30 Worklog Time Spent: 10m Work Description: tweise commented on a change in pull request #6265: [BEAM-4130] Bring up Job Server container for Python jobs URL: https://github.com/apache/beam/pull/6265#discussion_r212684030 ## File path: sdks/python/build.gradle ## @@ -204,19 +204,30 @@ task directRunnerIT(dependsOn: 'installGcpTest') { // Before running this, you need to: // -// 1. build the SDK container: +// 1. Build the SDK container: // //./gradlew -p sdks/python/container docker // -// 2. start a local JobService, for example, the Portable Flink runner +// 2. Either a) or b) +// a) If you want the Job Server to run in a Docker container: +// +//./gradlew :b®eam-runners-flink_2.11-job-server-container docker +// +// b) Otherwise, start a local JobService, for example, the Portable Flink runner //(in a separate shell since it continues to run): // //./gradlew :beam-runners-flink_2.11-job-server:runShadow // // Then you can run this example: // +// Docker (2a): +// //./gradlew :beam-sdks-python:portableWordCount // +// Local JobService (2b): +// +//./gradlew :beam-sdks-python:portableWordCount -PJobEndpoint=localhost Review comment: nit: the existing properties are usually lowerCamelCase What if I want to specify a port, is that no longer supported? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 137882) Time Spent: 6.5h (was: 6h 20m) > Portable Flink runner JobService entry point in a Docker container > -- > > Key: BEAM-4130 > URL: https://issues.apache.org/jira/browse/BEAM-4130 > Project: Beam > Issue Type: New Feature > Components: runner-flink >Reporter: Ben Sidhom >Assignee: Maximilian Michels >Priority: Minor > Time Spent: 6.5h > Remaining Estimate: 0h > > The portable Flink runner exists as a Job Service that runs somewhere. We > need a main entry point that itself spins up the job service (and artifact > staging service). The main program itself should be packaged into an uberjar > such that it can be run locally or submitted to a Flink deployment via `flink > run`. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container
[ https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=137880&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-137880 ] ASF GitHub Bot logged work on BEAM-4130: Author: ASF GitHub Bot Created on: 24/Aug/18 16:21 Start Date: 24/Aug/18 16:21 Worklog Time Spent: 10m Work Description: tweise commented on a change in pull request #6265: [BEAM-4130] Bring up Job Server container for Python jobs URL: https://github.com/apache/beam/pull/6265#discussion_r212681815 ## File path: sdks/python/build.gradle ## @@ -204,19 +204,30 @@ task directRunnerIT(dependsOn: 'installGcpTest') { // Before running this, you need to: // -// 1. build the SDK container: +// 1. Build the SDK container: // //./gradlew -p sdks/python/container docker // -// 2. start a local JobService, for example, the Portable Flink runner +// 2. Either a) or b) +// a) If you want the Job Server to run in a Docker container: +// +//./gradlew :b®eam-runners-flink_2.11-job-server-container docker Review comment: This command doesn't work (not sure why actually), but this here does: `./gradlew -p sdks/python/container docker` (see python container build above) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 137880) Time Spent: 6h 20m (was: 6h 10m) > Portable Flink runner JobService entry point in a Docker container > -- > > Key: BEAM-4130 > URL: https://issues.apache.org/jira/browse/BEAM-4130 > Project: Beam > Issue Type: New Feature > Components: runner-flink >Reporter: Ben Sidhom >Assignee: Maximilian Michels >Priority: Minor > Time Spent: 6h 20m > Remaining Estimate: 0h > > The portable Flink runner exists as a Job Service that runs somewhere. We > need a main entry point that itself spins up the job service (and artifact > staging service). The main program itself should be packaged into an uberjar > such that it can be run locally or submitted to a Flink deployment via `flink > run`. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container
[ https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=137876&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-137876 ] ASF GitHub Bot logged work on BEAM-4130: Author: ASF GitHub Bot Created on: 24/Aug/18 16:14 Start Date: 24/Aug/18 16:14 Worklog Time Spent: 10m Work Description: mxm commented on a change in pull request #6265: [BEAM-4130] Bring up Job Server container for Python jobs URL: https://github.com/apache/beam/pull/6265#discussion_r212679944 ## File path: sdks/python/build.gradle ## @@ -204,19 +204,30 @@ task directRunnerIT(dependsOn: 'installGcpTest') { // Before running this, you need to: // -// 1. build the SDK container: +// 1. Build the SDK container: // //./gradlew -p sdks/python/container docker // -// 2. start a local JobService, for example, the Portable Flink runner +// 2. Either a) or b) +// a) If you want the Job Server to run in a Docker container: +// +//./gradlew :b®eam-runners-flink_2.11-job-server-container docker +// +// b) Otherwise, start a local JobService, for example, the Portable Flink runner //(in a separate shell since it continues to run): // //./gradlew :beam-runners-flink_2.11-job-server:runShadow // // Then you can run this example: // +// Docker (2a): +// //./gradlew :beam-sdks-python:portableWordCount // +// Local JobService (2b): +// +//./gradlew :beam-sdks-python:portableWordCount --job_endpoint=localhost:8099 Review comment: Updated to include a property. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 137876) Time Spent: 6h 10m (was: 6h) > Portable Flink runner JobService entry point in a Docker container > -- > > Key: BEAM-4130 > URL: https://issues.apache.org/jira/browse/BEAM-4130 > Project: Beam > Issue Type: New Feature > Components: runner-flink >Reporter: Ben Sidhom >Assignee: Maximilian Michels >Priority: Minor > Time Spent: 6h 10m > Remaining Estimate: 0h > > The portable Flink runner exists as a Job Service that runs somewhere. We > need a main entry point that itself spins up the job service (and artifact > staging service). The main program itself should be packaged into an uberjar > such that it can be run locally or submitted to a Flink deployment via `flink > run`. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container
[ https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=137874&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-137874 ] ASF GitHub Bot logged work on BEAM-4130: Author: ASF GitHub Bot Created on: 24/Aug/18 16:05 Start Date: 24/Aug/18 16:05 Worklog Time Spent: 10m Work Description: tweise commented on a change in pull request #6265: [BEAM-4130] Bring up Job Server container for Python jobs URL: https://github.com/apache/beam/pull/6265#discussion_r212677632 ## File path: sdks/python/build.gradle ## @@ -204,19 +204,30 @@ task directRunnerIT(dependsOn: 'installGcpTest') { // Before running this, you need to: // -// 1. build the SDK container: +// 1. Build the SDK container: // //./gradlew -p sdks/python/container docker // -// 2. start a local JobService, for example, the Portable Flink runner +// 2. Either a) or b) +// a) If you want the Job Server to run in a Docker container: +// +//./gradlew :b®eam-runners-flink_2.11-job-server-container docker +// +// b) Otherwise, start a local JobService, for example, the Portable Flink runner //(in a separate shell since it continues to run): // //./gradlew :beam-runners-flink_2.11-job-server:runShadow // // Then you can run this example: // +// Docker (2a): +// //./gradlew :beam-sdks-python:portableWordCount // +// Local JobService (2b): +// +//./gradlew :beam-sdks-python:portableWordCount --job_endpoint=localhost:8099 Review comment: like this ``` if (project.hasProperty("flinkMasterUrl")) args += ["--flink-master-url=${project.property('flinkMasterUrl')}"] ``` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 137874) Time Spent: 6h (was: 5h 50m) > Portable Flink runner JobService entry point in a Docker container > -- > > Key: BEAM-4130 > URL: https://issues.apache.org/jira/browse/BEAM-4130 > Project: Beam > Issue Type: New Feature > Components: runner-flink >Reporter: Ben Sidhom >Assignee: Maximilian Michels >Priority: Minor > Time Spent: 6h > Remaining Estimate: 0h > > The portable Flink runner exists as a Job Service that runs somewhere. We > need a main entry point that itself spins up the job service (and artifact > staging service). The main program itself should be packaged into an uberjar > such that it can be run locally or submitted to a Flink deployment via `flink > run`. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container
[ https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=137872&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-137872 ] ASF GitHub Bot logged work on BEAM-4130: Author: ASF GitHub Bot Created on: 24/Aug/18 16:01 Start Date: 24/Aug/18 16:01 Worklog Time Spent: 10m Work Description: tweise commented on a change in pull request #6265: [BEAM-4130] Bring up Job Server container for Python jobs URL: https://github.com/apache/beam/pull/6265#discussion_r212676518 ## File path: sdks/python/build.gradle ## @@ -204,19 +204,30 @@ task directRunnerIT(dependsOn: 'installGcpTest') { // Before running this, you need to: // -// 1. build the SDK container: +// 1. Build the SDK container: // //./gradlew -p sdks/python/container docker // -// 2. start a local JobService, for example, the Portable Flink runner +// 2. Either a) or b) +// a) If you want the Job Server to run in a Docker container: +// +//./gradlew :b®eam-runners-flink_2.11-job-server-container docker +// +// b) Otherwise, start a local JobService, for example, the Portable Flink runner //(in a separate shell since it continues to run): // //./gradlew :beam-runners-flink_2.11-job-server:runShadow // // Then you can run this example: // +// Docker (2a): +// //./gradlew :beam-sdks-python:portableWordCount // +// Local JobService (2b): +// +//./gradlew :beam-sdks-python:portableWordCount --job_endpoint=localhost:8099 Review comment: `Unknown command-line option '--job_endpoint` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 137872) Time Spent: 5h 50m (was: 5h 40m) > Portable Flink runner JobService entry point in a Docker container > -- > > Key: BEAM-4130 > URL: https://issues.apache.org/jira/browse/BEAM-4130 > Project: Beam > Issue Type: New Feature > Components: runner-flink >Reporter: Ben Sidhom >Assignee: Maximilian Michels >Priority: Minor > Time Spent: 5h 50m > Remaining Estimate: 0h > > The portable Flink runner exists as a Job Service that runs somewhere. We > need a main entry point that itself spins up the job service (and artifact > staging service). The main program itself should be packaged into an uberjar > such that it can be run locally or submitted to a Flink deployment via `flink > run`. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container
[ https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=137870&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-137870 ] ASF GitHub Bot logged work on BEAM-4130: Author: ASF GitHub Bot Created on: 24/Aug/18 15:59 Start Date: 24/Aug/18 15:59 Worklog Time Spent: 10m Work Description: mxm commented on a change in pull request #6265: [BEAM-4130] Bring up Job Server container for Python jobs URL: https://github.com/apache/beam/pull/6265#discussion_r212675908 ## File path: sdks/python/build.gradle ## @@ -204,19 +204,30 @@ task directRunnerIT(dependsOn: 'installGcpTest') { // Before running this, you need to: // -// 1. build the SDK container: +// 1. Build the SDK container: // //./gradlew -p sdks/python/container docker // -// 2. start a local JobService, for example, the Portable Flink runner +// 2. Either a) or b) +// a) If you want the Job Server to run in a Docker container: +// +//./gradlew :b®eam-runners-flink_2.11-job-server-container docker +// +// b) Otherwise, start a local JobService, for example, the Portable Flink runner //(in a separate shell since it continues to run): // //./gradlew :beam-runners-flink_2.11-job-server:runShadow // // Then you can run this example: // +// Docker (2a): +// //./gradlew :beam-sdks-python:portableWordCount // +// Local JobService (2b): +// +//./gradlew :beam-sdks-python:portableWordCount --job_endpoint=localhost:8099 Review comment: Ah yes. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 137870) Time Spent: 5h 40m (was: 5.5h) > Portable Flink runner JobService entry point in a Docker container > -- > > Key: BEAM-4130 > URL: https://issues.apache.org/jira/browse/BEAM-4130 > Project: Beam > Issue Type: New Feature > Components: runner-flink >Reporter: Ben Sidhom >Assignee: Maximilian Michels >Priority: Minor > Time Spent: 5h 40m > Remaining Estimate: 0h > > The portable Flink runner exists as a Job Service that runs somewhere. We > need a main entry point that itself spins up the job service (and artifact > staging service). The main program itself should be packaged into an uberjar > such that it can be run locally or submitted to a Flink deployment via `flink > run`. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container
[ https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=137858&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-137858 ] ASF GitHub Bot logged work on BEAM-4130: Author: ASF GitHub Bot Created on: 24/Aug/18 15:51 Start Date: 24/Aug/18 15:51 Worklog Time Spent: 10m Work Description: tweise commented on a change in pull request #6265: [BEAM-4130] Bring up Job Server container for Python jobs URL: https://github.com/apache/beam/pull/6265#discussion_r212673728 ## File path: sdks/python/build.gradle ## @@ -204,19 +204,30 @@ task directRunnerIT(dependsOn: 'installGcpTest') { // Before running this, you need to: // -// 1. build the SDK container: +// 1. Build the SDK container: // //./gradlew -p sdks/python/container docker // -// 2. start a local JobService, for example, the Portable Flink runner +// 2. Either a) or b) +// a) If you want the Job Server to run in a Docker container: +// +//./gradlew :b®eam-runners-flink_2.11-job-server-container docker +// +// b) Otherwise, start a local JobService, for example, the Portable Flink runner //(in a separate shell since it continues to run): // //./gradlew :beam-runners-flink_2.11-job-server:runShadow // // Then you can run this example: // +// Docker (2a): +// //./gradlew :beam-sdks-python:portableWordCount // +// Local JobService (2b): +// +//./gradlew :beam-sdks-python:portableWordCount --job_endpoint=localhost:8099 Review comment: Doesn't this require a project property? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 137858) Time Spent: 5.5h (was: 5h 20m) > Portable Flink runner JobService entry point in a Docker container > -- > > Key: BEAM-4130 > URL: https://issues.apache.org/jira/browse/BEAM-4130 > Project: Beam > Issue Type: New Feature > Components: runner-flink >Reporter: Ben Sidhom >Assignee: Maximilian Michels >Priority: Minor > Time Spent: 5.5h > Remaining Estimate: 0h > > The portable Flink runner exists as a Job Service that runs somewhere. We > need a main entry point that itself spins up the job service (and artifact > staging service). The main program itself should be packaged into an uberjar > such that it can be run locally or submitted to a Flink deployment via `flink > run`. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container
[ https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=137824&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-137824 ] ASF GitHub Bot logged work on BEAM-4130: Author: ASF GitHub Bot Created on: 24/Aug/18 13:58 Start Date: 24/Aug/18 13:58 Worklog Time Spent: 10m Work Description: mxm commented on a change in pull request #6265: [BEAM-4130] Bring up Job Server container for Python jobs URL: https://github.com/apache/beam/pull/6265#discussion_r212636822 ## File path: sdks/python/build.gradle ## @@ -204,14 +204,10 @@ task directRunnerIT(dependsOn: 'installGcpTest') { // Before running this, you need to: // -// 1. build the SDK container: Review comment: +1 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 137824) Time Spent: 5h 20m (was: 5h 10m) > Portable Flink runner JobService entry point in a Docker container > -- > > Key: BEAM-4130 > URL: https://issues.apache.org/jira/browse/BEAM-4130 > Project: Beam > Issue Type: New Feature > Components: runner-flink >Reporter: Ben Sidhom >Assignee: Maximilian Michels >Priority: Minor > Time Spent: 5h 20m > Remaining Estimate: 0h > > The portable Flink runner exists as a Job Service that runs somewhere. We > need a main entry point that itself spins up the job service (and artifact > staging service). The main program itself should be packaged into an uberjar > such that it can be run locally or submitted to a Flink deployment via `flink > run`. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container
[ https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=137633&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-137633 ] ASF GitHub Bot logged work on BEAM-4130: Author: ASF GitHub Bot Created on: 24/Aug/18 00:54 Start Date: 24/Aug/18 00:54 Worklog Time Spent: 10m Work Description: tweise commented on a change in pull request #6265: [BEAM-4130] Bring up Job Server container for Python jobs URL: https://github.com/apache/beam/pull/6265#discussion_r212496888 ## File path: sdks/python/build.gradle ## @@ -204,14 +204,10 @@ task directRunnerIT(dependsOn: 'installGcpTest') { // Before running this, you need to: // -// 1. build the SDK container: Review comment: Perhaps we should keep the alternative documented here, since running against an existing job server is useful, for example when working on/debugging the runner. In the side input PR I already made some changes to the options here, to utilize Gradle properties. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 137633) Time Spent: 5h 10m (was: 5h) > Portable Flink runner JobService entry point in a Docker container > -- > > Key: BEAM-4130 > URL: https://issues.apache.org/jira/browse/BEAM-4130 > Project: Beam > Issue Type: New Feature > Components: runner-flink >Reporter: Ben Sidhom >Assignee: Maximilian Michels >Priority: Minor > Time Spent: 5h 10m > Remaining Estimate: 0h > > The portable Flink runner exists as a Job Service that runs somewhere. We > need a main entry point that itself spins up the job service (and artifact > staging service). The main program itself should be packaged into an uberjar > such that it can be run locally or submitted to a Flink deployment via `flink > run`. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container
[ https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=137111&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-137111 ] ASF GitHub Bot logged work on BEAM-4130: Author: ASF GitHub Bot Created on: 22/Aug/18 18:47 Start Date: 22/Aug/18 18:47 Worklog Time Spent: 10m Work Description: mxm opened a new pull request #6265: [BEAM-4130] Bring up Job Server container for Python jobs URL: https://github.com/apache/beam/pull/6265 This starts the Job Server in a container when Python pipelines are run and no Job Server endpoint has been specified. The Job Server has the docker binaries and socket mounted from the host to bring up SDK harness containers directly on the host. This feature is currently only is available for Flink. Due to host networking not being available on MacOs, we need to explicitly map ports from the host system to the container. On Linux this is not necessary because host networking works as expected. Generally, getting Docker support to work on MacOs proved to be a bit tricky. You can test this via: ``` gradle :beam-sdks-python:portableWordCount ``` Some rough edges on MacOS: - At times, the JobService socket is not available, even though `grpc.channel_ready_future(channel).result()` in the portable_runner returns. - The port range for the SDK harness is restricted to be between 8100 and 8200 (configurable) due to the necessary explicit port forwarding Potentially problematic on Linux: - The Docker binary might not be statically compiled and depend on shared libraries, though this needs to be tested CC @angoenka Post-Commit Tests Status (on master branch) Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark --- | --- | --- | --- | --- | --- | --- | --- Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_GradleBuild/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_GradleBuild/lastCompletedBuild/) | --- | --- | --- | --- | --- | --- Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_GradleBuild/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_GradleBuild/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex_Gradle/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Gradle/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Gradle/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump_Gradle/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza_Gradle/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark_Gradle/lastCompletedBuild/) Python | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/) | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/) [![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | --- | --- | --- | --- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 137111) Time Spent: 5h (was: 4h 50m) > Port
[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container
[ https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=135710&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-135710 ] ASF GitHub Bot logged work on BEAM-4130: Author: ASF GitHub Bot Created on: 17/Aug/18 14:38 Start Date: 17/Aug/18 14:38 Worklog Time Spent: 10m Work Description: asfgit closed pull request #6238: [BEAM-4130] Build Docker image for Flink's JobServer URL: https://github.com/apache/beam/pull/6238 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/runners/flink/job-server-container/Dockerfile b/runners/flink/job-server-container/Dockerfile new file mode 100644 index 000..a9aff21b6d6 --- /dev/null +++ b/runners/flink/job-server-container/Dockerfile @@ -0,0 +1,26 @@ +### +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +### + +FROM openjdk:8 +MAINTAINER "Apache Beam " + +ADD target/beam-runners-flink_2.11-job-server.jar /opt/apache/beam/jars/ +ADD target/flink-job-server.sh /opt/apache/beam/ + +WORKDIR /opt/apache/beam +ENTRYPOINT ["./flink-job-server.sh"] diff --git a/runners/flink/job-server-container/build.gradle b/runners/flink/job-server-container/build.gradle new file mode 100644 index 000..4d5f53316e3 --- /dev/null +++ b/runners/flink/job-server-container/build.gradle @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Build a Docker image to bootstrap FlinkJobServerDriver which requires a Java environment. + * Alternatively, it can also be bootstrapped through :beam-runners-flink_2.11-job-server:runShadow + * or by directly running the generated JAR file. + */ + +apply plugin: org.apache.beam.gradle.BeamModulePlugin +applyDockerNature() + +description = "Apache Beam :: Runners :: Flink :: Job Server :: Container" + +configurations { + dockerDependency +} + +dependencies { + dockerDependency project(path: ":beam-runners-flink_2.11-job-server", configuration: "shadow") +} + +task copyDockerfileDependencies(type: Copy) { + // Required Jars + from configurations.dockerDependency + rename 'beam-runners-flink_2.11-job-server.*.jar', 'beam-runners-flink_2.11-job-server.jar' + into "build/target" + // Entry script + from file("./flink-job-server.sh") + into "build/target" +} + +docker { + name containerImageName(name: "flink-job-server") + files "./build/" +} + +// Ensure that we build the required resources and copy and file dependencies from related projects +dockerPrepare.dependsOn copyDockerfileDependencies diff --git a/runners/flink/job-server-container/flink-job-server.sh b/runners/flink/job-server-container/flink-job-server.sh new file mode 100755 index 000..399e5e4c64a --- /dev/null +++ b/runners/flink/job-server-container/flink-job-server.sh @@ -0,0 +1,29 @@ +#!/bin/sh +### +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed w
[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container
[ https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=135562&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-135562 ] ASF GitHub Bot logged work on BEAM-4130: Author: ASF GitHub Bot Created on: 17/Aug/18 01:26 Start Date: 17/Aug/18 01:26 Worklog Time Spent: 10m Work Description: angoenka commented on a change in pull request #6238: [BEAM-4130] Build Docker image for Flink's JobServer URL: https://github.com/apache/beam/pull/6238#discussion_r210784330 ## File path: runners/flink/job-server-container/flink-job-server.sh ## @@ -0,0 +1,29 @@ +#!/bin/sh +### +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +### + +### Just a simple script to bootstrap the FlinkJobServerDriver +### For the environment, see the Dockerfile + +# The following (forking to the background, then waiting) enables to use CTRL+C to kill the container. +# We're PID 1 which doesn't handle signals. By forking the Java process to the background, +# a PID > 1 is created which handles signals. After the command shuts down, the script and +# thus the container will also exit. + +java -cp "jars/*" org.apache.beam.runners.flink.FlinkJobServerDriver "$@" & +wait Review comment: please add a new line. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 135562) Time Spent: 4h 40m (was: 4.5h) > Portable Flink runner JobService entry point in a Docker container > -- > > Key: BEAM-4130 > URL: https://issues.apache.org/jira/browse/BEAM-4130 > Project: Beam > Issue Type: New Feature > Components: runner-flink >Reporter: Ben Sidhom >Assignee: Maximilian Michels >Priority: Minor > Time Spent: 4h 40m > Remaining Estimate: 0h > > The portable Flink runner exists as a Job Service that runs somewhere. We > need a main entry point that itself spins up the job service (and artifact > staging service). The main program itself should be packaged into an uberjar > such that it can be run locally or submitted to a Flink deployment via `flink > run`. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container
[ https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=135394&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-135394 ] ASF GitHub Bot logged work on BEAM-4130: Author: ASF GitHub Bot Created on: 16/Aug/18 15:02 Start Date: 16/Aug/18 15:02 Worklog Time Spent: 10m Work Description: mxm opened a new pull request #6238: [BEAM-4130] Build Docker image for Flink's JobServer URL: https://github.com/apache/beam/pull/6238 This adds a new Gradle module flink-job-server-container which builds a docker image during the `docker` task. The image contains the FlinkJobServerDriver which is the entry point for submitting Beam pipelines to the cluster. The image can then be used to spawn a JobServer container when executing a Beam pipeline, i.e. `p.run()`. The SDKs (Java/Python/Go) need to be updated to either spawn up a container or use the address of a remote JobServer. CC @angoenka Post-Commit Tests Status (on master branch) Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark --- | --- | --- | --- | --- | --- | --- | --- Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_GradleBuild/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_GradleBuild/lastCompletedBuild/) | --- | --- | --- | --- | --- | --- Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_GradleBuild/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_GradleBuild/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex_Gradle/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Gradle/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Gradle/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump_Gradle/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza_Gradle/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark_Gradle/lastCompletedBuild/) Python | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/) | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/) [![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | --- | --- | --- | --- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 135394) Time Spent: 4.5h (was: 4h 20m) > Portable Flink runner JobService entry point in a Docker container > -- > > Key: BEAM-4130 > URL: https://issues.apache.org/jira/browse/BEAM-4130 > Project: Beam > Issue Type: New Feature > Components: runner-flink >Reporter: Ben Sidhom >Assignee: Maximilian Michels >Priority: Minor > Time Spent: 4.5h > Remaining Estimate: 0h > > The portable Flink runner exists as a Job Service that runs somewhere. We > need a main entry point that itself spins up the job service (and artifact > staging service). The main program itself should be packaged into an uberjar
[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container
[ https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=112078&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-112078 ] ASF GitHub Bot logged work on BEAM-4130: Author: ASF GitHub Bot Created on: 14/Jun/18 22:11 Start Date: 14/Jun/18 22:11 Worklog Time Spent: 10m Work Description: angoenka commented on issue #5493: [BEAM-4130] Add job submission capabilities to Flink runner. URL: https://github.com/apache/beam/pull/5493#issuecomment-397454040 In that case we can go ahead with the merge. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 112078) Time Spent: 4h 10m (was: 4h) > Portable Flink runner JobService entry point in a Docker container > -- > > Key: BEAM-4130 > URL: https://issues.apache.org/jira/browse/BEAM-4130 > Project: Beam > Issue Type: New Feature > Components: runner-flink >Reporter: Ben Sidhom >Priority: Minor > Time Spent: 4h 10m > Remaining Estimate: 0h > > The portable Flink runner exists as a Job Service that runs somewhere. We > need a main entry point that itself spins up the job service (and artifact > staging service). The main program itself should be packaged into an uberjar > such that it can be run locally or submitted to a Flink deployment via `flink > run`. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container
[ https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=112079&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-112079 ] ASF GitHub Bot logged work on BEAM-4130: Author: ASF GitHub Bot Created on: 14/Jun/18 22:11 Start Date: 14/Jun/18 22:11 Worklog Time Spent: 10m Work Description: robertwb closed pull request #5493: [BEAM-4130] Add job submission capabilities to Flink runner. URL: https://github.com/apache/beam/pull/5493 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/runners/flink/build.gradle b/runners/flink/build.gradle index ebd607c0e21..4eec38962c6 100644 --- a/runners/flink/build.gradle +++ b/runners/flink/build.gradle @@ -59,6 +59,7 @@ dependencies { shadow library.java.slf4j_api shadow library.java.joda_time shadow library.java.commons_compress + shadow library.java.args4j shadow "org.apache.flink:flink-clients_2.11:$flink_version" shadow "org.apache.flink:flink-core:$flink_version" shadow "org.apache.flink:flink-metrics-core:$flink_version" diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java index b7ee08378dc..60e9be798f7 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java @@ -98,7 +98,7 @@ * FlinkBatchPortablePipelineTranslator translator = * FlinkBatchPortablePipelineTranslator.createTranslator(); * BatchTranslationContext context = - * FlinkBatchPortablePipelineTranslator.createTranslationContext(options); + * FlinkBatchPortablePipelineTranslator.createTranslationContext(jobInfo); * translator.translate(context, pipeline); * ExecutionEnvironment executionEnvironment = context.getExecutionEnvironment(); * // Do something with executionEnvironment... diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvocation.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvocation.java new file mode 100644 index 000..cfc6b3898fe --- /dev/null +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvocation.java @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.function.Consumer; +import javax.annotation.Nullable; +import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessage; +import org.apache.beam.model.jobmanagement.v1.JobApi.JobState; +import org.apache.beam.model.pipeline.v1.RunnerApi; +import org.apache.beam.runners.core.construction.PipelineOptionsTranslation; +import org.apache.beam.runners.core.construction.graph.GreedyPipelineFuser; +import org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation; +import org.apache.beam.runners.fnexecution.provisioning.JobInfo; +import org.apache.beam.sdk.PipelineResult; +import org.apache.beam.sdk.metrics.MetricsEnvironment; +import org.apache.flink.api.common.JobExecutionResult; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Invocation of a Flink Job via {@link FlinkRunner}. + */ +public class FlinkJobInvocation implements JobInvocation { + private static final Logger LO
[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container
[ https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=112076&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-112076 ] ASF GitHub Bot logged work on BEAM-4130: Author: ASF GitHub Bot Created on: 14/Jun/18 22:09 Start Date: 14/Jun/18 22:09 Worklog Time Spent: 10m Work Description: robertwb commented on issue #5493: [BEAM-4130] Add job submission capabilities to Flink runner. URL: https://github.com/apache/beam/pull/5493#issuecomment-397453738 It's just hitting org.apache.beam.runners.direct.portable.ReferenceRunnerTest.pipelineExecution which is unrelated and flaky in head too. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 112076) Time Spent: 4h (was: 3h 50m) > Portable Flink runner JobService entry point in a Docker container > -- > > Key: BEAM-4130 > URL: https://issues.apache.org/jira/browse/BEAM-4130 > Project: Beam > Issue Type: New Feature > Components: runner-flink >Reporter: Ben Sidhom >Priority: Minor > Time Spent: 4h > Remaining Estimate: 0h > > The portable Flink runner exists as a Job Service that runs somewhere. We > need a main entry point that itself spins up the job service (and artifact > staging service). The main program itself should be packaged into an uberjar > such that it can be run locally or submitted to a Flink deployment via `flink > run`. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container
[ https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=112038&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-112038 ] ASF GitHub Bot logged work on BEAM-4130: Author: ASF GitHub Bot Created on: 14/Jun/18 20:21 Start Date: 14/Jun/18 20:21 Worklog Time Spent: 10m Work Description: angoenka commented on issue #5493: [BEAM-4130] Add job submission capabilities to Flink runner. URL: https://github.com/apache/beam/pull/5493#issuecomment-397426050 @robertwb Based on the discussion in the chat thread, should we remove BeamFileSystemArtifactSource as ArtifactRetrivalService is meant to do the same job. If you want we can keep it as it is as I can clean it up when I go about removing ArtifactSource interface. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 112038) Time Spent: 3h 50m (was: 3h 40m) > Portable Flink runner JobService entry point in a Docker container > -- > > Key: BEAM-4130 > URL: https://issues.apache.org/jira/browse/BEAM-4130 > Project: Beam > Issue Type: New Feature > Components: runner-flink >Reporter: Ben Sidhom >Priority: Minor > Time Spent: 3h 50m > Remaining Estimate: 0h > > The portable Flink runner exists as a Job Service that runs somewhere. We > need a main entry point that itself spins up the job service (and artifact > staging service). The main program itself should be packaged into an uberjar > such that it can be run locally or submitted to a Flink deployment via `flink > run`. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container
[ https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=111986&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-111986 ] ASF GitHub Bot logged work on BEAM-4130: Author: ASF GitHub Bot Created on: 14/Jun/18 18:59 Start Date: 14/Jun/18 18:59 Worklog Time Spent: 10m Work Description: angoenka commented on issue #5493: [BEAM-4130] Add job submission capabilities to Flink runner. URL: https://github.com/apache/beam/pull/5493#issuecomment-397403414 LGTM Please make sure that the checks are green. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 111986) Time Spent: 3h 40m (was: 3.5h) > Portable Flink runner JobService entry point in a Docker container > -- > > Key: BEAM-4130 > URL: https://issues.apache.org/jira/browse/BEAM-4130 > Project: Beam > Issue Type: New Feature > Components: runner-flink >Reporter: Ben Sidhom >Priority: Minor > Time Spent: 3h 40m > Remaining Estimate: 0h > > The portable Flink runner exists as a Job Service that runs somewhere. We > need a main entry point that itself spins up the job service (and artifact > staging service). The main program itself should be packaged into an uberjar > such that it can be run locally or submitted to a Flink deployment via `flink > run`. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container
[ https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=111718&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-111718 ] ASF GitHub Bot logged work on BEAM-4130: Author: ASF GitHub Bot Created on: 13/Jun/18 23:28 Start Date: 13/Jun/18 23:28 Worklog Time Spent: 10m Work Description: robertwb commented on a change in pull request #5493: [BEAM-4130] Add job submission capabilities to Flink runner. URL: https://github.com/apache/beam/pull/5493#discussion_r195264829 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvoker.java ## @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink; + +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.protobuf.Struct; +import java.io.IOException; +import java.util.UUID; +import javax.annotation.Nullable; +import org.apache.beam.model.pipeline.v1.RunnerApi; +import org.apache.beam.runners.core.construction.PipelineOptionsTranslation; +import org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation; +import org.apache.beam.runners.fnexecution.jobsubmission.JobInvoker; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Job Invoker for the {@link FlinkRunner}. + */ +public class FlinkJobInvoker implements JobInvoker { + private static final Logger LOG = LoggerFactory.getLogger(FlinkJobInvoker.class); + + public static FlinkJobInvoker create(ListeningExecutorService executorService) { +return new FlinkJobInvoker(executorService); + } + + private final ListeningExecutorService executorService; + + private FlinkJobInvoker(ListeningExecutorService executorService) { +this.executorService = executorService; + } + + @Override + public JobInvocation invoke( + RunnerApi.Pipeline pipeline, Struct options, @Nullable String artifactToken) + throws IOException { +// TODO: How to make Java/Python agree on names of keys and their values? +LOG.trace("Parsing pipeline options"); +FlinkPipelineOptions flinkOptions = PipelineOptionsTranslation.fromProto(options) +.as(FlinkPipelineOptions.class); + +String invocationId = String.format( +"%s_%s", flinkOptions.getJobName(), UUID.randomUUID().toString()); +LOG.debug("Invoking job {}", invocationId); + +// Set Flink Master to [auto] if no option was specified. +if (flinkOptions.getFlinkMaster() == null) { + flinkOptions.setFlinkMaster("[auto]"); +} + +flinkOptions.setRunner(null); Review comment: @axelmagn ? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 111718) Time Spent: 3h 10m (was: 3h) > Portable Flink runner JobService entry point in a Docker container > -- > > Key: BEAM-4130 > URL: https://issues.apache.org/jira/browse/BEAM-4130 > Project: Beam > Issue Type: New Feature > Components: runner-flink >Reporter: Ben Sidhom >Priority: Minor > Time Spent: 3h 10m > Remaining Estimate: 0h > > The portable Flink runner exists as a Job Service that runs somewhere. We > need a main entry point that itself spins up the job service (and artifact > staging service). The main program itself should be packaged into an uberjar > such that it can be run locally or submitted to a Flink deployment via `flink > run`. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container
[ https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=111719&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-111719 ] ASF GitHub Bot logged work on BEAM-4130: Author: ASF GitHub Bot Created on: 13/Jun/18 23:29 Start Date: 13/Jun/18 23:29 Worklog Time Spent: 10m Work Description: robertwb commented on a change in pull request #5493: [BEAM-4130] Add job submission capabilities to Flink runner. URL: https://github.com/apache/beam/pull/5493#discussion_r195264045 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvocation.java ## @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink; + +import static com.google.common.base.Preconditions.checkArgument; + +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.function.Consumer; +import javax.annotation.Nullable; +import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessage; +import org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum; +import org.apache.beam.model.pipeline.v1.RunnerApi; +import org.apache.beam.runners.core.construction.PipelineOptionsTranslation; +import org.apache.beam.runners.core.construction.graph.GreedyPipelineFuser; +import org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation; +import org.apache.beam.runners.fnexecution.provisioning.JobInfo; +import org.apache.beam.sdk.PipelineResult; +import org.apache.beam.sdk.metrics.MetricsEnvironment; +import org.apache.flink.api.common.JobExecutionResult; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Invocation of a Flink Job via {@link FlinkRunner}. + */ +public class FlinkJobInvocation implements JobInvocation { + private static final Logger LOG = LoggerFactory.getLogger(FlinkJobInvocation.class); + + public static FlinkJobInvocation create(String id, ListeningExecutorService executorService, + RunnerApi.Pipeline pipeline, FlinkPipelineOptions pipelineOptions) { +return new FlinkJobInvocation(id, executorService, pipeline, pipelineOptions); + } + + private final String id; + private final ListeningExecutorService executorService; + private final RunnerApi.Pipeline pipeline; + private final FlinkPipelineOptions pipelineOptions; + private Enum jobState; + private List> stateObservers; + + @Nullable + private ListenableFuture invocationFuture; + + private FlinkJobInvocation(String id, ListeningExecutorService executorService, + RunnerApi.Pipeline pipeline, FlinkPipelineOptions pipelineOptions) { +this.id = id; +this.executorService = executorService; +this.pipeline = pipeline; +this.pipelineOptions = pipelineOptions; +this.invocationFuture = null; +this.jobState = Enum.STOPPED; +this.stateObservers = new ArrayList<>(); + } + + private PipelineResult runPipeline() throws Exception { +LOG.trace("Translating pipeline from proto"); + +MetricsEnvironment.setMetricsSupported(true); + +LOG.info("Translating pipeline to Flink program."); +// Fused pipeline proto. +RunnerApi.Pipeline fusedPipeline = GreedyPipelineFuser.fuse(pipeline).toPipeline(); +JobInfo jobInfo = JobInfo.create( +id, pipelineOptions.getJobName(), PipelineOptionsTranslation.toProto(pipelineOptions)); +final JobExecutionResult result; + +if (!pipelineOptions.isStreaming() && !hasUnboundedPCollections(fusedPipeline)) { + // TODO: Do we need to inspect for unbounded sources before fusing? + // batch translation + FlinkBatchPortablePipelineTranslator translator = + FlinkBatchPortablePipelineTranslator.createTranslator(); + FlinkBatchPortablePipelineTranslator.BatchTranslationContext context = + FlinkBatchPortablePipelineTranslator.createTranslationContext(jobInfo); + tra
[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container
[ https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=111720&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-111720 ] ASF GitHub Bot logged work on BEAM-4130: Author: ASF GitHub Bot Created on: 13/Jun/18 23:29 Start Date: 13/Jun/18 23:29 Worklog Time Spent: 10m Work Description: robertwb commented on a change in pull request #5493: [BEAM-4130] Add job submission capabilities to Flink runner. URL: https://github.com/apache/beam/pull/5493#discussion_r195263412 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvocation.java ## @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink; + +import static com.google.common.base.Preconditions.checkArgument; + +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.function.Consumer; +import javax.annotation.Nullable; +import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessage; +import org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum; +import org.apache.beam.model.pipeline.v1.RunnerApi; +import org.apache.beam.runners.core.construction.PipelineOptionsTranslation; +import org.apache.beam.runners.core.construction.graph.GreedyPipelineFuser; +import org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation; +import org.apache.beam.runners.fnexecution.provisioning.JobInfo; +import org.apache.beam.sdk.PipelineResult; +import org.apache.beam.sdk.metrics.MetricsEnvironment; +import org.apache.flink.api.common.JobExecutionResult; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Invocation of a Flink Job via {@link FlinkRunner}. + */ +public class FlinkJobInvocation implements JobInvocation { + private static final Logger LOG = LoggerFactory.getLogger(FlinkJobInvocation.class); + + public static FlinkJobInvocation create(String id, ListeningExecutorService executorService, + RunnerApi.Pipeline pipeline, FlinkPipelineOptions pipelineOptions) { +return new FlinkJobInvocation(id, executorService, pipeline, pipelineOptions); + } + + private final String id; + private final ListeningExecutorService executorService; + private final RunnerApi.Pipeline pipeline; + private final FlinkPipelineOptions pipelineOptions; + private Enum jobState; + private List> stateObservers; + + @Nullable + private ListenableFuture invocationFuture; + + private FlinkJobInvocation(String id, ListeningExecutorService executorService, + RunnerApi.Pipeline pipeline, FlinkPipelineOptions pipelineOptions) { +this.id = id; +this.executorService = executorService; +this.pipeline = pipeline; +this.pipelineOptions = pipelineOptions; +this.invocationFuture = null; +this.jobState = Enum.STOPPED; +this.stateObservers = new ArrayList<>(); + } + + private PipelineResult runPipeline() throws Exception { +LOG.trace("Translating pipeline from proto"); + +MetricsEnvironment.setMetricsSupported(true); + +LOG.info("Translating pipeline to Flink program."); +// Fused pipeline proto. +RunnerApi.Pipeline fusedPipeline = GreedyPipelineFuser.fuse(pipeline).toPipeline(); +JobInfo jobInfo = JobInfo.create( +id, pipelineOptions.getJobName(), PipelineOptionsTranslation.toProto(pipelineOptions)); +final JobExecutionResult result; + +if (!pipelineOptions.isStreaming() && !hasUnboundedPCollections(fusedPipeline)) { + // TODO: Do we need to inspect for unbounded sources before fusing? + // batch translation + FlinkBatchPortablePipelineTranslator translator = + FlinkBatchPortablePipelineTranslator.createTranslator(); + FlinkBatchPortablePipelineTranslator.BatchTranslationContext context = + FlinkBatchPortablePipelineTranslator.createTranslationContext(jobInfo); + tra
[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container
[ https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=111717&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-111717 ] ASF GitHub Bot logged work on BEAM-4130: Author: ASF GitHub Bot Created on: 13/Jun/18 23:28 Start Date: 13/Jun/18 23:28 Worklog Time Spent: 10m Work Description: robertwb commented on a change in pull request #5493: [BEAM-4130] Add job submission capabilities to Flink runner. URL: https://github.com/apache/beam/pull/5493#discussion_r195263336 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvocation.java ## @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink; + +import static com.google.common.base.Preconditions.checkArgument; + +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.function.Consumer; +import javax.annotation.Nullable; +import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessage; +import org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum; +import org.apache.beam.model.pipeline.v1.RunnerApi; +import org.apache.beam.runners.core.construction.PipelineOptionsTranslation; +import org.apache.beam.runners.core.construction.graph.GreedyPipelineFuser; +import org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation; +import org.apache.beam.runners.fnexecution.provisioning.JobInfo; +import org.apache.beam.sdk.PipelineResult; +import org.apache.beam.sdk.metrics.MetricsEnvironment; +import org.apache.flink.api.common.JobExecutionResult; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Invocation of a Flink Job via {@link FlinkRunner}. + */ +public class FlinkJobInvocation implements JobInvocation { + private static final Logger LOG = LoggerFactory.getLogger(FlinkJobInvocation.class); + + public static FlinkJobInvocation create(String id, ListeningExecutorService executorService, + RunnerApi.Pipeline pipeline, FlinkPipelineOptions pipelineOptions) { +return new FlinkJobInvocation(id, executorService, pipeline, pipelineOptions); + } + + private final String id; + private final ListeningExecutorService executorService; + private final RunnerApi.Pipeline pipeline; + private final FlinkPipelineOptions pipelineOptions; + private Enum jobState; + private List> stateObservers; + + @Nullable + private ListenableFuture invocationFuture; + + private FlinkJobInvocation(String id, ListeningExecutorService executorService, + RunnerApi.Pipeline pipeline, FlinkPipelineOptions pipelineOptions) { +this.id = id; +this.executorService = executorService; +this.pipeline = pipeline; +this.pipelineOptions = pipelineOptions; +this.invocationFuture = null; +this.jobState = Enum.STOPPED; +this.stateObservers = new ArrayList<>(); + } + + private PipelineResult runPipeline() throws Exception { +LOG.trace("Translating pipeline from proto"); + +MetricsEnvironment.setMetricsSupported(true); + +LOG.info("Translating pipeline to Flink program."); +// Fused pipeline proto. +RunnerApi.Pipeline fusedPipeline = GreedyPipelineFuser.fuse(pipeline).toPipeline(); +JobInfo jobInfo = JobInfo.create( +id, pipelineOptions.getJobName(), PipelineOptionsTranslation.toProto(pipelineOptions)); +final JobExecutionResult result; + +if (!pipelineOptions.isStreaming() && !hasUnboundedPCollections(fusedPipeline)) { + // TODO: Do we need to inspect for unbounded sources before fusing? + // batch translation + FlinkBatchPortablePipelineTranslator translator = + FlinkBatchPortablePipelineTranslator.createTranslator(); + FlinkBatchPortablePipelineTranslator.BatchTranslationContext context = + FlinkBatchPortablePipelineTranslator.createTranslationContext(jobInfo); + tra
[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container
[ https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=111711&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-111711 ] ASF GitHub Bot logged work on BEAM-4130: Author: ASF GitHub Bot Created on: 13/Jun/18 22:42 Start Date: 13/Jun/18 22:42 Worklog Time Spent: 10m Work Description: robertwb commented on a change in pull request #5493: [BEAM-4130] Add job submission capabilities to Flink runner. URL: https://github.com/apache/beam/pull/5493#discussion_r195258050 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvocation.java ## @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink; + +import static com.google.common.base.Preconditions.checkArgument; + +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.function.Consumer; +import javax.annotation.Nullable; +import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessage; +import org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum; +import org.apache.beam.model.pipeline.v1.RunnerApi; +import org.apache.beam.runners.core.construction.PipelineOptionsTranslation; +import org.apache.beam.runners.core.construction.graph.GreedyPipelineFuser; +import org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation; +import org.apache.beam.runners.fnexecution.provisioning.JobInfo; +import org.apache.beam.sdk.PipelineResult; +import org.apache.beam.sdk.metrics.MetricsEnvironment; +import org.apache.flink.api.common.JobExecutionResult; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Invocation of a Flink Job via {@link FlinkRunner}. + */ +public class FlinkJobInvocation implements JobInvocation { + private static final Logger LOG = LoggerFactory.getLogger(FlinkJobInvocation.class); + + public static FlinkJobInvocation create(String id, ListeningExecutorService executorService, + RunnerApi.Pipeline pipeline, FlinkPipelineOptions pipelineOptions) { +return new FlinkJobInvocation(id, executorService, pipeline, pipelineOptions); + } + + private final String id; + private final ListeningExecutorService executorService; + private final RunnerApi.Pipeline pipeline; + private final FlinkPipelineOptions pipelineOptions; + private Enum jobState; + private List> stateObservers; + + @Nullable + private ListenableFuture invocationFuture; + + private FlinkJobInvocation(String id, ListeningExecutorService executorService, + RunnerApi.Pipeline pipeline, FlinkPipelineOptions pipelineOptions) { +this.id = id; +this.executorService = executorService; +this.pipeline = pipeline; +this.pipelineOptions = pipelineOptions; +this.invocationFuture = null; +this.jobState = Enum.STOPPED; +this.stateObservers = new ArrayList<>(); + } + + private PipelineResult runPipeline() throws Exception { +LOG.trace("Translating pipeline from proto"); Review comment: Done. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 111711) Time Spent: 2h 50m (was: 2h 40m) > Portable Flink runner JobService entry point in a Docker container > -- > > Key: BEAM-4130 > URL: https://issues.apache.org/jira/browse/BEAM-4130 > Project: Beam > Issue Type: New Feature > Components: runner-flink >Reporter: Ben Sidhom >Priority: Minor > Time Spent: 2h 50m >
[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container
[ https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=111701&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-111701 ] ASF GitHub Bot logged work on BEAM-4130: Author: ASF GitHub Bot Created on: 13/Jun/18 22:25 Start Date: 13/Jun/18 22:25 Worklog Time Spent: 10m Work Description: angoenka commented on a change in pull request #5493: [BEAM-4130] Add job submission capabilities to Flink runner. URL: https://github.com/apache/beam/pull/5493#discussion_r195211177 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvocation.java ## @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink; + +import static com.google.common.base.Preconditions.checkArgument; + +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.function.Consumer; +import javax.annotation.Nullable; +import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessage; +import org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum; +import org.apache.beam.model.pipeline.v1.RunnerApi; +import org.apache.beam.runners.core.construction.PipelineOptionsTranslation; +import org.apache.beam.runners.core.construction.graph.GreedyPipelineFuser; +import org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation; +import org.apache.beam.runners.fnexecution.provisioning.JobInfo; +import org.apache.beam.sdk.PipelineResult; +import org.apache.beam.sdk.metrics.MetricsEnvironment; +import org.apache.flink.api.common.JobExecutionResult; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Invocation of a Flink Job via {@link FlinkRunner}. + */ +public class FlinkJobInvocation implements JobInvocation { + private static final Logger LOG = LoggerFactory.getLogger(FlinkJobInvocation.class); + + public static FlinkJobInvocation create(String id, ListeningExecutorService executorService, + RunnerApi.Pipeline pipeline, FlinkPipelineOptions pipelineOptions) { +return new FlinkJobInvocation(id, executorService, pipeline, pipelineOptions); + } + + private final String id; + private final ListeningExecutorService executorService; + private final RunnerApi.Pipeline pipeline; + private final FlinkPipelineOptions pipelineOptions; + private Enum jobState; + private List> stateObservers; + + @Nullable + private ListenableFuture invocationFuture; + + private FlinkJobInvocation(String id, ListeningExecutorService executorService, + RunnerApi.Pipeline pipeline, FlinkPipelineOptions pipelineOptions) { +this.id = id; +this.executorService = executorService; +this.pipeline = pipeline; +this.pipelineOptions = pipelineOptions; +this.invocationFuture = null; +this.jobState = Enum.STOPPED; +this.stateObservers = new ArrayList<>(); + } + + private PipelineResult runPipeline() throws Exception { +LOG.trace("Translating pipeline from proto"); + +MetricsEnvironment.setMetricsSupported(true); + +LOG.info("Translating pipeline to Flink program."); +// Fused pipeline proto. +RunnerApi.Pipeline fusedPipeline = GreedyPipelineFuser.fuse(pipeline).toPipeline(); +JobInfo jobInfo = JobInfo.create( +id, pipelineOptions.getJobName(), PipelineOptionsTranslation.toProto(pipelineOptions)); +final JobExecutionResult result; + +if (!pipelineOptions.isStreaming() && !hasUnboundedPCollections(fusedPipeline)) { + // TODO: Do we need to inspect for unbounded sources before fusing? + // batch translation + FlinkBatchPortablePipelineTranslator translator = + FlinkBatchPortablePipelineTranslator.createTranslator(); + FlinkBatchPortablePipelineTranslator.BatchTranslationContext context = + FlinkBatchPortablePipelineTranslator.createTranslationContext(jobInfo); + tra
[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container
[ https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=111688&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-111688 ] ASF GitHub Bot logged work on BEAM-4130: Author: ASF GitHub Bot Created on: 13/Jun/18 22:25 Start Date: 13/Jun/18 22:25 Worklog Time Spent: 10m Work Description: angoenka commented on a change in pull request #5493: [BEAM-4130] Add job submission capabilities to Flink runner. URL: https://github.com/apache/beam/pull/5493#discussion_r195215990 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java ## @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink; + +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import java.io.IOException; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; +import org.apache.beam.model.pipeline.v1.Endpoints; +import org.apache.beam.runners.fnexecution.GrpcFnServer; +import org.apache.beam.runners.fnexecution.ServerFactory; +import org.apache.beam.runners.fnexecution.artifact.BeamFileSystemArtifactStagingService; +import org.apache.beam.runners.fnexecution.jobsubmission.InMemoryJobService; +import org.apache.beam.runners.fnexecution.jobsubmission.JobInvoker; +import org.kohsuke.args4j.CmdLineException; +import org.kohsuke.args4j.CmdLineParser; +import org.kohsuke.args4j.Option; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** Driver program that starts a job server. */ +public class FlinkJobServerDriver implements Runnable { + private static final Logger LOG = LoggerFactory.getLogger(FlinkJobServerDriver.class); + + private static class ServerConfiguration { +@Option( +name = "--job-host", +required = true, +usage = "The job server host string" +) +private String host = ""; + +@Option( +name = "--artifacts-dir", +usage = "The location to store staged artifact files" +) +private String artifactStagingPath = "/tmp/beam-artifact-staging"; + } + + public static void main(String[] args) { +ServerConfiguration configuration = new ServerConfiguration(); +CmdLineParser parser = new CmdLineParser(configuration); +try { + parser.parseArgument(args); +} catch (CmdLineException e) { + e.printStackTrace(System.err); Review comment: Logger? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 111688) Time Spent: 50m (was: 40m) > Portable Flink runner JobService entry point in a Docker container > -- > > Key: BEAM-4130 > URL: https://issues.apache.org/jira/browse/BEAM-4130 > Project: Beam > Issue Type: New Feature > Components: runner-flink >Reporter: Ben Sidhom >Priority: Minor > Time Spent: 50m > Remaining Estimate: 0h > > The portable Flink runner exists as a Job Service that runs somewhere. We > need a main entry point that itself spins up the job service (and artifact > staging service). The main program itself should be packaged into an uberjar > such that it can be run locally or submitted to a Flink deployment via `flink > run`. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container
[ https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=111694&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-111694 ] ASF GitHub Bot logged work on BEAM-4130: Author: ASF GitHub Bot Created on: 13/Jun/18 22:25 Start Date: 13/Jun/18 22:25 Worklog Time Spent: 10m Work Description: angoenka commented on a change in pull request #5493: [BEAM-4130] Add job submission capabilities to Flink runner. URL: https://github.com/apache/beam/pull/5493#discussion_r195206698 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvocation.java ## @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink; + +import static com.google.common.base.Preconditions.checkArgument; + +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.function.Consumer; +import javax.annotation.Nullable; +import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessage; +import org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum; +import org.apache.beam.model.pipeline.v1.RunnerApi; +import org.apache.beam.runners.core.construction.PipelineOptionsTranslation; +import org.apache.beam.runners.core.construction.graph.GreedyPipelineFuser; +import org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation; +import org.apache.beam.runners.fnexecution.provisioning.JobInfo; +import org.apache.beam.sdk.PipelineResult; +import org.apache.beam.sdk.metrics.MetricsEnvironment; +import org.apache.flink.api.common.JobExecutionResult; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Invocation of a Flink Job via {@link FlinkRunner}. + */ +public class FlinkJobInvocation implements JobInvocation { + private static final Logger LOG = LoggerFactory.getLogger(FlinkJobInvocation.class); + + public static FlinkJobInvocation create(String id, ListeningExecutorService executorService, + RunnerApi.Pipeline pipeline, FlinkPipelineOptions pipelineOptions) { +return new FlinkJobInvocation(id, executorService, pipeline, pipelineOptions); + } + + private final String id; + private final ListeningExecutorService executorService; + private final RunnerApi.Pipeline pipeline; + private final FlinkPipelineOptions pipelineOptions; + private Enum jobState; + private List> stateObservers; + + @Nullable + private ListenableFuture invocationFuture; + + private FlinkJobInvocation(String id, ListeningExecutorService executorService, + RunnerApi.Pipeline pipeline, FlinkPipelineOptions pipelineOptions) { +this.id = id; +this.executorService = executorService; +this.pipeline = pipeline; +this.pipelineOptions = pipelineOptions; +this.invocationFuture = null; +this.jobState = Enum.STOPPED; +this.stateObservers = new ArrayList<>(); + } + + private PipelineResult runPipeline() throws Exception { +LOG.trace("Translating pipeline from proto"); + +MetricsEnvironment.setMetricsSupported(true); + +LOG.info("Translating pipeline to Flink program."); +// Fused pipeline proto. +RunnerApi.Pipeline fusedPipeline = GreedyPipelineFuser.fuse(pipeline).toPipeline(); +JobInfo jobInfo = JobInfo.create( +id, pipelineOptions.getJobName(), PipelineOptionsTranslation.toProto(pipelineOptions)); +final JobExecutionResult result; + +if (!pipelineOptions.isStreaming() && !hasUnboundedPCollections(fusedPipeline)) { + // TODO: Do we need to inspect for unbounded sources before fusing? + // batch translation + FlinkBatchPortablePipelineTranslator translator = + FlinkBatchPortablePipelineTranslator.createTranslator(); + FlinkBatchPortablePipelineTranslator.BatchTranslationContext context = + FlinkBatchPortablePipelineTranslator.createTranslationContext(jobInfo); + tra
[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container
[ https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=111691&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-111691 ] ASF GitHub Bot logged work on BEAM-4130: Author: ASF GitHub Bot Created on: 13/Jun/18 22:25 Start Date: 13/Jun/18 22:25 Worklog Time Spent: 10m Work Description: angoenka commented on a change in pull request #5493: [BEAM-4130] Add job submission capabilities to Flink runner. URL: https://github.com/apache/beam/pull/5493#discussion_r195213110 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvoker.java ## @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink; + +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.protobuf.Struct; +import java.io.IOException; +import java.util.UUID; +import javax.annotation.Nullable; +import org.apache.beam.model.pipeline.v1.RunnerApi; +import org.apache.beam.runners.core.construction.PipelineOptionsTranslation; +import org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation; +import org.apache.beam.runners.fnexecution.jobsubmission.JobInvoker; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Job Invoker for the {@link FlinkRunner}. + */ +public class FlinkJobInvoker implements JobInvoker { + private static final Logger LOG = LoggerFactory.getLogger(FlinkJobInvoker.class); + + public static FlinkJobInvoker create(ListeningExecutorService executorService) { +return new FlinkJobInvoker(executorService); + } + + private final ListeningExecutorService executorService; + + private FlinkJobInvoker(ListeningExecutorService executorService) { +this.executorService = executorService; + } + + @Override + public JobInvocation invoke( + RunnerApi.Pipeline pipeline, Struct options, @Nullable String artifactToken) + throws IOException { +// TODO: How to make Java/Python agree on names of keys and their values? +LOG.trace("Parsing pipeline options"); +FlinkPipelineOptions flinkOptions = PipelineOptionsTranslation.fromProto(options) +.as(FlinkPipelineOptions.class); + +String invocationId = String.format( +"%s_%s", flinkOptions.getJobName(), UUID.randomUUID().toString()); +LOG.debug("Invoking job {}", invocationId); Review comment: Info? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 111691) Time Spent: 1h 20m (was: 1h 10m) > Portable Flink runner JobService entry point in a Docker container > -- > > Key: BEAM-4130 > URL: https://issues.apache.org/jira/browse/BEAM-4130 > Project: Beam > Issue Type: New Feature > Components: runner-flink >Reporter: Ben Sidhom >Priority: Minor > Time Spent: 1h 20m > Remaining Estimate: 0h > > The portable Flink runner exists as a Job Service that runs somewhere. We > need a main entry point that itself spins up the job service (and artifact > staging service). The main program itself should be packaged into an uberjar > such that it can be run locally or submitted to a Flink deployment via `flink > run`. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container
[ https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=111704&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-111704 ] ASF GitHub Bot logged work on BEAM-4130: Author: ASF GitHub Bot Created on: 13/Jun/18 22:25 Start Date: 13/Jun/18 22:25 Worklog Time Spent: 10m Work Description: angoenka commented on a change in pull request #5493: [BEAM-4130] Add job submission capabilities to Flink runner. URL: https://github.com/apache/beam/pull/5493#discussion_r195251040 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java ## @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink; + +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import java.io.IOException; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; +import org.apache.beam.model.pipeline.v1.Endpoints; +import org.apache.beam.runners.fnexecution.GrpcFnServer; +import org.apache.beam.runners.fnexecution.ServerFactory; +import org.apache.beam.runners.fnexecution.artifact.BeamFileSystemArtifactStagingService; +import org.apache.beam.runners.fnexecution.jobsubmission.InMemoryJobService; +import org.apache.beam.runners.fnexecution.jobsubmission.JobInvoker; +import org.kohsuke.args4j.CmdLineException; +import org.kohsuke.args4j.CmdLineParser; +import org.kohsuke.args4j.Option; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** Driver program that starts a job server. */ +public class FlinkJobServerDriver implements Runnable { + private static final Logger LOG = LoggerFactory.getLogger(FlinkJobServerDriver.class); + + private static class ServerConfiguration { +@Option( +name = "--job-host", +required = true, +usage = "The job server host string" +) +private String host = ""; + +@Option( +name = "--artifacts-dir", +usage = "The location to store staged artifact files" +) +private String artifactStagingPath = "/tmp/beam-artifact-staging"; + } + + public static void main(String[] args) { +ServerConfiguration configuration = new ServerConfiguration(); +CmdLineParser parser = new CmdLineParser(configuration); +try { + parser.parseArgument(args); +} catch (CmdLineException e) { + e.printStackTrace(System.err); + printUsage(parser); + return; +} +FlinkJobServerDriver driver = fromConfig(configuration); +driver.run(); + } + + private static void printUsage(CmdLineParser parser) { +System.err.println( +String.format( +"Usage: java %s arguments...", FlinkJobServerDriver.class.getSimpleName())); +parser.printUsage(System.err); +System.err.println(); + } + + public static FlinkJobServerDriver fromConfig(ServerConfiguration configuration) { +ThreadFactory threadFactory = +new ThreadFactoryBuilder().setNameFormat("flink-runner-job-server").setDaemon(true).build(); +ListeningExecutorService executor = + MoreExecutors.listeningDecorator(Executors.newCachedThreadPool(threadFactory)); +ServerFactory serverFactory = ServerFactory.createDefault(); +return create(configuration, executor, serverFactory); + } + + public static FlinkJobServerDriver create( + ServerConfiguration configuration, + ListeningExecutorService executor, + ServerFactory serverFactory) { +return new FlinkJobServerDriver(configuration, executor, serverFactory); + } + + private final ListeningExecutorService executor; Review comment: We should move these variables to the top. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue T
[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container
[ https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=111695&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-111695 ] ASF GitHub Bot logged work on BEAM-4130: Author: ASF GitHub Bot Created on: 13/Jun/18 22:25 Start Date: 13/Jun/18 22:25 Worklog Time Spent: 10m Work Description: angoenka commented on a change in pull request #5493: [BEAM-4130] Add job submission capabilities to Flink runner. URL: https://github.com/apache/beam/pull/5493#discussion_r195254596 ## File path: runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/artifact/BeamFileSystemArtifactSourceTest.java ## @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.fnexecution.artifact; + +import static org.hamcrest.Matchers.containsInAnyOrder; + +import com.google.protobuf.ByteString; +import io.grpc.stub.StreamObserver; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import org.apache.beam.model.jobmanagement.v1.ArtifactApi; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Tests for BeamFileSystemArtifactSource. + */ +@RunWith(JUnit4.class) public class BeamFileSystemArtifactSourceTest { + + BeamFileSystemArtifactStagingService stagingService = new BeamFileSystemArtifactStagingService(); + + @Rule public TemporaryFolder stagingDir = new TemporaryFolder(); + + @Test public void testStagingService() throws Exception { +String stagingSession = "stagingSession"; +String stagingSessionToken = BeamFileSystemArtifactStagingService +.generateStagingSessionToken(stagingSession, stagingDir.newFolder().getPath()); +List metadata = new ArrayList<>(); + + metadata.add(ArtifactApi.ArtifactMetadata.newBuilder().setName("file1").build()); +putArtifactContents(stagingSessionToken, "first", "file1"); + + metadata.add(ArtifactApi.ArtifactMetadata.newBuilder().setName("file2").build()); +putArtifactContents(stagingSessionToken, "second", "file2"); + +String stagingToken = commitManifest(stagingSessionToken, metadata); + +BeamFileSystemArtifactSource artifactSource = new BeamFileSystemArtifactSource(stagingToken); +Assert.assertEquals("first", getArtifactContents(artifactSource, "file1")); +Assert.assertEquals("second", getArtifactContents(artifactSource, "file2")); +Assert.assertThat(artifactSource.getManifest().getArtifactList(), +containsInAnyOrder(metadata.toArray(new ArtifactApi.ArtifactMetadata[0]))); + } + + private String commitManifest(String stagingSessionToken, + List artifacts) { +String[] stagingTokenHolder = new String[1]; +stagingService.commitManifest( + ArtifactApi.CommitManifestRequest.newBuilder().setStagingSessionToken(stagingSessionToken) + .setManifest(ArtifactApi.Manifest.newBuilder().addAllArtifact(artifacts)).build(), +new StreamObserver() { + + @Override public void onNext(ArtifactApi.CommitManifestResponse commitManifestResponse) { +stagingTokenHolder[0] = commitManifestResponse.getRetrievalToken(); + } + + @Override public void onError(Throwable throwable) { +throw new RuntimeException(throwable); + } + + @Override public void onCompleted() { + } +}); + +return stagingTokenHolder[0]; Review comment: We should wait for the commit call to finish before returning. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 111695) Time Spent: 1h 50m (was: 1h 40m) > Portable Flink runner JobSe
[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container
[ https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=111705&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-111705 ] ASF GitHub Bot logged work on BEAM-4130: Author: ASF GitHub Bot Created on: 13/Jun/18 22:25 Start Date: 13/Jun/18 22:25 Worklog Time Spent: 10m Work Description: angoenka commented on a change in pull request #5493: [BEAM-4130] Add job submission capabilities to Flink runner. URL: https://github.com/apache/beam/pull/5493#discussion_r195213503 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvoker.java ## @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink; + +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.protobuf.Struct; +import java.io.IOException; +import java.util.UUID; +import javax.annotation.Nullable; +import org.apache.beam.model.pipeline.v1.RunnerApi; +import org.apache.beam.runners.core.construction.PipelineOptionsTranslation; +import org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation; +import org.apache.beam.runners.fnexecution.jobsubmission.JobInvoker; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Job Invoker for the {@link FlinkRunner}. + */ +public class FlinkJobInvoker implements JobInvoker { + private static final Logger LOG = LoggerFactory.getLogger(FlinkJobInvoker.class); + + public static FlinkJobInvoker create(ListeningExecutorService executorService) { +return new FlinkJobInvoker(executorService); + } + + private final ListeningExecutorService executorService; + + private FlinkJobInvoker(ListeningExecutorService executorService) { +this.executorService = executorService; + } + + @Override + public JobInvocation invoke( + RunnerApi.Pipeline pipeline, Struct options, @Nullable String artifactToken) + throws IOException { +// TODO: How to make Java/Python agree on names of keys and their values? +LOG.trace("Parsing pipeline options"); Review comment: Should we log some info here. If possible with some pipeline identifier. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 111705) Time Spent: 2h 40m (was: 2.5h) > Portable Flink runner JobService entry point in a Docker container > -- > > Key: BEAM-4130 > URL: https://issues.apache.org/jira/browse/BEAM-4130 > Project: Beam > Issue Type: New Feature > Components: runner-flink >Reporter: Ben Sidhom >Priority: Minor > Time Spent: 2h 40m > Remaining Estimate: 0h > > The portable Flink runner exists as a Job Service that runs somewhere. We > need a main entry point that itself spins up the job service (and artifact > staging service). The main program itself should be packaged into an uberjar > such that it can be run locally or submitted to a Flink deployment via `flink > run`. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container
[ https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=111693&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-111693 ] ASF GitHub Bot logged work on BEAM-4130: Author: ASF GitHub Bot Created on: 13/Jun/18 22:25 Start Date: 13/Jun/18 22:25 Worklog Time Spent: 10m Work Description: angoenka commented on a change in pull request #5493: [BEAM-4130] Add job submission capabilities to Flink runner. URL: https://github.com/apache/beam/pull/5493#discussion_r195205949 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvocation.java ## @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink; + +import static com.google.common.base.Preconditions.checkArgument; + +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.function.Consumer; +import javax.annotation.Nullable; +import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessage; +import org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum; +import org.apache.beam.model.pipeline.v1.RunnerApi; +import org.apache.beam.runners.core.construction.PipelineOptionsTranslation; +import org.apache.beam.runners.core.construction.graph.GreedyPipelineFuser; +import org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation; +import org.apache.beam.runners.fnexecution.provisioning.JobInfo; +import org.apache.beam.sdk.PipelineResult; +import org.apache.beam.sdk.metrics.MetricsEnvironment; +import org.apache.flink.api.common.JobExecutionResult; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Invocation of a Flink Job via {@link FlinkRunner}. + */ +public class FlinkJobInvocation implements JobInvocation { + private static final Logger LOG = LoggerFactory.getLogger(FlinkJobInvocation.class); + + public static FlinkJobInvocation create(String id, ListeningExecutorService executorService, + RunnerApi.Pipeline pipeline, FlinkPipelineOptions pipelineOptions) { +return new FlinkJobInvocation(id, executorService, pipeline, pipelineOptions); + } + + private final String id; + private final ListeningExecutorService executorService; + private final RunnerApi.Pipeline pipeline; + private final FlinkPipelineOptions pipelineOptions; + private Enum jobState; + private List> stateObservers; + + @Nullable + private ListenableFuture invocationFuture; + + private FlinkJobInvocation(String id, ListeningExecutorService executorService, + RunnerApi.Pipeline pipeline, FlinkPipelineOptions pipelineOptions) { +this.id = id; +this.executorService = executorService; +this.pipeline = pipeline; +this.pipelineOptions = pipelineOptions; +this.invocationFuture = null; +this.jobState = Enum.STOPPED; +this.stateObservers = new ArrayList<>(); + } + + private PipelineResult runPipeline() throws Exception { +LOG.trace("Translating pipeline from proto"); + +MetricsEnvironment.setMetricsSupported(true); + +LOG.info("Translating pipeline to Flink program."); +// Fused pipeline proto. +RunnerApi.Pipeline fusedPipeline = GreedyPipelineFuser.fuse(pipeline).toPipeline(); +JobInfo jobInfo = JobInfo.create( +id, pipelineOptions.getJobName(), PipelineOptionsTranslation.toProto(pipelineOptions)); +final JobExecutionResult result; + +if (!pipelineOptions.isStreaming() && !hasUnboundedPCollections(fusedPipeline)) { + // TODO: Do we need to inspect for unbounded sources before fusing? + // batch translation + FlinkBatchPortablePipelineTranslator translator = + FlinkBatchPortablePipelineTranslator.createTranslator(); + FlinkBatchPortablePipelineTranslator.BatchTranslationContext context = Review comment: Should we only create the context in the if-else block and do the remain
[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container
[ https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=111703&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-111703 ] ASF GitHub Bot logged work on BEAM-4130: Author: ASF GitHub Bot Created on: 13/Jun/18 22:25 Start Date: 13/Jun/18 22:25 Worklog Time Spent: 10m Work Description: angoenka commented on a change in pull request #5493: [BEAM-4130] Add job submission capabilities to Flink runner. URL: https://github.com/apache/beam/pull/5493#discussion_r195209989 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvocation.java ## @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink; + +import static com.google.common.base.Preconditions.checkArgument; + +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.function.Consumer; +import javax.annotation.Nullable; +import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessage; +import org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum; +import org.apache.beam.model.pipeline.v1.RunnerApi; +import org.apache.beam.runners.core.construction.PipelineOptionsTranslation; +import org.apache.beam.runners.core.construction.graph.GreedyPipelineFuser; +import org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation; +import org.apache.beam.runners.fnexecution.provisioning.JobInfo; +import org.apache.beam.sdk.PipelineResult; +import org.apache.beam.sdk.metrics.MetricsEnvironment; +import org.apache.flink.api.common.JobExecutionResult; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Invocation of a Flink Job via {@link FlinkRunner}. + */ +public class FlinkJobInvocation implements JobInvocation { + private static final Logger LOG = LoggerFactory.getLogger(FlinkJobInvocation.class); + + public static FlinkJobInvocation create(String id, ListeningExecutorService executorService, + RunnerApi.Pipeline pipeline, FlinkPipelineOptions pipelineOptions) { +return new FlinkJobInvocation(id, executorService, pipeline, pipelineOptions); + } + + private final String id; + private final ListeningExecutorService executorService; + private final RunnerApi.Pipeline pipeline; + private final FlinkPipelineOptions pipelineOptions; + private Enum jobState; + private List> stateObservers; + + @Nullable + private ListenableFuture invocationFuture; + + private FlinkJobInvocation(String id, ListeningExecutorService executorService, + RunnerApi.Pipeline pipeline, FlinkPipelineOptions pipelineOptions) { +this.id = id; +this.executorService = executorService; +this.pipeline = pipeline; +this.pipelineOptions = pipelineOptions; +this.invocationFuture = null; +this.jobState = Enum.STOPPED; +this.stateObservers = new ArrayList<>(); + } + + private PipelineResult runPipeline() throws Exception { +LOG.trace("Translating pipeline from proto"); + +MetricsEnvironment.setMetricsSupported(true); + +LOG.info("Translating pipeline to Flink program."); +// Fused pipeline proto. +RunnerApi.Pipeline fusedPipeline = GreedyPipelineFuser.fuse(pipeline).toPipeline(); +JobInfo jobInfo = JobInfo.create( +id, pipelineOptions.getJobName(), PipelineOptionsTranslation.toProto(pipelineOptions)); +final JobExecutionResult result; + +if (!pipelineOptions.isStreaming() && !hasUnboundedPCollections(fusedPipeline)) { + // TODO: Do we need to inspect for unbounded sources before fusing? + // batch translation + FlinkBatchPortablePipelineTranslator translator = + FlinkBatchPortablePipelineTranslator.createTranslator(); + FlinkBatchPortablePipelineTranslator.BatchTranslationContext context = + FlinkBatchPortablePipelineTranslator.createTranslationContext(jobInfo); + tra
[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container
[ https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=111699&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-111699 ] ASF GitHub Bot logged work on BEAM-4130: Author: ASF GitHub Bot Created on: 13/Jun/18 22:25 Start Date: 13/Jun/18 22:25 Worklog Time Spent: 10m Work Description: angoenka commented on a change in pull request #5493: [BEAM-4130] Add job submission capabilities to Flink runner. URL: https://github.com/apache/beam/pull/5493#discussion_r195207823 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvocation.java ## @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink; + +import static com.google.common.base.Preconditions.checkArgument; + +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.function.Consumer; +import javax.annotation.Nullable; +import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessage; +import org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum; +import org.apache.beam.model.pipeline.v1.RunnerApi; +import org.apache.beam.runners.core.construction.PipelineOptionsTranslation; +import org.apache.beam.runners.core.construction.graph.GreedyPipelineFuser; +import org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation; +import org.apache.beam.runners.fnexecution.provisioning.JobInfo; +import org.apache.beam.sdk.PipelineResult; +import org.apache.beam.sdk.metrics.MetricsEnvironment; +import org.apache.flink.api.common.JobExecutionResult; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Invocation of a Flink Job via {@link FlinkRunner}. + */ +public class FlinkJobInvocation implements JobInvocation { + private static final Logger LOG = LoggerFactory.getLogger(FlinkJobInvocation.class); + + public static FlinkJobInvocation create(String id, ListeningExecutorService executorService, + RunnerApi.Pipeline pipeline, FlinkPipelineOptions pipelineOptions) { +return new FlinkJobInvocation(id, executorService, pipeline, pipelineOptions); + } + + private final String id; + private final ListeningExecutorService executorService; + private final RunnerApi.Pipeline pipeline; + private final FlinkPipelineOptions pipelineOptions; + private Enum jobState; + private List> stateObservers; + + @Nullable + private ListenableFuture invocationFuture; + + private FlinkJobInvocation(String id, ListeningExecutorService executorService, + RunnerApi.Pipeline pipeline, FlinkPipelineOptions pipelineOptions) { +this.id = id; +this.executorService = executorService; +this.pipeline = pipeline; +this.pipelineOptions = pipelineOptions; +this.invocationFuture = null; +this.jobState = Enum.STOPPED; +this.stateObservers = new ArrayList<>(); + } + + private PipelineResult runPipeline() throws Exception { +LOG.trace("Translating pipeline from proto"); + +MetricsEnvironment.setMetricsSupported(true); + +LOG.info("Translating pipeline to Flink program."); +// Fused pipeline proto. +RunnerApi.Pipeline fusedPipeline = GreedyPipelineFuser.fuse(pipeline).toPipeline(); +JobInfo jobInfo = JobInfo.create( +id, pipelineOptions.getJobName(), PipelineOptionsTranslation.toProto(pipelineOptions)); +final JobExecutionResult result; + +if (!pipelineOptions.isStreaming() && !hasUnboundedPCollections(fusedPipeline)) { + // TODO: Do we need to inspect for unbounded sources before fusing? + // batch translation + FlinkBatchPortablePipelineTranslator translator = + FlinkBatchPortablePipelineTranslator.createTranslator(); + FlinkBatchPortablePipelineTranslator.BatchTranslationContext context = + FlinkBatchPortablePipelineTranslator.createTranslationContext(jobInfo); + tra
[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container
[ https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=111707&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-111707 ] ASF GitHub Bot logged work on BEAM-4130: Author: ASF GitHub Bot Created on: 13/Jun/18 22:25 Start Date: 13/Jun/18 22:25 Worklog Time Spent: 10m Work Description: angoenka commented on a change in pull request #5493: [BEAM-4130] Add job submission capabilities to Flink runner. URL: https://github.com/apache/beam/pull/5493#discussion_r195210651 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvocation.java ## @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink; + +import static com.google.common.base.Preconditions.checkArgument; + +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.function.Consumer; +import javax.annotation.Nullable; +import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessage; +import org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum; +import org.apache.beam.model.pipeline.v1.RunnerApi; +import org.apache.beam.runners.core.construction.PipelineOptionsTranslation; +import org.apache.beam.runners.core.construction.graph.GreedyPipelineFuser; +import org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation; +import org.apache.beam.runners.fnexecution.provisioning.JobInfo; +import org.apache.beam.sdk.PipelineResult; +import org.apache.beam.sdk.metrics.MetricsEnvironment; +import org.apache.flink.api.common.JobExecutionResult; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Invocation of a Flink Job via {@link FlinkRunner}. + */ +public class FlinkJobInvocation implements JobInvocation { + private static final Logger LOG = LoggerFactory.getLogger(FlinkJobInvocation.class); + + public static FlinkJobInvocation create(String id, ListeningExecutorService executorService, + RunnerApi.Pipeline pipeline, FlinkPipelineOptions pipelineOptions) { +return new FlinkJobInvocation(id, executorService, pipeline, pipelineOptions); + } + + private final String id; + private final ListeningExecutorService executorService; + private final RunnerApi.Pipeline pipeline; + private final FlinkPipelineOptions pipelineOptions; + private Enum jobState; + private List> stateObservers; + + @Nullable + private ListenableFuture invocationFuture; + + private FlinkJobInvocation(String id, ListeningExecutorService executorService, + RunnerApi.Pipeline pipeline, FlinkPipelineOptions pipelineOptions) { +this.id = id; +this.executorService = executorService; +this.pipeline = pipeline; +this.pipelineOptions = pipelineOptions; +this.invocationFuture = null; +this.jobState = Enum.STOPPED; +this.stateObservers = new ArrayList<>(); + } + + private PipelineResult runPipeline() throws Exception { +LOG.trace("Translating pipeline from proto"); + +MetricsEnvironment.setMetricsSupported(true); + +LOG.info("Translating pipeline to Flink program."); +// Fused pipeline proto. +RunnerApi.Pipeline fusedPipeline = GreedyPipelineFuser.fuse(pipeline).toPipeline(); +JobInfo jobInfo = JobInfo.create( +id, pipelineOptions.getJobName(), PipelineOptionsTranslation.toProto(pipelineOptions)); +final JobExecutionResult result; + +if (!pipelineOptions.isStreaming() && !hasUnboundedPCollections(fusedPipeline)) { + // TODO: Do we need to inspect for unbounded sources before fusing? + // batch translation + FlinkBatchPortablePipelineTranslator translator = + FlinkBatchPortablePipelineTranslator.createTranslator(); + FlinkBatchPortablePipelineTranslator.BatchTranslationContext context = + FlinkBatchPortablePipelineTranslator.createTranslationContext(jobInfo); + tra
[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container
[ https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=111692&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-111692 ] ASF GitHub Bot logged work on BEAM-4130: Author: ASF GitHub Bot Created on: 13/Jun/18 22:25 Start Date: 13/Jun/18 22:25 Worklog Time Spent: 10m Work Description: angoenka commented on a change in pull request #5493: [BEAM-4130] Add job submission capabilities to Flink runner. URL: https://github.com/apache/beam/pull/5493#discussion_r195208514 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvocation.java ## @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink; + +import static com.google.common.base.Preconditions.checkArgument; + +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.function.Consumer; +import javax.annotation.Nullable; +import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessage; +import org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum; +import org.apache.beam.model.pipeline.v1.RunnerApi; +import org.apache.beam.runners.core.construction.PipelineOptionsTranslation; +import org.apache.beam.runners.core.construction.graph.GreedyPipelineFuser; +import org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation; +import org.apache.beam.runners.fnexecution.provisioning.JobInfo; +import org.apache.beam.sdk.PipelineResult; +import org.apache.beam.sdk.metrics.MetricsEnvironment; +import org.apache.flink.api.common.JobExecutionResult; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Invocation of a Flink Job via {@link FlinkRunner}. + */ +public class FlinkJobInvocation implements JobInvocation { + private static final Logger LOG = LoggerFactory.getLogger(FlinkJobInvocation.class); + + public static FlinkJobInvocation create(String id, ListeningExecutorService executorService, + RunnerApi.Pipeline pipeline, FlinkPipelineOptions pipelineOptions) { +return new FlinkJobInvocation(id, executorService, pipeline, pipelineOptions); + } + + private final String id; + private final ListeningExecutorService executorService; + private final RunnerApi.Pipeline pipeline; + private final FlinkPipelineOptions pipelineOptions; + private Enum jobState; + private List> stateObservers; + + @Nullable + private ListenableFuture invocationFuture; + + private FlinkJobInvocation(String id, ListeningExecutorService executorService, + RunnerApi.Pipeline pipeline, FlinkPipelineOptions pipelineOptions) { +this.id = id; +this.executorService = executorService; +this.pipeline = pipeline; +this.pipelineOptions = pipelineOptions; +this.invocationFuture = null; +this.jobState = Enum.STOPPED; +this.stateObservers = new ArrayList<>(); + } + + private PipelineResult runPipeline() throws Exception { +LOG.trace("Translating pipeline from proto"); + +MetricsEnvironment.setMetricsSupported(true); + +LOG.info("Translating pipeline to Flink program."); +// Fused pipeline proto. +RunnerApi.Pipeline fusedPipeline = GreedyPipelineFuser.fuse(pipeline).toPipeline(); +JobInfo jobInfo = JobInfo.create( +id, pipelineOptions.getJobName(), PipelineOptionsTranslation.toProto(pipelineOptions)); +final JobExecutionResult result; + +if (!pipelineOptions.isStreaming() && !hasUnboundedPCollections(fusedPipeline)) { + // TODO: Do we need to inspect for unbounded sources before fusing? + // batch translation + FlinkBatchPortablePipelineTranslator translator = + FlinkBatchPortablePipelineTranslator.createTranslator(); + FlinkBatchPortablePipelineTranslator.BatchTranslationContext context = + FlinkBatchPortablePipelineTranslator.createTranslationContext(jobInfo); + tra
[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container
[ https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=111696&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-111696 ] ASF GitHub Bot logged work on BEAM-4130: Author: ASF GitHub Bot Created on: 13/Jun/18 22:25 Start Date: 13/Jun/18 22:25 Worklog Time Spent: 10m Work Description: angoenka commented on a change in pull request #5493: [BEAM-4130] Add job submission capabilities to Flink runner. URL: https://github.com/apache/beam/pull/5493#discussion_r195209626 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvocation.java ## @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink; + +import static com.google.common.base.Preconditions.checkArgument; + +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.function.Consumer; +import javax.annotation.Nullable; +import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessage; +import org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum; +import org.apache.beam.model.pipeline.v1.RunnerApi; +import org.apache.beam.runners.core.construction.PipelineOptionsTranslation; +import org.apache.beam.runners.core.construction.graph.GreedyPipelineFuser; +import org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation; +import org.apache.beam.runners.fnexecution.provisioning.JobInfo; +import org.apache.beam.sdk.PipelineResult; +import org.apache.beam.sdk.metrics.MetricsEnvironment; +import org.apache.flink.api.common.JobExecutionResult; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Invocation of a Flink Job via {@link FlinkRunner}. + */ +public class FlinkJobInvocation implements JobInvocation { + private static final Logger LOG = LoggerFactory.getLogger(FlinkJobInvocation.class); + + public static FlinkJobInvocation create(String id, ListeningExecutorService executorService, + RunnerApi.Pipeline pipeline, FlinkPipelineOptions pipelineOptions) { +return new FlinkJobInvocation(id, executorService, pipeline, pipelineOptions); + } + + private final String id; + private final ListeningExecutorService executorService; + private final RunnerApi.Pipeline pipeline; + private final FlinkPipelineOptions pipelineOptions; + private Enum jobState; + private List> stateObservers; + + @Nullable + private ListenableFuture invocationFuture; + + private FlinkJobInvocation(String id, ListeningExecutorService executorService, + RunnerApi.Pipeline pipeline, FlinkPipelineOptions pipelineOptions) { +this.id = id; +this.executorService = executorService; +this.pipeline = pipeline; +this.pipelineOptions = pipelineOptions; +this.invocationFuture = null; +this.jobState = Enum.STOPPED; +this.stateObservers = new ArrayList<>(); + } + + private PipelineResult runPipeline() throws Exception { +LOG.trace("Translating pipeline from proto"); + +MetricsEnvironment.setMetricsSupported(true); + +LOG.info("Translating pipeline to Flink program."); +// Fused pipeline proto. +RunnerApi.Pipeline fusedPipeline = GreedyPipelineFuser.fuse(pipeline).toPipeline(); +JobInfo jobInfo = JobInfo.create( +id, pipelineOptions.getJobName(), PipelineOptionsTranslation.toProto(pipelineOptions)); +final JobExecutionResult result; + +if (!pipelineOptions.isStreaming() && !hasUnboundedPCollections(fusedPipeline)) { + // TODO: Do we need to inspect for unbounded sources before fusing? + // batch translation + FlinkBatchPortablePipelineTranslator translator = + FlinkBatchPortablePipelineTranslator.createTranslator(); + FlinkBatchPortablePipelineTranslator.BatchTranslationContext context = + FlinkBatchPortablePipelineTranslator.createTranslationContext(jobInfo); + tra
[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container
[ https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=111708&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-111708 ] ASF GitHub Bot logged work on BEAM-4130: Author: ASF GitHub Bot Created on: 13/Jun/18 22:25 Start Date: 13/Jun/18 22:25 Worklog Time Spent: 10m Work Description: angoenka commented on a change in pull request #5493: [BEAM-4130] Add job submission capabilities to Flink runner. URL: https://github.com/apache/beam/pull/5493#discussion_r195214125 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvoker.java ## @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink; + +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.protobuf.Struct; +import java.io.IOException; +import java.util.UUID; +import javax.annotation.Nullable; +import org.apache.beam.model.pipeline.v1.RunnerApi; +import org.apache.beam.runners.core.construction.PipelineOptionsTranslation; +import org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation; +import org.apache.beam.runners.fnexecution.jobsubmission.JobInvoker; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Job Invoker for the {@link FlinkRunner}. + */ +public class FlinkJobInvoker implements JobInvoker { + private static final Logger LOG = LoggerFactory.getLogger(FlinkJobInvoker.class); + + public static FlinkJobInvoker create(ListeningExecutorService executorService) { +return new FlinkJobInvoker(executorService); + } + + private final ListeningExecutorService executorService; + + private FlinkJobInvoker(ListeningExecutorService executorService) { +this.executorService = executorService; + } + + @Override + public JobInvocation invoke( + RunnerApi.Pipeline pipeline, Struct options, @Nullable String artifactToken) + throws IOException { +// TODO: How to make Java/Python agree on names of keys and their values? +LOG.trace("Parsing pipeline options"); +FlinkPipelineOptions flinkOptions = PipelineOptionsTranslation.fromProto(options) +.as(FlinkPipelineOptions.class); + +String invocationId = String.format( +"%s_%s", flinkOptions.getJobName(), UUID.randomUUID().toString()); +LOG.debug("Invoking job {}", invocationId); + +// Set Flink Master to [auto] if no option was specified. +if (flinkOptions.getFlinkMaster() == null) { + flinkOptions.setFlinkMaster("[auto]"); +} + +flinkOptions.setRunner(null); Review comment: Document why do we need to set runner as null. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 111708) > Portable Flink runner JobService entry point in a Docker container > -- > > Key: BEAM-4130 > URL: https://issues.apache.org/jira/browse/BEAM-4130 > Project: Beam > Issue Type: New Feature > Components: runner-flink >Reporter: Ben Sidhom >Priority: Minor > Time Spent: 2h 40m > Remaining Estimate: 0h > > The portable Flink runner exists as a Job Service that runs somewhere. We > need a main entry point that itself spins up the job service (and artifact > staging service). The main program itself should be packaged into an uberjar > such that it can be run locally or submitted to a Flink deployment via `flink > run`. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container
[ https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=111697&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-111697 ] ASF GitHub Bot logged work on BEAM-4130: Author: ASF GitHub Bot Created on: 13/Jun/18 22:25 Start Date: 13/Jun/18 22:25 Worklog Time Spent: 10m Work Description: angoenka commented on a change in pull request #5493: [BEAM-4130] Add job submission capabilities to Flink runner. URL: https://github.com/apache/beam/pull/5493#discussion_r195209114 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvocation.java ## @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink; + +import static com.google.common.base.Preconditions.checkArgument; + +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.function.Consumer; +import javax.annotation.Nullable; +import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessage; +import org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum; +import org.apache.beam.model.pipeline.v1.RunnerApi; +import org.apache.beam.runners.core.construction.PipelineOptionsTranslation; +import org.apache.beam.runners.core.construction.graph.GreedyPipelineFuser; +import org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation; +import org.apache.beam.runners.fnexecution.provisioning.JobInfo; +import org.apache.beam.sdk.PipelineResult; +import org.apache.beam.sdk.metrics.MetricsEnvironment; +import org.apache.flink.api.common.JobExecutionResult; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Invocation of a Flink Job via {@link FlinkRunner}. + */ +public class FlinkJobInvocation implements JobInvocation { + private static final Logger LOG = LoggerFactory.getLogger(FlinkJobInvocation.class); + + public static FlinkJobInvocation create(String id, ListeningExecutorService executorService, + RunnerApi.Pipeline pipeline, FlinkPipelineOptions pipelineOptions) { +return new FlinkJobInvocation(id, executorService, pipeline, pipelineOptions); + } + + private final String id; + private final ListeningExecutorService executorService; + private final RunnerApi.Pipeline pipeline; + private final FlinkPipelineOptions pipelineOptions; + private Enum jobState; + private List> stateObservers; + + @Nullable + private ListenableFuture invocationFuture; + + private FlinkJobInvocation(String id, ListeningExecutorService executorService, + RunnerApi.Pipeline pipeline, FlinkPipelineOptions pipelineOptions) { +this.id = id; +this.executorService = executorService; +this.pipeline = pipeline; +this.pipelineOptions = pipelineOptions; +this.invocationFuture = null; +this.jobState = Enum.STOPPED; +this.stateObservers = new ArrayList<>(); + } + + private PipelineResult runPipeline() throws Exception { +LOG.trace("Translating pipeline from proto"); + +MetricsEnvironment.setMetricsSupported(true); + +LOG.info("Translating pipeline to Flink program."); +// Fused pipeline proto. +RunnerApi.Pipeline fusedPipeline = GreedyPipelineFuser.fuse(pipeline).toPipeline(); +JobInfo jobInfo = JobInfo.create( +id, pipelineOptions.getJobName(), PipelineOptionsTranslation.toProto(pipelineOptions)); +final JobExecutionResult result; + +if (!pipelineOptions.isStreaming() && !hasUnboundedPCollections(fusedPipeline)) { + // TODO: Do we need to inspect for unbounded sources before fusing? + // batch translation + FlinkBatchPortablePipelineTranslator translator = + FlinkBatchPortablePipelineTranslator.createTranslator(); + FlinkBatchPortablePipelineTranslator.BatchTranslationContext context = + FlinkBatchPortablePipelineTranslator.createTranslationContext(jobInfo); + tra
[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container
[ https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=111702&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-111702 ] ASF GitHub Bot logged work on BEAM-4130: Author: ASF GitHub Bot Created on: 13/Jun/18 22:25 Start Date: 13/Jun/18 22:25 Worklog Time Spent: 10m Work Description: angoenka commented on a change in pull request #5493: [BEAM-4130] Add job submission capabilities to Flink runner. URL: https://github.com/apache/beam/pull/5493#discussion_r195254764 ## File path: runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/artifact/BeamFileSystemArtifactSourceTest.java ## @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.fnexecution.artifact; + +import static org.hamcrest.Matchers.containsInAnyOrder; + +import com.google.protobuf.ByteString; +import io.grpc.stub.StreamObserver; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import org.apache.beam.model.jobmanagement.v1.ArtifactApi; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Tests for BeamFileSystemArtifactSource. + */ +@RunWith(JUnit4.class) public class BeamFileSystemArtifactSourceTest { + + BeamFileSystemArtifactStagingService stagingService = new BeamFileSystemArtifactStagingService(); + + @Rule public TemporaryFolder stagingDir = new TemporaryFolder(); + + @Test public void testStagingService() throws Exception { +String stagingSession = "stagingSession"; +String stagingSessionToken = BeamFileSystemArtifactStagingService +.generateStagingSessionToken(stagingSession, stagingDir.newFolder().getPath()); +List metadata = new ArrayList<>(); + + metadata.add(ArtifactApi.ArtifactMetadata.newBuilder().setName("file1").build()); +putArtifactContents(stagingSessionToken, "first", "file1"); + + metadata.add(ArtifactApi.ArtifactMetadata.newBuilder().setName("file2").build()); +putArtifactContents(stagingSessionToken, "second", "file2"); + +String stagingToken = commitManifest(stagingSessionToken, metadata); + +BeamFileSystemArtifactSource artifactSource = new BeamFileSystemArtifactSource(stagingToken); +Assert.assertEquals("first", getArtifactContents(artifactSource, "file1")); +Assert.assertEquals("second", getArtifactContents(artifactSource, "file2")); +Assert.assertThat(artifactSource.getManifest().getArtifactList(), +containsInAnyOrder(metadata.toArray(new ArtifactApi.ArtifactMetadata[0]))); + } + + private String commitManifest(String stagingSessionToken, + List artifacts) { +String[] stagingTokenHolder = new String[1]; +stagingService.commitManifest( + ArtifactApi.CommitManifestRequest.newBuilder().setStagingSessionToken(stagingSessionToken) + .setManifest(ArtifactApi.Manifest.newBuilder().addAllArtifact(artifacts)).build(), +new StreamObserver() { + + @Override public void onNext(ArtifactApi.CommitManifestResponse commitManifestResponse) { +stagingTokenHolder[0] = commitManifestResponse.getRetrievalToken(); + } + + @Override public void onError(Throwable throwable) { +throw new RuntimeException(throwable); + } + + @Override public void onCompleted() { + } +}); + +return stagingTokenHolder[0]; + } + + private void putArtifactContents(String stagingSessionToken, String contents, String name) { +StreamObserver outputStreamObserver = stagingService +.putArtifact(new StreamObserver() { + + @Override public void onNext(ArtifactApi.PutArtifactResponse putArtifactResponse) { + } + + @Override public void onError(Throwable throwable) { +throw new RuntimeException(throwable); + } + + @Override public void onCompleted() { + } +}); + + outputStrea
[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container
[ https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=111706&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-111706 ] ASF GitHub Bot logged work on BEAM-4130: Author: ASF GitHub Bot Created on: 13/Jun/18 22:25 Start Date: 13/Jun/18 22:25 Worklog Time Spent: 10m Work Description: angoenka commented on a change in pull request #5493: [BEAM-4130] Add job submission capabilities to Flink runner. URL: https://github.com/apache/beam/pull/5493#discussion_r195253607 ## File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/InMemoryJobService.java ## @@ -60,17 +60,36 @@ public static InMemoryJobService create( Review comment: Can we remove this method? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 111706) Time Spent: 2h 40m (was: 2.5h) > Portable Flink runner JobService entry point in a Docker container > -- > > Key: BEAM-4130 > URL: https://issues.apache.org/jira/browse/BEAM-4130 > Project: Beam > Issue Type: New Feature > Components: runner-flink >Reporter: Ben Sidhom >Priority: Minor > Time Spent: 2h 40m > Remaining Estimate: 0h > > The portable Flink runner exists as a Job Service that runs somewhere. We > need a main entry point that itself spins up the job service (and artifact > staging service). The main program itself should be packaged into an uberjar > such that it can be run locally or submitted to a Flink deployment via `flink > run`. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container
[ https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=111698&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-111698 ] ASF GitHub Bot logged work on BEAM-4130: Author: ASF GitHub Bot Created on: 13/Jun/18 22:25 Start Date: 13/Jun/18 22:25 Worklog Time Spent: 10m Work Description: angoenka commented on a change in pull request #5493: [BEAM-4130] Add job submission capabilities to Flink runner. URL: https://github.com/apache/beam/pull/5493#discussion_r195208381 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvocation.java ## @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink; + +import static com.google.common.base.Preconditions.checkArgument; + +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.function.Consumer; +import javax.annotation.Nullable; +import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessage; +import org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum; +import org.apache.beam.model.pipeline.v1.RunnerApi; +import org.apache.beam.runners.core.construction.PipelineOptionsTranslation; +import org.apache.beam.runners.core.construction.graph.GreedyPipelineFuser; +import org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation; +import org.apache.beam.runners.fnexecution.provisioning.JobInfo; +import org.apache.beam.sdk.PipelineResult; +import org.apache.beam.sdk.metrics.MetricsEnvironment; +import org.apache.flink.api.common.JobExecutionResult; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Invocation of a Flink Job via {@link FlinkRunner}. + */ +public class FlinkJobInvocation implements JobInvocation { + private static final Logger LOG = LoggerFactory.getLogger(FlinkJobInvocation.class); + + public static FlinkJobInvocation create(String id, ListeningExecutorService executorService, + RunnerApi.Pipeline pipeline, FlinkPipelineOptions pipelineOptions) { +return new FlinkJobInvocation(id, executorService, pipeline, pipelineOptions); + } + + private final String id; + private final ListeningExecutorService executorService; + private final RunnerApi.Pipeline pipeline; + private final FlinkPipelineOptions pipelineOptions; + private Enum jobState; + private List> stateObservers; + + @Nullable + private ListenableFuture invocationFuture; + + private FlinkJobInvocation(String id, ListeningExecutorService executorService, + RunnerApi.Pipeline pipeline, FlinkPipelineOptions pipelineOptions) { +this.id = id; +this.executorService = executorService; +this.pipeline = pipeline; +this.pipelineOptions = pipelineOptions; +this.invocationFuture = null; +this.jobState = Enum.STOPPED; +this.stateObservers = new ArrayList<>(); + } + + private PipelineResult runPipeline() throws Exception { +LOG.trace("Translating pipeline from proto"); + +MetricsEnvironment.setMetricsSupported(true); + +LOG.info("Translating pipeline to Flink program."); +// Fused pipeline proto. +RunnerApi.Pipeline fusedPipeline = GreedyPipelineFuser.fuse(pipeline).toPipeline(); +JobInfo jobInfo = JobInfo.create( +id, pipelineOptions.getJobName(), PipelineOptionsTranslation.toProto(pipelineOptions)); +final JobExecutionResult result; + +if (!pipelineOptions.isStreaming() && !hasUnboundedPCollections(fusedPipeline)) { + // TODO: Do we need to inspect for unbounded sources before fusing? + // batch translation + FlinkBatchPortablePipelineTranslator translator = + FlinkBatchPortablePipelineTranslator.createTranslator(); + FlinkBatchPortablePipelineTranslator.BatchTranslationContext context = + FlinkBatchPortablePipelineTranslator.createTranslationContext(jobInfo); + tra
[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container
[ https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=111700&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-111700 ] ASF GitHub Bot logged work on BEAM-4130: Author: ASF GitHub Bot Created on: 13/Jun/18 22:25 Start Date: 13/Jun/18 22:25 Worklog Time Spent: 10m Work Description: angoenka commented on a change in pull request #5493: [BEAM-4130] Add job submission capabilities to Flink runner. URL: https://github.com/apache/beam/pull/5493#discussion_r195209379 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvocation.java ## @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink; + +import static com.google.common.base.Preconditions.checkArgument; + +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.function.Consumer; +import javax.annotation.Nullable; +import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessage; +import org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum; +import org.apache.beam.model.pipeline.v1.RunnerApi; +import org.apache.beam.runners.core.construction.PipelineOptionsTranslation; +import org.apache.beam.runners.core.construction.graph.GreedyPipelineFuser; +import org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation; +import org.apache.beam.runners.fnexecution.provisioning.JobInfo; +import org.apache.beam.sdk.PipelineResult; +import org.apache.beam.sdk.metrics.MetricsEnvironment; +import org.apache.flink.api.common.JobExecutionResult; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Invocation of a Flink Job via {@link FlinkRunner}. + */ +public class FlinkJobInvocation implements JobInvocation { + private static final Logger LOG = LoggerFactory.getLogger(FlinkJobInvocation.class); + + public static FlinkJobInvocation create(String id, ListeningExecutorService executorService, + RunnerApi.Pipeline pipeline, FlinkPipelineOptions pipelineOptions) { +return new FlinkJobInvocation(id, executorService, pipeline, pipelineOptions); + } + + private final String id; + private final ListeningExecutorService executorService; + private final RunnerApi.Pipeline pipeline; + private final FlinkPipelineOptions pipelineOptions; + private Enum jobState; + private List> stateObservers; + + @Nullable + private ListenableFuture invocationFuture; + + private FlinkJobInvocation(String id, ListeningExecutorService executorService, + RunnerApi.Pipeline pipeline, FlinkPipelineOptions pipelineOptions) { +this.id = id; +this.executorService = executorService; +this.pipeline = pipeline; +this.pipelineOptions = pipelineOptions; +this.invocationFuture = null; +this.jobState = Enum.STOPPED; +this.stateObservers = new ArrayList<>(); + } + + private PipelineResult runPipeline() throws Exception { +LOG.trace("Translating pipeline from proto"); + +MetricsEnvironment.setMetricsSupported(true); + +LOG.info("Translating pipeline to Flink program."); +// Fused pipeline proto. +RunnerApi.Pipeline fusedPipeline = GreedyPipelineFuser.fuse(pipeline).toPipeline(); +JobInfo jobInfo = JobInfo.create( +id, pipelineOptions.getJobName(), PipelineOptionsTranslation.toProto(pipelineOptions)); +final JobExecutionResult result; + +if (!pipelineOptions.isStreaming() && !hasUnboundedPCollections(fusedPipeline)) { + // TODO: Do we need to inspect for unbounded sources before fusing? + // batch translation + FlinkBatchPortablePipelineTranslator translator = + FlinkBatchPortablePipelineTranslator.createTranslator(); + FlinkBatchPortablePipelineTranslator.BatchTranslationContext context = + FlinkBatchPortablePipelineTranslator.createTranslationContext(jobInfo); + tra
[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container
[ https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=111690&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-111690 ] ASF GitHub Bot logged work on BEAM-4130: Author: ASF GitHub Bot Created on: 13/Jun/18 22:25 Start Date: 13/Jun/18 22:25 Worklog Time Spent: 10m Work Description: angoenka commented on a change in pull request #5493: [BEAM-4130] Add job submission capabilities to Flink runner. URL: https://github.com/apache/beam/pull/5493#discussion_r195199604 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvocation.java ## @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink; + +import static com.google.common.base.Preconditions.checkArgument; + +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.function.Consumer; +import javax.annotation.Nullable; +import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessage; +import org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum; +import org.apache.beam.model.pipeline.v1.RunnerApi; +import org.apache.beam.runners.core.construction.PipelineOptionsTranslation; +import org.apache.beam.runners.core.construction.graph.GreedyPipelineFuser; +import org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation; +import org.apache.beam.runners.fnexecution.provisioning.JobInfo; +import org.apache.beam.sdk.PipelineResult; +import org.apache.beam.sdk.metrics.MetricsEnvironment; +import org.apache.flink.api.common.JobExecutionResult; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Invocation of a Flink Job via {@link FlinkRunner}. + */ +public class FlinkJobInvocation implements JobInvocation { + private static final Logger LOG = LoggerFactory.getLogger(FlinkJobInvocation.class); + + public static FlinkJobInvocation create(String id, ListeningExecutorService executorService, + RunnerApi.Pipeline pipeline, FlinkPipelineOptions pipelineOptions) { +return new FlinkJobInvocation(id, executorService, pipeline, pipelineOptions); + } + + private final String id; + private final ListeningExecutorService executorService; + private final RunnerApi.Pipeline pipeline; + private final FlinkPipelineOptions pipelineOptions; + private Enum jobState; + private List> stateObservers; + + @Nullable + private ListenableFuture invocationFuture; + + private FlinkJobInvocation(String id, ListeningExecutorService executorService, + RunnerApi.Pipeline pipeline, FlinkPipelineOptions pipelineOptions) { +this.id = id; +this.executorService = executorService; +this.pipeline = pipeline; +this.pipelineOptions = pipelineOptions; +this.invocationFuture = null; +this.jobState = Enum.STOPPED; +this.stateObservers = new ArrayList<>(); + } + + private PipelineResult runPipeline() throws Exception { +LOG.trace("Translating pipeline from proto"); + +MetricsEnvironment.setMetricsSupported(true); + +LOG.info("Translating pipeline to Flink program."); +// Fused pipeline proto. +RunnerApi.Pipeline fusedPipeline = GreedyPipelineFuser.fuse(pipeline).toPipeline(); Review comment: Should we pass the fuser Function in constructor parameter? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 111690) Time Spent: 1h 10m (was: 1h) > Portable Flink runner JobService entry point in a Docker container > -- > >
[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container
[ https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=111689&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-111689 ] ASF GitHub Bot logged work on BEAM-4130: Author: ASF GitHub Bot Created on: 13/Jun/18 22:25 Start Date: 13/Jun/18 22:25 Worklog Time Spent: 10m Work Description: angoenka commented on a change in pull request #5493: [BEAM-4130] Add job submission capabilities to Flink runner. URL: https://github.com/apache/beam/pull/5493#discussion_r195252523 ## File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/BeamFileSystemArtifactSource.java ## @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.fnexecution.artifact; + +import com.google.protobuf.ByteString; +import com.google.protobuf.util.JsonFormat; +import io.grpc.stub.StreamObserver; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.Channels; +import java.nio.channels.ReadableByteChannel; +import java.nio.charset.StandardCharsets; +import org.apache.beam.model.jobmanagement.v1.ArtifactApi; +import org.apache.beam.sdk.io.FileSystems; + +/** + * An ArtifactSource suitable for retrieving artifacts uploaded via + * {@link BeamFileSystemArtifactStagingService}. + */ +public class BeamFileSystemArtifactSource implements ArtifactSource { + + private static final int CHUNK_SIZE = 2 * 1024 * 1024; + + private final String retrievalToken; + private ArtifactApi.ProxyManifest proxyManifest; + + public BeamFileSystemArtifactSource(String retrievalToken) { +this.retrievalToken = retrievalToken; + } + + public static BeamFileSystemArtifactSource create(String artifactToken) { +return new BeamFileSystemArtifactSource(artifactToken); + } + + @Override + public ArtifactApi.Manifest getManifest() throws IOException { +return getProxyManifest().getManifest(); + } + + @Override + public void getArtifact(String name, + StreamObserver responseObserver) throws IOException { +ReadableByteChannel artifact = FileSystems +.open(FileSystems.matchNewResource(lookupUri(name), false)); +ByteBuffer buffer = ByteBuffer.allocate(CHUNK_SIZE); +while (artifact.read(buffer) > -1) { + buffer.flip(); + responseObserver.onNext( + ArtifactApi.ArtifactChunk.newBuilder().setData(ByteString.copyFrom(buffer)).build()); + buffer.clear(); +} + } + + private String lookupUri(String name) throws IOException { +for (ArtifactApi.ProxyManifest.Location location : getProxyManifest().getLocationList()) { + if (location.getName().equals(name)) { +return location.getUri(); + } +} +throw new IllegalArgumentException("No such artifact: " + name); + } + + private ArtifactApi.ProxyManifest getProxyManifest() throws IOException { +if (proxyManifest == null) { + ArtifactApi.ProxyManifest.Builder builder = ArtifactApi.ProxyManifest.newBuilder(); + JsonFormat.parser().merge(Channels.newReader( + FileSystems.open(FileSystems.matchNewResource(retrievalToken, false /* is directory */)), + StandardCharsets.UTF_8.name()), builder); Review comment: Shall we standardize the characterset for all BeamFileSystemArtifact ? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 111689) Time Spent: 1h (was: 50m) > Portable Flink runner JobService entry point in a Docker container > -- > > Key: BEAM-4130 > URL: https://issues.apache.org/jira/browse/BEAM-4130 > Project: Beam > Issue Type: New Feature > Components: runner-flink >Reporter: Ben Sidhom >
[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container
[ https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=111687&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-111687 ] ASF GitHub Bot logged work on BEAM-4130: Author: ASF GitHub Bot Created on: 13/Jun/18 22:24 Start Date: 13/Jun/18 22:24 Worklog Time Spent: 10m Work Description: angoenka commented on a change in pull request #5493: [BEAM-4130] Add job submission capabilities to Flink runner. URL: https://github.com/apache/beam/pull/5493#discussion_r195199350 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvocation.java ## @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink; + +import static com.google.common.base.Preconditions.checkArgument; + +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.function.Consumer; +import javax.annotation.Nullable; +import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessage; +import org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum; +import org.apache.beam.model.pipeline.v1.RunnerApi; +import org.apache.beam.runners.core.construction.PipelineOptionsTranslation; +import org.apache.beam.runners.core.construction.graph.GreedyPipelineFuser; +import org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation; +import org.apache.beam.runners.fnexecution.provisioning.JobInfo; +import org.apache.beam.sdk.PipelineResult; +import org.apache.beam.sdk.metrics.MetricsEnvironment; +import org.apache.flink.api.common.JobExecutionResult; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Invocation of a Flink Job via {@link FlinkRunner}. + */ +public class FlinkJobInvocation implements JobInvocation { + private static final Logger LOG = LoggerFactory.getLogger(FlinkJobInvocation.class); + + public static FlinkJobInvocation create(String id, ListeningExecutorService executorService, + RunnerApi.Pipeline pipeline, FlinkPipelineOptions pipelineOptions) { +return new FlinkJobInvocation(id, executorService, pipeline, pipelineOptions); + } + + private final String id; + private final ListeningExecutorService executorService; + private final RunnerApi.Pipeline pipeline; + private final FlinkPipelineOptions pipelineOptions; + private Enum jobState; + private List> stateObservers; + + @Nullable + private ListenableFuture invocationFuture; + + private FlinkJobInvocation(String id, ListeningExecutorService executorService, + RunnerApi.Pipeline pipeline, FlinkPipelineOptions pipelineOptions) { +this.id = id; +this.executorService = executorService; +this.pipeline = pipeline; +this.pipelineOptions = pipelineOptions; +this.invocationFuture = null; +this.jobState = Enum.STOPPED; +this.stateObservers = new ArrayList<>(); + } + + private PipelineResult runPipeline() throws Exception { +LOG.trace("Translating pipeline from proto"); Review comment: This trace is unnecessary This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 111687) Time Spent: 40m (was: 0.5h) > Portable Flink runner JobService entry point in a Docker container > -- > > Key: BEAM-4130 > URL: https://issues.apache.org/jira/browse/BEAM-4130 > Project: Beam > Issue Type: New Feature > Components: runner-flink >Reporter: Ben Sidhom >Priority: Minor > Tim
[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container
[ https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=111663&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-111663 ] ASF GitHub Bot logged work on BEAM-4130: Author: ASF GitHub Bot Created on: 13/Jun/18 21:16 Start Date: 13/Jun/18 21:16 Worklog Time Spent: 10m Work Description: robertwb commented on a change in pull request #5493: [BEAM-4130] Add job submission capabilities to Flink runner. URL: https://github.com/apache/beam/pull/5493#discussion_r195238847 ## File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/InMemoryJobService.java ## @@ -60,17 +60,28 @@ public static InMemoryJobService create( Endpoints.ApiServiceDescriptor stagingServiceDescriptor, JobInvoker invoker) { -return new InMemoryJobService(stagingServiceDescriptor, invoker); +return new InMemoryJobService(stagingServiceDescriptor, (String session) -> "token", invoker); + } + + public static InMemoryJobService create( + Endpoints.ApiServiceDescriptor stagingServiceDescriptor, + Function stagingServiceTokenProvider, Review comment: Good point. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 111663) Time Spent: 0.5h (was: 20m) > Portable Flink runner JobService entry point in a Docker container > -- > > Key: BEAM-4130 > URL: https://issues.apache.org/jira/browse/BEAM-4130 > Project: Beam > Issue Type: New Feature > Components: runner-flink >Reporter: Ben Sidhom >Priority: Minor > Time Spent: 0.5h > Remaining Estimate: 0h > > The portable Flink runner exists as a Job Service that runs somewhere. We > need a main entry point that itself spins up the job service (and artifact > staging service). The main program itself should be packaged into an uberjar > such that it can be run locally or submitted to a Flink deployment via `flink > run`. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container
[ https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=111633&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-111633 ] ASF GitHub Bot logged work on BEAM-4130: Author: ASF GitHub Bot Created on: 13/Jun/18 20:20 Start Date: 13/Jun/18 20:20 Worklog Time Spent: 10m Work Description: axelmagn commented on a change in pull request #5493: [BEAM-4130] Add job submission capabilities to Flink runner. URL: https://github.com/apache/beam/pull/5493#discussion_r195221468 ## File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/InMemoryJobService.java ## @@ -60,17 +60,28 @@ public static InMemoryJobService create( Endpoints.ApiServiceDescriptor stagingServiceDescriptor, JobInvoker invoker) { -return new InMemoryJobService(stagingServiceDescriptor, invoker); +return new InMemoryJobService(stagingServiceDescriptor, (String session) -> "token", invoker); + } + + public static InMemoryJobService create( + Endpoints.ApiServiceDescriptor stagingServiceDescriptor, + Function stagingServiceTokenProvider, Review comment: maybe document that this function expects a preparation ID as input. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 111633) Time Spent: 20m (was: 10m) > Portable Flink runner JobService entry point in a Docker container > -- > > Key: BEAM-4130 > URL: https://issues.apache.org/jira/browse/BEAM-4130 > Project: Beam > Issue Type: New Feature > Components: runner-flink >Reporter: Ben Sidhom >Priority: Minor > Time Spent: 20m > Remaining Estimate: 0h > > The portable Flink runner exists as a Job Service that runs somewhere. We > need a main entry point that itself spins up the job service (and artifact > staging service). The main program itself should be packaged into an uberjar > such that it can be run locally or submitted to a Flink deployment via `flink > run`. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-4130) Portable Flink runner JobService entry point in a Docker container
[ https://issues.apache.org/jira/browse/BEAM-4130?focusedWorklogId=111558&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-111558 ] ASF GitHub Bot logged work on BEAM-4130: Author: ASF GitHub Bot Created on: 13/Jun/18 16:57 Start Date: 13/Jun/18 16:57 Worklog Time Spent: 10m Work Description: robertwb commented on issue #5493: [BEAM-4130] Add job submission capabilities to Flink runner. URL: https://github.com/apache/beam/pull/5493#issuecomment-397010704 R: @axelmagn, @angoenka Please take a look. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 111558) Time Spent: 10m Remaining Estimate: 0h > Portable Flink runner JobService entry point in a Docker container > -- > > Key: BEAM-4130 > URL: https://issues.apache.org/jira/browse/BEAM-4130 > Project: Beam > Issue Type: New Feature > Components: runner-flink >Reporter: Ben Sidhom >Priority: Minor > Time Spent: 10m > Remaining Estimate: 0h > > The portable Flink runner exists as a Job Service that runs somewhere. We > need a main entry point that itself spins up the job service (and artifact > staging service). The main program itself should be packaged into an uberjar > such that it can be run locally or submitted to a Flink deployment via `flink > run`. -- This message was sent by Atlassian JIRA (v7.6.3#76005)