Repository: incubator-airflow Updated Branches: refs/heads/master 3cac39674 -> 259c864a0
[AIRFLOW-781] Allow DataFlowOperators to accept jobs stored in GCS Closes #2037 from fenglu-g/master Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/259c864a Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/259c864a Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/259c864a Branch: refs/heads/master Commit: 259c864a07617458841471e71a1034ed196ba3fc Parents: 3cac396 Author: Feng Lu <fen...@fengcloud.hot.corp.google.com> Authored: Wed Feb 1 09:36:02 2017 -0800 Committer: Chris Riccomini <chr...@wepay.com> Committed: Wed Feb 1 09:36:25 2017 -0800 ---------------------------------------------------------------------- airflow/contrib/operators/dataflow_operator.py | 53 +++++++++++++++++++++ tests/contrib/operators/dataflow_operator.py | 12 +++-- 2 files changed, 62 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/259c864a/airflow/contrib/operators/dataflow_operator.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/dataflow_operator.py b/airflow/contrib/operators/dataflow_operator.py index ef49eb6..c1dca24 100644 --- a/airflow/contrib/operators/dataflow_operator.py +++ b/airflow/contrib/operators/dataflow_operator.py @@ -14,7 +14,9 @@ import copy import re +import uuid +from airflow.contrib.hooks.gcs_hook import GoogleCloudStorageHook from airflow.contrib.hooks.gcp_dataflow_hook import DataFlowHook from airflow.models import BaseOperator from airflow.utils.decorators import apply_defaults @@ -107,6 +109,9 @@ class DataFlowJavaOperator(BaseOperator): self.options = options def execute(self, context): + bucket_helper = GoogleCloudBucketHelper( + self.gcp_conn_id, self.delegate_to) + self.jar = bucket_helper.google_cloud_to_local(self.jar) hook = DataFlowHook(gcp_conn_id=self.gcp_conn_id, delegate_to=self.delegate_to) @@ -168,6 +173,9 @@ class DataFlowPythonOperator(BaseOperator): def execute(self, context): """Execute the python dataflow job.""" + bucket_helper = GoogleCloudBucketHelper( + self.gcp_conn_id, self.delegate_to) + self.py_file = bucket_helper.google_cloud_to_local(self.py_file) hook = DataFlowHook(gcp_conn_id=self.gcp_conn_id, delegate_to=self.delegate_to) dataflow_options = self.dataflow_default_options.copy() @@ -180,3 +188,48 @@ class DataFlowPythonOperator(BaseOperator): hook.start_python_dataflow( self.task_id, formatted_options, self.py_file, self.py_options) + + +class GoogleCloudBucketHelper(): + """GoogleCloudStorageHook helper class to download GCS object.""" + GCS_PREFIX_LENGTH = 5 + + def __init__(self, + gcp_conn_id='google_cloud_default', + delegate_to=None): + self._gcs_hook = GoogleCloudStorageHook(gcp_conn_id, delegate_to) + + def google_cloud_to_local(self, file_name): + """ + Checks whether the file specified by file_name is stored in Google Cloud + Storage (GCS), if so, downloads the file and saves it locally. The full + path of the saved file will be returned. Otherwise the local file_name + will be returned immediately. + + :param file_name: The full path of input file. + :type file_name: string + :return: The full path of local file. + :type: string + """ + if not file_name.startswith('gs://'): + return file_name + + # Extracts bucket_id and object_id by first removing 'gs://' prefix and + # then split the remaining by path delimiter '/'. + path_components = file_name[self.GCS_PREFIX_LENGTH:].split('/') + if path_components < 2: + raise Exception( + 'Invalid Google Cloud Storage (GCS) object path: {}.' + .format(file_name)) + + bucket_id = path_components[0] + object_id = '/'.join(path_components[1:]) + local_file = '/tmp/dataflow{}-{}'.format(str(uuid.uuid1())[:8], + path_components[-1]) + file_size = self._gcs_hook.download(bucket_id, object_id, local_file) + + if file_size > 0: + return local_file + raise Exception( + 'Failed to download Google Cloud Storage GCS object: {}' + .format(file_name)) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/259c864a/tests/contrib/operators/dataflow_operator.py ---------------------------------------------------------------------- diff --git a/tests/contrib/operators/dataflow_operator.py b/tests/contrib/operators/dataflow_operator.py index 4f887c1..5329723 100644 --- a/tests/contrib/operators/dataflow_operator.py +++ b/tests/contrib/operators/dataflow_operator.py @@ -27,7 +27,7 @@ except ImportError: TASK_ID = 'test-python-dataflow' -PY_FILE = 'apache_beam.examples.wordcount' +PY_FILE = 'gs://my-bucket/my-object.py' PY_OPTIONS = ['-m'] DEFAULT_OPTIONS = { 'project': 'test', @@ -36,6 +36,7 @@ DEFAULT_OPTIONS = { ADDITIONAL_OPTIONS = { 'output': 'gs://test/output' } +GCS_HOOK_STRING = 'airflow.contrib.operators.dataflow_operator.{}' class DataFlowPythonOperatorTest(unittest.TestCase): @@ -59,12 +60,14 @@ class DataFlowPythonOperatorTest(unittest.TestCase): ADDITIONAL_OPTIONS) @mock.patch('airflow.contrib.operators.dataflow_operator.DataFlowHook') - def test_exec(self, dataflow_mock): + @mock.patch(GCS_HOOK_STRING.format('GoogleCloudStorageHook')) + def test_exec(self, gcs_hook, dataflow_mock): """Test DataFlowHook is created and the right args are passed to start_python_workflow. """ start_python_hook = dataflow_mock.return_value.start_python_dataflow + gcs_download_hook = gcs_hook.return_value.download self.dataflow.execute(None) assert dataflow_mock.called expected_options = { @@ -72,5 +75,8 @@ class DataFlowPythonOperatorTest(unittest.TestCase): 'staging_location': 'gs://test/staging', 'output': 'gs://test/output' } + gcs_download_hook.assert_called_once_with( + 'my-bucket', 'my-object.py', mock.ANY) start_python_hook.assert_called_once_with(TASK_ID, expected_options, - PY_FILE, PY_OPTIONS) + mock.ANY, PY_OPTIONS) + self.assertTrue(self.dataflow.py_file.startswith('/tmp/dataflow'))