[AIRFLOW-683] Add jira hook, operator and sensor Closes #1950 from jhsenjaliya/AIRFLOW-683
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/44798e0d Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/44798e0d Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/44798e0d Branch: refs/heads/v1-8-test Commit: 44798e0d4d36e6a793d19b4986f2ddb6814ec208 Parents: a8b2f7f Author: Jay <jhsonl...@gmail.com> Authored: Mon Jan 16 17:46:11 2017 +0100 Committer: Bolke de Bruin <bo...@xs4all.nl> Committed: Mon Jan 16 17:46:21 2017 +0100 ---------------------------------------------------------------------- airflow/contrib/hooks/jira_hook.py | 82 ++++++++++++ airflow/contrib/operators/jira_operator.py | 89 +++++++++++++ airflow/contrib/sensors/jira_sensor.py | 146 +++++++++++++++++++++ airflow/models.py | 4 + scripts/ci/requirements.txt | 1 + setup.py | 2 + tests/contrib/hooks/test_jira_hook.py | 51 +++++++ tests/contrib/operators/jira_operator_test.py | 101 ++++++++++++++ tests/contrib/sensors/jira_sensor_test.py | 85 ++++++++++++ 9 files changed, 561 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/44798e0d/airflow/contrib/hooks/jira_hook.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/hooks/jira_hook.py b/airflow/contrib/hooks/jira_hook.py new file mode 100644 index 0000000..148101b --- /dev/null +++ b/airflow/contrib/hooks/jira_hook.py @@ -0,0 +1,82 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging + +from jira import JIRA +from jira.exceptions import JIRAError + +from airflow.exceptions import AirflowException +from airflow.hooks.base_hook import BaseHook + + +class JiraHook(BaseHook): + """ + Jira interaction hook, a Wrapper around JIRA Python SDK. + + :param jira_conn_id: reference to a pre-defined Jira Connection + :type jira_conn_id: string + """ + + def __init__(self, + jira_conn_id='jira_default'): + super(JiraHook, self).__init__(jira_conn_id) + self.jira_conn_id = jira_conn_id + self.client = None + self.get_conn() + + def get_conn(self): + if not self.client: + logging.debug('creating jira client for conn_id: {0}'.format(self.jira_conn_id)) + + get_server_info = True + validate = True + extra_options = {} + conn = None + + if self.jira_conn_id is not None: + conn = self.get_connection(self.jira_conn_id) + if conn.extra is not None: + extra_options = conn.extra_dejson + # only required attributes are taken for now, + # more can be added ex: async, logging, max_retries + + # verify + if 'verify' in extra_options \ + and extra_options['verify'].lower() == 'false': + extra_options['verify'] = False + + # validate + if 'validate' in extra_options \ + and extra_options['validate'].lower() == 'false': + validate = False + + if 'get_server_info' in extra_options \ + and extra_options['get_server_info'].lower() == 'false': + get_server_info = False + + try: + self.client = JIRA(conn.host, + options=extra_options, + basic_auth=(conn.login, conn.password), + get_server_info=get_server_info, + validate=validate) + except JIRAError as jira_error: + raise AirflowException('Failed to create jira client, jira error: %s' + % str(jira_error)) + except Exception as e: + raise AirflowException('Failed to create jira client, error: %s' + % str(e)) + + return self.client http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/44798e0d/airflow/contrib/operators/jira_operator.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/jira_operator.py b/airflow/contrib/operators/jira_operator.py new file mode 100644 index 0000000..6623b1c --- /dev/null +++ b/airflow/contrib/operators/jira_operator.py @@ -0,0 +1,89 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from airflow.contrib.hooks.jira_hook import JIRAError +from airflow.contrib.hooks.jira_hook import JiraHook +from airflow.exceptions import AirflowException +from airflow.models import BaseOperator +from airflow.utils.decorators import apply_defaults + + +class JiraOperator(BaseOperator): + """ + JiraOperator to interact and perform action on Jira issue tracking system. + This operator is designed to use Jira Python SDK: http://jira.readthedocs.io + + :param jira_conn_id: reference to a pre-defined Jira Connection + :type jira_conn_id: str + :param jira_method: method name from Jira Python SDK to be called + :type jira_method: str + :param jira_method_args: required method parameters for the jira_method + :type jira_method_args: dict + :param result_processor: function to further process the response from Jira + :type result_processor: function + :param get_jira_resource_method: function or operator to get jira resource + on which the provided jira_method will be executed + :type get_jira_resource_method: function + """ + + template_fields = ("jira_method_args",) + + @apply_defaults + def __init__(self, + jira_conn_id='jira_default', + jira_method=None, + jira_method_args=None, + result_processor=None, + get_jira_resource_method=None, + *args, + **kwargs): + super(JiraOperator, self).__init__(*args, **kwargs) + self.jira_conn_id = jira_conn_id + self.method_name = jira_method + self.jira_method_args = jira_method_args + self.result_processor = result_processor + self.get_jira_resource_method = get_jira_resource_method + + def execute(self, context): + try: + if self.get_jira_resource_method is not None: + # if get_jira_resource_method is provided, jira_method will be executed on + # resource returned by executing the get_jira_resource_method. + # This makes all the provided methods of JIRA sdk accessible and usable + # directly at the JiraOperator without additional wrappers. + # ref: http://jira.readthedocs.io/en/latest/api.html + if isinstance(self.get_jira_resource_method, JiraOperator): + resource = self.get_jira_resource_method.execute(**context) + else: + resource = self.get_jira_resource_method(**context) + else: + # Default method execution is on the top level jira client resource + hook = JiraHook(jira_conn_id=self.jira_conn_id) + resource = hook.client + + # Current Jira-Python SDK (1.0.7) has issue with pickling the jira response. + # ex: self.xcom_push(context, key='operator_response', value=jira_response) + # This could potentially throw error if jira_result is not picklable + jira_result = getattr(resource, self.method_name)(**self.jira_method_args) + if self.result_processor: + return self.result_processor(context, jira_result) + + return jira_result + + except JIRAError as jira_error: + raise AirflowException("Failed to execute jiraOperator, error: %s" + % str(jira_error)) + except Exception as e: + raise AirflowException("Jira operator error: %s" % str(e)) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/44798e0d/airflow/contrib/sensors/jira_sensor.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/sensors/jira_sensor.py b/airflow/contrib/sensors/jira_sensor.py new file mode 100644 index 0000000..708caad --- /dev/null +++ b/airflow/contrib/sensors/jira_sensor.py @@ -0,0 +1,146 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging + +from jira.resources import Resource + +from airflow.contrib.operators.jira_operator import JIRAError +from airflow.contrib.operators.jira_operator import JiraOperator +from airflow.operators.sensors import BaseSensorOperator +from airflow.utils.decorators import apply_defaults + + +class JiraSensor(BaseSensorOperator): + """ + Monitors a jira ticket for any change. + + :param jira_conn_id: reference to a pre-defined Jira Connection + :type jira_conn_id: str + :param method_name: method name from jira-python-sdk to be execute + :type method_name: str + :param method_params: parameters for the method method_name + :type method_params: dict + :param result_processor: function that return boolean and act as a sensor response + :type result_processor: function + """ + + @apply_defaults + def __init__(self, + jira_conn_id='jira_default', + method_name=None, + method_params=None, + result_processor=None, + *args, + **kwargs): + super(JiraSensor, self).__init__(*args, **kwargs) + self.jira_conn_id = jira_conn_id + self.result_processor = None + if result_processor is not None: + self.result_processor = result_processor + self.method_name = method_name + self.method_params = method_params + self.jira_operator = JiraOperator(task_id=self.task_id, + jira_conn_id=self.jira_conn_id, + jira_method=self.method_name, + jira_method_args=self.method_params, + result_processor=self.result_processor) + + def poke(self, context): + return self.jira_operator.execute(context=context) + + +class JiraTicketSensor(JiraSensor): + """ + Monitors a jira ticket for given change in terms of function. + + :param jira_conn_id: reference to a pre-defined Jira Connection + :type jira_conn_id: str + :param ticket_id: id of the ticket to be monitored + :type ticket_id: str + :param field: field of the ticket to be monitored + :type field: str + :param expected_value: expected value of the field + :type expected_value: str + :param result_processor: function that return boolean and act as a sensor response + :type result_processor: function + """ + + template_fields = ("ticket_id",) + + @apply_defaults + def __init__(self, + jira_conn_id='jira_default', + ticket_id=None, + field=None, + expected_value=None, + field_checker_func=None, + *args, **kwargs): + + self.jira_conn_id = jira_conn_id + self.ticket_id = ticket_id + self.field = field + self.expected_value = expected_value + if field_checker_func is None: + field_checker_func = self.issue_field_checker + + super(JiraTicketSensor, self).__init__(jira_conn_id=jira_conn_id, + result_processor=field_checker_func, + *args, **kwargs) + + def poke(self, context): + logging.info('Jira Sensor checking for change in ticket : {0}' + .format(self.ticket_id)) + + self.jira_operator.method_name = "issue" + self.jira_operator.jira_method_args = { + 'id': self.ticket_id, + 'fields': self.field + } + return JiraSensor.poke(self, context=context) + + def issue_field_checker(self, context, issue): + result = None + try: + if issue is not None \ + and self.field is not None \ + and self.expected_value is not None: + + field_value = getattr(issue.fields, self.field) + if field_value is not None: + if isinstance(field_value, list): + result = self.expected_value in field_value + elif isinstance(field_value, str): + result = self.expected_value.lower() == field_value.lower() + elif isinstance(field_value, Resource) \ + and getattr(field_value, 'name'): + result = self.expected_value.lower() == field_value.name.lower() + else: + logging.warning("not implemented checker for issue field {0} " + "which is neither string nor list nor " + "jira Resource".format(self.field)) + + except JIRAError as jira_error: + logging.error("jira error while checking with expected value: {0}" + .format(jira_error)) + except Exception as e: + logging.error("error while checking with expected value {0}, error: {1}" + .format(self.expected_value, e)) + if result is True: + logging.info("issue field {0} has expected value {1}, returning success" + .format(self.field, self.expected_value)) + else: + logging.info("issue field {0} dont have expected value {1} yet." + .format(self.field, self.expected_value)) + return result http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/44798e0d/airflow/models.py ---------------------------------------------------------------------- diff --git a/airflow/models.py b/airflow/models.py index d878457..8682f35 100755 --- a/airflow/models.py +++ b/airflow/models.py @@ -535,6 +535,7 @@ class Connection(Base): ('cloudant', 'IBM Cloudant',), ('mssql', 'Microsoft SQL Server'), ('mesos_framework-id', 'Mesos Framework ID'), + ('jira', 'JIRA',), ] def __init__( @@ -655,6 +656,9 @@ class Connection(Base): elif self.conn_type == 'cloudant': from airflow.contrib.hooks.cloudant_hook import CloudantHook return CloudantHook(cloudant_conn_id=self.conn_id) + elif self.conn_type == 'jira': + from airflow.contrib.hooks.jira_hook import JiraHook + return JiraHook(jira_conn_id=self.conn_id) except: pass http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/44798e0d/scripts/ci/requirements.txt ---------------------------------------------------------------------- diff --git a/scripts/ci/requirements.txt b/scripts/ci/requirements.txt index 446952b..9e503f9 100644 --- a/scripts/ci/requirements.txt +++ b/scripts/ci/requirements.txt @@ -27,6 +27,7 @@ impyla ipython jaydebeapi jinja2<2.9.0 +jira ldap3 lxml markdown http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/44798e0d/setup.py ---------------------------------------------------------------------- diff --git a/setup.py b/setup.py index 7acc90f..aad9984 100644 --- a/setup.py +++ b/setup.py @@ -127,6 +127,7 @@ gcp_api = [ ] hdfs = ['snakebite>=2.7.8'] webhdfs = ['hdfs[dataframe,avro,kerberos]>=2.0.4'] +jira = ['JIRA>1.0.7'] hive = [ 'hive-thrift-py>=0.0.1', 'pyhive>=0.1.3', @@ -256,6 +257,7 @@ def do_setup(): 'statsd': statsd, 'vertica': vertica, 'webhdfs': webhdfs, + 'jira': jira, }, classifiers=[ 'Development Status :: 5 - Production/Stable', http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/44798e0d/tests/contrib/hooks/test_jira_hook.py ---------------------------------------------------------------------- diff --git a/tests/contrib/hooks/test_jira_hook.py b/tests/contrib/hooks/test_jira_hook.py new file mode 100644 index 0000000..1a3d735 --- /dev/null +++ b/tests/contrib/hooks/test_jira_hook.py @@ -0,0 +1,51 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import unittest + +from mock import Mock +from mock import patch + +from airflow import configuration +from airflow.contrib.hooks.jira_hook import JiraHook +from airflow import models +from airflow.utils import db + +jira_client_mock = Mock( + name="jira_client" +) + + +class TestJiraHook(unittest.TestCase): + def setUp(self): + configuration.load_test_config() + db.merge_conn( + models.Connection( + conn_id='jira_default', conn_type='jira', + host='https://localhost/jira/', port=443, + extra='{"verify": "False", "project": "AIRFLOW"}')) + + @patch("airflow.contrib.hooks.jira_hook.JIRA", autospec=True, + return_value=jira_client_mock) + def test_jira_client_connection(self, jira_mock): + jira_hook = JiraHook() + + assert jira_mock.called + self.assertIsInstance(jira_hook.client, Mock) + self.assertEqual(jira_hook.client.name, jira_mock.return_value.name) + + +if __name__ == '__main__': + unittest.main() http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/44798e0d/tests/contrib/operators/jira_operator_test.py ---------------------------------------------------------------------- diff --git a/tests/contrib/operators/jira_operator_test.py b/tests/contrib/operators/jira_operator_test.py new file mode 100644 index 0000000..0188c0b --- /dev/null +++ b/tests/contrib/operators/jira_operator_test.py @@ -0,0 +1,101 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import unittest +import datetime +from mock import Mock +from mock import patch + +from airflow import DAG, configuration +from airflow.contrib.operators.jira_operator import JiraOperator +from airflow import models +from airflow.utils import db + +DEFAULT_DATE = datetime.datetime(2017, 1, 1) +jira_client_mock = Mock( + name="jira_client_for_test" +) + +minimal_test_ticket = { + "id": "911539", + "self": "https://sandbox.localhost/jira/rest/api/2/issue/911539", + "key": "TEST-1226", + "fields": { + "labels": [ + "test-label-1", + "test-label-2" + ], + "description": "this is a test description", + } +} + + +class TestJiraOperator(unittest.TestCase): + def setUp(self): + configuration.load_test_config() + args = { + 'owner': 'airflow', + 'start_date': DEFAULT_DATE + } + dag = DAG('test_dag_id', default_args=args) + self.dag = dag + db.merge_conn( + models.Connection( + conn_id='jira_default', conn_type='jira', + host='https://localhost/jira/', port=443, + extra='{"verify": "False", "project": "AIRFLOW"}')) + + @patch("airflow.contrib.hooks.jira_hook.JIRA", + autospec=True, return_value=jira_client_mock) + def test_issue_search(self, jira_mock): + jql_str = 'issuekey=TEST-1226' + jira_mock.return_value.search_issues.return_value = minimal_test_ticket + + jira_ticket_search_operator = JiraOperator(task_id='search-ticket-test', + jira_method="search_issues", + jira_method_args={ + 'jql_str': jql_str, + 'maxResults': '1' + }, + dag=self.dag) + + jira_ticket_search_operator.run(start_date=DEFAULT_DATE, + end_date=DEFAULT_DATE, ignore_ti_state=True) + + assert jira_mock.called + assert jira_mock.return_value.search_issues.called + + @patch("airflow.contrib.hooks.jira_hook.JIRA", + autospec=True, return_value=jira_client_mock) + def test_update_issue(self, jira_mock): + jira_mock.return_value.add_comment.return_value = True + + add_comment_operator = JiraOperator(task_id='add_comment_test', + jira_method="add_comment", + jira_method_args={ + 'issue': minimal_test_ticket.get("key"), + 'body': 'this is test comment' + }, + dag=self.dag) + + add_comment_operator.run(start_date=DEFAULT_DATE, + end_date=DEFAULT_DATE, ignore_ti_state=True) + + assert jira_mock.called + assert jira_mock.return_value.add_comment.called + + +if __name__ == '__main__': + unittest.main() http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/44798e0d/tests/contrib/sensors/jira_sensor_test.py ---------------------------------------------------------------------- diff --git a/tests/contrib/sensors/jira_sensor_test.py b/tests/contrib/sensors/jira_sensor_test.py new file mode 100644 index 0000000..5ca58e4 --- /dev/null +++ b/tests/contrib/sensors/jira_sensor_test.py @@ -0,0 +1,85 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import unittest +import datetime +from mock import Mock +from mock import patch + +from airflow import DAG, configuration +from airflow.contrib.sensors.jira_sensor import JiraTicketSensor +from airflow import models +from airflow.utils import db + +DEFAULT_DATE = datetime.datetime(2017, 1, 1) +jira_client_mock = Mock( + name="jira_client_for_test" +) + +minimal_test_ticket = { + "id": "911539", + "self": "https://sandbox.localhost/jira/rest/api/2/issue/911539", + "key": "TEST-1226", + "fields": { + "labels": [ + "test-label-1", + "test-label-2" + ], + "description": "this is a test description", + } +} + + +class TestJiraSensor(unittest.TestCase): + def setUp(self): + configuration.load_test_config() + args = { + 'owner': 'airflow', + 'start_date': DEFAULT_DATE + } + dag = DAG('test_dag_id', default_args=args) + self.dag = dag + db.merge_conn( + models.Connection( + conn_id='jira_default', conn_type='jira', + host='https://localhost/jira/', port=443, + extra='{"verify": "False", "project": "AIRFLOW"}')) + + @patch("airflow.contrib.hooks.jira_hook.JIRA", + autospec=True, return_value=jira_client_mock) + def test_issue_label_set(self, jira_mock): + jira_mock.return_value.issue.return_value = minimal_test_ticket + + ticket_label_sensor = JiraTicketSensor(task_id='search-ticket-test', + ticket_id='TEST-1226', + field_checker_func= + TestJiraSensor.field_checker_func, + timeout=518400, + poke_interval=10, + dag=self.dag) + + ticket_label_sensor.run(start_date=DEFAULT_DATE, + end_date=DEFAULT_DATE, ignore_ti_state=True) + + assert jira_mock.called + assert jira_mock.return_value.issue.called + + @staticmethod + def field_checker_func(context, issue): + return "test-label-1" in issue['fields']['labels'] + + +if __name__ == '__main__': + unittest.main()