Repository: incubator-airflow Updated Branches: refs/heads/master 5cb530b45 -> 34f827f04
[AIRFLOW-2301] Sync files of an S3 key with a GCS path Closes #3216 from wileeam/s3-to-gcs-operator Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/34f827f0 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/34f827f0 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/34f827f0 Branch: refs/heads/master Commit: 34f827f04c1c87678e1131bff93026a037dd5f8e Parents: 5cb530b Author: Guillermo Rodriguez Cano <guillermo.rodriguezc...@bonnierbroadcasting.com> Authored: Fri Apr 13 09:32:22 2018 +0200 Committer: Fokko Driesprong <fokkodriespr...@godatadriven.com> Committed: Fri Apr 13 09:32:22 2018 +0200 ---------------------------------------------------------------------- airflow/contrib/operators/s3_to_gcs_operator.py | 186 +++++++++++++++++++ docs/code.rst | 1 + docs/integration.rst | 34 ++-- .../operators/test_s3_to_gcs_operator.py | 96 ++++++++++ 4 files changed, 304 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/34f827f0/airflow/contrib/operators/s3_to_gcs_operator.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/s3_to_gcs_operator.py b/airflow/contrib/operators/s3_to_gcs_operator.py new file mode 100644 index 0000000..666029f --- /dev/null +++ b/airflow/contrib/operators/s3_to_gcs_operator.py @@ -0,0 +1,186 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from tempfile import NamedTemporaryFile + +from airflow.contrib.hooks.gcs_hook import (GoogleCloudStorageHook, + _parse_gcs_url) +from airflow.contrib.operators.s3_list_operator import S3ListOperator +from airflow.exceptions import AirflowException +from airflow.hooks.S3_hook import S3Hook +from airflow.utils.decorators import apply_defaults + + +class S3ToGoogleCloudStorageOperator(S3ListOperator): + """ + Synchronizes an S3 key, possibly a prefix, with a Google Cloud Storage + destination path. + + :param bucket: The S3 bucket where to find the objects. + :type bucket: string + :param prefix: Prefix string which filters objects whose name begin with + such prefix. + :type prefix: string + :param delimiter: The delimiter by which you want to filter the objects on. + E.g. to list CSV files from a S3 key you would do the following, + `delimiter='.csv'`. + :type delimiter: string + :param aws_conn_id: The source S3 connection + :type aws_conn_id: str + :param dest_gcs_conn_id: The destination connection ID to use + when connecting to Google Cloud Storage. + :type dest_gcs_conn_id: string + :param dest_gcs: The destination Google Cloud Storage bucket and prefix + where you want to store the files. + :type dest_gcs: string + :param delegate_to: The account to impersonate, if any. + For this to work, the service account making the request must have + domain-wide delegation enabled. + :type delegate_to: string + :param replace: Whether you want to replace existing destination files + or not. + :type replace: bool + + + **Example**: + .. code-block:: python + s3_to_gcs_op = S3ToGoogleCloudStorageOperator( + task_id='s3_to_gcs_example', + bucket='my-s3-bucket', + prefix='data/customers-201804', + dest_gcs_conn_id='google_cloud_default', + dest_gcs='gs://my.gcs.bucket/some/customers/', + replace=False, + dag=my-dag) + + Note that ``bucket``, ``prefix``, ``delimiter`` and ``dest_gcs`` are + templated, so you can use variables in them if you wish. + """ + + template_fields = ('bucket', 'prefix', 'delimiter', 'dest_gcs') + ui_color = '#e09411' + + @apply_defaults + def __init__(self, + bucket, + prefix='', + delimiter='', + aws_conn_id='aws_default', + dest_gcs_conn_id=None, + dest_gcs=None, + delegate_to=None, + replace=False, + *args, + **kwargs): + + super(S3ToGoogleCloudStorageOperator, self).__init__( + bucket=bucket, + prefix=prefix, + delimiter=delimiter, + aws_conn_id=aws_conn_id, + *args, + **kwargs) + self.dest_gcs_conn_id = dest_gcs_conn_id + self.dest_gcs = dest_gcs + self.delegate_to = delegate_to + self.replace = replace + + if dest_gcs and not self._gcs_object_is_directory(self.dest_gcs): + self.log.info('Destination Google Cloud Storage path is not a ' + 'valid "directory", define one and end the path ' + 'with a slash: "/".') + raise AirflowException('The destination Google Cloud Storage path ' + 'must end with a slash "/".') + + def execute(self, context): + # use the super method to list all the files in an S3 bucket/key + files = super(S3ToGoogleCloudStorageOperator, self).execute(context) + + gcs_hook = GoogleCloudStorageHook( + google_cloud_storage_conn_id=self.dest_gcs_conn_id, + delegate_to=self.delegate_to) + + if not self.replace: + # if we are not replacing -> list all files in the GCS bucket + # and only keep those files which are present in + # S3 and not in Google Cloud Storage + bucket_name, object_prefix = _parse_gcs_url(self.dest_gcs) + existing_files_prefixed = gcs_hook.list( + bucket_name, prefix=object_prefix) + + existing_files = [] + + if existing_files_prefixed: + # Remove the object prefix itself, an empty directory was found + if object_prefix in existing_files_prefixed: + existing_files_prefixed.remove(object_prefix) + + # Remove the object prefix from all object string paths + for f in existing_files_prefixed: + if f.startswith(object_prefix): + existing_files.append(f[len(object_prefix):]) + else: + existing_files.append(f) + + files = set(files) - set(existing_files) + if len(files) > 0: + self.log.info('{0} files are going to be synced: {1}.'.format( + len(files), files)) + else: + self.log.info( + 'There are no new files to sync. Have a nice day!') + + if files: + hook = S3Hook(aws_conn_id=self.aws_conn_id) + + for file in files: + # GCS hook builds its own in-memory file so we have to create + # and pass the path + file_object = hook.get_key(file, self.bucket) + with NamedTemporaryFile(mode='wb', delete=True) as f: + file_object.download_fileobj(f) + f.flush() + + dest_gcs_bucket, dest_gcs_object_prefix = _parse_gcs_url( + self.dest_gcs) + # There will always be a '/' before file because it is + # enforced at instantiation time + dest_gcs_object = dest_gcs_object_prefix + file + + # Sync is sequential and the hook already logs too much + # so skip this for now + # self.log.info( + # 'Saving file {0} from S3 bucket {1} in GCS bucket {2}' + # ' as object {3}'.format(file, self.bucket, + # dest_gcs_bucket, + # dest_gcs_object)) + + gcs_hook.upload(dest_gcs_bucket, dest_gcs_object, f.name) + + self.log.info( + "All done, uploaded %d files to Google Cloud Storage", + len(files)) + else: + self.log.info( + 'In sync, no files needed to be uploaded to Google Cloud' + 'Storage') + + return files + + # Following functionality may be better suited in + # airflow/contrib/hooks/gcs_hook.py + def _gcs_object_is_directory(self, object): + bucket, blob = _parse_gcs_url(object) + + return blob.endswith('/') http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/34f827f0/docs/code.rst ---------------------------------------------------------------------- diff --git a/docs/code.rst b/docs/code.rst index 97327b1..d4810f0 100644 --- a/docs/code.rst +++ b/docs/code.rst @@ -170,6 +170,7 @@ Operators .. autoclass:: airflow.contrib.operators.pubsub_operator.PubSubPublishOperator .. autoclass:: airflow.contrib.operators.qubole_operator.QuboleOperator .. autoclass:: airflow.contrib.operators.s3_list_operator.S3ListOperator +.. autoclass:: airflow.contrib.operators.s3_to_gcs_operator.S3ToGoogleCloudStorageOperator .. autoclass:: airflow.contrib.operators.sftp_operator.SFTPOperator .. autoclass:: airflow.contrib.operators.slack_webhook_operator.SlackWebhookOperator .. autoclass:: airflow.contrib.operators.snowflake_operator.SnowflakeOperator http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/34f827f0/docs/integration.rst ---------------------------------------------------------------------- diff --git a/docs/integration.rst b/docs/integration.rst index d4e8b05..77425aa 100644 --- a/docs/integration.rst +++ b/docs/integration.rst @@ -195,17 +195,18 @@ EmrHook AWS S3 '''''' -- :ref:`S3ListOperator` : Lists the files matching a key prefix from a S3 location. +- :ref:`S3Hook` : Interact with AWS S3. - :ref:`S3FileTransformOperator` : Copies data from a source S3 location to a temporary location on the local filesystem. +- :ref:`S3ListOperator` : Lists the files matching a key prefix from a S3 location. +- :ref:`S3ToGoogleCloudStorageOperator` : Syncs an S3 location with a Google Cloud Storage bucket. - :ref:`S3ToHiveTransfer` : Moves data from S3 to Hive. The operator downloads a file from S3, stores the file locally before loading it into a Hive table. -- :ref:`S3Hook` : Interact with AWS S3. -.. _S3ListOperator: +.. _S3Hook: -S3ListOperator -"""""""""""""" +S3Hook +"""""" -.. autoclass:: airflow.contrib.operators.s3_list_operator.S3ListOperator +.. autoclass:: airflow.hooks.S3_hook.S3Hook .. _S3FileTransformOperator: @@ -214,6 +215,20 @@ S3FileTransformOperator .. autoclass:: airflow.operators.s3_file_transform_operator.S3FileTransformOperator +.. _S3ListOperator: + +S3ListOperator +"""""""""""""" + +.. autoclass:: airflow.contrib.operators.s3_list_operator.S3ListOperator + +.. _S3ToGoogleCloudStorageOperator: + +S3ToGoogleCloudStorageOperator +"""""""""""""""""""""""""""""" + +.. autoclass:: airflow.contrib.operators.s3_to_gcs_operator.S3ToGoogleCloudStorageOperator + .. _S3ToHiveTransfer: S3ToHiveTransfer @@ -221,13 +236,6 @@ S3ToHiveTransfer .. autoclass:: airflow.operators.s3_to_hive_operator.S3ToHiveTransfer -.. _S3Hook: - -S3Hook -""""""" - -.. autoclass:: airflow.hooks.S3_hook.S3Hook - AWS EC2 Container Service ''''''''''''''''''''''''' http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/34f827f0/tests/contrib/operators/test_s3_to_gcs_operator.py ---------------------------------------------------------------------- diff --git a/tests/contrib/operators/test_s3_to_gcs_operator.py b/tests/contrib/operators/test_s3_to_gcs_operator.py new file mode 100644 index 0000000..66c4531 --- /dev/null +++ b/tests/contrib/operators/test_s3_to_gcs_operator.py @@ -0,0 +1,96 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest + +from airflow.contrib.hooks.gcs_hook import _parse_gcs_url +from airflow.contrib.operators.s3_to_gcs_operator import \ + S3ToGoogleCloudStorageOperator + +try: + from unittest import mock +except ImportError: + try: + import mock + except ImportError: + mock = None + +TASK_ID = 'test-s3-gcs-operator' +S3_BUCKET = 'test-bucket' +S3_PREFIX = 'TEST' +S3_DELIMITER = '/' +GCS_PATH_PREFIX = 'gs://gcs-bucket/data/' +MOCK_FILES = ["TEST1.csv", "TEST2.csv", "TEST3.csv"] +AWS_CONN_ID = 'aws_default' +GCS_CONN_ID = 'google_cloud_default' + + +class S3ToGoogleCloudStorageOperatorTest(unittest.TestCase): + def test_init(self): + """Test S3ToGoogleCloudStorageOperator instance is properly initialized.""" + + operator = S3ToGoogleCloudStorageOperator( + task_id=TASK_ID, + bucket=S3_BUCKET, + prefix=S3_PREFIX, + delimiter=S3_DELIMITER, + dest_gcs_conn_id=GCS_CONN_ID, + dest_gcs=GCS_PATH_PREFIX) + + self.assertEqual(operator.task_id, TASK_ID) + self.assertEqual(operator.bucket, S3_BUCKET) + self.assertEqual(operator.prefix, S3_PREFIX) + self.assertEqual(operator.delimiter, S3_DELIMITER) + self.assertEqual(operator.dest_gcs_conn_id, GCS_CONN_ID) + self.assertEqual(operator.dest_gcs, GCS_PATH_PREFIX) + + @mock.patch('airflow.contrib.operators.s3_to_gcs_operator.S3Hook') + @mock.patch('airflow.contrib.operators.s3_list_operator.S3Hook') + @mock.patch( + 'airflow.contrib.operators.s3_to_gcs_operator.GoogleCloudStorageHook') + def test_execute(self, gcs_mock_hook, s3_one_mock_hook, s3_two_mock_hook): + """Test the execute function when the run is successful.""" + + operator = S3ToGoogleCloudStorageOperator( + task_id=TASK_ID, + bucket=S3_BUCKET, + prefix=S3_PREFIX, + delimiter=S3_DELIMITER, + dest_gcs_conn_id=GCS_CONN_ID, + dest_gcs=GCS_PATH_PREFIX) + + s3_one_mock_hook.return_value.list_keys.return_value = MOCK_FILES + s3_two_mock_hook.return_value.list_keys.return_value = MOCK_FILES + + def _assert_upload(bucket, object, tmp_filename): + gcs_bucket, gcs_object_path = _parse_gcs_url(GCS_PATH_PREFIX) + + self.assertEqual(gcs_bucket, bucket) + self.assertIn(object[len(gcs_object_path):], MOCK_FILES) + + gcs_mock_hook.return_value.upload.side_effect = _assert_upload + + uploaded_files = operator.execute(None) + + s3_one_mock_hook.assert_called_once_with(aws_conn_id=AWS_CONN_ID) + s3_two_mock_hook.assert_called_once_with(aws_conn_id=AWS_CONN_ID) + gcs_mock_hook.assert_called_once_with( + google_cloud_storage_conn_id=GCS_CONN_ID, delegate_to=None) + + # we expect MOCK_FILES to be uploaded + self.assertEqual(sorted(MOCK_FILES), sorted(uploaded_files)) + + +if __name__ == '__main__': + unittest.main()