This is an automated email from the ASF dual-hosted git repository. mikhail pushed a commit to branch release-2.17.0 in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/release-2.17.0 by this push: new 9f32479 [BEAM-8835] Disable Flink Uber Jar by default. (#10270) new 7148e2e Merge pull request #10274 from robertwb/release-2.17.0 9f32479 is described below commit 9f32479944d36a41e73f40f72fce2c9ef536439b Author: Robert Bradshaw <rober...@google.com> AuthorDate: Tue Dec 3 13:50:58 2019 -0800 [BEAM-8835] Disable Flink Uber Jar by default. (#10270) --- .../apache_beam/runners/portability/flink_runner.py | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/sdks/python/apache_beam/runners/portability/flink_runner.py b/sdks/python/apache_beam/runners/portability/flink_runner.py index 296c7bc..c438dfc 100644 --- a/sdks/python/apache_beam/runners/portability/flink_runner.py +++ b/sdks/python/apache_beam/runners/portability/flink_runner.py @@ -46,14 +46,19 @@ class FlinkRunner(portable_runner.PortableRunner): flink_master = self.add_http_scheme( options.view_as(FlinkRunnerOptions).flink_master) options.view_as(FlinkRunnerOptions).flink_master = flink_master - if flink_master in MAGIC_HOST_NAMES or sys.version_info < (3, 6): - return job_server.StopOnExitJobServer(FlinkJarJobServer(options)) - else: + if (options.view_as(FlinkRunnerOptions).flink_submit_uber_jar + and flink_master not in MAGIC_HOST_NAMES): + if sys.version_info < (3, 6): + raise ValueError( + 'flink_submit_uber_jar requires Python 3.6+, current version %s' + % sys.version) # This has to be changed [auto], otherwise we will attempt to submit a # the pipeline remotely on the Flink JobMaster which will _fail_. # DO NOT CHANGE the following line, unless you have tested this. options.view_as(FlinkRunnerOptions).flink_master = '[auto]' return flink_uber_jar_job_server.FlinkUberJarJobServer(flink_master) + else: + return job_server.StopOnExitJobServer(FlinkJarJobServer(options)) @staticmethod def add_http_scheme(flink_master): @@ -84,6 +89,13 @@ class FlinkRunnerOptions(pipeline_options.PipelineOptions): parser.add_argument('--flink_job_server_jar', help='Path or URL to a flink jobserver jar.') parser.add_argument('--artifacts_dir', default=None) + parser.add_argument('--flink_submit_uber_jar', + default=False, + action='store_true', + help='Create and upload an uberjar to the flink master' + ' directly, rather than starting up a job server.' + ' Only applies when flink_master is set to a' + ' cluster address. Requires Python 3.6+.') class FlinkJarJobServer(job_server.JavaJarJobServer):