bkyryliuk commented on a change in pull request #10605:
URL: 
https://github.com/apache/incubator-superset/pull/10605#discussion_r478617957



##########
File path: superset/models/alerts.py
##########
@@ -100,3 +105,154 @@ class AlertLog(Model):
     @property
     def duration(self) -> int:
         return (self.dttm_end - self.dttm_start).total_seconds()
+
+
+class SQLObserver(Model):
+    """Runs SQL-based queries for alerts"""
+
+    __tablename__ = "sql_observers"
+
+    id = Column(Integer, primary_key=True)
+    name = Column(String(150), nullable=False)
+    sql = Column(Text, nullable=False)
+
+    @declared_attr
+    def alert_id(self) -> int:
+        return Column(Integer, ForeignKey("alerts.id"), nullable=False)
+
+    @declared_attr
+    def alert(self) -> RelationshipProperty:
+        return relationship(
+            "Alert",
+            foreign_keys=[self.alert_id],
+            backref=backref("sql_observers", cascade="all, delete-orphan"),
+        )
+
+    @declared_attr
+    def database_id(self) -> int:
+        return Column(Integer, ForeignKey("dbs.id"), nullable=False)
+
+    @declared_attr
+    def database(self) -> RelationshipProperty:
+        return relationship(
+            "Database",
+            foreign_keys=[self.database_id],
+            backref=backref("sql_observers", cascade="all, delete-orphan"),
+        )
+
+    # TODO: Abstract observations from the sqlamodls e.g.

Review comment:
       s/sqlamodls/sqlalchemy models
   add some context, e.g. observation table will be constantly growing and 
would need to either have some retention policy or be moved to the more 
scalable db. Same as alert log

##########
File path: superset/models/alerts.py
##########
@@ -100,3 +105,154 @@ class AlertLog(Model):
     @property
     def duration(self) -> int:
         return (self.dttm_end - self.dttm_start).total_seconds()
