Repository: incubator-airflow
Updated Branches:
  refs/heads/master 5cb530b45 -> 34f827f04


[AIRFLOW-2301] Sync files of an S3 key with a GCS path

Closes #3216 from wileeam/s3-to-gcs-operator


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/34f827f0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/34f827f0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/34f827f0

Branch: refs/heads/master
Commit: 34f827f04c1c87678e1131bff93026a037dd5f8e
Parents: 5cb530b
Author: Guillermo Rodriguez Cano 
<guillermo.rodriguezc...@bonnierbroadcasting.com>
Authored: Fri Apr 13 09:32:22 2018 +0200
Committer: Fokko Driesprong <fokkodriespr...@godatadriven.com>
Committed: Fri Apr 13 09:32:22 2018 +0200

----------------------------------------------------------------------
 airflow/contrib/operators/s3_to_gcs_operator.py | 186 +++++++++++++++++++
 docs/code.rst                                   |   1 +
 docs/integration.rst                            |  34 ++--
 .../operators/test_s3_to_gcs_operator.py        |  96 ++++++++++
 4 files changed, 304 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/34f827f0/airflow/contrib/operators/s3_to_gcs_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/s3_to_gcs_operator.py 
b/airflow/contrib/operators/s3_to_gcs_operator.py
new file mode 100644
index 0000000..666029f
--- /dev/null
+++ b/airflow/contrib/operators/s3_to_gcs_operator.py
@@ -0,0 +1,186 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from tempfile import NamedTemporaryFile
+
+from airflow.contrib.hooks.gcs_hook import (GoogleCloudStorageHook,
+                                            _parse_gcs_url)
+from airflow.contrib.operators.s3_list_operator import S3ListOperator
+from airflow.exceptions import AirflowException
+from airflow.hooks.S3_hook import S3Hook
+from airflow.utils.decorators import apply_defaults
+
+
+class S3ToGoogleCloudStorageOperator(S3ListOperator):
+    """
+    Synchronizes an S3 key, possibly a prefix, with a Google Cloud Storage
+    destination path.
+
+    :param bucket: The S3 bucket where to find the objects.
+    :type bucket: string
+    :param prefix: Prefix string which filters objects whose name begin with
+        such prefix.
+    :type prefix: string
+    :param delimiter: The delimiter by which you want to filter the objects on.
+        E.g. to list CSV files from a S3 key you would do the following,
+        `delimiter='.csv'`.
+    :type delimiter: string
+    :param aws_conn_id: The source S3 connection
+    :type aws_conn_id: str
+    :param dest_gcs_conn_id: The destination connection ID to use
+        when connecting to Google Cloud Storage.
+    :type dest_gcs_conn_id: string
+    :param dest_gcs: The destination Google Cloud Storage bucket and prefix
+        where you want to store the files.
+    :type dest_gcs: string
+    :param delegate_to: The account to impersonate, if any.
+        For this to work, the service account making the request must have
+        domain-wide delegation enabled.
+    :type delegate_to: string
+    :param replace: Whether you want to replace existing destination files
+        or not.
+    :type replace: bool
+
+
+    **Example**:
+    .. code-block:: python
+       s3_to_gcs_op = S3ToGoogleCloudStorageOperator(
+            task_id='s3_to_gcs_example',
+            bucket='my-s3-bucket',
+            prefix='data/customers-201804',
+            dest_gcs_conn_id='google_cloud_default',
+            dest_gcs='gs://my.gcs.bucket/some/customers/',
+            replace=False,
+            dag=my-dag)
+
+    Note that ``bucket``, ``prefix``, ``delimiter`` and ``dest_gcs`` are
+    templated, so you can use variables in them if you wish.
+    """
+
+    template_fields = ('bucket', 'prefix', 'delimiter', 'dest_gcs')
+    ui_color = '#e09411'
+
+    @apply_defaults
+    def __init__(self,
+                 bucket,
+                 prefix='',
+                 delimiter='',
+                 aws_conn_id='aws_default',
+                 dest_gcs_conn_id=None,
+                 dest_gcs=None,
+                 delegate_to=None,
+                 replace=False,
+                 *args,
+                 **kwargs):
+
+        super(S3ToGoogleCloudStorageOperator, self).__init__(
+            bucket=bucket,
+            prefix=prefix,
+            delimiter=delimiter,
+            aws_conn_id=aws_conn_id,
+            *args,
+            **kwargs)
+        self.dest_gcs_conn_id = dest_gcs_conn_id
+        self.dest_gcs = dest_gcs
+        self.delegate_to = delegate_to
+        self.replace = replace
+
+        if dest_gcs and not self._gcs_object_is_directory(self.dest_gcs):
+            self.log.info('Destination Google Cloud Storage path is not a '
+                          'valid "directory", define one and end the path '
+                          'with a slash: "/".')
+            raise AirflowException('The destination Google Cloud Storage path '
+                                   'must end with a slash "/".')
+
+    def execute(self, context):
+        # use the super method to list all the files in an S3 bucket/key
+        files = super(S3ToGoogleCloudStorageOperator, self).execute(context)
+
+        gcs_hook = GoogleCloudStorageHook(
+            google_cloud_storage_conn_id=self.dest_gcs_conn_id,
+            delegate_to=self.delegate_to)
+
+        if not self.replace:
+            # if we are not replacing -> list all files in the GCS bucket
+            # and only keep those files which are present in
+            # S3 and not in Google Cloud Storage
+            bucket_name, object_prefix = _parse_gcs_url(self.dest_gcs)
+            existing_files_prefixed = gcs_hook.list(
+                bucket_name, prefix=object_prefix)
+
+            existing_files = []
+
+            if existing_files_prefixed:
+                # Remove the object prefix itself, an empty directory was found
+                if object_prefix in existing_files_prefixed:
+                    existing_files_prefixed.remove(object_prefix)
+
+                # Remove the object prefix from all object string paths
+                for f in existing_files_prefixed:
+                    if f.startswith(object_prefix):
+                        existing_files.append(f[len(object_prefix):])
+                    else:
+                        existing_files.append(f)
+
+            files = set(files) - set(existing_files)
+            if len(files) > 0:
+                self.log.info('{0} files are going to be synced: {1}.'.format(
+                    len(files), files))
+            else:
+                self.log.info(
+                    'There are no new files to sync. Have a nice day!')
+
+        if files:
+            hook = S3Hook(aws_conn_id=self.aws_conn_id)
+
+            for file in files:
+                # GCS hook builds its own in-memory file so we have to create
+                # and pass the path
+                file_object = hook.get_key(file, self.bucket)
+                with NamedTemporaryFile(mode='wb', delete=True) as f:
+                    file_object.download_fileobj(f)
+                    f.flush()
+
+                    dest_gcs_bucket, dest_gcs_object_prefix = _parse_gcs_url(
+                        self.dest_gcs)
+                    # There will always be a '/' before file because it is
+                    # enforced at instantiation time
+                    dest_gcs_object = dest_gcs_object_prefix + file
+
+                    # Sync is sequential and the hook already logs too much
+                    # so skip this for now
+                    # self.log.info(
+                    #     'Saving file {0} from S3 bucket {1} in GCS bucket 
{2}'
+                    #     ' as object {3}'.format(file, self.bucket,
+                    #                             dest_gcs_bucket,
+                    #                             dest_gcs_object))
+
+                    gcs_hook.upload(dest_gcs_bucket, dest_gcs_object, f.name)
+
+            self.log.info(
+                "All done, uploaded %d files to Google Cloud Storage",
+                len(files))
+        else:
+            self.log.info(
+                'In sync, no files needed to be uploaded to Google Cloud'
+                'Storage')
+
+        return files
+
+    # Following functionality may be better suited in
+    # airflow/contrib/hooks/gcs_hook.py
+    def _gcs_object_is_directory(self, object):
+        bucket, blob = _parse_gcs_url(object)
+
+        return blob.endswith('/')

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/34f827f0/docs/code.rst
----------------------------------------------------------------------
diff --git a/docs/code.rst b/docs/code.rst
index 97327b1..d4810f0 100644
--- a/docs/code.rst
+++ b/docs/code.rst
@@ -170,6 +170,7 @@ Operators
 .. autoclass:: airflow.contrib.operators.pubsub_operator.PubSubPublishOperator
 .. autoclass:: airflow.contrib.operators.qubole_operator.QuboleOperator
 .. autoclass:: airflow.contrib.operators.s3_list_operator.S3ListOperator
