Repository: incubator-airflow Updated Branches: refs/heads/master d9bf1edd4 -> bc6feea03
[AIRFLOW-1509][AIRFLOW-442] SFTP Sensor Closes #3213 from sdiazb/sftp_sensor Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/bc6feea0 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/bc6feea0 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/bc6feea0 Branch: refs/heads/master Commit: bc6feea0313325346393675792c18b531b526dca Parents: d9bf1ed Author: Sergio DiÌaz Bautista <s.diazbauti...@gmail.com> Authored: Thu Apr 12 09:15:10 2018 +0200 Committer: Fokko Driesprong <fokkodriespr...@godatadriven.com> Committed: Thu Apr 12 09:15:10 2018 +0200 ---------------------------------------------------------------------- airflow/contrib/sensors/sftp_sensor.py | 47 +++++++++++++++++++ docs/code.rst | 1 + tests/contrib/sensors/test_sftp_sensor.py | 64 ++++++++++++++++++++++++++ 3 files changed, 112 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/bc6feea0/airflow/contrib/sensors/sftp_sensor.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/sensors/sftp_sensor.py b/airflow/contrib/sensors/sftp_sensor.py new file mode 100644 index 0000000..6d6fbdd --- /dev/null +++ b/airflow/contrib/sensors/sftp_sensor.py @@ -0,0 +1,47 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +from paramiko import SFTP_NO_SUCH_FILE +from airflow.contrib.hooks.sftp_hook import SFTPHook +from airflow.operators.sensors import BaseSensorOperator +from airflow.utils.decorators import apply_defaults + + +class SFTPSensor(BaseSensorOperator): + """ + Waits for a file or directory to be present on SFTP. + :param path: Remote file or directory path + :type path: str + :param sftp_conn_id: The connection to run the sensor against + :type sftp_conn_id: str + """ + template_fields = ('path',) + + @apply_defaults + def __init__(self, path, sftp_conn_id='sftp_default', *args, **kwargs): + super(SFTPSensor, self).__init__(*args, **kwargs) + self.path = path + self.hook = SFTPHook(sftp_conn_id=sftp_conn_id) + + def poke(self, context): + logging.info('Poking for %s', self.path) + try: + self.hook.get_mod_time(self.path) + except IOError as e: + if e.errno != SFTP_NO_SUCH_FILE: + raise e + return False + self.hook.close_conn() + return True http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/bc6feea0/docs/code.rst ---------------------------------------------------------------------- diff --git a/docs/code.rst b/docs/code.rst index a30d117..97327b1 100644 --- a/docs/code.rst +++ b/docs/code.rst @@ -203,6 +203,7 @@ Sensors .. autoclass:: airflow.contrib.sensors.pubsub_sensor.PubSubPullSensor .. autoclass:: airflow.contrib.sensors.qubole_sensor.QuboleSensor .. autoclass:: airflow.contrib.sensors.redis_key_sensor.RedisKeySensor +.. autoclass:: airflow.contrib.sensors.sftp_sensor.SFTPSensor .. autoclass:: airflow.contrib.sensors.wasb_sensor.WasbBlobSensor .. _macros: http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/bc6feea0/tests/contrib/sensors/test_sftp_sensor.py ---------------------------------------------------------------------- diff --git a/tests/contrib/sensors/test_sftp_sensor.py b/tests/contrib/sensors/test_sftp_sensor.py new file mode 100644 index 0000000..091b95f --- /dev/null +++ b/tests/contrib/sensors/test_sftp_sensor.py @@ -0,0 +1,64 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest +from mock import patch +from paramiko import SFTP_NO_SUCH_FILE, SFTP_FAILURE +from airflow.contrib.sensors.sftp_sensor import SFTPSensor + + +class SFTPSensorTest(unittest.TestCase): + @patch('airflow.contrib.sensors.sftp_sensor.SFTPHook') + def test_file_present(self, sftp_hook_mock): + sftp_hook_mock.return_value.get_mod_time.return_value = '19700101000000' + sftp_sensor = SFTPSensor( + task_id='unit_test', + path='/path/to/file/1970-01-01.txt') + context = { + 'ds': '1970-01-01' + } + output = sftp_sensor.poke(context) + sftp_hook_mock.return_value.get_mod_time.assert_called_with( + '/path/to/file/1970-01-01.txt') + self.assertTrue(output) + + @patch('airflow.contrib.sensors.sftp_sensor.SFTPHook') + def test_file_absent(self, sftp_hook_mock): + sftp_hook_mock.return_value.get_mod_time.side_effect = IOError( + SFTP_NO_SUCH_FILE, 'File missing') + sftp_sensor = SFTPSensor( + task_id='unit_test', + path='/path/to/file/1970-01-01.txt') + context = { + 'ds': '1970-01-01' + } + output = sftp_sensor.poke(context) + sftp_hook_mock.return_value.get_mod_time.assert_called_with( + '/path/to/file/1970-01-01.txt') + self.assertFalse(output) + + @patch('airflow.contrib.sensors.sftp_sensor.SFTPHook') + def test_sftp_failure(self, sftp_hook_mock): + sftp_hook_mock.return_value.get_mod_time.side_effect = IOError( + SFTP_FAILURE, 'SFTP failure') + sftp_sensor = SFTPSensor( + task_id='unit_test', + path='/path/to/file/1970-01-01.txt') + context = { + 'ds': '1970-01-01' + } + with self.assertRaises(IOError): + sftp_sensor.poke(context) + sftp_hook_mock.return_value.get_mod_time.assert_called_with( + '/path/to/file/1970-01-01.txt')