This is an automated email from the ASF dual-hosted git repository. goenka pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new f3623e8 [BEAM-6853] Make sdkWorkerParallelism option consistent new fe676c5 Merge pull request #8286 from angoenka/portable_options f3623e8 is described below commit f3623e8ba2257f7659ccb312dc2574f862ef41b5 Author: Ankur Goenka <ankurgoe...@gmail.com> AuthorDate: Thu Apr 11 16:49:38 2019 -0700 [BEAM-6853] Make sdkWorkerParallelism option consistent --- .../apache/beam/runners/flink/FlinkJobInvoker.java | 2 +- .../fnexecution/jobsubmission/JobServerDriver.java | 10 +++++++--- .../beam/sdk/options/PortablePipelineOptions.java | 6 +++--- .../python/apache_beam/options/pipeline_options.py | 23 ++++++---------------- 4 files changed, 17 insertions(+), 24 deletions(-) diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvoker.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvoker.java index 3dc22c2..1db7806 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvoker.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvoker.java @@ -70,7 +70,7 @@ public class FlinkJobInvoker extends JobInvoker { } PortablePipelineOptions portableOptions = flinkOptions.as(PortablePipelineOptions.class); - if (portableOptions.getSdkWorkerParallelism() == null) { + if (portableOptions.getSdkWorkerParallelism() == 0L) { portableOptions.setSdkWorkerParallelism(serverConfig.getSdkWorkerParallelism()); } diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobServerDriver.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobServerDriver.java index 2724910..53f4ab8 100644 --- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobServerDriver.java +++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobServerDriver.java @@ -96,8 +96,12 @@ public abstract class JobServerDriver implements Runnable { @Option( name = "--sdk-worker-parallelism", - usage = "Default parallelism for SDK worker processes (see portable pipeline options)") - private Long sdkWorkerParallelism = 1L; + usage = + "Default parallelism for SDK worker processes. This option is only applied when the " + + "pipeline option sdkWorkerParallelism is set to 0." + + "Default is 1, If 0, worker parallelism will be dynamically decided by runner." + + "See also: sdkWorkerParallelism Pipeline Option") + private long sdkWorkerParallelism = 1L; public String getHost() { return host; @@ -123,7 +127,7 @@ public abstract class JobServerDriver implements Runnable { return cleanArtifactsPerJob; } - public Long getSdkWorkerParallelism() { + public long getSdkWorkerParallelism() { return this.sdkWorkerParallelism; } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PortablePipelineOptions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PortablePipelineOptions.java index 75c55fc..3f3cfcf 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PortablePipelineOptions.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PortablePipelineOptions.java @@ -77,10 +77,10 @@ public interface PortablePipelineOptions extends PipelineOptions { "Sets the number of sdk worker processes that will run on each worker node. Default is 1. If" + " 0, it will be automatically set by the runner by looking at different parameters " + "(e.g. number of CPU cores on the worker machine).") - @Nullable - Long getSdkWorkerParallelism(); + @Default.Long(1L) + long getSdkWorkerParallelism(); - void setSdkWorkerParallelism(@Nullable Long parallelism); + void setSdkWorkerParallelism(long parallelism); @Description("Duration in milliseconds for environment cache within a job. 0 means no caching.") @Default.Integer(0) diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py index 745dcad..fde1a7e 100644 --- a/sdks/python/apache_beam/options/pipeline_options.py +++ b/sdks/python/apache_beam/options/pipeline_options.py @@ -780,7 +780,9 @@ class SetupOptions(PipelineOptions): class PortableOptions(PipelineOptions): - + """Portable options are common options expected to be understood by most of + the portable runners. + """ @classmethod def _add_argparse_args(cls, parser): parser.add_argument('--job_endpoint', @@ -801,30 +803,17 @@ class PortableOptions(PipelineOptions): '"<ENV_VAL>"} }. All fields in the json are optional except ' 'command.')) parser.add_argument( - '--sdk_worker_parallelism', default=None, + '--sdk_worker_parallelism', default=0, help=('Sets the number of sdk worker processes that will run on each ' - 'worker node. Default is 1. If 0, it will be automatically set ' + 'worker node. Default is 0. If 0, it will be automatically set ' 'by the runner by looking at different parameters (e.g. number ' - 'of CPU cores on the worker machine).')) + 'of CPU cores on the worker machine or configuration).')) parser.add_argument( '--environment_cache_millis', default=0, help=('Duration in milliseconds for environment cache within a job. ' '0 means no caching.')) -class RunnerOptions(PipelineOptions): - """Runner options are provided by the job service. - - The SDK has no a priori knowledge of runner options. - It should be able to work with any portable runner. - Runner specific options are discovered from the job service endpoint. - """ - @classmethod - def _add_argparse_args(cls, parser): - # TODO: help option to display discovered options - pass - - class TestOptions(PipelineOptions): @classmethod