+
+
+class SQLObserver(Model):
+    """Runs SQL-based queries for alerts"""
+
+    __tablename__ = "sql_observers"
+
+    id = Column(Integer, primary_key=True)
+    name = Column(String(150), nullable=False)
+    sql = Column(Text, nullable=False)
+
+    @declared_attr
+    def alert_id(self) -> int:
+        return Column(Integer, ForeignKey("alerts.id"), nullable=False)
+
+    @declared_attr
+    def alert(self) -> RelationshipProperty:
+        return relationship(
+            "Alert",
+            foreign_keys=[self.alert_id],
+            backref=backref("sql_observers", cascade="all, delete-orphan"),
+        )
+
+    @declared_attr
+    def database_id(self) -> int:
+        return Column(Integer, ForeignKey("dbs.id"), nullable=False)
+
+    @declared_attr
+    def database(self) -> RelationshipProperty:
+        return relationship(
+            "Database",
+            foreign_keys=[self.database_id],
+            backref=backref("sql_observers", cascade="all, delete-orphan"),
+        )
+
+    # TODO: Abstract observations from the sqlamodls e.g.
+    # 
https://github.com/apache/incubator-superset/blob/master/superset/utils/log.py#L32
+    def get_observations(self, observation_num: Optional[int] = 2) -> 
List[Any]:
+        return (
+            db.session.query(SQLObservation)
+            .filter_by(observer_id=self.id)
+            .order_by(SQLObservation.dttm.desc())
+            .limit(observation_num)
+        )
+
+
+class SQLObservation(Model):  # pylint: disable=too-few-public-methods
+    """Keeps track of values retrieved from SQLObservers"""
+
+    __tablename__ = "sql_observations"
+
+    id = Column(Integer, primary_key=True)
+    dttm = Column(DateTime, default=datetime.utcnow, index=True)
+    observer_id = Column(Integer, ForeignKey("sql_observers.id"), 
nullable=False)
+    observer = relationship(
+        "SQLObserver",
+        foreign_keys=[observer_id],
+        backref=backref("observations", cascade="all, delete-orphan"),
+    )
+    alert_id = Column(Integer, ForeignKey("alerts.id"))
+    alert = relationship(
+        "Alert",
+        foreign_keys=[alert_id],
+        backref=backref("observations", cascade="all, delete-orphan"),
+    )
+    value = Column(Float)
+
+
+class Validator(Model):
+    """Used to determine how an alert and its observations should be 
validated"""
+
+    __tablename__ = "alert_validators"
+
+    id = Column(Integer, primary_key=True)
+    name = Column(String(150), nullable=False)
+    validator_type = Column(Enum(AlertValidatorType))
+    config = Column(
+        Text,
+        default=textwrap.dedent(
+            """
+            {
+                "example_threshold": 50

Review comment:
       let's keep the default empty

##########
File path: superset/tasks/schedules.py
##########
@@ -645,55 +660,65 @@ def deliver_slack_alert(alert_content: AlertContent, 
slack_channel: str) -> None
     )
 
 
-def run_alert_query(
-    alert_id: int, database_id: int, sql: str, label: str
-) -> Optional[bool]:
+def observe(alert_id: int) -> str:

Review comment:
       let's move it into a separate module e.g. tasks/alerts/observer

##########
File path: superset/tasks/schedules.py
##########
@@ -645,55 +660,65 @@ def deliver_slack_alert(alert_content: AlertContent, 
slack_channel: str) -> None
     )
 
 
-def run_alert_query(
-    alert_id: int, database_id: int, sql: str, label: str
-) -> Optional[bool]:
+def observe(alert_id: int) -> str:
     """
-    Execute alert.sql and return value if any rows are returned
+    Runs the SQL query in an alert's SQLObserver and then
+    stores the result in a SQLObservation
     """
-    logger.info("Processing alert ID: %i", alert_id)
-    database = db.session.query(Database).get(database_id)
-    if not database:
-        logger.error("Alert database not preset")
-        return None
 
-    if not sql:
-        logger.error("Alert SQL not preset")
-        return None
+    sql_observer = 
db.session.query(SQLObserver).filter_by(alert_id=alert_id).one()
+    value = None
 
-    parsed_query = ParsedQuery(sql)
+    parsed_query = ParsedQuery(sql_observer.sql)
     sql = parsed_query.stripped()
+    df = sql_observer.database.get_df(sql)
+
+    if not df.empty:
+        value = float(df.to_records()[0][1])
+
+    observation = SQLObservation(
+        observer_id=sql_observer.id,
+        alert_id=alert_id,
+        dttm=datetime.utcnow(),
+        value=value,
+    )
+
+    db.session.add(observation)
+    db.session.commit()
+
+    return sql_observer.sql
+
+
+def check_alert(alert_id: int, label: str) -> None:

Review comment:
       observe_and_validate would be a better name

##########
File path: tests/alerts_tests.py
##########
@@ -41,95 +50,204 @@
 def setup_database():
     with app.app_context():
         slice_id = db.session.query(Slice).all()[0].id
-        database_id = utils.get_example_database().id
+        database = utils.get_example_database()
+        database_id = database.id
+        database.get_sqla_engine().execute("CREATE TABLE test_table AS SELECT 
2 as id")
 
+        common_data = dict(
+            active=True,
+            crontab="* * * * *",
+            slice_id=slice_id,
+            recipients="[email protected]",
+            slack_channel="#test_channel",
+        )
         alerts = [
-            Alert(
-                id=1,
-                label="alert_1",
-                active=True,
-                crontab="*/1 * * * *",
-                sql="SELECT 0",
-                alert_type="email",
-                slice_id=slice_id,
-                database_id=database_id,
+            Alert(**common_data, id=1, label="alert_1"),

Review comment:
       let's not specify ids here and those are autogenerated, use label to 
retrieve alerts you care about

##########
File path: superset/models/alerts.py
##########
@@ -100,3 +105,154 @@ class AlertLog(Model):
     @property
     def duration(self) -> int:
         return (self.dttm_end - self.dttm_start).total_seconds()
+
+
+class SQLObserver(Model):
+    """Runs SQL-based queries for alerts"""
+
+    __tablename__ = "sql_observers"
+
+    id = Column(Integer, primary_key=True)
+    name = Column(String(150), nullable=False)
+    sql = Column(Text, nullable=False)
+
+    @declared_attr
+    def alert_id(self) -> int:
+        return Column(Integer, ForeignKey("alerts.id"), nullable=False)
+
+    @declared_attr
+    def alert(self) -> RelationshipProperty:
+        return relationship(
+            "Alert",
+            foreign_keys=[self.alert_id],
+            backref=backref("sql_observers", cascade="all, delete-orphan"),
+        )
+
+    @declared_attr
+    def database_id(self) -> int:
+        return Column(Integer, ForeignKey("dbs.id"), nullable=False)
+
+    @declared_attr
+    def database(self) -> RelationshipProperty:
+        return relationship(
+            "Database",
+            foreign_keys=[self.database_id],
+            backref=backref("sql_observers", cascade="all, delete-orphan"),
+        )
+
+    # TODO: Abstract observations from the sqlamodls e.g.
+    # 
https://github.com/apache/incubator-superset/blob/master/superset/utils/log.py#L32
+    def get_observations(self, observation_num: Optional[int] = 2) -> 
List[Any]:
+        return (
+            db.session.query(SQLObservation)
+            .filter_by(observer_id=self.id)
+            .order_by(SQLObservation.dttm.desc())
+            .limit(observation_num)
+        )
+
+
+class SQLObservation(Model):  # pylint: disable=too-few-public-methods
+    """Keeps track of values retrieved from SQLObservers"""
+
+    __tablename__ = "sql_observations"
+
+    id = Column(Integer, primary_key=True)
+    dttm = Column(DateTime, default=datetime.utcnow, index=True)
+    observer_id = Column(Integer, ForeignKey("sql_observers.id"), 
nullable=False)
+    observer = relationship(
+        "SQLObserver",
+        foreign_keys=[observer_id],
+        backref=backref("observations", cascade="all, delete-orphan"),
+    )
+    alert_id = Column(Integer, ForeignKey("alerts.id"))
+    alert = relationship(
+        "Alert",
+        foreign_keys=[alert_id],
+        backref=backref("observations", cascade="all, delete-orphan"),
+    )
+    value = Column(Float)
+
+
+class Validator(Model):
+    """Used to determine how an alert and its observations should be 
validated"""
+
+    __tablename__ = "alert_validators"
+
+    id = Column(Integer, primary_key=True)
+    name = Column(String(150), nullable=False)
+    validator_type = Column(Enum(AlertValidatorType))
+    config = Column(
+        Text,
+        default=textwrap.dedent(
+            """
+            {
+                "example_threshold": 50
+            }
+            """
+        ),
+    )
+
+    @declared_attr
+    def alert_id(self) -> int:
+        return Column(Integer, ForeignKey("alerts.id"), nullable=False)
+
+    @declared_attr
+    def alert(self) -> RelationshipProperty:
+        return relationship(
+            "Alert",
+            foreign_keys=[self.alert_id],
+            backref=backref("alert_validators", cascade="all, delete-orphan"),
+        )
+
+
+def not_null_executor(

Review comment:
       this should live outside the models, you can create a separate module 
for them 
   e.g. tasks/schedules/alerts/validator.py
   

##########
File path: superset/tasks/schedules.py
##########
@@ -538,16 +543,15 @@ def schedule_alert_query(  # pylint: 
disable=unused-argument
             logger.info("Ignoring deactivated alert")
             return
 
+        if schedule.sql_observers:
+            sql = schedule.sql_observers[0].sql

Review comment:
       could there be multiple observers?
   if not, let's change sql_observers to sql_observer

##########
File path: superset/models/alerts.py
##########
@@ -100,3 +105,154 @@ class AlertLog(Model):
     @property
     def duration(self) -> int:
         return (self.dttm_end - self.dttm_start).total_seconds()
+
+
+class SQLObserver(Model):
+    """Runs SQL-based queries for alerts"""
+
+    __tablename__ = "sql_observers"
+
+    id = Column(Integer, primary_key=True)
+    name = Column(String(150), nullable=False)
+    sql = Column(Text, nullable=False)
+
+    @declared_attr
+    def alert_id(self) -> int:
+        return Column(Integer, ForeignKey("alerts.id"), nullable=False)
+
+    @declared_attr
+    def alert(self) -> RelationshipProperty:
+        return relationship(
+            "Alert",
+            foreign_keys=[self.alert_id],
+            backref=backref("sql_observers", cascade="all, delete-orphan"),
+        )
+
+    @declared_attr
+    def database_id(self) -> int:
+        return Column(Integer, ForeignKey("dbs.id"), nullable=False)
+
+    @declared_attr
+    def database(self) -> RelationshipProperty:
+        return relationship(
+            "Database",
+            foreign_keys=[self.database_id],
+            backref=backref("sql_observers", cascade="all, delete-orphan"),
+        )
+
+    # TODO: Abstract observations from the sqlamodls e.g.
+    # 
https://github.com/apache/incubator-superset/blob/master/superset/utils/log.py#L32
+    def get_observations(self, observation_num: Optional[int] = 2) -> 
List[Any]:
+        return (
+            db.session.query(SQLObservation)
+            .filter_by(observer_id=self.id)
+            .order_by(SQLObservation.dttm.desc())
+            .limit(observation_num)
+        )
+
+
+class SQLObservation(Model):  # pylint: disable=too-few-public-methods
+    """Keeps track of values retrieved from SQLObservers"""
+
+    __tablename__ = "sql_observations"
+
+    id = Column(Integer, primary_key=True)
+    dttm = Column(DateTime, default=datetime.utcnow, index=True)
+    observer_id = Column(Integer, ForeignKey("sql_observers.id"), 
nullable=False)
+    observer = relationship(
+        "SQLObserver",
+        foreign_keys=[observer_id],
+        backref=backref("observations", cascade="all, delete-orphan"),
+    )
+    alert_id = Column(Integer, ForeignKey("alerts.id"))
+    alert = relationship(
+        "Alert",
+        foreign_keys=[alert_id],
+        backref=backref("observations", cascade="all, delete-orphan"),
+    )
+    value = Column(Float)
+
+
+class Validator(Model):
+    """Used to determine how an alert and its observations should be 
validated"""
+
+    __tablename__ = "alert_validators"
+
+    id = Column(Integer, primary_key=True)
+    name = Column(String(150), nullable=False)
+    validator_type = Column(Enum(AlertValidatorType))
+    config = Column(
+        Text,
+        default=textwrap.dedent(
+            """
+            {
+                "example_threshold": 50
+            }
+            """
+        ),
+    )
+
+    @declared_attr
+    def alert_id(self) -> int:
+        return Column(Integer, ForeignKey("alerts.id"), nullable=False)
+
+    @declared_attr
+    def alert(self) -> RelationshipProperty:
+        return relationship(
+            "Alert",
+            foreign_keys=[self.alert_id],
+            backref=backref("alert_validators", cascade="all, delete-orphan"),
+        )
+
+
+def not_null_executor(
+    observer: SQLObserver, validator: Validator  # pylint: 
disable=unused-argument
+) -> bool:
+    """Returns True if a SQLObserver's recent observation is not NULL"""
+
+    observation = observer.get_observations(1)[0]
+    if observation.value:

Review comment:
       this one could be tricky
   if 0 returns false :) 

##########
File path: superset/tasks/schedules.py
##########
@@ -538,16 +543,15 @@ def schedule_alert_query(  # pylint: 
disable=unused-argument
             logger.info("Ignoring deactivated alert")
             return
 
+        if schedule.sql_observers:
+            sql = schedule.sql_observers[0].sql
+
         if report_type == ScheduleType.alert:
             if recipients or slack_channel:
-                deliver_alert(schedule.id, recipients, slack_channel)
+                deliver_alert(schedule.id, sql, recipients, slack_channel)

Review comment:
       how can you deliver alert before checking it?
   maybe rename the functions to properly represent what they are doing
   
   it says that both deliver_alert and check_alert notify the customer. 
slightly confused here.

##########
File path: superset/tasks/schedules.py
##########
@@ -705,7 +730,19 @@ def run_alert_query(
     )
     db.session.commit()
 
-    return None
+
+def validate_alert(alert_id: int, label: str) -> bool:

Review comment:
       s/validate_alert/validate_observations or just validate
   
   

##########
File path: superset/tasks/schedules.py
##########
@@ -645,55 +660,65 @@ def deliver_slack_alert(alert_content: AlertContent, 
slack_channel: str) -> None
     )
 
 
-def run_alert_query(
-    alert_id: int, database_id: int, sql: str, label: str
-) -> Optional[bool]:
+def observe(alert_id: int) -> str:
     """
-    Execute alert.sql and return value if any rows are returned
+    Runs the SQL query in an alert's SQLObserver and then
+    stores the result in a SQLObservation
     """
-    logger.info("Processing alert ID: %i", alert_id)
-    database = db.session.query(Database).get(database_id)
-    if not database:
-        logger.error("Alert database not preset")
-        return None
 
-    if not sql:
-        logger.error("Alert SQL not preset")
-        return None
+    sql_observer = 
db.session.query(SQLObserver).filter_by(alert_id=alert_id).one()
+    value = None
 
-    parsed_query = ParsedQuery(sql)
+    parsed_query = ParsedQuery(sql_observer.sql)
     sql = parsed_query.stripped()
+    df = sql_observer.database.get_df(sql)
+
+    if not df.empty:
+        value = float(df.to_records()[0][1])

Review comment:
       add logging here if there are incorrect types / df is empty and properly 
handle edgecases
   for instance you can add a type / field to the observation that result was 
malformed

##########
File path: superset/views/alerts.py
##########
@@ -23,17 +24,62 @@
 from wtforms import BooleanField, Form, StringField
 
 from superset.constants import RouteMethod
-from superset.models.alerts import Alert, AlertLog
+from superset.models.alerts import (
+    Alert,
+    AlertLog,
+    AlertValidatorType,
+    SQLObservation,
+    SQLObserver,
+    Validator,
+)
 from superset.models.schedules import ScheduleType
 from superset.tasks.schedules import schedule_alert_query
+from superset.utils import core as utils
 from superset.utils.core import get_email_address_str, markdown
 
 from ..exceptions import SupersetException
+from ..sql_parse import ParsedQuery
 from .base import SupersetModelView
 
 # TODO: access control rules for this module
 
 
+def test_observer_sql(item: "SQLObserverInlineView") -> None:
+    try:
+        parsed_query = ParsedQuery(item.sql)
+        sql = parsed_query.stripped()
+        df = item.database.get_df(sql)
+
+        if not df.empty:
+            value = df.to_records()[0][1]
+
+            # Check that the SQL result can be read as a float
+            if value:
+                float(value)
+
+    except ValueError:
+        raise SupersetException("Error: SQL query returned non-number result")
+    except Exception as ex:  # pylint: disable=broad-except
+        raise SupersetException(f"Observer raised exception: {ex}")
+
+
+def check_validator_config(item: "ValidatorInlineView") -> None:
+    config = json.loads(item.config)
+
+    if item.validator_type == AlertValidatorType.gte_threshold and not 
config.get(
+        "gte_threshold"
+    ):
+        raise SupersetException(
+            "Error: Greater Than or Equal To Validator needs a specified 
threshold"

Review comment:
       in exception - specify and example, this way user will have an 
actionable insight

##########
File path: superset/views/alerts.py
##########
@@ -23,17 +24,62 @@
 from wtforms import BooleanField, Form, StringField
 
 from superset.constants import RouteMethod
-from superset.models.alerts import Alert, AlertLog
+from superset.models.alerts import (
+    Alert,
+    AlertLog,
+    AlertValidatorType,
+    SQLObservation,
+    SQLObserver,
+    Validator,
+)
 from superset.models.schedules import ScheduleType
 from superset.tasks.schedules import schedule_alert_query
+from superset.utils import core as utils
 from superset.utils.core import get_email_address_str, markdown
 
 from ..exceptions import SupersetException
+from ..sql_parse import ParsedQuery
 from .base import SupersetModelView
 
 # TODO: access control rules for this module
 
 
+def test_observer_sql(item: "SQLObserverInlineView") -> None:

Review comment:
       test_ prefix should be used only in the test method / classes
   
   this would be check/validate would be a better name, be aware that 
validation may take quite some time - but it's nice to have it. validation 
logic should also live in the task as well, you can abstract this function. It 
may happen that table schema changed and that would make observer invalid.

##########
File path: tests/alerts_tests.py
##########
@@ -41,95 +50,204 @@
 def setup_database():
     with app.app_context():
         slice_id = db.session.query(Slice).all()[0].id
-        database_id = utils.get_example_database().id
+        database = utils.get_example_database()
+        database_id = database.id
+        database.get_sqla_engine().execute("CREATE TABLE test_table AS SELECT 
2 as id")
 
+        common_data = dict(
+            active=True,
+            crontab="* * * * *",
+            slice_id=slice_id,
+            recipients="[email protected]",
+            slack_channel="#test_channel",
+        )
         alerts = [
-            Alert(
-                id=1,
-                label="alert_1",
-                active=True,
-                crontab="*/1 * * * *",
-                sql="SELECT 0",
-                alert_type="email",
-                slice_id=slice_id,
-                database_id=database_id,
+            Alert(**common_data, id=1, label="alert_1"),
+            Alert(**common_data, id=2, label="alert_2"),
+            Alert(**common_data, id=3, label="alert_3"),
+            Alert(**common_data, id=4, label="alert_4"),
+            Alert(id=5, crontab="* * * * *", active=False, label="alert_5"),
+            Alert(id=6, crontab="* * * * *", active=False, label="alert_6"),
+        ]
+        observers = [
+            SQLObserver(
+                name="observer_1", sql="SELECT 0", alert_id=1, 
database_id=database_id
             ),
-            Alert(
-                id=2,
-                label="alert_2",
-                active=True,
-                crontab="*/1 * * * *",
-                sql="SELECT 55",
-                alert_type="email",
-                slice_id=slice_id,
-                recipients="[email protected]",
-                slack_channel="#test_channel",
+            SQLObserver(
+                name="observer_2",
+                sql="SELECT id FROM test_table WHERE id = -1",
+                alert_id=2,
                 database_id=database_id,
             ),
-            Alert(
-                id=3,
-                label="alert_3",
-                active=False,
-                crontab="*/1 * * * *",
-                sql="UPDATE 55",
-                alert_type="email",
-                slice_id=slice_id,
-                database_id=database_id,
+            SQLObserver(
+                name="observer_3", sql="$%^&", alert_id=3, 
database_id=database_id
+            ),
+            SQLObserver(
+                name="observer_4", sql="SELECT 55", alert_id=4, 
database_id=database_id
             ),
-            Alert(id=4, active=False, label="alert_4", database_id=-1),
-            Alert(id=5, active=False, label="alert_5", 
database_id=database_id),
         ]
 
         db.session.bulk_save_objects(alerts)
-        db.session.commit()
+        db.session.bulk_save_objects(observers)
         yield db.session
 
+        db.session.query(SQLObservation).delete()
+        db.session.query(SQLObserver).delete()
+        db.session.query(Validator).delete()
         db.session.query(AlertLog).delete()
         db.session.query(Alert).delete()
 
 
+def test_alert_observer(setup_database):
+    dbsession = setup_database
+
+    # Test SQLObserver with empty SQL return

Review comment:
       nice tests, let's add more, returning 1 row 1 value, 1 row 2 values, 2 
rows, boolean, float, int, null, string, etc

##########
File path: tests/alerts_tests.py
##########
@@ -41,95 +50,204 @@
 def setup_database():
     with app.app_context():
         slice_id = db.session.query(Slice).all()[0].id
-        database_id = utils.get_example_database().id
+        database = utils.get_example_database()

Review comment:
       s/database/example_database

##########
File path: tests/alerts_tests.py
##########
@@ -41,95 +50,204 @@
 def setup_database():
     with app.app_context():
         slice_id = db.session.query(Slice).all()[0].id
-        database_id = utils.get_example_database().id
+        database = utils.get_example_database()
+        database_id = database.id
+        database.get_sqla_engine().execute("CREATE TABLE test_table AS SELECT 
2 as id")
 
+        common_data = dict(
+            active=True,
+            crontab="* * * * *",
+            slice_id=slice_id,
+            recipients="[email protected]",
+            slack_channel="#test_channel",
+        )
         alerts = [
-            Alert(
-                id=1,
-                label="alert_1",
-                active=True,
-                crontab="*/1 * * * *",
-                sql="SELECT 0",
-                alert_type="email",
-                slice_id=slice_id,
-                database_id=database_id,
+            Alert(**common_data, id=1, label="alert_1"),
+            Alert(**common_data, id=2, label="alert_2"),
+            Alert(**common_data, id=3, label="alert_3"),
+            Alert(**common_data, id=4, label="alert_4"),
+            Alert(id=5, crontab="* * * * *", active=False, label="alert_5"),
+            Alert(id=6, crontab="* * * * *", active=False, label="alert_6"),
+        ]
+        observers = [
+            SQLObserver(
+                name="observer_1", sql="SELECT 0", alert_id=1, 
database_id=database_id
             ),
-            Alert(
-                id=2,
-                label="alert_2",
-                active=True,
-                crontab="*/1 * * * *",
-                sql="SELECT 55",
-                alert_type="email",
-                slice_id=slice_id,
-                recipients="[email protected]",
-                slack_channel="#test_channel",
+            SQLObserver(
+                name="observer_2",
+                sql="SELECT id FROM test_table WHERE id = -1",
+                alert_id=2,
                 database_id=database_id,
             ),
-            Alert(
-                id=3,
-                label="alert_3",
-                active=False,
-                crontab="*/1 * * * *",
-                sql="UPDATE 55",
-                alert_type="email",
-                slice_id=slice_id,
-                database_id=database_id,
+            SQLObserver(
+                name="observer_3", sql="$%^&", alert_id=3, 
database_id=database_id
+            ),
+            SQLObserver(
+                name="observer_4", sql="SELECT 55", alert_id=4, 
database_id=database_id
             ),
-            Alert(id=4, active=False, label="alert_4", database_id=-1),
-            Alert(id=5, active=False, label="alert_5", 
database_id=database_id),
         ]
 
         db.session.bulk_save_objects(alerts)
-        db.session.commit()
+        db.session.bulk_save_objects(observers)
         yield db.session
 
+        db.session.query(SQLObservation).delete()
+        db.session.query(SQLObserver).delete()
+        db.session.query(Validator).delete()
         db.session.query(AlertLog).delete()
         db.session.query(Alert).delete()
 
 
+def test_alert_observer(setup_database):
+    dbsession = setup_database
+
+    # Test SQLObserver with empty SQL return
+    alert2 = dbsession.query(Alert).filter_by(id=2).one()
+    observe(alert2.id)
+    assert alert2.sql_observers[0].observations[-1].value is None
+
+    # Test SQLObserver with non-empty SQL return
+    alert4 = dbsession.query(Alert).filter_by(id=4).one()
+    observe(alert4.id)
+    assert alert4.sql_observers[0].observations[-1].value == 55
+
+
+def test_alert_error(setup_database):
+    dbsession = setup_database
+
+    # Test error with Observer SQL statement
+    alert3 = dbsession.query(Alert).filter_by(id=3).one()
+    check_alert(alert3.id, alert3.label)
+    assert alert3.logs[-1].state == AlertState.ERROR
+
+    # Test error with alert lacking observer
+    alert5 = dbsession.query(Alert).filter_by(id=5).one()
+    check_alert(alert5.id, alert5.label)
+    assert alert5.logs[-1].state == AlertState.ERROR
+
+
 @patch("superset.tasks.schedules.deliver_alert")
-@patch("superset.tasks.schedules.logging.Logger.error")
-def test_run_alert_query(mock_error, mock_deliver_alert, setup_database):
+def test_not_null_validator(mock_deliver_alert, setup_database):
     dbsession = setup_database
+    null_val1 = Validator(

Review comment:
       let's decouple db and validators, validators are just the simple 
functions.
   You can pass observation list and config to them instead.

##########
File path: tests/alerts_tests.py
##########
@@ -41,95 +50,204 @@
 def setup_database():
     with app.app_context():
         slice_id = db.session.query(Slice).all()[0].id
-        database_id = utils.get_example_database().id
+        database = utils.get_example_database()
+        database_id = database.id
+        database.get_sqla_engine().execute("CREATE TABLE test_table AS SELECT 
2 as id")
 
+        common_data = dict(
+            active=True,
+            crontab="* * * * *",
+            slice_id=slice_id,
+            recipients="[email protected]",
+            slack_channel="#test_channel",
+        )
         alerts = [
-            Alert(
-                id=1,
-                label="alert_1",
-                active=True,
-                crontab="*/1 * * * *",
-                sql="SELECT 0",
-                alert_type="email",
-                slice_id=slice_id,
-                database_id=database_id,
+            Alert(**common_data, id=1, label="alert_1"),
+            Alert(**common_data, id=2, label="alert_2"),
+            Alert(**common_data, id=3, label="alert_3"),
+            Alert(**common_data, id=4, label="alert_4"),
+            Alert(id=5, crontab="* * * * *", active=False, label="alert_5"),
+            Alert(id=6, crontab="* * * * *", active=False, label="alert_6"),
+        ]
+        observers = [
+            SQLObserver(
+                name="observer_1", sql="SELECT 0", alert_id=1, 
database_id=database_id
             ),
-            Alert(
-                id=2,
-                label="alert_2",
-                active=True,
-                crontab="*/1 * * * *",
-                sql="SELECT 55",
-                alert_type="email",
-                slice_id=slice_id,
-                recipients="[email protected]",
-                slack_channel="#test_channel",
+            SQLObserver(
+                name="observer_2",
+                sql="SELECT id FROM test_table WHERE id = -1",
+                alert_id=2,
                 database_id=database_id,
             ),
-            Alert(
-                id=3,
-                label="alert_3",
-                active=False,
-                crontab="*/1 * * * *",
-                sql="UPDATE 55",
-                alert_type="email",
-                slice_id=slice_id,
-                database_id=database_id,
+            SQLObserver(
+                name="observer_3", sql="$%^&", alert_id=3, 
database_id=database_id
+            ),
+            SQLObserver(
+                name="observer_4", sql="SELECT 55", alert_id=4, 
database_id=database_id
             ),
-            Alert(id=4, active=False, label="alert_4", database_id=-1),
-            Alert(id=5, active=False, label="alert_5", 
database_id=database_id),
         ]
 
         db.session.bulk_save_objects(alerts)
-        db.session.commit()
+        db.session.bulk_save_objects(observers)
         yield db.session
 
+        db.session.query(SQLObservation).delete()
+        db.session.query(SQLObserver).delete()
+        db.session.query(Validator).delete()
         db.session.query(AlertLog).delete()
         db.session.query(Alert).delete()
 
 
+def test_alert_observer(setup_database):
+    dbsession = setup_database
+
+    # Test SQLObserver with empty SQL return
+    alert2 = dbsession.query(Alert).filter_by(id=2).one()
+    observe(alert2.id)
+    assert alert2.sql_observers[0].observations[-1].value is None
+
+    # Test SQLObserver with non-empty SQL return
+    alert4 = dbsession.query(Alert).filter_by(id=4).one()
+    observe(alert4.id)
+    assert alert4.sql_observers[0].observations[-1].value == 55
+
+
+def test_alert_error(setup_database):
+    dbsession = setup_database
+
+    # Test error with Observer SQL statement
+    alert3 = dbsession.query(Alert).filter_by(id=3).one()
+    check_alert(alert3.id, alert3.label)
+    assert alert3.logs[-1].state == AlertState.ERROR
+
+    # Test error with alert lacking observer
+    alert5 = dbsession.query(Alert).filter_by(id=5).one()
+    check_alert(alert5.id, alert5.label)
+    assert alert5.logs[-1].state == AlertState.ERROR

Review comment:
       let's add successful alert tests as well




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to