This is an automated email from the ASF dual-hosted git repository. potiuk pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push: new 77d2fc7d75 Check task attribute before use in sentry.add_tagging() (#37143) 77d2fc7d75 is described below commit 77d2fc7d7591679aa99c1924daba678463a7b7bb Author: Lipu Fei <lipu.fei...@gmail.com> AuthorDate: Sun Mar 24 18:20:08 2024 +0100 Check task attribute before use in sentry.add_tagging() (#37143) * Check task attribute before use in add_tagging * Refactor sentry and add tests --------- Co-authored-by: Lipu Fei <lipu....@kpn.com> --- airflow/sentry.py | 196 ------------------------------------------- airflow/sentry/__init__.py | 29 +++++++ airflow/sentry/blank.py | 40 +++++++++ airflow/sentry/configured.py | 176 ++++++++++++++++++++++++++++++++++++++ tests/test_sentry.py | 65 ++++++++++++++ 5 files changed, 310 insertions(+), 196 deletions(-) diff --git a/airflow/sentry.py b/airflow/sentry.py deleted file mode 100644 index d5fbf3c04d..0000000000 --- a/airflow/sentry.py +++ /dev/null @@ -1,196 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -"""Sentry Integration.""" - -from __future__ import annotations - -import logging -from functools import wraps -from typing import TYPE_CHECKING - -from airflow.configuration import conf -from airflow.executors.executor_loader import ExecutorLoader -from airflow.utils.session import find_session_idx, provide_session -from airflow.utils.state import TaskInstanceState - -if TYPE_CHECKING: - from sqlalchemy.orm import Session - - from airflow.models.taskinstance import TaskInstance - -log = logging.getLogger(__name__) - - -class DummySentry: - """Blank class for Sentry.""" - - def add_tagging(self, task_instance): - """Blank function for tagging.""" - - def add_breadcrumbs(self, task_instance, session: Session | None = None): - """Blank function for breadcrumbs.""" - - def enrich_errors(self, run): - """Blank function for formatting a TaskInstance._run_raw_task.""" - return run - - def flush(self): - """Blank function for flushing errors.""" - - -Sentry: DummySentry = DummySentry() -if conf.getboolean("sentry", "sentry_on", fallback=False): - import sentry_sdk - from sentry_sdk.integrations.flask import FlaskIntegration - from sentry_sdk.integrations.logging import ignore_logger - - class ConfiguredSentry(DummySentry): - """Configure Sentry SDK.""" - - SCOPE_DAG_RUN_TAGS = frozenset(("data_interval_end", "data_interval_start", "execution_date")) - SCOPE_TASK_INSTANCE_TAGS = frozenset(("task_id", "dag_id", "try_number")) - SCOPE_CRUMBS = frozenset(("task_id", "state", "operator", "duration")) - - UNSUPPORTED_SENTRY_OPTIONS = frozenset( - ( - "integrations", - "in_app_include", - "in_app_exclude", - "ignore_errors", - "before_breadcrumb", - ) - ) - - def __init__(self): - """Initialize the Sentry SDK.""" - ignore_logger("airflow.task") - - sentry_flask = FlaskIntegration() - - # LoggingIntegration is set by default. - integrations = [sentry_flask] - - executor_class, _ = ExecutorLoader.import_default_executor_cls(validate=False) - - if executor_class.supports_sentry: - from sentry_sdk.integrations.celery import CeleryIntegration - - sentry_celery = CeleryIntegration() - integrations.append(sentry_celery) - - dsn = None - sentry_config_opts = conf.getsection("sentry") or {} - if sentry_config_opts: - sentry_config_opts.pop("sentry_on") - old_way_dsn = sentry_config_opts.pop("sentry_dsn", None) - new_way_dsn = sentry_config_opts.pop("dsn", None) - # supported backward compatibility with old way dsn option - dsn = old_way_dsn or new_way_dsn - - unsupported_options = self.UNSUPPORTED_SENTRY_OPTIONS.intersection(sentry_config_opts.keys()) - if unsupported_options: - log.warning( - "There are unsupported options in [sentry] section: %s", - ", ".join(unsupported_options), - ) - - sentry_config_opts["before_send"] = conf.getimport("sentry", "before_send", fallback=None) - sentry_config_opts["transport"] = conf.getimport("sentry", "transport", fallback=None) - - if dsn: - sentry_sdk.init(dsn=dsn, integrations=integrations, **sentry_config_opts) - else: - # Setting up Sentry using environment variables. - log.debug("Defaulting to SENTRY_DSN in environment.") - sentry_sdk.init(integrations=integrations, **sentry_config_opts) - - def add_tagging(self, task_instance): - """Add tagging for a task_instance.""" - dag_run = task_instance.dag_run - task = task_instance.task - - with sentry_sdk.configure_scope() as scope: - for tag_name in self.SCOPE_TASK_INSTANCE_TAGS: - attribute = getattr(task_instance, tag_name) - scope.set_tag(tag_name, attribute) - for tag_name in self.SCOPE_DAG_RUN_TAGS: - attribute = getattr(dag_run, tag_name) - scope.set_tag(tag_name, attribute) - scope.set_tag("operator", task.__class__.__name__) - - @provide_session - def add_breadcrumbs( - self, - task_instance: TaskInstance, - session: Session | None = None, - ) -> None: - """Add breadcrumbs inside of a task_instance.""" - if session is None: - return - dr = task_instance.get_dagrun(session) - task_instances = dr.get_task_instances( - state={TaskInstanceState.SUCCESS, TaskInstanceState.FAILED}, - session=session, - ) - - for ti in task_instances: - data = {} - for crumb_tag in self.SCOPE_CRUMBS: - data[crumb_tag] = getattr(ti, crumb_tag) - - sentry_sdk.add_breadcrumb(category="completed_tasks", data=data, level="info") - - def enrich_errors(self, func): - """ - Decorate errors. - - Wrap TaskInstance._run_raw_task to support task specific tags and breadcrumbs. - """ - session_args_idx = find_session_idx(func) - - @wraps(func) - def wrapper(_self, *args, **kwargs): - # Wrapping the _run_raw_task function with push_scope to contain - # tags and breadcrumbs to a specific Task Instance - - try: - session = kwargs.get("session", args[session_args_idx]) - except IndexError: - session = None - - with sentry_sdk.push_scope(): - try: - # Is a LocalTaskJob get the task instance - if hasattr(_self, "task_instance"): - task_instance = _self.task_instance - else: - task_instance = _self - - self.add_tagging(task_instance) - self.add_breadcrumbs(task_instance, session=session) - return func(_self, *args, **kwargs) - except Exception as e: - sentry_sdk.capture_exception(e) - raise - - return wrapper - - def flush(self): - sentry_sdk.flush() - - Sentry = ConfiguredSentry() diff --git a/airflow/sentry/__init__.py b/airflow/sentry/__init__.py new file mode 100644 index 0000000000..10178aabf0 --- /dev/null +++ b/airflow/sentry/__init__.py @@ -0,0 +1,29 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""Sentry Integration.""" + +from __future__ import annotations + +from airflow.configuration import conf +from airflow.sentry.blank import BlankSentry + +Sentry: BlankSentry = BlankSentry() +if conf.getboolean("sentry", "sentry_on", fallback=False): + from airflow.sentry.configured import ConfiguredSentry + + Sentry = ConfiguredSentry() diff --git a/airflow/sentry/blank.py b/airflow/sentry/blank.py new file mode 100644 index 0000000000..8cdb40b5a9 --- /dev/null +++ b/airflow/sentry/blank.py @@ -0,0 +1,40 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from sqlalchemy.orm import Session + + +class BlankSentry: + """Blank class for Sentry.""" + + def add_tagging(self, task_instance): + """Blank function for tagging.""" + + def add_breadcrumbs(self, task_instance, session: Session | None = None): + """Blank function for breadcrumbs.""" + + def enrich_errors(self, run): + """Blank function for formatting a TaskInstance._run_raw_task.""" + return run + + def flush(self): + """Blank function for flushing errors.""" diff --git a/airflow/sentry/configured.py b/airflow/sentry/configured.py new file mode 100644 index 0000000000..af96839138 --- /dev/null +++ b/airflow/sentry/configured.py @@ -0,0 +1,176 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import logging +from functools import wraps +from typing import TYPE_CHECKING + +import sentry_sdk +from sentry_sdk.integrations.flask import FlaskIntegration +from sentry_sdk.integrations.logging import ignore_logger + +from airflow.configuration import conf +from airflow.executors.executor_loader import ExecutorLoader +from airflow.sentry.blank import BlankSentry +from airflow.utils.session import find_session_idx, provide_session +from airflow.utils.state import TaskInstanceState + +if TYPE_CHECKING: + from sqlalchemy.orm import Session + + from airflow.models.taskinstance import TaskInstance + +log = logging.getLogger(__name__) + + +class ConfiguredSentry(BlankSentry): + """Configure Sentry SDK.""" + + SCOPE_DAG_RUN_TAGS = frozenset(("data_interval_end", "data_interval_start", "execution_date")) + SCOPE_TASK_INSTANCE_TAGS = frozenset(("task_id", "dag_id", "try_number")) + SCOPE_CRUMBS = frozenset(("task_id", "state", "operator", "duration")) + + UNSUPPORTED_SENTRY_OPTIONS = frozenset( + ( + "integrations", + "in_app_include", + "in_app_exclude", + "ignore_errors", + "before_breadcrumb", + ) + ) + + def __init__(self): + """Initialize the Sentry SDK.""" + ignore_logger("airflow.task") + + sentry_flask = FlaskIntegration() + + # LoggingIntegration is set by default. + integrations = [sentry_flask] + + executor_class, _ = ExecutorLoader.import_default_executor_cls(validate=False) + + if executor_class.supports_sentry: + from sentry_sdk.integrations.celery import CeleryIntegration + + sentry_celery = CeleryIntegration() + integrations.append(sentry_celery) + + dsn = None + sentry_config_opts = conf.getsection("sentry") or {} + if sentry_config_opts: + sentry_config_opts.pop("sentry_on") + old_way_dsn = sentry_config_opts.pop("sentry_dsn", None) + new_way_dsn = sentry_config_opts.pop("dsn", None) + # supported backward compatibility with old way dsn option + dsn = old_way_dsn or new_way_dsn + + unsupported_options = self.UNSUPPORTED_SENTRY_OPTIONS.intersection(sentry_config_opts.keys()) + if unsupported_options: + log.warning( + "There are unsupported options in [sentry] section: %s", + ", ".join(unsupported_options), + ) + + sentry_config_opts["before_send"] = conf.getimport("sentry", "before_send", fallback=None) + sentry_config_opts["transport"] = conf.getimport("sentry", "transport", fallback=None) + + if dsn: + sentry_sdk.init(dsn=dsn, integrations=integrations, **sentry_config_opts) + else: + # Setting up Sentry using environment variables. + log.debug("Defaulting to SENTRY_DSN in environment.") + sentry_sdk.init(integrations=integrations, **sentry_config_opts) + + def add_tagging(self, task_instance): + """Add tagging for a task_instance.""" + dag_run = task_instance.dag_run + # See TaskInstance definition, the "task" attribute may not be set + task = getattr(task_instance, "task") + + with sentry_sdk.configure_scope() as scope: + for tag_name in self.SCOPE_TASK_INSTANCE_TAGS: + attribute = getattr(task_instance, tag_name) + scope.set_tag(tag_name, attribute) + for tag_name in self.SCOPE_DAG_RUN_TAGS: + attribute = getattr(dag_run, tag_name) + scope.set_tag(tag_name, attribute) + if task is not None: + scope.set_tag("operator", task.__class__.__name__) + + @provide_session + def add_breadcrumbs( + self, + task_instance: TaskInstance, + session: Session | None = None, + ) -> None: + """Add breadcrumbs inside of a task_instance.""" + if session is None: + return + dr = task_instance.get_dagrun(session) + task_instances = dr.get_task_instances( + state={TaskInstanceState.SUCCESS, TaskInstanceState.FAILED}, + session=session, + ) + + for ti in task_instances: + data = {} + for crumb_tag in self.SCOPE_CRUMBS: + data[crumb_tag] = getattr(ti, crumb_tag) + + sentry_sdk.add_breadcrumb(category="completed_tasks", data=data, level="info") + + def enrich_errors(self, func): + """ + Decorate errors. + + Wrap TaskInstance._run_raw_task to support task specific tags and breadcrumbs. + """ + session_args_idx = find_session_idx(func) + + @wraps(func) + def wrapper(_self, *args, **kwargs): + # Wrapping the _run_raw_task function with push_scope to contain + # tags and breadcrumbs to a specific Task Instance + + try: + session = kwargs.get("session", args[session_args_idx]) + except IndexError: + session = None + + with sentry_sdk.push_scope(): + try: + # Is a LocalTaskJob get the task instance + if hasattr(_self, "task_instance"): + task_instance = _self.task_instance + else: + task_instance = _self + + self.add_tagging(task_instance) + self.add_breadcrumbs(task_instance, session=session) + return func(_self, *args, **kwargs) + except Exception as e: + sentry_sdk.capture_exception(e) + raise + + return wrapper + + def flush(self): + sentry_sdk.flush() diff --git a/tests/test_sentry.py b/tests/test_sentry.py new file mode 100644 index 0000000000..10968b5274 --- /dev/null +++ b/tests/test_sentry.py @@ -0,0 +1,65 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from unittest.mock import MagicMock, create_autospec, patch + +from sentry_sdk.scope import Scope + +from airflow.models.taskinstance import TaskInstance + + +@patch("sentry_sdk.configure_scope") +def test_configured_sentry_add_tagging(mock_configure_scope): + mock_scope = create_autospec(Scope) + mock_configure_scope.return_value = mock_scope + + from airflow.sentry.configured import ConfiguredSentry + + sentry = ConfiguredSentry() + + dummy_tags = ConfiguredSentry.SCOPE_DAG_RUN_TAGS | ConfiguredSentry.SCOPE_TASK_INSTANCE_TAGS + + # It should not raise error with both "dag_run" and "task" attributes, available. + task_instance_1 = create_autospec(TaskInstance) + task_instance_1.dag_run = MagicMock() + task_instance_1.task = "task_1" + for tag in dummy_tags: + setattr(task_instance_1.dag_run, tag, "dummy") + sentry.add_tagging(task_instance_1) + + # Verify tags + for tag in dummy_tags: + mock_scope.set_tag.assert_called_with(tag, "dummy") + mock_scope.set_tag.assert_called_with("operator", "str") + + # Reset the mock + mock_scope.reset_mock() + + # It should not raise error if "task" attribute is not set. + task_instance_2 = create_autospec(TaskInstance) + task_instance_2.dag_run = MagicMock() + for tag in dummy_tags: + setattr(task_instance_2.dag_run, tag, "dummy") + sentry.add_tagging(task_instance_2) + + # Verify tags + for tag in dummy_tags: + mock_scope.set_tag.assert_called_with(tag, "dummy") + # Also verify the "operator" tag, which is related to the "task attribute. + mock_scope.set_tag.assert_called_with("operator", "str")