This is an automated email from the ASF dual-hosted git repository.

goenka pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new f3623e8  [BEAM-6853] Make sdkWorkerParallelism option consistent
     new fe676c5  Merge pull request #8286 from angoenka/portable_options
f3623e8 is described below

commit f3623e8ba2257f7659ccb312dc2574f862ef41b5
Author: Ankur Goenka <ankurgoe...@gmail.com>
AuthorDate: Thu Apr 11 16:49:38 2019 -0700

    [BEAM-6853] Make sdkWorkerParallelism option consistent
---
 .../apache/beam/runners/flink/FlinkJobInvoker.java |  2 +-
 .../fnexecution/jobsubmission/JobServerDriver.java | 10 +++++++---
 .../beam/sdk/options/PortablePipelineOptions.java  |  6 +++---
 .../python/apache_beam/options/pipeline_options.py | 23 ++++++----------------
 4 files changed, 17 insertions(+), 24 deletions(-)

diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvoker.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvoker.java
index 3dc22c2..1db7806 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvoker.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvoker.java
@@ -70,7 +70,7 @@ public class FlinkJobInvoker extends JobInvoker {
     }
 
     PortablePipelineOptions portableOptions = 
flinkOptions.as(PortablePipelineOptions.class);
-    if (portableOptions.getSdkWorkerParallelism() == null) {
+    if (portableOptions.getSdkWorkerParallelism() == 0L) {
       
portableOptions.setSdkWorkerParallelism(serverConfig.getSdkWorkerParallelism());
     }
 
diff --git 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobServerDriver.java
 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobServerDriver.java
index 2724910..53f4ab8 100644
--- 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobServerDriver.java
+++ 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobServerDriver.java
@@ -96,8 +96,12 @@ public abstract class JobServerDriver implements Runnable {
 
     @Option(
         name = "--sdk-worker-parallelism",
-        usage = "Default parallelism for SDK worker processes (see portable 
pipeline options)")
-    private Long sdkWorkerParallelism = 1L;
+        usage =
+            "Default parallelism for SDK worker processes. This option is only 
applied when the "
+                + "pipeline option sdkWorkerParallelism is set to 0."
+                + "Default is 1, If 0, worker parallelism will be dynamically 
decided by runner."
+                + "See also: sdkWorkerParallelism Pipeline Option")
+    private long sdkWorkerParallelism = 1L;
 
     public String getHost() {
       return host;
@@ -123,7 +127,7 @@ public abstract class JobServerDriver implements Runnable {
       return cleanArtifactsPerJob;
     }
 
-    public Long getSdkWorkerParallelism() {
+    public long getSdkWorkerParallelism() {
       return this.sdkWorkerParallelism;
     }
   }
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PortablePipelineOptions.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PortablePipelineOptions.java
index 75c55fc..3f3cfcf 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PortablePipelineOptions.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PortablePipelineOptions.java
@@ -77,10 +77,10 @@ public interface PortablePipelineOptions extends 
PipelineOptions {
       "Sets the number of sdk worker processes that will run on each worker 
node. Default is 1. If"
           + " 0, it will be automatically set by the runner by looking at 
different parameters "
           + "(e.g. number of CPU cores on the worker machine).")
-  @Nullable
-  Long getSdkWorkerParallelism();
+  @Default.Long(1L)
+  long getSdkWorkerParallelism();
 
-  void setSdkWorkerParallelism(@Nullable Long parallelism);
+  void setSdkWorkerParallelism(long parallelism);
 
   @Description("Duration in milliseconds for environment cache within a job. 0 
means no caching.")
   @Default.Integer(0)
diff --git a/sdks/python/apache_beam/options/pipeline_options.py 
b/sdks/python/apache_beam/options/pipeline_options.py
index 745dcad..fde1a7e 100644
--- a/sdks/python/apache_beam/options/pipeline_options.py
+++ b/sdks/python/apache_beam/options/pipeline_options.py
@@ -780,7 +780,9 @@ class SetupOptions(PipelineOptions):
 
 
 class PortableOptions(PipelineOptions):
-
+  """Portable options are common options expected to be understood by most of
+  the portable runners.
+  """
   @classmethod
   def _add_argparse_args(cls, parser):
     parser.add_argument('--job_endpoint',
@@ -801,30 +803,17 @@ class PortableOptions(PipelineOptions):
               '"<ENV_VAL>"} }. All fields in the json are optional except '
               'command.'))
     parser.add_argument(
-        '--sdk_worker_parallelism', default=None,
+        '--sdk_worker_parallelism', default=0,
         help=('Sets the number of sdk worker processes that will run on each '
-              'worker node. Default is 1. If 0, it will be automatically set '
+              'worker node. Default is 0. If 0, it will be automatically set '
               'by the runner by looking at different parameters (e.g. number '
-              'of CPU cores on the worker machine).'))
+              'of CPU cores on the worker machine or configuration).'))
     parser.add_argument(
         '--environment_cache_millis', default=0,
         help=('Duration in milliseconds for environment cache within a job. '
               '0 means no caching.'))
 
 
-class RunnerOptions(PipelineOptions):
-  """Runner options are provided by the job service.
-
-  The SDK has no a priori knowledge of runner options.
-  It should be able to work with any portable runner.
-  Runner specific options are discovered from the job service endpoint.
-  """
-  @classmethod
-  def _add_argparse_args(cls, parser):
-    # TODO: help option to display discovered options
-    pass
-
-
 class TestOptions(PipelineOptions):
 
   @classmethod

Reply via email to