bkyryliuk commented on a change in pull request #11711: URL: https://github.com/apache/incubator-superset/pull/11711#discussion_r527380328
########## File path: superset/reports/commands/alert.py ########## @@ -0,0 +1,83 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +import json +import logging +from operator import eq, ge, gt, le, lt, ne +from typing import Optional + +import numpy as np + +from superset import jinja_context +from superset.commands.base import BaseCommand +from superset.models.reports import ReportSchedule, ReportScheduleValidatorType +from superset.reports.commands.exceptions import ( + AlertQueryInvalidTypeError, + AlertQueryMultipleColumnsError, + AlertQueryMultipleRowsError, +) + +logger = logging.getLogger(__name__) + + +OPERATOR_FUNCTIONS = {">=": ge, ">": gt, "<=": le, "<": lt, "==": eq, "!=": ne} + + +class AlertCommand(BaseCommand): + def __init__(self, report_schedule: ReportSchedule): + self._report_schedule = report_schedule + self._result: Optional[float] = None + + def run(self) -> bool: + self.validate() + + if self._report_schedule.validator_type == ReportScheduleValidatorType.NOT_NULL: + self._report_schedule.last_value_row_json = self._result + return self._result not in (0, None, np.nan) + self._report_schedule.last_value = self._result + operator = json.loads(self._report_schedule.validator_config_json)["op"] + threshold = json.loads(self._report_schedule.validator_config_json)["threshold"] + return OPERATOR_FUNCTIONS[operator](self._result, threshold) + + def validate(self) -> None: + """ + Validate the query result as a Pandas DataFrame + """ + sql_template = jinja_context.get_template_processor( + database=self._report_schedule.database + ) + rendered_sql = sql_template.process_template(self._report_schedule.sql) + df = self._report_schedule.database.get_df(rendered_sql) + + if df.empty: + return + rows = df.to_records() + if self._report_schedule.validator_type == ReportScheduleValidatorType.NOT_NULL: + self._result = rows[0][1] + return + # check if query return more then one row + if len(rows) > 1: Review comment: it will be useful to add result to the exception & surface it to the user in the error msg ########## File path: superset/reports/commands/execute.py ########## @@ -0,0 +1,256 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +import logging +from datetime import datetime, timedelta +from typing import Optional + +from sqlalchemy.orm import Session + +from superset import app, thumbnail_cache +from superset.commands.base import BaseCommand +from superset.commands.exceptions import CommandException +from superset.extensions import security_manager +from superset.models.reports import ( + ReportExecutionLog, + ReportLogState, + ReportSchedule, + ReportScheduleType, +) +from superset.reports.commands.alert import AlertCommand +from superset.reports.commands.exceptions import ( + ReportScheduleAlertGracePeriodError, + ReportScheduleExecuteUnexpectedError, + ReportScheduleNotFoundError, + ReportScheduleNotificationError, + ReportSchedulePreviousWorkingError, + ReportScheduleScreenshotFailedError, +) +from superset.reports.dao import ReportScheduleDAO +from superset.reports.notifications import create_notification +from superset.reports.notifications.base import NotificationContent, ScreenshotData +from superset.reports.notifications.exceptions import NotificationError +from superset.utils.celery import session_scope +from superset.utils.screenshots import ( + BaseScreenshot, + ChartScreenshot, + DashboardScreenshot, +) +from superset.utils.urls import get_url_path + +logger = logging.getLogger(__name__) + + +class AsyncExecuteReportScheduleCommand(BaseCommand): + """ + Execute all types of report schedules. + - On reports takes chart or dashboard screenshots and sends configured notifications + - On Alerts uses related Command AlertCommand and sends configured notifications + """ + + def __init__(self, model_id: int, scheduled_dttm: datetime): + self._model_id = model_id + self._model: Optional[ReportSchedule] = None + self._scheduled_dttm = scheduled_dttm + + def set_state_and_log( + self, + session: Session, + start_dttm: datetime, + state: ReportLogState, + error_message: Optional[str] = None, + ) -> None: + """ + Updates current ReportSchedule state and TS. If on final state writes the log + for this execution + """ + now_dttm = datetime.utcnow() + if state == ReportLogState.WORKING: + self.set_state(session, state, now_dttm) + return + self.set_state(session, state, now_dttm) + self.create_log( + session, start_dttm, now_dttm, state, error_message=error_message, + ) + + def set_state( + self, session: Session, state: ReportLogState, dttm: datetime + ) -> None: + """ + Set the current report schedule state, on this case we want to + commit immediately + """ + if self._model: + self._model.last_state = state + self._model.last_eval_dttm = dttm + session.commit() + + def create_log( # pylint: disable=too-many-arguments + self, + session: Session, + start_dttm: datetime, + end_dttm: datetime, + state: ReportLogState, + error_message: Optional[str] = None, + ) -> None: + """ + Creates a Report execution log, uses the current computed last_value for Alerts + """ + if self._model: + log = ReportExecutionLog( + scheduled_dttm=self._scheduled_dttm, + start_dttm=start_dttm, + end_dttm=end_dttm, + value=self._model.last_value, + value_row_json=self._model.last_value_row_json, + state=state, + error_message=error_message, + report_schedule=self._model, + ) + session.add(log) + + @staticmethod + def _get_url(report_schedule: ReportSchedule, user_friendly: bool = False) -> str: + """ + Get the url for this report schedule: chart or dashboard + """ + if report_schedule.chart: + return get_url_path( + "Superset.slice", + user_friendly=user_friendly, + slice_id=report_schedule.chart_id, + standalone="true", + ) + return get_url_path( + "Superset.dashboard", + user_friendly=user_friendly, + dashboard_id_or_slug=report_schedule.dashboard_id, + ) + + def _get_screenshot(self, report_schedule: ReportSchedule) -> ScreenshotData: Review comment: nit: it would be nice to pass only the attributes needed to calculate the screenshot vs a whole class it makes it easier to unit tests the functions when less objects needs to be constructed, and read the functions / code as well as functions definition explains what is passed to it. this is just a personal preference and optional suggestion ########## File path: superset/reports/commands/alert.py ########## @@ -0,0 +1,79 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +import json +import logging +from operator import eq, ge, gt, le, lt, ne +from typing import Optional + +import numpy as np + +from superset import jinja_context +from superset.commands.base import BaseCommand +from superset.models.reports import ReportSchedule, ReportScheduleValidatorType +from superset.reports.commands.exceptions import ( + AlertQueryInvalidTypeError, + AlertQueryMultipleColumnsError, + AlertQueryMultipleRowsError, +) + +logger = logging.getLogger(__name__) + + +OPERATOR_FUNCTIONS = {">=": ge, ">": gt, "<=": le, "<": lt, "==": eq, "!=": ne} + + +class AlertCommand(BaseCommand): + def __init__(self, report_schedule: ReportSchedule): + self._report_schedule = report_schedule + self._result: Optional[float] = None + + def run(self) -> bool: + self.validate() + self._report_schedule.last_value = self._result + + if self._report_schedule.validator_type == ReportScheduleValidatorType.NOT_NULL: + return self._result not in (0, None, np.nan) + operator = json.loads(self._report_schedule.validator_config_json)["op"] + threshold = json.loads(self._report_schedule.validator_config_json)["threshold"] + return OPERATOR_FUNCTIONS[operator](self._result, threshold) + + def validate(self) -> None: + """ + Validate the query result as a Pandas DataFrame + """ + sql_template = jinja_context.get_template_processor( + database=self._report_schedule.database + ) + rendered_sql = sql_template.process_template(self._report_schedule.sql) + df = self._report_schedule.database.get_df(rendered_sql) + + if df.empty: + return + rows = df.to_records() + # check if query return more then one row + if len(rows) > 1: + raise AlertQueryMultipleRowsError() + if len(rows[0]) > 2: + raise AlertQueryMultipleColumnsError() Review comment: nit: it may be cleaner to have separate validation functions depending on the type of the validator ########## File path: superset/reports/notifications/base.py ########## @@ -0,0 +1,62 @@ +# -*- coding: utf-8 -*- +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from dataclasses import dataclass +from typing import Any, List, Optional, Type + +from superset.models.reports import ReportRecipients, ReportRecipientType + + +@dataclass +class ScreenshotData: + url: str # url to chart/dashboard for this screenshot + image: bytes # bytes for the screenshot + + +@dataclass +class NotificationContent: + name: str + screenshot: ScreenshotData + + +class BaseNotification: # pylint: disable=too-few-public-methods Review comment: ❤️ ########## File path: superset/reports/commands/execute.py ########## @@ -0,0 +1,256 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +import logging +from datetime import datetime, timedelta +from typing import Optional + +from sqlalchemy.orm import Session + +from superset import app, thumbnail_cache +from superset.commands.base import BaseCommand +from superset.commands.exceptions import CommandException +from superset.extensions import security_manager +from superset.models.reports import ( + ReportExecutionLog, + ReportLogState, + ReportSchedule, + ReportScheduleType, +) +from superset.reports.commands.alert import AlertCommand +from superset.reports.commands.exceptions import ( + ReportScheduleAlertGracePeriodError, + ReportScheduleExecuteUnexpectedError, + ReportScheduleNotFoundError, + ReportScheduleNotificationError, + ReportSchedulePreviousWorkingError, + ReportScheduleScreenshotFailedError, +) +from superset.reports.dao import ReportScheduleDAO +from superset.reports.notifications import create_notification +from superset.reports.notifications.base import NotificationContent, ScreenshotData +from superset.reports.notifications.exceptions import NotificationError +from superset.utils.celery import session_scope +from superset.utils.screenshots import ( + BaseScreenshot, + ChartScreenshot, + DashboardScreenshot, +) +from superset.utils.urls import get_url_path + +logger = logging.getLogger(__name__) + + +class AsyncExecuteReportScheduleCommand(BaseCommand): + """ + Execute all types of report schedules. + - On reports takes chart or dashboard screenshots and sends configured notifications + - On Alerts uses related Command AlertCommand and sends configured notifications + """ + + def __init__(self, model_id: int, scheduled_dttm: datetime): + self._model_id = model_id + self._model: Optional[ReportSchedule] = None + self._scheduled_dttm = scheduled_dttm + + def set_state_and_log( + self, + session: Session, + start_dttm: datetime, + state: ReportLogState, + error_message: Optional[str] = None, + ) -> None: + """ + Updates current ReportSchedule state and TS. If on final state writes the log + for this execution + """ + now_dttm = datetime.utcnow() + if state == ReportLogState.WORKING: + self.set_state(session, state, now_dttm) + return + self.set_state(session, state, now_dttm) + self.create_log( + session, start_dttm, now_dttm, state, error_message=error_message, + ) + + def set_state( + self, session: Session, state: ReportLogState, dttm: datetime + ) -> None: + """ + Set the current report schedule state, on this case we want to + commit immediately + """ + if self._model: + self._model.last_state = state + self._model.last_eval_dttm = dttm + session.commit() + + def create_log( # pylint: disable=too-many-arguments + self, + session: Session, + start_dttm: datetime, + end_dttm: datetime, + state: ReportLogState, + error_message: Optional[str] = None, + ) -> None: + """ + Creates a Report execution log, uses the current computed last_value for Alerts + """ + if self._model: + log = ReportExecutionLog( + scheduled_dttm=self._scheduled_dttm, + start_dttm=start_dttm, + end_dttm=end_dttm, + value=self._model.last_value, + value_row_json=self._model.last_value_row_json, + state=state, + error_message=error_message, + report_schedule=self._model, + ) + session.add(log) + + @staticmethod + def _get_url(report_schedule: ReportSchedule, user_friendly: bool = False) -> str: + """ + Get the url for this report schedule: chart or dashboard + """ + if report_schedule.chart: + return get_url_path( + "Superset.slice", + user_friendly=user_friendly, + slice_id=report_schedule.chart_id, + standalone="true", + ) + return get_url_path( + "Superset.dashboard", + user_friendly=user_friendly, + dashboard_id_or_slug=report_schedule.dashboard_id, + ) + + def _get_screenshot(self, report_schedule: ReportSchedule) -> ScreenshotData: + """ + Get a chart or dashboard screenshot + :raises: ReportScheduleScreenshotFailedError + """ + url = self._get_url(report_schedule) + screenshot: Optional[BaseScreenshot] = None + if report_schedule.chart: + screenshot = ChartScreenshot(url, report_schedule.chart.digest) + else: + screenshot = DashboardScreenshot(url, report_schedule.dashboard.digest) + image_url = self._get_url(report_schedule, user_friendly=True) + user = security_manager.find_user(app.config["THUMBNAIL_SELENIUM_USER"]) + image_data = screenshot.compute_and_cache( + user=user, cache=thumbnail_cache, force=True, + ) + if not image_data: + raise ReportScheduleScreenshotFailedError() + return ScreenshotData(url=image_url, image=image_data) + + def _get_notification_content( + self, report_schedule: ReportSchedule + ) -> NotificationContent: + """ + Gets a notification content, this is composed by a title and a screenshot + :raises: ReportScheduleScreenshotFailedError + """ + screenshot_data = self._get_screenshot(report_schedule) + if report_schedule.chart: + name = report_schedule.chart.slice_name + else: + name = report_schedule.dashboard.dashboard_title + return NotificationContent(name=name, screenshot=screenshot_data) + + def _send(self, report_schedule: ReportSchedule) -> None: + """ + Creates the notification content and sends them to all recipients + + :raises: ReportScheduleNotificationError + """ + notification_errors = [] + notification_content = self._get_notification_content(report_schedule) + for recipient in report_schedule.recipients: + notification = create_notification(recipient, notification_content) + try: + notification.send() + except NotificationError as ex: + # collect notification errors but keep processing them + notification_errors.append(str(ex)) + if notification_errors: + raise ReportScheduleNotificationError(";".join(notification_errors)) + + def run(self) -> None: + with session_scope(nullpool=True) as session: + try: + start_dttm = datetime.utcnow() + self.validate(session=session) + if not self._model: + raise ReportScheduleExecuteUnexpectedError() + self.set_state_and_log(session, start_dttm, ReportLogState.WORKING) + # If it's an alert check if the alert is triggered + if self._model.type == ReportScheduleType.ALERT: + if not AlertCommand(self._model).run(): + self.set_state_and_log(session, start_dttm, ReportLogState.NOOP) + return + + self._send(self._model) + + # Log, state and TS + self.set_state_and_log(session, start_dttm, ReportLogState.SUCCESS) + except ReportScheduleAlertGracePeriodError as ex: + self.set_state_and_log( + session, start_dttm, ReportLogState.NOOP, error_message=str(ex) + ) + except ReportSchedulePreviousWorkingError as ex: + self.create_log( + session, + start_dttm, + datetime.utcnow(), + state=ReportLogState.ERROR, + error_message=str(ex), + ) + session.commit() + raise + except CommandException as ex: + self.set_state_and_log( + session, start_dttm, ReportLogState.ERROR, error_message=str(ex) + ) + # We want to actually commit the state and log inside the scope + session.commit() + raise + + def validate( # pylint: disable=arguments-differ + self, session: Session = None + ) -> None: + # Validate/populate model exists + self._model = ReportScheduleDAO.find_by_id(self._model_id, session=session) + if not self._model: + raise ReportScheduleNotFoundError() + # Avoid overlap processing + if self._model.last_state == ReportLogState.WORKING: + raise ReportSchedulePreviousWorkingError() + # Check grace period + if self._model.type == ReportScheduleType.ALERT: + last_success = ReportScheduleDAO.find_last_success_log(session) + if ( + last_success Review comment: would be nice to move it to a separate function & have unit test ########## File path: superset/reports/commands/execute.py ########## @@ -0,0 +1,256 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +import logging +from datetime import datetime, timedelta +from typing import Optional + +from sqlalchemy.orm import Session + +from superset import app, thumbnail_cache +from superset.commands.base import BaseCommand +from superset.commands.exceptions import CommandException +from superset.extensions import security_manager +from superset.models.reports import ( + ReportExecutionLog, + ReportLogState, + ReportSchedule, + ReportScheduleType, +) +from superset.reports.commands.alert import AlertCommand +from superset.reports.commands.exceptions import ( + ReportScheduleAlertGracePeriodError, + ReportScheduleExecuteUnexpectedError, + ReportScheduleNotFoundError, + ReportScheduleNotificationError, + ReportSchedulePreviousWorkingError, + ReportScheduleScreenshotFailedError, +) +from superset.reports.dao import ReportScheduleDAO +from superset.reports.notifications import create_notification +from superset.reports.notifications.base import NotificationContent, ScreenshotData +from superset.reports.notifications.exceptions import NotificationError +from superset.utils.celery import session_scope +from superset.utils.screenshots import ( + BaseScreenshot, + ChartScreenshot, + DashboardScreenshot, +) +from superset.utils.urls import get_url_path + +logger = logging.getLogger(__name__) + + +class AsyncExecuteReportScheduleCommand(BaseCommand): + """ + Execute all types of report schedules. + - On reports takes chart or dashboard screenshots and sends configured notifications + - On Alerts uses related Command AlertCommand and sends configured notifications + """ + + def __init__(self, model_id: int, scheduled_dttm: datetime): + self._model_id = model_id + self._model: Optional[ReportSchedule] = None + self._scheduled_dttm = scheduled_dttm + + def set_state_and_log( + self, + session: Session, + start_dttm: datetime, + state: ReportLogState, + error_message: Optional[str] = None, + ) -> None: + """ + Updates current ReportSchedule state and TS. If on final state writes the log + for this execution + """ + now_dttm = datetime.utcnow() + if state == ReportLogState.WORKING: + self.set_state(session, state, now_dttm) + return + self.set_state(session, state, now_dttm) + self.create_log( + session, start_dttm, now_dttm, state, error_message=error_message, + ) + + def set_state( + self, session: Session, state: ReportLogState, dttm: datetime + ) -> None: + """ + Set the current report schedule state, on this case we want to + commit immediately + """ + if self._model: + self._model.last_state = state + self._model.last_eval_dttm = dttm + session.commit() + + def create_log( # pylint: disable=too-many-arguments + self, + session: Session, + start_dttm: datetime, + end_dttm: datetime, + state: ReportLogState, + error_message: Optional[str] = None, + ) -> None: + """ + Creates a Report execution log, uses the current computed last_value for Alerts + """ + if self._model: + log = ReportExecutionLog( + scheduled_dttm=self._scheduled_dttm, + start_dttm=start_dttm, + end_dttm=end_dttm, + value=self._model.last_value, + value_row_json=self._model.last_value_row_json, + state=state, + error_message=error_message, + report_schedule=self._model, + ) + session.add(log) + + @staticmethod + def _get_url(report_schedule: ReportSchedule, user_friendly: bool = False) -> str: + """ + Get the url for this report schedule: chart or dashboard + """ + if report_schedule.chart: + return get_url_path( + "Superset.slice", + user_friendly=user_friendly, + slice_id=report_schedule.chart_id, + standalone="true", + ) + return get_url_path( + "Superset.dashboard", + user_friendly=user_friendly, + dashboard_id_or_slug=report_schedule.dashboard_id, + ) + + def _get_screenshot(self, report_schedule: ReportSchedule) -> ScreenshotData: + """ + Get a chart or dashboard screenshot + :raises: ReportScheduleScreenshotFailedError + """ + url = self._get_url(report_schedule) + screenshot: Optional[BaseScreenshot] = None + if report_schedule.chart: + screenshot = ChartScreenshot(url, report_schedule.chart.digest) + else: + screenshot = DashboardScreenshot(url, report_schedule.dashboard.digest) + image_url = self._get_url(report_schedule, user_friendly=True) + user = security_manager.find_user(app.config["THUMBNAIL_SELENIUM_USER"]) + image_data = screenshot.compute_and_cache( + user=user, cache=thumbnail_cache, force=True, + ) + if not image_data: + raise ReportScheduleScreenshotFailedError() + return ScreenshotData(url=image_url, image=image_data) + + def _get_notification_content( + self, report_schedule: ReportSchedule + ) -> NotificationContent: + """ + Gets a notification content, this is composed by a title and a screenshot + :raises: ReportScheduleScreenshotFailedError + """ + screenshot_data = self._get_screenshot(report_schedule) + if report_schedule.chart: + name = report_schedule.chart.slice_name + else: + name = report_schedule.dashboard.dashboard_title + return NotificationContent(name=name, screenshot=screenshot_data) + + def _send(self, report_schedule: ReportSchedule) -> None: + """ + Creates the notification content and sends them to all recipients + + :raises: ReportScheduleNotificationError + """ + notification_errors = [] + notification_content = self._get_notification_content(report_schedule) + for recipient in report_schedule.recipients: + notification = create_notification(recipient, notification_content) + try: + notification.send() + except NotificationError as ex: + # collect notification errors but keep processing them + notification_errors.append(str(ex)) + if notification_errors: + raise ReportScheduleNotificationError(";".join(notification_errors)) + + def run(self) -> None: + with session_scope(nullpool=True) as session: + try: + start_dttm = datetime.utcnow() + self.validate(session=session) + if not self._model: + raise ReportScheduleExecuteUnexpectedError() + self.set_state_and_log(session, start_dttm, ReportLogState.WORKING) + # If it's an alert check if the alert is triggered + if self._model.type == ReportScheduleType.ALERT: + if not AlertCommand(self._model).run(): + self.set_state_and_log(session, start_dttm, ReportLogState.NOOP) + return + + self._send(self._model) + + # Log, state and TS + self.set_state_and_log(session, start_dttm, ReportLogState.SUCCESS) + except ReportScheduleAlertGracePeriodError as ex: + self.set_state_and_log( + session, start_dttm, ReportLogState.NOOP, error_message=str(ex) + ) + except ReportSchedulePreviousWorkingError as ex: Review comment: should this one be retried ? ########## File path: superset/reports/commands/log_prune.py ########## @@ -0,0 +1,48 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +import logging +from datetime import datetime, timedelta + +from superset.commands.base import BaseCommand +from superset.models.reports import ReportSchedule +from superset.reports.dao import ReportScheduleDAO +from superset.utils.celery import session_scope + +logger = logging.getLogger(__name__) + + +class AsyncPruneReportScheduleLogCommand(BaseCommand): Review comment: nice! ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
