Bindu-yadav8 commented on issue #30304:
URL: https://github.com/apache/superset/issues/30304#issuecomment-2367598652
Hi everyone,
I'm currently working on integrating custom webhook notifications for alerts
in Apache Superset. My goal is to send alert messages to a POST REST API
instead of using traditional notification methods like email or Slack.
What I’ve Implemented:
Custom Webhook Notification Method: I've created a custom script
(webhook.py) to handle the notifications.
```
import requests
from superset.reports.notifications.base import BaseNotification
from superset.reports.models import ReportRecipientType
class WebhookNotification(BaseNotification):
type = ReportRecipientType.WEBHOOK # Register this notification as
'webhook'
def send(self) -> None:
# Get the webhook URL from recipient's data
webhook_url = self._recipient.recipient_config_json.get("target")
# Prepare the message content (adjust as per your needs)
payload = {
"alert_name": self._content.name or "New Superset Alert",
"text": self._content.text or "Alert Triggered",
"description": self._content.description,
"url": self._content.url,
"timestamp": self._content.timestamp.isoformat() if
self._content.timestamp else "",
# Add any additional fields you need
}
if webhook_url:
headers = {'Content-Type': 'application/json'}
try:
response = requests.post(webhook_url, json=payload,
headers=headers)
response.raise_for_status()
self.logger.info(f"Successfully sent alert to Webhook:
{response.status_code}")
except requests.exceptions.RequestException as e:
self.logger.error(f"Failed to send alert to Webhook: {e}")
else:
self.logger.error("Webhook URL not found in recipient
configuration")
raise ValueError("Webhook URL not found in recipient
configuration")
`
```
Alert Configuration: I’ve set up alerts that should trigger based on certain
conditions in my database.
<img width="710" alt="image"
src="https://github.com/user-attachments/assets/cad83991-1039-4dd8-97e0-76f27321a26b">
Basically, I created this alert from the REST API -
http://172.18.2.7:8088/api/v1/report/ (for creating alerts) with the payload -
> {
> "description_columns": {},
> "id": 2,
> "label_columns": {
> "active": "Active",
> "chart.id": "Chart Id",
> "chart.slice_name": "Chart Slice Name",
> "chart.viz_type": "Chart Viz Type",
> "context_markdown": "Context Markdown",
> "creation_method": "Creation Method",
> "crontab": "Crontab",
> "dashboard.dashboard_title": "Dashboard Dashboard Title",
> "dashboard.id": "Dashboard Id",
> "database.database_name": "Database Database Name",
> "database.id": "Database Id",
> "description": "Description",
> "extra": "Extra",
> "force_screenshot": "Force Screenshot",
> "grace_period": "Grace Period",
> "id": "Id",
> "last_eval_dttm": "Last Eval Dttm",
> "last_state": "Last State",
> "last_value": "Last Value",
> "last_value_row_json": "Last Value Row Json",
> "log_retention": "Log Retention",
> "name": "Name",
> "owners.first_name": "Owners First Name",
> "owners.id": "Owners Id",
> "owners.last_name": "Owners Last Name",
> "recipients.id": "Recipients Id",
> "recipients.recipient_config_json": "Recipients Recipient Config
Json",
> "recipients.type": "Recipients Type",
> "report_format": "Report Format",
> "sql": "Sql",
> "timezone": "Timezone",
> "type": "Type",
> "validator_config_json": "Validator Config Json",
> "validator_type": "Validator Type",
> "working_timeout": "Working Timeout"
> },
> "result": {
> "active": true,
> "chart": null,
> "context_markdown": "string",
> "creation_method": "alerts_reports",
> "crontab": "* * * * *",
> "dashboard": {
> "dashboard_title": "Zabbix Alarms",
> "id": 15
> },
> "database": {
> "database_name": "PostgreSQL",
> "id": 2
> },
> "description": "Webhook Alert notification",
> "extra": {},
> "force_screenshot": false,
> "grace_period": 14400,
> "id": 2,
> "last_eval_dttm": null,
> "last_state": "Not triggered",
> "last_value": null,
> "last_value_row_json": null,
> "log_retention": 90,
> "name": "Webhook Alert",
> "owners": [
> {
> "first_name": "admin",
> "id": 1,
> "last_name": "user"
> }
> ],
> "recipients": [
> {
> "id": 36,
> "recipient_config_json": "{\"target\":
\"https://webhook.site/d98c5948-3a9f-4cca-ab27-f251033e6956\"}",
> "type": "Webhook"
> }
> ],
> "report_format": "PNG",
> "sql": "SELECT \"value_column\" \r\nFROM
inodm.dummy_table\r\nWHERE \"insertion_timestamp\" = (SELECT
MAX(\"insertion_timestamp\") FROM inodm.dummy_table)\r\n\r\n\r\n",
> "timezone": "Asia/Kolkata",
> "type": "Alert",
> "validator_config_json": "{\"op\": \">\", \"threshold\": 20.0}",
> "validator_type": "operator",
> "working_timeout": 3600
> },
> "show_columns": [
> "id",
> "active",
> "chart.id",
> "chart.slice_name",
> "chart.viz_type",
> "context_markdown",
> "creation_method",
> "crontab",
> "dashboard.dashboard_title",
> "dashboard.id",
> "database.database_name",
> "database.id",
> "description",
> "extra",
> "force_screenshot",
> "grace_period",
> "last_eval_dttm",
> "last_state",
> "last_value",
> "last_value_row_json",
> "log_retention",
> "name",
> "owners.first_name",
> "owners.id",
> "owners.last_name",
> "recipients.id",
> "recipients.recipient_config_json",
> "recipients.type",
> "report_format",
> "sql",
> "timezone",
> "type",
> "validator_config_json",
> "validator_type",
> "working_timeout"
> ],
> "show_title": "Show Report Schedule"
> }
Database Testing: I’ve mocked the data by adding records to the database and
verified that the alerts should be triggered when the conditions are met.
Issues Encountered:
Despite being able to successfully send requests to the webhook endpoint
using Postman, I'm not receiving any alerts from Superset.
I have both Celery and Redis running to handle background tasks, but it
seems the alert mechanism is not working as expected.
superset/reports/models.py
```
`import enum
from cron_descriptor import get_description
from flask_appbuilder import Model
from flask_appbuilder.models.decorators import renders
from sqlalchemy import (
Boolean,
Column,
DateTime,
Float,
ForeignKey,
Integer,
String,
Table,
Text,
)
from sqlalchemy.orm import backref, relationship
from sqlalchemy.schema import UniqueConstraint
from sqlalchemy_utils import UUIDType
from superset.extensions import security_manager
from superset.models.core import Database
from superset.models.dashboard import Dashboard
from superset.models.helpers import AuditMixinNullable, ExtraJSONMixin
from superset.models.slice import Slice
from superset.reports.types import ReportScheduleExtra
metadata = Model.metadata # pylint: disable=no-member
class ReportScheduleType(str, enum.Enum):
ALERT = "Alert"
REPORT = "Report"
class ReportScheduleValidatorType(str, enum.Enum):
"""Validator types for alerts"""
NOT_NULL = "not null"
OPERATOR = "operator"
class ReportRecipientType(str, enum.Enum):
EMAIL = "Email"
SLACK = "Slack"
WEBHOOK = "Webhook" # New type for REST API notifications
class ReportState(str, enum.Enum):
SUCCESS = "Success"
WORKING = "Working"
ERROR = "Error"
NOOP = "Not triggered"
GRACE = "On Grace"
class ReportDataFormat(str, enum.Enum):
VISUALIZATION = "PNG"
DATA = "CSV"
TEXT = "TEXT"
class ReportCreationMethod(str, enum.Enum):
CHARTS = "charts"
DASHBOARDS = "dashboards"
ALERTS_REPORTS = "alerts_reports"
class ReportSourceFormat(str, enum.Enum):
CHART = "chart"
DASHBOARD = "dashboard"
report_schedule_user = Table(
"report_schedule_user",
metadata,
Column("id", Integer, primary_key=True),
Column("user_id", Integer, ForeignKey("ab_user.id"), nullable=False),
Column(
"report_schedule_id", Integer, ForeignKey("report_schedule.id"),
nullable=False
),
UniqueConstraint("user_id", "report_schedule_id"),
)
class ReportSchedule(Model, AuditMixinNullable, ExtraJSONMixin):
"""
Report Schedules, supports alerts and reports
"""
__tablename__ = "report_schedule"
__table_args__ = (UniqueConstraint("name", "type"),)
id = Column(Integer, primary_key=True)
type = Column(String(50), nullable=False)
name = Column(String(150), nullable=False)
description = Column(Text)
context_markdown = Column(Text)
active = Column(Boolean, default=True, index=True)
crontab = Column(String(1000), nullable=False)
creation_method = Column(
String(255), server_default=ReportCreationMethod.ALERTS_REPORTS
)
timezone = Column(String(100), default="UTC", nullable=False)
report_format = Column(String(50),
default=ReportDataFormat.VISUALIZATION)
sql = Column(Text())
# (Alerts/Reports) M-O to chart
chart_id = Column(Integer, ForeignKey("slices.id"), nullable=True)
chart = relationship(Slice, backref="report_schedules",
foreign_keys=[chart_id])
# (Alerts/Reports) M-O to dashboard
dashboard_id = Column(Integer, ForeignKey("dashboards.id"),
nullable=True)
dashboard = relationship(
Dashboard, backref="report_schedules", foreign_keys=[dashboard_id]
)
# (Alerts) M-O to database
database_id = Column(Integer, ForeignKey("dbs.id"), nullable=True)
database = relationship(Database, foreign_keys=[database_id])
owners = relationship(security_manager.user_model,
secondary=report_schedule_user)
# (Alerts) Stamped last observations
last_eval_dttm = Column(DateTime)
last_state = Column(String(50), default=ReportState.NOOP)
last_value = Column(Float)
last_value_row_json = Column(Text)
# (Alerts) Observed value validation related columns
validator_type = Column(String(100))
validator_config_json = Column(Text, default="{}")
# Log retention
log_retention = Column(Integer, default=90)
# (Alerts) After a success how long to wait for a new trigger (seconds)
grace_period = Column(Integer, default=60 * 60 * 4)
# (Alerts/Reports) Unlock a possible stalled working state
working_timeout = Column(Integer, default=60 * 60 * 1)
# (Reports) When generating a screenshot, bypass the cache?
force_screenshot = Column(Boolean, default=False)
extra: ReportScheduleExtra # type: ignore
def __repr__(self) -> str:
return str(self.name)
@renders("crontab")
def crontab_humanized(self) -> str:
return get_description(self.crontab)
class ReportRecipients(Model, AuditMixinNullable):
"""
Report Recipients, meant to support multiple notification types, eg:
Slack, email
"""
__tablename__ = "report_recipient"
id = Column(Integer, primary_key=True)
type = Column(String(50), nullable=False)
recipient_config_json = Column(Text, default="{}")
report_schedule_id = Column(
Integer, ForeignKey("report_schedule.id"), nullable=False
)
report_schedule = relationship(
ReportSchedule,
backref=backref("recipients", cascade="all,delete,delete-orphan"),
foreign_keys=[report_schedule_id],
)
class ReportExecutionLog(Model): # pylint: disable=too-few-public-methods
"""
Report Execution Log, hold the result of the report execution with
timestamps,
last observation and possible error messages
"""
__tablename__ = "report_execution_log"
id = Column(Integer, primary_key=True)
uuid = Column(UUIDType(binary=True))
# Timestamps
scheduled_dttm = Column(DateTime, nullable=False)
start_dttm = Column(DateTime)
end_dttm = Column(DateTime)
# (Alerts) Observed values
value = Column(Float)
value_row_json = Column(Text)
state = Column(String(50), nullable=False)
error_message = Column(Text)
report_schedule_id = Column(
Integer, ForeignKey("report_schedule.id"), nullable=False
)
report_schedule = relationship(
ReportSchedule,
backref=backref("logs", cascade="all,delete,delete-orphan"),
foreign_keys=[report_schedule_id],
)
```
superset/reports/notifications/Base.py
```
from dataclasses import dataclass
from typing import Any, List, Optional, Type
import pandas as pd
from superset.reports.models import ReportRecipients, ReportRecipientType
from superset.utils.core import HeaderDataType
@dataclass
class NotificationContent:
name: str
header_data: HeaderDataType # this is optional to account for error
states
csv: Optional[bytes] = None # bytes for csv file
screenshots: Optional[List[bytes]] = None # bytes for a list of
screenshots
text: Optional[str] = None
description: Optional[str] = ""
url: Optional[str] = None # url to chart/dashboard for this screenshot
embedded_data: Optional[pd.DataFrame] = None
class BaseNotification: # pylint: disable=too-few-public-methods
"""
Serves has base for all notifications and creates a simple plugin system
for extending future implementations.
Child implementations get automatically registered and should identify
the
notification type
"""
plugins: List[Type["BaseNotification"]] = []
type: Optional[ReportRecipientType] = None
"""
Child classes set their notification type ex: `type = "email"` this
string will be
used by ReportRecipients.type to map to the correct implementation
"""
def __init_subclass__(cls, *args: Any, **kwargs: Any) -> None:
super().__init_subclass__(*args, **kwargs)
cls.plugins.append(cls)
def __init__(
self, recipient: ReportRecipients, content: NotificationContent
) -> None:
self._recipient = recipient
self._content = content
def send(self) -> None:
raise NotImplementedError()
```
Superset_Config.py
```
`from celery.schedules import crontab
import logging
from logging.handlers import RotatingFileHandler
from superset.tasks.types import ExecutorType
# Superset specific config
ROW_LIMIT = 5000
# Flask App Builder configuration
# Your App secret key will be used for securely signing the session cookie
# and encrypting sensitive information on the database
# Make sure you are changing this key for your deployment with a strong key.
# Alternatively you can set it with `SUPERSET_SECRET_KEY` environment
variable.
# You MUST set this for production environments or the server will not refuse
# to start and you will see an error in the logs accordingly.
SECRET_KEY =
'*****************************************************************'
# The SQLAlchemy connection string to your database backend
# This connection defines the path to the database that stores your
# superset metadata (slices, connections, tables, dashboards, ...).
# Note that the connection information to connect to the datasources
# you want to explore are managed directly in the web UI
# The check_same_thread=false property ensures the sqlite client does not
attempt
# to enforce single-threaded access, which may be problematic in some edge
cases
SQLALCHEMY_DATABASE_URI =
'sqlite:////app/superset/superset.db?check_same_thread=false'
TALISMAN_ENABLED = False
WTF_CSRF_ENABLED = False
# Set this API key to enable Mapbox visualizations
MAPBOX_API_KEY = ''
FEATURE_FLAGS = {
"ALERT_REPORTS": True,
"DASHBOARD_CROSS_FILTERS": True,
"DASHBOARD_VIRTUALIZATION": True,
"EMBEDDED_SUPERSET": False,
"ALERT_REPORT_TABS": False,
"ALERT_REPORT_SLACK_V2": False,
"ENABLE_ADVANCED_DATA_TYPES": False,
"ALERTS_ATTACH_REPORTS": True,
"ALLOW_FULL_CSV_EXPORT": False,
"ALLOW_ADHOC_SUBQUERY": False,
"EMBEDDABLE_CHARTS": True,
"DRILL_TO_DETAIL": True,
"DRILL_BY": True,
"HORIZONTAL_FILTER_BAR": True,
"ENABLE_SCHEDULED_REPORTS": True,
"ALERT_REPORTS_NOTIFICATION_DRY_RUN": False,
"DATAPANEL_CLOSED_BY_DEFAULT": False,
}
REDIS_HOST = "localhost"
REDIS_PORT = "6379"
class CeleryConfig:
broker_url = f"redis://{REDIS_HOST}:{REDIS_PORT}/0"
broker_connection_retry_on_startup = True
imports = (
"superset.sql_lab",
"superset.tasks.scheduler",
)
result_backend = f"redis://{REDIS_HOST}:{REDIS_PORT}/0"
worker_prefetch_multiplier = 10
task_acks_late = True
task_annotations = {
"sql_lab.get_sql_results": {
"rate_limit": "100/s",
},
}
beat_schedule = {
"reports.scheduler": {
"task": "reports.scheduler",
"schedule": crontab(minute="*", hour="*"),
},
"reports.prune_log": {
"task": "reports.prune_log",
"schedule": crontab(minute=0, hour=0),
},
}
CELERY_CONFIG = CeleryConfig
ALERT_NOTIFICATION_METHODS = {
'Webhook': 'superset.reports.notifications.webhook.WebhookNotification',
}
WEBDRIVER_TYPE = "chrome"
WEBDRIVER_OPTION_ARGS = [
"--force-device-scale-factor=2.0",
"--high-dpi-support=2.0",
"--headless",
"--disable-gpu",
"--disable-dev-shm-usage",
"--no-sandbox",
"--disable-setuid-sandbox",
"--disable-extensions",
]
WEBDRIVER_BASEURL = "http://localhost:8088"
WEBDRIVER_BASEURL_USER_FRIENDLY = "http://localhost:8088"
THUMBNAIL_SELENIUM_USER = 'admin'
ALERT_REPORTS_EXECUTE_AS = [ExecutorType.SELENIUM]
# Define the log file location and settings
LOG_FORMAT = '%(asctime)s:%(levelname)s:%(name)s:%(message)s'
logging.basicConfig(format=LOG_FORMAT, level=logging.INFO)
logger = logging.getLogger(__name__)
logger.info("superset_config.py loaded successfully.")
# Enable file logging
LOG_FILE = '/var/log/superset/superset.log' # Change the path as needed
file_handler = RotatingFileHandler(LOG_FILE, maxBytes=10000000,
backupCount=5)
file_handler.setLevel(logging.INFO)
file_handler.setFormatter(logging.Formatter(LOG_FORMAT))
logging.getLogger().addHandler(file_handler)
```
Celery and Redis are running correctly -
<img width="941" alt="image"
src="https://github.com/user-attachments/assets/a9e6f716-bbc8-4b03-a204-26e353cb5607">
Request for Help:
I would appreciate any insights or suggestions on troubleshooting this issue.
Specifically, I want to confirm that my implementation is aligned with the
correct alerting process in Superset.
If anyone has experience with similar custom notifications, your advice
would be invaluable.
Thank you in advance for your assistance!
@fisjac @kistoth90 @michael-s-molina @dpgaspar
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]