This is an automated email from the ASF dual-hosted git repository.

altay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 3ecd61c  Makes Dataflow runner use python3-fnapi container when 
applicable.
     new b8183e4  Merge pull request #7829 from 
tvalentyn/use_py3_container_for_fnapi
3ecd61c is described below

commit 3ecd61c974ad716189b0a728572a7e87abae031c
Author: Valentyn Tymofieiev <valen...@google.com>
AuthorDate: Tue Feb 12 21:28:33 2019 -0800

    Makes Dataflow runner use python3-fnapi container when applicable.
---
 .../runners/dataflow/internal/apiclient.py         | 23 +++++++++++-----
 .../runners/dataflow/internal/apiclient_test.py    | 32 +++++++++++++++-------
 2 files changed, 38 insertions(+), 17 deletions(-)

diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py 
b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
index a1873ea..6d5e140 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
@@ -874,16 +874,25 @@ def get_default_container_image_for_current_sdk(job_type):
     Returns:
       str: Google Cloud Dataflow container image for remote execution.
     """
+  if sys.version_info[0] == 2:
+    version_suffix = ''
+  elif sys.version_info[0] == 3:
+    version_suffix = '3'
+  else:
+    raise Exception('Dataflow only supports Python versions 2 and 3, got: %s'
+                    % sys.version_info[0])
+
   # TODO(tvalentyn): Use enumerated type instead of strings for job types.
   if job_type == 'FNAPI_BATCH' or job_type == 'FNAPI_STREAMING':
-    image_name = names.DATAFLOW_CONTAINER_IMAGE_REPOSITORY + '/python-fnapi'
+    fnapi_suffix = '-fnapi'
   else:
-    if sys.version_info[0] == 2:
-      image_name = names.DATAFLOW_CONTAINER_IMAGE_REPOSITORY + '/python'
-    elif sys.version_info[0] == 3:
-      image_name = names.DATAFLOW_CONTAINER_IMAGE_REPOSITORY + '/python3'
-    else:
-      raise Exception('Dataflow only supports Python versions 2 and 3.')
+    fnapi_suffix = ''
+
+  image_name = '{repository}/python{version_suffix}{fnapi_suffix}'.format(
+      repository=names.DATAFLOW_CONTAINER_IMAGE_REPOSITORY,
+      version_suffix=version_suffix,
+      fnapi_suffix=fnapi_suffix)
+
   image_tag = _get_required_container_version(job_type)
   return image_name + ':' + image_tag
 
diff --git 
a/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py 
b/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py
index ac66fc6..0e37a0a 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py
@@ -248,10 +248,16 @@ class UtilTest(unittest.TestCase):
                                 pipeline_options,
                                 '2.0.0', #any environment version
                                 FAKE_PIPELINE_URL)
-    self.assertEqual(
-        env.proto.workerPools[0].workerHarnessContainerImage,
-        (names.DATAFLOW_CONTAINER_IMAGE_REPOSITORY +
-         '/python-fnapi:' + names.BEAM_FNAPI_CONTAINER_VERSION))
+    if sys.version_info[0] == 3:
+      self.assertEqual(
+          env.proto.workerPools[0].workerHarnessContainerImage,
+          (names.DATAFLOW_CONTAINER_IMAGE_REPOSITORY +
+           '/python3-fnapi:' + names.BEAM_FNAPI_CONTAINER_VERSION))
+    else:
+      self.assertEqual(
+          env.proto.workerPools[0].workerHarnessContainerImage,
+          (names.DATAFLOW_CONTAINER_IMAGE_REPOSITORY +
+           '/python-fnapi:' + names.BEAM_FNAPI_CONTAINER_VERSION))
 
     # batch, legacy pipeline.
     pipeline_options = PipelineOptions(
@@ -281,10 +287,16 @@ class UtilTest(unittest.TestCase):
                                 pipeline_options,
                                 '2.0.0', #any environment version
                                 FAKE_PIPELINE_URL)
-    self.assertEqual(
-        env.proto.workerPools[0].workerHarnessContainerImage,
-        (names.DATAFLOW_CONTAINER_IMAGE_REPOSITORY +
-         '/python-fnapi:2.2.0'))
+    if sys.version_info[0] == 3:
+      self.assertEqual(
+          env.proto.workerPools[0].workerHarnessContainerImage,
+          (names.DATAFLOW_CONTAINER_IMAGE_REPOSITORY +
+           '/python3-fnapi:2.2.0'))
+    else:
+      self.assertEqual(
+          env.proto.workerPools[0].workerHarnessContainerImage,
+          (names.DATAFLOW_CONTAINER_IMAGE_REPOSITORY +
+           '/python-fnapi:2.2.0'))
 
     # batch, legacy pipeline.
     pipeline_options = PipelineOptions(
@@ -391,10 +403,10 @@ class UtilTest(unittest.TestCase):
          '--temp_location', 'gs://test-location/temp',
          '--experiments', 'beam_fn_api',
          '--experiments', 'use_multiple_sdk_containers'])
+    mock_sys.version_info = [2, 333]
     environment = apiclient.Environment(
         [], pipeline_options, 1, FAKE_PIPELINE_URL)
-    mock_sys.version_info = [22, 333]
-    self.assertEqual('Apache Beam Python 22.333 SDK',
+    self.assertEqual('Apache Beam Python 2.333 SDK',
                      environment._get_python_sdk_name())
 
 

Reply via email to