+.. autoclass:: 
airflow.contrib.operators.s3_to_gcs_operator.S3ToGoogleCloudStorageOperator
 .. autoclass:: airflow.contrib.operators.sftp_operator.SFTPOperator
 .. autoclass:: 
airflow.contrib.operators.slack_webhook_operator.SlackWebhookOperator
 .. autoclass:: airflow.contrib.operators.snowflake_operator.SnowflakeOperator

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/34f827f0/docs/integration.rst
----------------------------------------------------------------------
diff --git a/docs/integration.rst b/docs/integration.rst
index d4e8b05..77425aa 100644
--- a/docs/integration.rst
+++ b/docs/integration.rst
@@ -195,17 +195,18 @@ EmrHook
 AWS S3
 ''''''
 
-- :ref:`S3ListOperator` : Lists the files matching a key prefix from a S3 
location.
+- :ref:`S3Hook` : Interact with AWS S3.
 - :ref:`S3FileTransformOperator` : Copies data from a source S3 location to a 
temporary location on the local filesystem.
+- :ref:`S3ListOperator` : Lists the files matching a key prefix from a S3 
location.
+- :ref:`S3ToGoogleCloudStorageOperator` : Syncs an S3 location with a Google 
Cloud Storage bucket.
 - :ref:`S3ToHiveTransfer` : Moves data from S3 to Hive. The operator downloads 
