Repository: incubator-airflow Updated Branches: refs/heads/master 23a16f7ad -> 8de850162
[AIRFLOW-999] Add support for Redis database This PR includes a redis_hook and a redis_key_sensor to enable checking for key existence in redis. It also updates the documentation and add the relevant unit tests. - [x] Opened a PR on Github - [x] My PR addresses the following Airflow JIRA issues: - https://issues.apache.org/jira/browse/AIRFLOW-999 - [x] The PR title references the JIRA issues. For example, "[AIRFLOW-1] My Airflow PR" - [x] My PR adds unit tests - [ ] __OR__ my PR does not need testing for this extremely good reason: - [x] Here are some details about my PR: - [ ] Here are screenshots of any UI changes, if appropriate: - [x] Each commit subject references a JIRA issue. For example, "[AIRFLOW-1] Add new feature" - [x] Multiple commits addressing the same JIRA issue have been squashed - [x] My commits follow the guidelines from "[How to write a good git commit message](http://chris.beams.io/posts/git- commit/)": 1. Subject is separated from body by a blank line 2. Subject is limited to 50 characters 3. Subject does not end with a period 4. Subject uses the imperative mood ("add", not "adding") 5. Body wraps at 72 characters 6. Body explains "what" and "why", not "how" Closes #2165 from msempere/AIRFLOW-999/support- for-redis-database Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/8de85016 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/8de85016 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/8de85016 Branch: refs/heads/master Commit: 8de85016265443987a0e0fff406e996d421dc9d6 Parents: 23a16f7 Author: MSempere <msemp...@gmx.com> Authored: Mon Mar 20 11:10:55 2017 -0700 Committer: Arthur Wiedmer <arthur.wied...@gmail.com> Committed: Mon Mar 20 11:11:31 2017 -0700 ---------------------------------------------------------------------- airflow/contrib/hooks/redis_hook.py | 92 ++++++++++++++++++++++++ airflow/contrib/sensors/redis_key_sensor.py | 46 ++++++++++++ airflow/models.py | 4 ++ airflow/utils/db.py | 5 ++ docs/installation.rst | 2 + setup.py | 2 + tests/contrib/hooks/test_redis_hook.py | 46 ++++++++++++ tests/contrib/sensors/redis_sensor.py | 64 +++++++++++++++++ 8 files changed, 261 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/8de85016/airflow/contrib/hooks/redis_hook.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/hooks/redis_hook.py b/airflow/contrib/hooks/redis_hook.py new file mode 100644 index 0000000..936eff8 --- /dev/null +++ b/airflow/contrib/hooks/redis_hook.py @@ -0,0 +1,92 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +RedisHook module +""" + +import logging + +from redis import StrictRedis + +from airflow.exceptions import AirflowException +from airflow.hooks.base_hook import BaseHook + + +class RedisHook(BaseHook): + """ + Hook to interact with Redis database + """ + def __init__(self, redis_conn_id='redis_default'): + """ + Prepares hook to connect to a Redis database. + + :param conn_id: the name of the connection that has the parameters + we need to connect to Redis. + """ + self.redis_conn_id = redis_conn_id + self.client = None + conn = self.get_connection(self.redis_conn_id) + self.host = conn.host + self.port = int(conn.port) + self.password = conn.password + self.db = int(conn.extra_dejson.get('db', 0)) + self.logger = logging.getLogger(__name__) + self.logger.debug( + '''Connection "{conn}": + \thost: {host} + \tport: {port} + \textra: {extra} + '''.format( + conn=self.redis_conn_id, + host=self.host, + port=self.port, + extra=conn.extra_dejson + ) + ) + + def get_conn(self): + """ + Returns a Redis connection. + """ + if not self.client: + self.logger.debug( + 'generating Redis client for conn_id "{conn}" on ' + '{host}:{port}:{db}'.format(conn=self.redis_conn_id, + host=self.host, + port=self.port, + db=self.db)) + try: + self.client = StrictRedis( + host=self.host, + port=self.port, + password=self.password, + db=self.db) + except Exception as general_error: + raise AirflowException( + 'Failed to create Redis client, error: {error}'.format( + error=str(general_error) + ) + ) + + return self.client + + def key_exists(self, key): + """ + Checks if a key exists in Redis database + + :param key: The key to check the existence. + :type key: string + """ + return self.get_conn().exists(key) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/8de85016/airflow/contrib/sensors/redis_key_sensor.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/sensors/redis_key_sensor.py b/airflow/contrib/sensors/redis_key_sensor.py new file mode 100644 index 0000000..4cab407 --- /dev/null +++ b/airflow/contrib/sensors/redis_key_sensor.py @@ -0,0 +1,46 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging + +from airflow.contrib.hooks.redis_hook import RedisHook +from airflow.operators.sensors import BaseSensorOperator +from airflow.utils.decorators import apply_defaults + + +class RedisKeySensor(BaseSensorOperator): + """ + Checks for the existence of a key in a Redis database + """ + template_fields = ('key',) + ui_color = '#f0eee4' + + @apply_defaults + def __init__(self, key, redis_conn_id, *args, **kwargs): + """ + Create a new RedisKeySensor + + :param key: The key to be monitored + :type key: string + :param redis_conn_id: The connection ID to use when connecting to Redis DB. + :type redis_conn_id: string + """ + super(RedisKeySensor, self).__init__(*args, **kwargs) + self.logger = logging.getLogger(__name__) + self.redis_conn_id = redis_conn_id + self.key = key + + def poke(self, context): + self.logger.info('Sensor check existence of key: %s', self.key) + return RedisHook(self.redis_conn_id).key_exists(self.key) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/8de85016/airflow/models.py ---------------------------------------------------------------------- diff --git a/airflow/models.py b/airflow/models.py index f2d955b..a7d2916 100755 --- a/airflow/models.py +++ b/airflow/models.py @@ -541,6 +541,7 @@ class Connection(Base): ('mssql', 'Microsoft SQL Server'), ('mesos_framework-id', 'Mesos Framework ID'), ('jira', 'JIRA',), + ('redis', 'Redis',), ] def __init__( @@ -670,6 +671,9 @@ class Connection(Base): elif self.conn_type == 'jira': from airflow.contrib.hooks.jira_hook import JiraHook return JiraHook(jira_conn_id=self.conn_id) + elif self.conn_type == 'redis': + from airflow.contrib.hooks.redis_hook import RedisHook + return RedisHook(redis_conn_id=self.conn_id) except: pass http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/8de85016/airflow/utils/db.py ---------------------------------------------------------------------- diff --git a/airflow/utils/db.py b/airflow/utils/db.py index 49a8d62..618e0020 100644 --- a/airflow/utils/db.py +++ b/airflow/utils/db.py @@ -191,6 +191,11 @@ def initdb(): host='yarn', extra='{"queue": "root.default"}')) merge_conn( models.Connection( + conn_id='redis_default', conn_type='redis', + host='localhost', port=6379, + extra='{"db": 0}')) + merge_conn( + models.Connection( conn_id='emr_default', conn_type='emr', extra=''' { "Name": "default_job_flow_name", http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/8de85016/docs/installation.rst ---------------------------------------------------------------------- diff --git a/docs/installation.rst b/docs/installation.rst index 289f64f..c001ccf 100644 --- a/docs/installation.rst +++ b/docs/installation.rst @@ -88,3 +88,5 @@ Here's the list of the subpackages and what they enable: +---------------+-------------------------------------+-------------------------------------------------+ | cloudant | ``pip install airflow[cloudant]`` | Cloudant hook | +---------------+-------------------------------------+-------------------------------------------------+ +| redis | ``pip install airflow[redis]`` | Redis hooks and sensors | ++---------------+-------------------------------------+-------------------------------------------------+ http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/8de85016/setup.py ---------------------------------------------------------------------- diff --git a/setup.py b/setup.py index 481d427..26a0e27 100644 --- a/setup.py +++ b/setup.py @@ -168,6 +168,7 @@ password = [ github_enterprise = ['Flask-OAuthlib>=0.9.1'] qds = ['qds-sdk>=1.9.0'] cloudant = ['cloudant>=0.5.9,<2.0'] # major update coming soon, clamp to 0.x +redis = ['redis>=2.10.5'] all_dbs = postgres + mysql + hive + mssql + hdfs + vertica + cloudant devel = [ @@ -269,6 +270,7 @@ def do_setup(): 'vertica': vertica, 'webhdfs': webhdfs, 'jira': jira, + 'redis': redis, }, classifiers=[ 'Development Status :: 5 - Production/Stable', http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/8de85016/tests/contrib/hooks/test_redis_hook.py ---------------------------------------------------------------------- diff --git a/tests/contrib/hooks/test_redis_hook.py b/tests/contrib/hooks/test_redis_hook.py new file mode 100644 index 0000000..ab9a4bc --- /dev/null +++ b/tests/contrib/hooks/test_redis_hook.py @@ -0,0 +1,46 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import unittest +from mock import patch + +from airflow import configuration +from airflow.contrib.hooks.redis_hook import RedisHook + + +class TestRedisHook(unittest.TestCase): + def setUp(self): + configuration.load_test_config() + + def test_get_conn(self): + hook = RedisHook(redis_conn_id='redis_default') + self.assertEqual(hook.client, None) + self.assertEqual( + repr(hook.get_conn()), + ( + 'StrictRedis<ConnectionPool' + '<Connection<host=localhost,port=6379,db=0>>>' + ) + ) + + @patch("airflow.contrib.hooks.redis_hook.RedisHook.get_conn") + def test_first_conn_instantiation(self, get_conn): + hook = RedisHook(redis_conn_id='redis_default') + hook.key_exists('test_key') + self.assertTrue(get_conn.called_once()) + + +if __name__ == '__main__': + unittest.main() http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/8de85016/tests/contrib/sensors/redis_sensor.py ---------------------------------------------------------------------- diff --git a/tests/contrib/sensors/redis_sensor.py b/tests/contrib/sensors/redis_sensor.py new file mode 100644 index 0000000..8022a92 --- /dev/null +++ b/tests/contrib/sensors/redis_sensor.py @@ -0,0 +1,64 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import unittest +import datetime + +from mock import patch + +from airflow import DAG +from airflow import configuration +from airflow.contrib.sensors.redis_key_sensor import RedisKeySensor + +DEFAULT_DATE = datetime.datetime(2017, 1, 1) + + +class TestRedisSensor(unittest.TestCase): + + def setUp(self): + configuration.load_test_config() + args = { + 'owner': 'airflow', + 'start_date': DEFAULT_DATE + } + + self.dag = DAG('test_dag_id', default_args=args) + self.sensor = RedisKeySensor( + task_id='test_task', + redis_conn_id='redis_default', + dag=self.dag, + key='test_key' + ) + + @patch("airflow.contrib.hooks.redis_hook.RedisHook.key_exists") + def test_poke(self, key_exists): + key_exists.return_value = True + self.assertTrue(self.sensor.poke(None)) + + key_exists.return_value = False + self.assertFalse(self.sensor.poke(None)) + + @patch("airflow.contrib.hooks.redis_hook.StrictRedis.exists") + def test_existing_key_called(self, redis_client_exists): + self.sensor.run( + start_date=DEFAULT_DATE, + end_date=DEFAULT_DATE, ignore_ti_state=True + ) + + self.assertTrue(redis_client_exists.called_with('test_key')) + + +if __name__ == '__main__': + unittest.main()