This is an automated email from the ASF dual-hosted git repository.

akshayrai09 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new afc7539  [TE] Fix corner case in SLA alert - round up 
datasetLastRefreshTime (#5909)
afc7539 is described below

commit afc75394d127e8a870206eb4300f8c773154a513
Author: Akshay Rai <ak...@linkedin.com>
AuthorDate: Tue Aug 25 12:00:30 2020 -0700

    [TE] Fix corner case in SLA alert - round up datasetLastRefreshTime (#5909)
---
 .../components/DataSlaQualityChecker.java          | 40 ++++++-----
 .../spec/DataSlaQualityCheckerSpec.java            |  7 +-
 .../dataquality/DataQualityTaskRunnerTest.java     | 80 +++++++++++-----------
 3 files changed, 69 insertions(+), 58 deletions(-)

diff --git 
a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/dataquality/components/DataSlaQualityChecker.java
 
b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/dataquality/components/DataSlaQualityChecker.java
index 42d6860..9f4f110 100644
--- 
a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/dataquality/components/DataSlaQualityChecker.java
+++ 
b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/dataquality/components/DataSlaQualityChecker.java
@@ -77,7 +77,6 @@ public class DataSlaQualityChecker implements 
AnomalyDetector<DataSlaQualityChec
 
   private String sla;
   private InputDataFetcher dataFetcher;
-  private final String DEFAULT_DATA_SLA = "3_DAYS";
 
   @Override
   public DetectionResult runDetection(Interval window, String metricUrn) {
@@ -108,10 +107,11 @@ public class DataSlaQualityChecker implements 
AnomalyDetector<DataSlaQualityChec
 
     try {
       long datasetLastRefreshTime = fetchLatestDatasetRefreshTime(data, me, 
window);
-      MetricSlice slice = MetricSlice.from(me.getId(), datasetLastRefreshTime 
+ 1, expectedDatasetRefreshTime);
+      long datasetLastRefreshTimeRoundUp = 
alignToUpperBoundary(datasetLastRefreshTime, datasetConfig);
+      MetricSlice slice = MetricSlice.from(me.getId(), 
datasetLastRefreshTimeRoundUp, expectedDatasetRefreshTime);
 
-      if (isSLAViolated(datasetLastRefreshTime, expectedDatasetRefreshTime)) {
-        anomalies.add(createDataSLAAnomaly(slice, datasetConfig));
+      if (isSLAViolated(datasetLastRefreshTimeRoundUp, 
expectedDatasetRefreshTime)) {
+        anomalies.add(createDataSLAAnomaly(slice, datasetConfig, 
datasetLastRefreshTime, datasetLastRefreshTimeRoundUp));
       }
     } catch (Exception e) {
       LOG.error(String.format("Failed to run sla check on metric URN %s", 
me.getUrn()), e);
@@ -148,6 +148,7 @@ public class DataSlaQualityChecker implements 
AnomalyDetector<DataSlaQualityChec
       DataFrame dataFrame = this.dataFetcher.fetchData(new InputDataSpec()
           
.withTimeseriesSlices(Collections.singletonList(metricSlice))).getTimeseries().get(metricSlice);
       if (dataFrame != null && !dataFrame.isEmpty()) {
+        // Fetches the latest timestamp from the source. This value is already 
in epoch (UTC).
         return dataFrame.getDoubles("timestamp").max().longValue();
       }
     }
@@ -177,38 +178,45 @@ public class DataSlaQualityChecker implements 
AnomalyDetector<DataSlaQualityChec
    * fetch the user configured SLA, otherwise default 3_DAYS.
    */
   private boolean isSLAViolated(long actualRefreshTime, long 
expectedRefreshTime) {
-    String sla = StringUtils.isNotEmpty(this.sla) ? this.sla : 
DEFAULT_DATA_SLA;
-    long delay = 
TimeGranularity.fromString(sla).toPeriod().toStandardDuration().getMillis();
-
+    long delay = 
TimeGranularity.fromString(this.sla).toPeriod().toStandardDuration().getMillis();
     return (expectedRefreshTime - actualRefreshTime) > delay;
   }
 
   /**
-   * Align and round off start time to the upper boundary of the granularity
+   * Align and round off timestamp to the upper boundary of the granularity
+   *
+   * Examples:
+   * a. 20th Aug 05:00 pm will be rounded up to 21th Aug 12:00 am
+   * b. 20th Aug 12:00 am will be rounded up to 21th Aug 12:00 am
    */
   private static long alignToUpperBoundary(long timestamp, DatasetConfigDTO 
datasetConfig) {
     Period granularityPeriod = 
datasetConfig.bucketTimeGranularity().toPeriod();
     DateTimeZone timezone = DateTimeZone.forID(datasetConfig.getTimezone());
-    DateTime startTime = new DateTime(timestamp - 1, 
timezone).plus(granularityPeriod);
-    return (startTime.getMillis() / 
granularityPeriod.toStandardDuration().getMillis()) * 
granularityPeriod.toStandardDuration().getMillis();
+    DateTime startTime = new DateTime(timestamp, 
timezone).plus(granularityPeriod);
+    return (startTime.getMillis() / 
granularityPeriod.toStandardDuration().getMillis())
+        * granularityPeriod.toStandardDuration().getMillis();
   }
 
   /**
    * Creates a DATA_SLA anomaly from ceiling(start) to ceiling(end) for the 
detection id.
    * If existing DATA_SLA anomalies are present, then it will be merged 
accordingly.
+   *
+   * @param datasetLastRefreshTime the timestamp corresponding to the last 
record in the data (UTC)
+   * @param datasetLastRefreshTimeRoundUp the timestamp rounded off to the 
upper granular bucket. See
+   * @{link #alignToUpperBoundary}
    */
-  private MergedAnomalyResultDTO createDataSLAAnomaly(MetricSlice slice, 
DatasetConfigDTO datasetConfig) {
-    MergedAnomalyResultDTO anomaly = DetectionUtils.makeAnomaly(
-        alignToUpperBoundary(slice.getStart(), datasetConfig),
-        alignToUpperBoundary(slice.getEnd(), datasetConfig));
+  private MergedAnomalyResultDTO createDataSLAAnomaly(MetricSlice slice, 
DatasetConfigDTO datasetConfig,
+      long datasetLastRefreshTime, long datasetLastRefreshTimeRoundUp) {
+    MergedAnomalyResultDTO anomaly = 
DetectionUtils.makeAnomaly(slice.getStart(), slice.getEnd());
     anomaly.setCollection(datasetConfig.getName());
     anomaly.setType(AnomalyType.DATA_SLA);
     anomaly.setAnomalyResultSource(AnomalyResultSource.DATA_QUALITY_DETECTION);
 
     // Store the metadata in the anomaly
     Map<String, String> properties = new HashMap<>();
-    properties.put("datasetLastRefreshTime", String.valueOf(slice.getStart() - 
1));
-    properties.put("sla", sla);
+    properties.put("datasetLastRefreshTime", 
String.valueOf(datasetLastRefreshTime));
+    properties.put("datasetLastRefreshTimeRoundUp", 
String.valueOf(datasetLastRefreshTimeRoundUp));
+    properties.put("sla", this.sla);
     anomaly.setProperties(properties);
 
     return anomaly;
diff --git 
a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/dataquality/spec/DataSlaQualityCheckerSpec.java
 
b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/dataquality/spec/DataSlaQualityCheckerSpec.java
index edfbc19..5a67d01 100644
--- 
a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/dataquality/spec/DataSlaQualityCheckerSpec.java
+++ 
b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/dataquality/spec/DataSlaQualityCheckerSpec.java
@@ -20,16 +20,17 @@
 package org.apache.pinot.thirdeye.detection.dataquality.spec;
 
 import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
-import org.apache.pinot.thirdeye.dataframe.util.MetricSlice;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.pinot.thirdeye.detection.spec.AbstractSpec;
 
 
 @JsonIgnoreProperties(ignoreUnknown = true)
 public class DataSlaQualityCheckerSpec extends AbstractSpec {
-  private String sla = "3_DAYS";
+  public static final String DEFAULT_DATA_SLA = "3_DAYS";
+  private String sla;
 
   public String getSla() {
-    return sla;
+    return StringUtils.isNotEmpty(this.sla) ? this.sla : DEFAULT_DATA_SLA;
   }
 
   public void setSla(String sla) {
diff --git 
a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/dataquality/DataQualityTaskRunnerTest.java
 
b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/dataquality/DataQualityTaskRunnerTest.java
index 963946d..67c03a5 100644
--- 
a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/dataquality/DataQualityTaskRunnerTest.java
+++ 
b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/dataquality/DataQualityTaskRunnerTest.java
@@ -264,15 +264,9 @@ public class DataQualityTaskRunnerTest {
     this.info.setStart(START_TIME + 4 * GRANULARITY);
     this.info.setEnd(START_TIME + 5 * GRANULARITY);
     runner.execute(this.info, this.context);
-    // 1 data sla anomaly should be created
+    // No data sla anomaly should be created
     anomalies = retrieveAllAnomalies();
-    Assert.assertEquals(anomalies.size(), 1);
-    expectedAnomalies.add(makeSlaAnomaly(4, 5, "1_DAYS", "slaRule1:DATA_SLA"));
-    Assert.assertTrue(anomalies.containsAll(expectedAnomalies));
-    // clean up
-    expectedAnomalies.clear();
-    cleanUpAnomalies();
-
+    Assert.assertEquals(anomalies.size(), 0);
 
     // CHECK 2: Report data missing when delayed (sla = 2_DAYS)
     detectionConfigDTO = translateSlaConfig(detectorId, "sla-config-2.yaml");
@@ -287,7 +281,7 @@ public class DataQualityTaskRunnerTest {
     anomalies = retrieveAllAnomalies();
     Assert.assertEquals(anomalies.size(), 0);
 
-    // 2nd scan after issue - sla breach
+    // 2nd scan after issue - still no sla breach
     //  time:    3____4    5    6    // Data for 4th still hasn't arrived
     //  scan:         |---------|
     this.info.setStart(START_TIME + 4 * GRANULARITY);
@@ -295,8 +289,18 @@ public class DataQualityTaskRunnerTest {
     runner.execute(this.info, this.context);
     // 1 data sla anomaly should be created as data is delayed by 1 ms
     anomalies = retrieveAllAnomalies();
+    Assert.assertEquals(anomalies.size(), 0);
+
+    // 3rd scan after issue - sla breach. It has cross 2 days.
+    //  time:    3____4    5    6    7    // Data for 4th still hasn't arrived
+    //  scan:         |--------------|
+    this.info.setStart(START_TIME + 4 * GRANULARITY);
+    this.info.setEnd(START_TIME + 7 * GRANULARITY);
+    runner.execute(this.info, this.context);
+    // 1 data sla anomaly should be created as data is delayed by 1 ms
+    anomalies = retrieveAllAnomalies();
     Assert.assertEquals(anomalies.size(), 1);
-    expectedAnomalies.add(makeSlaAnomaly(4, 6, "2_DAYS", "slaRule1:DATA_SLA"));
+    expectedAnomalies.add(makeSlaAnomaly(4, 7, "2_DAYS", "slaRule1:DATA_SLA"));
     Assert.assertTrue(anomalies.containsAll(expectedAnomalies));
     // clean up
     expectedAnomalies.clear();
@@ -326,16 +330,26 @@ public class DataQualityTaskRunnerTest {
     anomalies = retrieveAllAnomalies();
     Assert.assertEquals(anomalies.size(), 0);
 
-    // 3rd scan after issue - sla breach
+    // 3rd scan after issue - no sla breach
     //  time:    3____4    5    6    7    // Data for 4th still hasn't arrived
     //  scan:         |--------------|
     this.info.setStart(START_TIME + 4 * GRANULARITY);
     this.info.setEnd(START_TIME + 7 * GRANULARITY);
     runner.execute(this.info, this.context);
-    // 1 data sla anomaly should be created from 4 to 7 as the data is missing 
since 3 days
+    // No data sla anomaly should be created from 4 to 7; sla not breached
+    anomalies = retrieveAllAnomalies();
+    Assert.assertEquals(anomalies.size(), 0);
+
+    // 4th scan after issue - sla breach
+    //  time:    3____4    5    6    7    8    // Data for 4th still hasn't 
arrived
+    //  scan:         |-------------------|
+    this.info.setStart(START_TIME + 4 * GRANULARITY);
+    this.info.setEnd(START_TIME + 8 * GRANULARITY);
+    runner.execute(this.info, this.context);
+    // 1 data sla anomaly should be created from 4 to 8 as the data is missing 
since 3 days
     anomalies = retrieveAllAnomalies();
     Assert.assertEquals(anomalies.size(), 1);
-    expectedAnomalies.add(makeSlaAnomaly(4, 7, "3_DAYS", "slaRule1:DATA_SLA"));
+    expectedAnomalies.add(makeSlaAnomaly(4, 8, "3_DAYS", "slaRule1:DATA_SLA"));
     Assert.assertTrue(anomalies.containsAll(expectedAnomalies));
     // clean up
     expectedAnomalies.clear();
@@ -371,16 +385,16 @@ public class DataQualityTaskRunnerTest {
     datasetConfigDTO.setLastRefreshTime(0);
     datasetDAO.update(datasetConfigDTO);
     // 1st scan after issue - sla breach
-    //  time:  3____4    5    6           // We have data till 3rd, data since 
4th is missing/delayed
-    //  scan:            |----|
+    //  time:  3____4    5    6    7           // We have data till 3rd, data 
since 4th is missing/delayed
+    //  scan:            |---------|
     this.info.setStart(START_TIME + 5 * GRANULARITY);
-    this.info.setEnd(START_TIME + 6 * GRANULARITY);
+    this.info.setEnd(START_TIME + 7 * GRANULARITY);
     runner.execute(this.info, this.context);
 
-    // 1 data sla anomaly should be created from 4 to 6
+    // 1 data sla anomaly should be created from 4 to 7
     anomalies = retrieveAllAnomalies();
     Assert.assertEquals(anomalies.size(), 1);
-    expectedAnomalies.add(makeSlaAnomaly(5, 6, "1_DAYS", "slaRule1:DATA_SLA"));
+    expectedAnomalies.add(makeSlaAnomaly(5, 7, "1_DAYS", "slaRule1:DATA_SLA"));
     Assert.assertTrue(anomalies.containsAll(expectedAnomalies));
     // clean up
     expectedAnomalies.clear();
@@ -538,18 +552,19 @@ public class DataQualityTaskRunnerTest {
 
     // Prepare the data sla task on unavailable data
     this.info.setStart(START_TIME + 4 * GRANULARITY);
-    this.info.setEnd(START_TIME + 5 * GRANULARITY);
+    this.info.setEnd(START_TIME + 6 * GRANULARITY);
     runner.execute(this.info, this.context);
 
     // 1 SLA anomaly should be created
     anomalies = retrieveAllAnomalies();
     Assert.assertEquals(anomalies.size(), 1);
     Assert.assertEquals(anomalies.get(0).getStartTime(), START_TIME + 4 * 
GRANULARITY);
-    Assert.assertEquals(anomalies.get(0).getEndTime(), START_TIME + 5 * 
GRANULARITY);
+    Assert.assertEquals(anomalies.get(0).getEndTime(), START_TIME + 6 * 
GRANULARITY);
     Assert.assertEquals(anomalies.get(0).getType(), AnomalyType.DATA_SLA);
     
Assert.assertTrue(anomalies.get(0).getAnomalyResultSource().equals(AnomalyResultSource.DATA_QUALITY_DETECTION));
     Map<String, String> anomalyProps = new HashMap<>();
     anomalyProps.put("datasetLastRefreshTime", String.valueOf(START_TIME + 4 * 
GRANULARITY - 1));
+    anomalyProps.put("datasetLastRefreshTimeRoundUp", 
String.valueOf(START_TIME + 4 * GRANULARITY));
     anomalyProps.put("sla", "1_DAYS");
     anomalyProps.put("detectorComponentName", "slaRule1:DATA_SLA");
     anomalyProps.put("subEntityName", "test_sla_alert");
@@ -583,20 +598,9 @@ public class DataQualityTaskRunnerTest {
     this.info.setEnd(START_TIME + 5 * GRANULARITY);
     runner.execute(this.info, this.context);
 
-    // 1 data sla anomaly should be created from 4 to 5
+    // No sla anomaly should be created as the data is within 1_DAY
     List<MergedAnomalyResultDTO> anomalies = retrieveAllAnomalies();
-    Assert.assertEquals(anomalies.size(), 1);
-    MergedAnomalyResultDTO slaAnomaly = anomalies.get(0);
-    Assert.assertEquals(slaAnomaly.getStartTime(), START_TIME + 4 * 
GRANULARITY);
-    Assert.assertEquals(slaAnomaly.getEndTime(), START_TIME + 5 * GRANULARITY);
-    Assert.assertEquals(slaAnomaly.getType(), AnomalyType.DATA_SLA);
-    
Assert.assertTrue(slaAnomaly.getAnomalyResultSource().equals(AnomalyResultSource.DATA_QUALITY_DETECTION));
-    Map<String, String> anomalyProps = new HashMap<>();
-    anomalyProps.put("datasetLastRefreshTime", String.valueOf(START_TIME + 4 * 
GRANULARITY - 1));
-    anomalyProps.put("sla", "1_DAYS");
-    anomalyProps.put("detectorComponentName", "slaRule1:DATA_SLA");
-    anomalyProps.put("subEntityName", "test_sla_alert");
-    Assert.assertEquals(anomalies.get(0).getProperties(), anomalyProps);
+    Assert.assertEquals(anomalies.size(), 0);
 
     // 2nd scan
     //  time:    3____4    5    6    // Data for 4th still hasn't arrived
@@ -605,11 +609,10 @@ public class DataQualityTaskRunnerTest {
     this.info.setEnd(START_TIME + 6 * GRANULARITY);
     runner.execute(this.info, this.context);
 
-    // We should now have 2 anomalies in our database (4 to 5) and (4 to 6)
+    // We should now have 1 anomalies in our database (4 to 6)
     anomalies = retrieveAllAnomalies();
-    Assert.assertEquals(anomalies.size(), 2);
+    Assert.assertEquals(anomalies.size(), 1);
     List<MergedAnomalyResultDTO> expectedAnomalies = new ArrayList<>();
-    expectedAnomalies.add(makeSlaAnomaly(4, 5, "1_DAYS", "slaRule1:DATA_SLA"));
     expectedAnomalies.add(makeSlaAnomaly(4, 6, "1_DAYS", "slaRule1:DATA_SLA"));
     Assert.assertTrue(anomalies.containsAll(expectedAnomalies));
 
@@ -629,12 +632,11 @@ public class DataQualityTaskRunnerTest {
     );
     runner.execute(this.info, this.context);
 
-    // We will now have 3 anomalies in our database
+    // We will now have 2 anomalies in our database
     // In the current iteration we will create 1 new anomaly from (5 to 7)
     anomalies = retrieveAllAnomalies();
-    Assert.assertEquals(anomalies.size(), 3);
+    Assert.assertEquals(anomalies.size(), 2);
     expectedAnomalies = new ArrayList<>();
-    expectedAnomalies.add(makeSlaAnomaly(4, 5, "1_DAYS", "slaRule1:DATA_SLA"));
     expectedAnomalies.add(makeSlaAnomaly(4, 6, "1_DAYS", "slaRule1:DATA_SLA"));
     expectedAnomalies.add(makeSlaAnomaly(5, 7, "1_DAYS", "slaRule1:DATA_SLA"));
     Assert.assertTrue(anomalies.containsAll(expectedAnomalies));


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to