a file from S3, stores the file locally before loading it into a Hive table.
-- :ref:`S3Hook` : Interact with AWS S3.
 
-.. _S3ListOperator:
+.. _S3Hook:
 
-S3ListOperator
-""""""""""""""
+S3Hook
+""""""
 
-.. autoclass:: airflow.contrib.operators.s3_list_operator.S3ListOperator
+.. autoclass:: airflow.hooks.S3_hook.S3Hook
 
 .. _S3FileTransformOperator:
 
@@ -214,6 +215,20 @@ S3FileTransformOperator
 
 .. autoclass:: 
airflow.operators.s3_file_transform_operator.S3FileTransformOperator
 
+.. _S3ListOperator:
+
+S3ListOperator
+""""""""""""""
+
+.. autoclass:: airflow.contrib.operators.s3_list_operator.S3ListOperator
+
+.. _S3ToGoogleCloudStorageOperator:
+
+S3ToGoogleCloudStorageOperator
+""""""""""""""""""""""""""""""
+
+.. autoclass:: 
airflow.contrib.operators.s3_to_gcs_operator.S3ToGoogleCloudStorageOperator
+
 .. _S3ToHiveTransfer:
 
 S3ToHiveTransfer
@@ -221,13 +236,6 @@ S3ToHiveTransfer
 
 .. autoclass:: airflow.operators.s3_to_hive_operator.S3ToHiveTransfer
 
