dpgaspar commented on a change in pull request #11711:
URL: 
https://github.com/apache/incubator-superset/pull/11711#discussion_r527544979



##########
File path: superset/reports/commands/execute.py
##########
@@ -0,0 +1,256 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import logging
+from datetime import datetime, timedelta
+from typing import Optional
+
+from sqlalchemy.orm import Session
+
+from superset import app, thumbnail_cache
+from superset.commands.base import BaseCommand
+from superset.commands.exceptions import CommandException
+from superset.extensions import security_manager
+from superset.models.reports import (
+    ReportExecutionLog,
+    ReportLogState,
+    ReportSchedule,
+    ReportScheduleType,
+)
+from superset.reports.commands.alert import AlertCommand
+from superset.reports.commands.exceptions import (
+    ReportScheduleAlertGracePeriodError,
+    ReportScheduleExecuteUnexpectedError,
+    ReportScheduleNotFoundError,
+    ReportScheduleNotificationError,
+    ReportSchedulePreviousWorkingError,
+    ReportScheduleScreenshotFailedError,
+)
+from superset.reports.dao import ReportScheduleDAO
+from superset.reports.notifications import create_notification
+from superset.reports.notifications.base import NotificationContent, 
ScreenshotData
+from superset.reports.notifications.exceptions import NotificationError
+from superset.utils.celery import session_scope
+from superset.utils.screenshots import (
+    BaseScreenshot,
+    ChartScreenshot,
+    DashboardScreenshot,
+)
+from superset.utils.urls import get_url_path
+
+logger = logging.getLogger(__name__)
+
+
+class AsyncExecuteReportScheduleCommand(BaseCommand):
+    """
+    Execute all types of report schedules.
+    - On reports takes chart or dashboard screenshots and sends configured 
notifications
+    - On Alerts uses related Command AlertCommand and sends configured 
notifications
+    """
+
+    def __init__(self, model_id: int, scheduled_dttm: datetime):
+        self._model_id = model_id
+        self._model: Optional[ReportSchedule] = None
+        self._scheduled_dttm = scheduled_dttm
+
+    def set_state_and_log(
+        self,
+        session: Session,
+        start_dttm: datetime,
+        state: ReportLogState,
+        error_message: Optional[str] = None,
+    ) -> None:
+        """
+        Updates current ReportSchedule state and TS. If on final state writes 
the log
+        for this execution
+        """
+        now_dttm = datetime.utcnow()
+        if state == ReportLogState.WORKING:
+            self.set_state(session, state, now_dttm)
+            return
+        self.set_state(session, state, now_dttm)
+        self.create_log(
+            session, start_dttm, now_dttm, state, error_message=error_message,
+        )
+
+    def set_state(
+        self, session: Session, state: ReportLogState, dttm: datetime
+    ) -> None:
+        """
+        Set the current report schedule state, on this case we want to
+        commit immediately
+        """
+        if self._model:
+            self._model.last_state = state
+            self._model.last_eval_dttm = dttm
+            session.commit()
+
+    def create_log(  # pylint: disable=too-many-arguments
+        self,
+        session: Session,
+        start_dttm: datetime,
+        end_dttm: datetime,
+        state: ReportLogState,
+        error_message: Optional[str] = None,
+    ) -> None:
+        """
+        Creates a Report execution log, uses the current computed last_value 
for Alerts
+        """
+        if self._model:
+            log = ReportExecutionLog(
+                scheduled_dttm=self._scheduled_dttm,
+                start_dttm=start_dttm,
+                end_dttm=end_dttm,
+                value=self._model.last_value,
+                value_row_json=self._model.last_value_row_json,
+                state=state,
+                error_message=error_message,
+                report_schedule=self._model,
+            )
+            session.add(log)
+
+    @staticmethod
+    def _get_url(report_schedule: ReportSchedule, user_friendly: bool = False) 
-> str:
+        """
+        Get the url for this report schedule: chart or dashboard
+        """
+        if report_schedule.chart:
+            return get_url_path(
+                "Superset.slice",
+                user_friendly=user_friendly,
+                slice_id=report_schedule.chart_id,
+                standalone="true",
+            )
+        return get_url_path(
+            "Superset.dashboard",
+            user_friendly=user_friendly,
+            dashboard_id_or_slug=report_schedule.dashboard_id,
+        )
+
+    def _get_screenshot(self, report_schedule: ReportSchedule) -> 
ScreenshotData:
+        """
+        Get a chart or dashboard screenshot
+        :raises: ReportScheduleScreenshotFailedError
+        """
+        url = self._get_url(report_schedule)
+        screenshot: Optional[BaseScreenshot] = None
+        if report_schedule.chart:
+            screenshot = ChartScreenshot(url, report_schedule.chart.digest)
+        else:
+            screenshot = DashboardScreenshot(url, 
report_schedule.dashboard.digest)
+        image_url = self._get_url(report_schedule, user_friendly=True)
+        user = 
security_manager.find_user(app.config["THUMBNAIL_SELENIUM_USER"])
+        image_data = screenshot.compute_and_cache(
+            user=user, cache=thumbnail_cache, force=True,
+        )
+        if not image_data:
+            raise ReportScheduleScreenshotFailedError()
+        return ScreenshotData(url=image_url, image=image_data)
+
+    def _get_notification_content(
+        self, report_schedule: ReportSchedule
+    ) -> NotificationContent:
+        """
+        Gets a notification content, this is composed by a title and a 
screenshot
+        :raises: ReportScheduleScreenshotFailedError
+        """
+        screenshot_data = self._get_screenshot(report_schedule)
+        if report_schedule.chart:
+            name = report_schedule.chart.slice_name
+        else:
+            name = report_schedule.dashboard.dashboard_title
+        return NotificationContent(name=name, screenshot=screenshot_data)
+
+    def _send(self, report_schedule: ReportSchedule) -> None:
+        """
+        Creates the notification content and sends them to all recipients
+
+        :raises: ReportScheduleNotificationError
+        """
+        notification_errors = []
+        notification_content = self._get_notification_content(report_schedule)
+        for recipient in report_schedule.recipients:
+            notification = create_notification(recipient, notification_content)
+            try:
+                notification.send()
+            except NotificationError as ex:
+                # collect notification errors but keep processing them
+                notification_errors.append(str(ex))
+        if notification_errors:
+            raise 
ReportScheduleNotificationError(";".join(notification_errors))
+
+    def run(self) -> None:
+        with session_scope(nullpool=True) as session:
+            try:
+                start_dttm = datetime.utcnow()
+                self.validate(session=session)
+                if not self._model:
+                    raise ReportScheduleExecuteUnexpectedError()
+                self.set_state_and_log(session, start_dttm, 
ReportLogState.WORKING)
+                # If it's an alert check if the alert is triggered
+                if self._model.type == ReportScheduleType.ALERT:
+                    if not AlertCommand(self._model).run():
+                        self.set_state_and_log(session, start_dttm, 
ReportLogState.NOOP)
+                        return
+
+                self._send(self._model)
+
+                # Log, state and TS
+                self.set_state_and_log(session, start_dttm, 
ReportLogState.SUCCESS)
+            except ReportScheduleAlertGracePeriodError as ex:
+                self.set_state_and_log(
+                    session, start_dttm, ReportLogState.NOOP, 
error_message=str(ex)
+                )
+            except ReportSchedulePreviousWorkingError as ex:

Review comment:
       I think not, since I'm proposing a 1min beat, the beat cycle itself is 
the retry, yet the cron for the alert can be set to 1h or 10min etc. 
Undecided..., I propose not to throw too much logic here, unless we find it's 
something we really should add




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to