This is an automated email from the ASF dual-hosted git repository. akshayrai09 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push: new afc7539 [TE] Fix corner case in SLA alert - round up datasetLastRefreshTime (#5909) afc7539 is described below commit afc75394d127e8a870206eb4300f8c773154a513 Author: Akshay Rai <ak...@linkedin.com> AuthorDate: Tue Aug 25 12:00:30 2020 -0700 [TE] Fix corner case in SLA alert - round up datasetLastRefreshTime (#5909) --- .../components/DataSlaQualityChecker.java | 40 ++++++----- .../spec/DataSlaQualityCheckerSpec.java | 7 +- .../dataquality/DataQualityTaskRunnerTest.java | 80 +++++++++++----------- 3 files changed, 69 insertions(+), 58 deletions(-) diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/dataquality/components/DataSlaQualityChecker.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/dataquality/components/DataSlaQualityChecker.java index 42d6860..9f4f110 100644 --- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/dataquality/components/DataSlaQualityChecker.java +++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/dataquality/components/DataSlaQualityChecker.java @@ -77,7 +77,6 @@ public class DataSlaQualityChecker implements AnomalyDetector<DataSlaQualityChec private String sla; private InputDataFetcher dataFetcher; - private final String DEFAULT_DATA_SLA = "3_DAYS"; @Override public DetectionResult runDetection(Interval window, String metricUrn) { @@ -108,10 +107,11 @@ public class DataSlaQualityChecker implements AnomalyDetector<DataSlaQualityChec try { long datasetLastRefreshTime = fetchLatestDatasetRefreshTime(data, me, window); - MetricSlice slice = MetricSlice.from(me.getId(), datasetLastRefreshTime + 1, expectedDatasetRefreshTime); + long datasetLastRefreshTimeRoundUp = alignToUpperBoundary(datasetLastRefreshTime, datasetConfig); + MetricSlice slice = MetricSlice.from(me.getId(), datasetLastRefreshTimeRoundUp, expectedDatasetRefreshTime); - if (isSLAViolated(datasetLastRefreshTime, expectedDatasetRefreshTime)) { - anomalies.add(createDataSLAAnomaly(slice, datasetConfig)); + if (isSLAViolated(datasetLastRefreshTimeRoundUp, expectedDatasetRefreshTime)) { + anomalies.add(createDataSLAAnomaly(slice, datasetConfig, datasetLastRefreshTime, datasetLastRefreshTimeRoundUp)); } } catch (Exception e) { LOG.error(String.format("Failed to run sla check on metric URN %s", me.getUrn()), e); @@ -148,6 +148,7 @@ public class DataSlaQualityChecker implements AnomalyDetector<DataSlaQualityChec DataFrame dataFrame = this.dataFetcher.fetchData(new InputDataSpec() .withTimeseriesSlices(Collections.singletonList(metricSlice))).getTimeseries().get(metricSlice); if (dataFrame != null && !dataFrame.isEmpty()) { + // Fetches the latest timestamp from the source. This value is already in epoch (UTC). return dataFrame.getDoubles("timestamp").max().longValue(); } } @@ -177,38 +178,45 @@ public class DataSlaQualityChecker implements AnomalyDetector<DataSlaQualityChec * fetch the user configured SLA, otherwise default 3_DAYS. */ private boolean isSLAViolated(long actualRefreshTime, long expectedRefreshTime) { - String sla = StringUtils.isNotEmpty(this.sla) ? this.sla : DEFAULT_DATA_SLA; - long delay = TimeGranularity.fromString(sla).toPeriod().toStandardDuration().getMillis(); - + long delay = TimeGranularity.fromString(this.sla).toPeriod().toStandardDuration().getMillis(); return (expectedRefreshTime - actualRefreshTime) > delay; } /** - * Align and round off start time to the upper boundary of the granularity + * Align and round off timestamp to the upper boundary of the granularity + * + * Examples: + * a. 20th Aug 05:00 pm will be rounded up to 21th Aug 12:00 am + * b. 20th Aug 12:00 am will be rounded up to 21th Aug 12:00 am */ private static long alignToUpperBoundary(long timestamp, DatasetConfigDTO datasetConfig) { Period granularityPeriod = datasetConfig.bucketTimeGranularity().toPeriod(); DateTimeZone timezone = DateTimeZone.forID(datasetConfig.getTimezone()); - DateTime startTime = new DateTime(timestamp - 1, timezone).plus(granularityPeriod); - return (startTime.getMillis() / granularityPeriod.toStandardDuration().getMillis()) * granularityPeriod.toStandardDuration().getMillis(); + DateTime startTime = new DateTime(timestamp, timezone).plus(granularityPeriod); + return (startTime.getMillis() / granularityPeriod.toStandardDuration().getMillis()) + * granularityPeriod.toStandardDuration().getMillis(); } /** * Creates a DATA_SLA anomaly from ceiling(start) to ceiling(end) for the detection id. * If existing DATA_SLA anomalies are present, then it will be merged accordingly. + * + * @param datasetLastRefreshTime the timestamp corresponding to the last record in the data (UTC) + * @param datasetLastRefreshTimeRoundUp the timestamp rounded off to the upper granular bucket. See + * @{link #alignToUpperBoundary} */ - private MergedAnomalyResultDTO createDataSLAAnomaly(MetricSlice slice, DatasetConfigDTO datasetConfig) { - MergedAnomalyResultDTO anomaly = DetectionUtils.makeAnomaly( - alignToUpperBoundary(slice.getStart(), datasetConfig), - alignToUpperBoundary(slice.getEnd(), datasetConfig)); + private MergedAnomalyResultDTO createDataSLAAnomaly(MetricSlice slice, DatasetConfigDTO datasetConfig, + long datasetLastRefreshTime, long datasetLastRefreshTimeRoundUp) { + MergedAnomalyResultDTO anomaly = DetectionUtils.makeAnomaly(slice.getStart(), slice.getEnd()); anomaly.setCollection(datasetConfig.getName()); anomaly.setType(AnomalyType.DATA_SLA); anomaly.setAnomalyResultSource(AnomalyResultSource.DATA_QUALITY_DETECTION); // Store the metadata in the anomaly Map<String, String> properties = new HashMap<>(); - properties.put("datasetLastRefreshTime", String.valueOf(slice.getStart() - 1)); - properties.put("sla", sla); + properties.put("datasetLastRefreshTime", String.valueOf(datasetLastRefreshTime)); + properties.put("datasetLastRefreshTimeRoundUp", String.valueOf(datasetLastRefreshTimeRoundUp)); + properties.put("sla", this.sla); anomaly.setProperties(properties); return anomaly; diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/dataquality/spec/DataSlaQualityCheckerSpec.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/dataquality/spec/DataSlaQualityCheckerSpec.java index edfbc19..5a67d01 100644 --- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/dataquality/spec/DataSlaQualityCheckerSpec.java +++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/dataquality/spec/DataSlaQualityCheckerSpec.java @@ -20,16 +20,17 @@ package org.apache.pinot.thirdeye.detection.dataquality.spec; import com.fasterxml.jackson.annotation.JsonIgnoreProperties; -import org.apache.pinot.thirdeye.dataframe.util.MetricSlice; +import org.apache.commons.lang3.StringUtils; import org.apache.pinot.thirdeye.detection.spec.AbstractSpec; @JsonIgnoreProperties(ignoreUnknown = true) public class DataSlaQualityCheckerSpec extends AbstractSpec { - private String sla = "3_DAYS"; + public static final String DEFAULT_DATA_SLA = "3_DAYS"; + private String sla; public String getSla() { - return sla; + return StringUtils.isNotEmpty(this.sla) ? this.sla : DEFAULT_DATA_SLA; } public void setSla(String sla) { diff --git a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/dataquality/DataQualityTaskRunnerTest.java b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/dataquality/DataQualityTaskRunnerTest.java index 963946d..67c03a5 100644 --- a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/dataquality/DataQualityTaskRunnerTest.java +++ b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/dataquality/DataQualityTaskRunnerTest.java @@ -264,15 +264,9 @@ public class DataQualityTaskRunnerTest { this.info.setStart(START_TIME + 4 * GRANULARITY); this.info.setEnd(START_TIME + 5 * GRANULARITY); runner.execute(this.info, this.context); - // 1 data sla anomaly should be created + // No data sla anomaly should be created anomalies = retrieveAllAnomalies(); - Assert.assertEquals(anomalies.size(), 1); - expectedAnomalies.add(makeSlaAnomaly(4, 5, "1_DAYS", "slaRule1:DATA_SLA")); - Assert.assertTrue(anomalies.containsAll(expectedAnomalies)); - // clean up - expectedAnomalies.clear(); - cleanUpAnomalies(); - + Assert.assertEquals(anomalies.size(), 0); // CHECK 2: Report data missing when delayed (sla = 2_DAYS) detectionConfigDTO = translateSlaConfig(detectorId, "sla-config-2.yaml"); @@ -287,7 +281,7 @@ public class DataQualityTaskRunnerTest { anomalies = retrieveAllAnomalies(); Assert.assertEquals(anomalies.size(), 0); - // 2nd scan after issue - sla breach + // 2nd scan after issue - still no sla breach // time: 3____4 5 6 // Data for 4th still hasn't arrived // scan: |---------| this.info.setStart(START_TIME + 4 * GRANULARITY); @@ -295,8 +289,18 @@ public class DataQualityTaskRunnerTest { runner.execute(this.info, this.context); // 1 data sla anomaly should be created as data is delayed by 1 ms anomalies = retrieveAllAnomalies(); + Assert.assertEquals(anomalies.size(), 0); + + // 3rd scan after issue - sla breach. It has cross 2 days. + // time: 3____4 5 6 7 // Data for 4th still hasn't arrived + // scan: |--------------| + this.info.setStart(START_TIME + 4 * GRANULARITY); + this.info.setEnd(START_TIME + 7 * GRANULARITY); + runner.execute(this.info, this.context); + // 1 data sla anomaly should be created as data is delayed by 1 ms + anomalies = retrieveAllAnomalies(); Assert.assertEquals(anomalies.size(), 1); - expectedAnomalies.add(makeSlaAnomaly(4, 6, "2_DAYS", "slaRule1:DATA_SLA")); + expectedAnomalies.add(makeSlaAnomaly(4, 7, "2_DAYS", "slaRule1:DATA_SLA")); Assert.assertTrue(anomalies.containsAll(expectedAnomalies)); // clean up expectedAnomalies.clear(); @@ -326,16 +330,26 @@ public class DataQualityTaskRunnerTest { anomalies = retrieveAllAnomalies(); Assert.assertEquals(anomalies.size(), 0); - // 3rd scan after issue - sla breach + // 3rd scan after issue - no sla breach // time: 3____4 5 6 7 // Data for 4th still hasn't arrived // scan: |--------------| this.info.setStart(START_TIME + 4 * GRANULARITY); this.info.setEnd(START_TIME + 7 * GRANULARITY); runner.execute(this.info, this.context); - // 1 data sla anomaly should be created from 4 to 7 as the data is missing since 3 days + // No data sla anomaly should be created from 4 to 7; sla not breached + anomalies = retrieveAllAnomalies(); + Assert.assertEquals(anomalies.size(), 0); + + // 4th scan after issue - sla breach + // time: 3____4 5 6 7 8 // Data for 4th still hasn't arrived + // scan: |-------------------| + this.info.setStart(START_TIME + 4 * GRANULARITY); + this.info.setEnd(START_TIME + 8 * GRANULARITY); + runner.execute(this.info, this.context); + // 1 data sla anomaly should be created from 4 to 8 as the data is missing since 3 days anomalies = retrieveAllAnomalies(); Assert.assertEquals(anomalies.size(), 1); - expectedAnomalies.add(makeSlaAnomaly(4, 7, "3_DAYS", "slaRule1:DATA_SLA")); + expectedAnomalies.add(makeSlaAnomaly(4, 8, "3_DAYS", "slaRule1:DATA_SLA")); Assert.assertTrue(anomalies.containsAll(expectedAnomalies)); // clean up expectedAnomalies.clear(); @@ -371,16 +385,16 @@ public class DataQualityTaskRunnerTest { datasetConfigDTO.setLastRefreshTime(0); datasetDAO.update(datasetConfigDTO); // 1st scan after issue - sla breach - // time: 3____4 5 6 // We have data till 3rd, data since 4th is missing/delayed - // scan: |----| + // time: 3____4 5 6 7 // We have data till 3rd, data since 4th is missing/delayed + // scan: |---------| this.info.setStart(START_TIME + 5 * GRANULARITY); - this.info.setEnd(START_TIME + 6 * GRANULARITY); + this.info.setEnd(START_TIME + 7 * GRANULARITY); runner.execute(this.info, this.context); - // 1 data sla anomaly should be created from 4 to 6 + // 1 data sla anomaly should be created from 4 to 7 anomalies = retrieveAllAnomalies(); Assert.assertEquals(anomalies.size(), 1); - expectedAnomalies.add(makeSlaAnomaly(5, 6, "1_DAYS", "slaRule1:DATA_SLA")); + expectedAnomalies.add(makeSlaAnomaly(5, 7, "1_DAYS", "slaRule1:DATA_SLA")); Assert.assertTrue(anomalies.containsAll(expectedAnomalies)); // clean up expectedAnomalies.clear(); @@ -538,18 +552,19 @@ public class DataQualityTaskRunnerTest { // Prepare the data sla task on unavailable data this.info.setStart(START_TIME + 4 * GRANULARITY); - this.info.setEnd(START_TIME + 5 * GRANULARITY); + this.info.setEnd(START_TIME + 6 * GRANULARITY); runner.execute(this.info, this.context); // 1 SLA anomaly should be created anomalies = retrieveAllAnomalies(); Assert.assertEquals(anomalies.size(), 1); Assert.assertEquals(anomalies.get(0).getStartTime(), START_TIME + 4 * GRANULARITY); - Assert.assertEquals(anomalies.get(0).getEndTime(), START_TIME + 5 * GRANULARITY); + Assert.assertEquals(anomalies.get(0).getEndTime(), START_TIME + 6 * GRANULARITY); Assert.assertEquals(anomalies.get(0).getType(), AnomalyType.DATA_SLA); Assert.assertTrue(anomalies.get(0).getAnomalyResultSource().equals(AnomalyResultSource.DATA_QUALITY_DETECTION)); Map<String, String> anomalyProps = new HashMap<>(); anomalyProps.put("datasetLastRefreshTime", String.valueOf(START_TIME + 4 * GRANULARITY - 1)); + anomalyProps.put("datasetLastRefreshTimeRoundUp", String.valueOf(START_TIME + 4 * GRANULARITY)); anomalyProps.put("sla", "1_DAYS"); anomalyProps.put("detectorComponentName", "slaRule1:DATA_SLA"); anomalyProps.put("subEntityName", "test_sla_alert"); @@ -583,20 +598,9 @@ public class DataQualityTaskRunnerTest { this.info.setEnd(START_TIME + 5 * GRANULARITY); runner.execute(this.info, this.context); - // 1 data sla anomaly should be created from 4 to 5 + // No sla anomaly should be created as the data is within 1_DAY List<MergedAnomalyResultDTO> anomalies = retrieveAllAnomalies(); - Assert.assertEquals(anomalies.size(), 1); - MergedAnomalyResultDTO slaAnomaly = anomalies.get(0); - Assert.assertEquals(slaAnomaly.getStartTime(), START_TIME + 4 * GRANULARITY); - Assert.assertEquals(slaAnomaly.getEndTime(), START_TIME + 5 * GRANULARITY); - Assert.assertEquals(slaAnomaly.getType(), AnomalyType.DATA_SLA); - Assert.assertTrue(slaAnomaly.getAnomalyResultSource().equals(AnomalyResultSource.DATA_QUALITY_DETECTION)); - Map<String, String> anomalyProps = new HashMap<>(); - anomalyProps.put("datasetLastRefreshTime", String.valueOf(START_TIME + 4 * GRANULARITY - 1)); - anomalyProps.put("sla", "1_DAYS"); - anomalyProps.put("detectorComponentName", "slaRule1:DATA_SLA"); - anomalyProps.put("subEntityName", "test_sla_alert"); - Assert.assertEquals(anomalies.get(0).getProperties(), anomalyProps); + Assert.assertEquals(anomalies.size(), 0); // 2nd scan // time: 3____4 5 6 // Data for 4th still hasn't arrived @@ -605,11 +609,10 @@ public class DataQualityTaskRunnerTest { this.info.setEnd(START_TIME + 6 * GRANULARITY); runner.execute(this.info, this.context); - // We should now have 2 anomalies in our database (4 to 5) and (4 to 6) + // We should now have 1 anomalies in our database (4 to 6) anomalies = retrieveAllAnomalies(); - Assert.assertEquals(anomalies.size(), 2); + Assert.assertEquals(anomalies.size(), 1); List<MergedAnomalyResultDTO> expectedAnomalies = new ArrayList<>(); - expectedAnomalies.add(makeSlaAnomaly(4, 5, "1_DAYS", "slaRule1:DATA_SLA")); expectedAnomalies.add(makeSlaAnomaly(4, 6, "1_DAYS", "slaRule1:DATA_SLA")); Assert.assertTrue(anomalies.containsAll(expectedAnomalies)); @@ -629,12 +632,11 @@ public class DataQualityTaskRunnerTest { ); runner.execute(this.info, this.context); - // We will now have 3 anomalies in our database + // We will now have 2 anomalies in our database // In the current iteration we will create 1 new anomaly from (5 to 7) anomalies = retrieveAllAnomalies(); - Assert.assertEquals(anomalies.size(), 3); + Assert.assertEquals(anomalies.size(), 2); expectedAnomalies = new ArrayList<>(); - expectedAnomalies.add(makeSlaAnomaly(4, 5, "1_DAYS", "slaRule1:DATA_SLA")); expectedAnomalies.add(makeSlaAnomaly(4, 6, "1_DAYS", "slaRule1:DATA_SLA")); expectedAnomalies.add(makeSlaAnomaly(5, 7, "1_DAYS", "slaRule1:DATA_SLA")); Assert.assertTrue(anomalies.containsAll(expectedAnomalies)); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org