Repository: incubator-beam Updated Branches: refs/heads/python-sdk 7c5e4aa66 -> 0d99856f3
Modify create_job to allow staging the job and not submitting it to the service. - Modularize create_job in create job description, stage job, and send for execution. - Modify --dataflow_job_file to stage the job and continue submitting it to the service. - Add --template_location to stage the job but not submit it to the service. - Add tests for both, including making them mutually exclusive (following Java SDK decision). - Add template_runner_test.py with integration tests. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/cfa0ad81 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/cfa0ad81 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/cfa0ad81 Branch: refs/heads/python-sdk Commit: cfa0ad8136b323bade9de14ea6149e7f74cbd0b4 Parents: 7c5e4aa Author: Maria Garcia Herrero <mari...@google.com> Authored: Wed Nov 2 09:14:48 2016 -0700 Committer: Robert Bradshaw <rober...@gmail.com> Committed: Mon Dec 5 11:04:34 2016 -0800 ---------------------------------------------------------------------- sdks/python/apache_beam/examples/wordcount.py | 1 - sdks/python/apache_beam/internal/apiclient.py | 34 +++++++- .../apache_beam/runners/dataflow_runner.py | 13 ++- sdks/python/apache_beam/template_runner_test.py | 83 ++++++++++++++++++++ sdks/python/apache_beam/utils/options.py | 10 +++ .../apache_beam/utils/pipeline_options_test.py | 13 +++ .../utils/pipeline_options_validator_test.py | 28 +++++++ 7 files changed, 175 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/cfa0ad81/sdks/python/apache_beam/examples/wordcount.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/wordcount.py b/sdks/python/apache_beam/examples/wordcount.py index 096e508..7f347d8 100644 --- a/sdks/python/apache_beam/examples/wordcount.py +++ b/sdks/python/apache_beam/examples/wordcount.py @@ -59,7 +59,6 @@ class WordExtractingDoFn(beam.DoFn): def run(argv=None): """Main entry point; defines and runs the wordcount pipeline.""" - parser = argparse.ArgumentParser() parser.add_argument('--input', dest='input', http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/cfa0ad81/sdks/python/apache_beam/internal/apiclient.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/internal/apiclient.py b/sdks/python/apache_beam/internal/apiclient.py index 5612631..a894557 100644 --- a/sdks/python/apache_beam/internal/apiclient.py +++ b/sdks/python/apache_beam/internal/apiclient.py @@ -24,6 +24,7 @@ import os import re import time +from StringIO import StringIO from apitools.base.py import encoding from apitools.base.py import exceptions @@ -42,7 +43,6 @@ from apache_beam.utils.options import DebugOptions from apache_beam.utils.options import GoogleCloudOptions from apache_beam.utils.options import StandardOptions from apache_beam.utils.options import WorkerOptions - from apache_beam.internal.clients import storage import apache_beam.internal.clients.dataflow as dataflow @@ -327,6 +327,9 @@ class Job(object): self.base64_str_re = re.compile(r'^[A-Za-z0-9+/]*=*$') self.coder_str_re = re.compile(r'^([A-Za-z]+\$)([A-Za-z0-9+/]*=*)$') + def json(self): + return encoding.MessageToJson(self.proto) + class DataflowApplicationClient(object): """A Dataflow API client used by application code to create and query jobs.""" @@ -392,8 +395,29 @@ class DataflowApplicationClient(object): # TODO(silviuc): Refactor so that retry logic can be applied. @retry.no_retries # Using no_retries marks this as an integration point. def create_job(self, job): - """Submits for remote execution a job described by the workflow proto.""" - # Stage job resources and add an environment proto with their paths. + """Creates a job description. + Additionally, it may stage it and/or submit it for remote execution. + """ + self.create_job_description(job) + + # Stage and submit the job when necessary + dataflow_job_file = job.options.view_as(DebugOptions).dataflow_job_file + template_location = ( + job.options.view_as(GoogleCloudOptions).template_location) + + job_location = template_location or dataflow_job_file + if job_location: + gcs_or_local_path = os.path.dirname(job_location) + file_name = os.path.basename(job_location) + self.stage_file(gcs_or_local_path, file_name, StringIO(job.json())) + + if not template_location: + return self.submit_job_description() + else: + return None + + def create_job_description(self, job): + """Creates a job described by the workflow proto.""" resources = dependency.stage_job_resources( job.options, file_copy=self._gcs_file_copy) job.proto.environment = Environment( @@ -401,8 +425,10 @@ class DataflowApplicationClient(object): environment_version=self.environment_version).proto # TODO(silviuc): Remove the debug logging eventually. logging.info('JOB: %s', job) - request = dataflow.DataflowProjectsJobsCreateRequest() + def submit_job_description(self): + """Creates and excutes a job request.""" + request = dataflow.DataflowProjectsJobsCreateRequest() request.projectId = self.google_cloud_options.project request.job = job.proto http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/cfa0ad81/sdks/python/apache_beam/runners/dataflow_runner.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow_runner.py index 00b466b..8b953b0 100644 --- a/sdks/python/apache_beam/runners/dataflow_runner.py +++ b/sdks/python/apache_beam/runners/dataflow_runner.py @@ -160,20 +160,25 @@ class DataflowPipelineRunner(PipelineRunner): # pylint: disable=wrong-import-order, wrong-import-position from apache_beam.internal import apiclient self.job = apiclient.Job(pipeline.options) + # The superclass's run will trigger a traversal of all reachable nodes. super(DataflowPipelineRunner, self).run(pipeline) - # Get a Dataflow API client and submit the job. + standard_options = pipeline.options.view_as(StandardOptions) if standard_options.streaming: job_version = DataflowPipelineRunner.STREAMING_ENVIRONMENT_MAJOR_VERSION else: job_version = DataflowPipelineRunner.BATCH_ENVIRONMENT_MAJOR_VERSION + + # Get a Dataflow API client and set its options self.dataflow_client = apiclient.DataflowApplicationClient( pipeline.options, job_version) + + # Create the job self.result = DataflowPipelineResult( self.dataflow_client.create_job(self.job)) - if self.blocking: + if self.result.has_job and self.blocking: thread = threading.Thread( target=DataflowPipelineRunner.poll_for_job_completion, args=(self, self.result.job_id())) @@ -652,6 +657,10 @@ class DataflowPipelineResult(PipelineResult): def job_id(self): return self._job.id + @property + def has_job(self): + return self._job is not None + def current_state(self): """Return the current state of the remote job. http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/cfa0ad81/sdks/python/apache_beam/template_runner_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/template_runner_test.py b/sdks/python/apache_beam/template_runner_test.py new file mode 100644 index 0000000..bfcd70c --- /dev/null +++ b/sdks/python/apache_beam/template_runner_test.py @@ -0,0 +1,83 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Unit tests for templated pipelines.""" + +from __future__ import absolute_import + +import json +import unittest +import tempfile + +import apache_beam as beam +from apache_beam.pipeline import Pipeline +from apache_beam.runners.dataflow_runner import DataflowPipelineRunner +from apache_beam.utils.options import PipelineOptions +from apache_beam.internal import apiclient + + +class TemplatingDataflowPipelineRunnerTest(unittest.TestCase): + """TemplatingDataflow tests.""" + def test_full_completion(self): + dummy_file = tempfile.NamedTemporaryFile() + dummy_dir = tempfile.mkdtemp() + + remote_runner = DataflowPipelineRunner() + pipeline = Pipeline(remote_runner, + options=PipelineOptions([ + '--dataflow_endpoint=ignored', + '--sdk_location=' + dummy_file.name, + '--job_name=test-job', + '--project=test-project', + '--staging_location=' + dummy_dir, + '--temp_location=/dev/null', + '--template_location=' + dummy_file.name, + '--no_auth=True'])) + + pipeline | beam.Create([1, 2, 3]) | beam.Map(lambda x: x) # pylint: disable=expression-not-assigned + pipeline.run() + with open(dummy_file.name) as template_file: + saved_job_dict = json.load(template_file) + self.assertEqual( + saved_job_dict['environment']['sdkPipelineOptions'] + ['options']['project'], 'test-project') + self.assertEqual( + saved_job_dict['environment']['sdkPipelineOptions'] + ['options']['job_name'], 'test-job') + + def test_bad_path(self): + dummy_sdk_file = tempfile.NamedTemporaryFile() + remote_runner = DataflowPipelineRunner() + pipeline = Pipeline(remote_runner, + options=PipelineOptions([ + '--dataflow_endpoint=ignored', + '--sdk_location=' + dummy_sdk_file.name, + '--job_name=test-job', + '--project=test-project', + '--staging_location=ignored', + '--temp_location=/dev/null', + '--template_location=/bad/path', + '--no_auth=True'])) + remote_runner.job = apiclient.Job(pipeline.options) + + with self.assertRaises(IOError): + pipeline.run() + + +if __name__ == '__main__': + logging.getLogger().setLevel(logging.INFO) + unittest.main() http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/cfa0ad81/sdks/python/apache_beam/utils/options.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/utils/options.py b/sdks/python/apache_beam/utils/options.py index aacb186..eaa1065 100644 --- a/sdks/python/apache_beam/utils/options.py +++ b/sdks/python/apache_beam/utils/options.py @@ -263,6 +263,10 @@ class GoogleCloudOptions(PipelineOptions): default=None, help='Identity to run virtual machines as.') parser.add_argument('--no_auth', dest='no_auth', type=bool, default=False) + # Option to run templated pipelines + parser.add_argument('--template_location', + default=None, + help='Save job to specified local or GCS location.') def validate(self, validator): errors = [] @@ -272,6 +276,12 @@ class GoogleCloudOptions(PipelineOptions): if getattr(self, 'temp_location', None) or getattr(self, 'staging_location', None) is None: errors.extend(validator.validate_gcs_path(self, 'temp_location')) + + if self.view_as(DebugOptions).dataflow_job_file: + if self.view_as(GoogleCloudOptions).template_location: + errors.append('--dataflow_job_file and --template_location ' + 'are mutually exclusive.') + return errors http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/cfa0ad81/sdks/python/apache_beam/utils/pipeline_options_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/utils/pipeline_options_test.py b/sdks/python/apache_beam/utils/pipeline_options_test.py index ed55362..d9439e9 100644 --- a/sdks/python/apache_beam/utils/pipeline_options_test.py +++ b/sdks/python/apache_beam/utils/pipeline_options_test.py @@ -152,6 +152,19 @@ class PipelineOptionsTest(unittest.TestCase): options = PipelineOptions(flags=['']) self.assertEqual(options.get_all_options()['extra_packages'], None) + def test_dataflow_job_file(self): + options = PipelineOptions(['--dataflow_job_file', 'abc']) + self.assertEqual(options.get_all_options()['dataflow_job_file'], 'abc') + + options = PipelineOptions(flags=['']) + self.assertEqual(options.get_all_options()['dataflow_job_file'], None) + + def test_template_location(self): + options = PipelineOptions(['--template_location', 'abc']) + self.assertEqual(options.get_all_options()['template_location'], 'abc') + + options = PipelineOptions(flags=['']) + self.assertEqual(options.get_all_options()['template_location'], None) if __name__ == '__main__': logging.getLogger().setLevel(logging.INFO) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/cfa0ad81/sdks/python/apache_beam/utils/pipeline_options_validator_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/utils/pipeline_options_validator_test.py b/sdks/python/apache_beam/utils/pipeline_options_validator_test.py index bca9fa5..bffbeca 100644 --- a/sdks/python/apache_beam/utils/pipeline_options_validator_test.py +++ b/sdks/python/apache_beam/utils/pipeline_options_validator_test.py @@ -260,6 +260,34 @@ class SetupTest(unittest.TestCase): PipelineOptions(case['options']), case['runner']) self.assertEqual(validator.is_service_runner(), case['expected']) + def test_dataflow_job_file_and_template_location_mutually_exclusive(self): + runner = MockRunners.OtherRunner() + options = PipelineOptions([ + '--template_location', 'abc', + '--dataflow_job_file', 'def' + ]) + validator = PipelineOptionsValidator(options, runner) + errors = validator.validate() + self.assertTrue(errors) + + def test_validate_template_location(self): + runner = MockRunners.OtherRunner() + options = PipelineOptions([ + '--template_location', 'abc', + ]) + validator = PipelineOptionsValidator(options, runner) + errors = validator.validate() + self.assertFalse(errors) + + def test_validate_dataflow_job_file(self): + runner = MockRunners.OtherRunner() + options = PipelineOptions([ + '--dataflow_job_file', 'abc' + ]) + validator = PipelineOptionsValidator(options, runner) + errors = validator.validate() + self.assertFalse(errors) + if __name__ == '__main__': logging.getLogger().setLevel(logging.INFO) unittest.main()