This is an automated email from the ASF dual-hosted git repository. altay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 3ecd61c Makes Dataflow runner use python3-fnapi container when applicable. new b8183e4 Merge pull request #7829 from tvalentyn/use_py3_container_for_fnapi 3ecd61c is described below commit 3ecd61c974ad716189b0a728572a7e87abae031c Author: Valentyn Tymofieiev <valen...@google.com> AuthorDate: Tue Feb 12 21:28:33 2019 -0800 Makes Dataflow runner use python3-fnapi container when applicable. --- .../runners/dataflow/internal/apiclient.py | 23 +++++++++++----- .../runners/dataflow/internal/apiclient_test.py | 32 +++++++++++++++------- 2 files changed, 38 insertions(+), 17 deletions(-) diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py index a1873ea..6d5e140 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py @@ -874,16 +874,25 @@ def get_default_container_image_for_current_sdk(job_type): Returns: str: Google Cloud Dataflow container image for remote execution. """ + if sys.version_info[0] == 2: + version_suffix = '' + elif sys.version_info[0] == 3: + version_suffix = '3' + else: + raise Exception('Dataflow only supports Python versions 2 and 3, got: %s' + % sys.version_info[0]) + # TODO(tvalentyn): Use enumerated type instead of strings for job types. if job_type == 'FNAPI_BATCH' or job_type == 'FNAPI_STREAMING': - image_name = names.DATAFLOW_CONTAINER_IMAGE_REPOSITORY + '/python-fnapi' + fnapi_suffix = '-fnapi' else: - if sys.version_info[0] == 2: - image_name = names.DATAFLOW_CONTAINER_IMAGE_REPOSITORY + '/python' - elif sys.version_info[0] == 3: - image_name = names.DATAFLOW_CONTAINER_IMAGE_REPOSITORY + '/python3' - else: - raise Exception('Dataflow only supports Python versions 2 and 3.') + fnapi_suffix = '' + + image_name = '{repository}/python{version_suffix}{fnapi_suffix}'.format( + repository=names.DATAFLOW_CONTAINER_IMAGE_REPOSITORY, + version_suffix=version_suffix, + fnapi_suffix=fnapi_suffix) + image_tag = _get_required_container_version(job_type) return image_name + ':' + image_tag diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py index ac66fc6..0e37a0a 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py @@ -248,10 +248,16 @@ class UtilTest(unittest.TestCase): pipeline_options, '2.0.0', #any environment version FAKE_PIPELINE_URL) - self.assertEqual( - env.proto.workerPools[0].workerHarnessContainerImage, - (names.DATAFLOW_CONTAINER_IMAGE_REPOSITORY + - '/python-fnapi:' + names.BEAM_FNAPI_CONTAINER_VERSION)) + if sys.version_info[0] == 3: + self.assertEqual( + env.proto.workerPools[0].workerHarnessContainerImage, + (names.DATAFLOW_CONTAINER_IMAGE_REPOSITORY + + '/python3-fnapi:' + names.BEAM_FNAPI_CONTAINER_VERSION)) + else: + self.assertEqual( + env.proto.workerPools[0].workerHarnessContainerImage, + (names.DATAFLOW_CONTAINER_IMAGE_REPOSITORY + + '/python-fnapi:' + names.BEAM_FNAPI_CONTAINER_VERSION)) # batch, legacy pipeline. pipeline_options = PipelineOptions( @@ -281,10 +287,16 @@ class UtilTest(unittest.TestCase): pipeline_options, '2.0.0', #any environment version FAKE_PIPELINE_URL) - self.assertEqual( - env.proto.workerPools[0].workerHarnessContainerImage, - (names.DATAFLOW_CONTAINER_IMAGE_REPOSITORY + - '/python-fnapi:2.2.0')) + if sys.version_info[0] == 3: + self.assertEqual( + env.proto.workerPools[0].workerHarnessContainerImage, + (names.DATAFLOW_CONTAINER_IMAGE_REPOSITORY + + '/python3-fnapi:2.2.0')) + else: + self.assertEqual( + env.proto.workerPools[0].workerHarnessContainerImage, + (names.DATAFLOW_CONTAINER_IMAGE_REPOSITORY + + '/python-fnapi:2.2.0')) # batch, legacy pipeline. pipeline_options = PipelineOptions( @@ -391,10 +403,10 @@ class UtilTest(unittest.TestCase): '--temp_location', 'gs://test-location/temp', '--experiments', 'beam_fn_api', '--experiments', 'use_multiple_sdk_containers']) + mock_sys.version_info = [2, 333] environment = apiclient.Environment( [], pipeline_options, 1, FAKE_PIPELINE_URL) - mock_sys.version_info = [22, 333] - self.assertEqual('Apache Beam Python 22.333 SDK', + self.assertEqual('Apache Beam Python 2.333 SDK', environment._get_python_sdk_name())