-.. _S3Hook:
-
-S3Hook
-"""""""
-
-.. autoclass:: airflow.hooks.S3_hook.S3Hook
-
 
 AWS EC2 Container Service
 '''''''''''''''''''''''''

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/34f827f0/tests/contrib/operators/test_s3_to_gcs_operator.py
----------------------------------------------------------------------
diff --git a/tests/contrib/operators/test_s3_to_gcs_operator.py 
b/tests/contrib/operators/test_s3_to_gcs_operator.py
new file mode 100644
index 0000000..66c4531
--- /dev/null
+++ b/tests/contrib/operators/test_s3_to_gcs_operator.py
@@ -0,0 +1,96 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import unittest
+
+from airflow.contrib.hooks.gcs_hook import _parse_gcs_url
+from airflow.contrib.operators.s3_to_gcs_operator import \
+    S3ToGoogleCloudStorageOperator
+
+try:
+    from unittest import mock
+except ImportError:
+    try:
+        import mock
+    except ImportError:
+        mock = None
+
+TASK_ID = 'test-s3-gcs-operator'
+S3_BUCKET = 'test-bucket'
+S3_PREFIX = 'TEST'
+S3_DELIMITER = '/'
+GCS_PATH_PREFIX = 'gs://gcs-bucket/data/'
+MOCK_FILES = ["TEST1.csv", "TEST2.csv", "TEST3.csv"]
+AWS_CONN_ID = 'aws_default'
+GCS_CONN_ID = 'google_cloud_default'
+
+
+class S3ToGoogleCloudStorageOperatorTest(unittest.TestCase):
+    def test_init(self):
+        """Test S3ToGoogleCloudStorageOperator instance is properly 
initialized."""
+
+        operator = S3ToGoogleCloudStorageOperator(
+            task_id=TASK_ID,
+            bucket=S3_BUCKET,
+            prefix=S3_PREFIX,
+            delimiter=S3_DELIMITER,
+            dest_gcs_conn_id=GCS_CONN_ID,
+            dest_gcs=GCS_PATH_PREFIX)
+
+        self.assertEqual(operator.task_id, TASK_ID)
+        self.assertEqual(operator.bucket, S3_BUCKET)
+        self.assertEqual(operator.prefix, S3_PREFIX)
+        self.assertEqual(operator.delimiter, S3_DELIMITER)
+        self.assertEqual(operator.dest_gcs_conn_id, GCS_CONN_ID)
+        self.assertEqual(operator.dest_gcs, GCS_PATH_PREFIX)
+
+    @mock.patch('airflow.contrib.operators.s3_to_gcs_operator.S3Hook')
+    @mock.patch('airflow.contrib.operators.s3_list_operator.S3Hook')
+    @mock.patch(
+        'airflow.contrib.operators.s3_to_gcs_operator.GoogleCloudStorageHook')
+    def test_execute(self, gcs_mock_hook, s3_one_mock_hook, s3_two_mock_hook):
+        """Test the execute function when the run is successful."""
+
+        operator = S3ToGoogleCloudStorageOperator(
+            task_id=TASK_ID,
+            bucket=S3_BUCKET,
+            prefix=S3_PREFIX,
+            delimiter=S3_DELIMITER,
+            dest_gcs_conn_id=GCS_CONN_ID,
+            dest_gcs=GCS_PATH_PREFIX)
+
+        s3_one_mock_hook.return_value.list_keys.return_value = MOCK_FILES
+        s3_two_mock_hook.return_value.list_keys.return_value = MOCK_FILES
+
+        def _assert_upload(bucket, object, tmp_filename):
+            gcs_bucket, gcs_object_path = _parse_gcs_url(GCS_PATH_PREFIX)
+
+            self.assertEqual(gcs_bucket, bucket)
+            self.assertIn(object[len(gcs_object_path):], MOCK_FILES)
+
+        gcs_mock_hook.return_value.upload.side_effect = _assert_upload
+
+        uploaded_files = operator.execute(None)
+
+        s3_one_mock_hook.assert_called_once_with(aws_conn_id=AWS_CONN_ID)
+        s3_two_mock_hook.assert_called_once_with(aws_conn_id=AWS_CONN_ID)
+        gcs_mock_hook.assert_called_once_with(
+            google_cloud_storage_conn_id=GCS_CONN_ID, delegate_to=None)
+
+        # we expect MOCK_FILES to be uploaded
+        self.assertEqual(sorted(MOCK_FILES), sorted(uploaded_files))
+
+
+if __name__ == '__main__':
+    unittest.main()

Reply via email to