This is an automated email from the ASF dual-hosted git repository. jihao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push: new 209c966 [TE] detection - threshold-based filter (#3881) 209c966 is described below commit 209c9661c75ac340c94b54167e64b7f929a4e0b0 Author: Jihao Zhang <jihzh...@linkedin.com> AuthorDate: Mon Feb 25 17:37:33 2019 -0800 [TE] detection - threshold-based filter (#3881) Threshold-based anomaly filter to work on the hourly/daily bucket. --- .../components/ThresholdRuleAnomalyFilter.java | 32 ++++++++--- .../detection/spec/ThresholdRuleFilterSpec.java | 40 +++++++++---- .../components/ThresholdRuleAnomalyFilterTest.java | 66 ++++++++++++++++------ 3 files changed, 101 insertions(+), 37 deletions(-) diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/components/ThresholdRuleAnomalyFilter.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/components/ThresholdRuleAnomalyFilter.java index b24ce1f..be7fae0 100644 --- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/components/ThresholdRuleAnomalyFilter.java +++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/components/ThresholdRuleAnomalyFilter.java @@ -19,6 +19,7 @@ package org.apache.pinot.thirdeye.detection.components; +import java.util.concurrent.TimeUnit; import org.apache.pinot.thirdeye.dataframe.DataFrame; import org.apache.pinot.thirdeye.dataframe.util.MetricSlice; import org.apache.pinot.thirdeye.datalayer.dto.MergedAnomalyResultDTO; @@ -34,6 +35,7 @@ import org.apache.pinot.thirdeye.detection.spi.model.InputDataSpec; import org.apache.pinot.thirdeye.rootcause.impl.MetricEntity; import java.util.Collections; import java.util.Map; +import org.joda.time.Interval; import static org.apache.pinot.thirdeye.dataframe.util.DataFrameUtils.*; @@ -42,12 +44,12 @@ import static org.apache.pinot.thirdeye.dataframe.util.DataFrameUtils.*; * This threshold rule filter stage filters the anomalies if either the min or max thresholds do not pass. */ @Components(title = "Aggregate Threshold Filter", type = "THRESHOLD_RULE_FILTER", tags = { - DetectionTag.RULE_FILTER}, description = "Threshold rule filter. filters the anomalies if either the min or max thresholds do not satisfied.", presentation = { - @PresentationOption(name = "absolute value", description = "aggregated absolute value within a time period", template = "is higher than ${min} and lower than ${max}")}, params = { - @Param(name = "min", placeholder = "value"), @Param(name = "max", placeholder = "value")}) + DetectionTag.RULE_FILTER}, description = "Threshold rule filter. filters the anomalies if either the min or max thresholds do not satisfied.") public class ThresholdRuleAnomalyFilter implements AnomalyFilter<ThresholdRuleFilterSpec> { - private double min; - private double max; + private double minValueHourly; + private double maxValueHourly; + private double minValueDaily; + private double maxValueDaily; private InputDataFetcher dataFetcher; @Override @@ -58,10 +60,20 @@ public class ThresholdRuleAnomalyFilter implements AnomalyFilter<ThresholdRuleFi Map<MetricSlice, DataFrame> aggregates = data.getAggregates(); double currentValue = getValueFromAggregates(currentSlice, aggregates); - if (!Double.isNaN(this.min) && currentValue < this.min) { + + Interval anomalyInterval = new Interval(anomaly.getStartTime(), anomaly.getEndTime()); + double hourlyMultiplier = TimeUnit.HOURS.toMillis(1) / (double) anomalyInterval.toDurationMillis(); + double dailyMultiplier = TimeUnit.DAYS.toMillis(1) / (double) anomalyInterval.toDurationMillis(); + if (!Double.isNaN(this.minValueHourly) && currentValue * hourlyMultiplier < this.minValueHourly) { + return false; + } + if (!Double.isNaN(this.maxValueHourly) && currentValue * hourlyMultiplier > this.maxValueHourly) { + return false; + } + if (!Double.isNaN(this.minValueDaily) && currentValue * dailyMultiplier< this.minValueDaily) { return false; } - if (!Double.isNaN(this.max) && currentValue > this.max) { + if (!Double.isNaN(this.maxValueDaily) && currentValue * dailyMultiplier > this.maxValueDaily) { return false; } return true; @@ -69,8 +81,10 @@ public class ThresholdRuleAnomalyFilter implements AnomalyFilter<ThresholdRuleFi @Override public void init(ThresholdRuleFilterSpec spec, InputDataFetcher dataFetcher) { - this.min = spec.getMin(); - this.max = spec.getMax(); + this.minValueHourly = spec.getMinValueHourly(); + this.maxValueHourly = spec.getMaxValueHourly(); + this.minValueDaily = spec.getMinValueDaily(); + this.maxValueDaily = spec.getMaxValueDaily(); this.dataFetcher = dataFetcher; } diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/spec/ThresholdRuleFilterSpec.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/spec/ThresholdRuleFilterSpec.java index 72f4655..2c97df5 100644 --- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/spec/ThresholdRuleFilterSpec.java +++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/spec/ThresholdRuleFilterSpec.java @@ -24,22 +24,42 @@ import com.fasterxml.jackson.annotation.JsonIgnoreProperties; @JsonIgnoreProperties(ignoreUnknown = true) public class ThresholdRuleFilterSpec extends AbstractSpec { - private double min = Double.NaN; - private double max = Double.NaN; + private double minValueHourly = Double.NaN; + private double minValueDaily = Double.NaN; - public double getMin() { - return min; + private double maxValueHourly = Double.NaN; + private double maxValueDaily = Double.NaN; + + + public double getMinValueHourly() { + return minValueHourly; + } + + public void setMinValueHourly(double minValueHourly) { + this.minValueHourly = minValueHourly; + } + + public double getMinValueDaily() { + return minValueDaily; + } + + public void setMinValueDaily(double minValueDaily) { + this.minValueDaily = minValueDaily; + } + + public double getMaxValueHourly() { + return maxValueHourly; } - public void setMin(double min) { - this.min = min; + public void setMaxValueHourly(double maxValueHourly) { + this.maxValueHourly = maxValueHourly; } - public double getMax() { - return max; + public double getMaxValueDaily() { + return maxValueDaily; } - public void setMax(double max) { - this.max = max; + public void setMaxValueDaily(double maxValueDaily) { + this.maxValueDaily = maxValueDaily; } } diff --git a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/components/ThresholdRuleAnomalyFilterTest.java b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/components/ThresholdRuleAnomalyFilterTest.java index de52d00..84ae901 100644 --- a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/components/ThresholdRuleAnomalyFilterTest.java +++ b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/components/ThresholdRuleAnomalyFilterTest.java @@ -24,8 +24,10 @@ import org.apache.pinot.thirdeye.datalayer.dto.DetectionConfigDTO; import org.apache.pinot.thirdeye.datalayer.dto.MergedAnomalyResultDTO; import org.apache.pinot.thirdeye.datalayer.dto.MetricConfigDTO; import org.apache.pinot.thirdeye.detection.DataProvider; +import org.apache.pinot.thirdeye.detection.DefaultInputDataFetcher; import org.apache.pinot.thirdeye.detection.DetectionPipelineResult; import org.apache.pinot.thirdeye.detection.DetectionTestUtils; +import org.apache.pinot.thirdeye.detection.InputDataFetcher; import org.apache.pinot.thirdeye.detection.MockDataProvider; import org.apache.pinot.thirdeye.detection.MockPipeline; import org.apache.pinot.thirdeye.detection.MockPipelineLoader; @@ -60,13 +62,13 @@ public class ThresholdRuleAnomalyFilterTest { @BeforeMethod public void beforeMethod() { Map<MetricSlice, DataFrame> aggregates = new HashMap<>(); - aggregates.put(MetricSlice.from(123L, 0, 2), + aggregates.put(MetricSlice.from(123L, 1551186000000L, 1551189600000L), new DataFrame().addSeries(COL_VALUE, 0)); - aggregates.put(MetricSlice.from(123L, 4, 6), + aggregates.put(MetricSlice.from(123L, 1551189600000L, 1551193200000L), new DataFrame().addSeries(COL_VALUE, 200)); - aggregates.put(MetricSlice.from(123L, 6, 8), + aggregates.put(MetricSlice.from(123L, 1551193200000L, 1551196800000L), new DataFrame().addSeries(COL_VALUE, 500)); - aggregates.put(MetricSlice.from(123L, 8, 10), + aggregates.put(MetricSlice.from(123L, 1551196800000L, 1551200400000L), new DataFrame().addSeries(COL_VALUE, 1000)); MetricConfigDTO metricConfigDTO = new MetricConfigDTO(); @@ -92,12 +94,12 @@ public class ThresholdRuleAnomalyFilterTest { this.config.setComponentSpecs(ImmutableMap.of("abc", this.specs)); this.config.setProperties(this.properties); - this.anomalies = Arrays.asList(makeAnomaly(0, 2), makeAnomaly(4, 6), makeAnomaly(6, 8), makeAnomaly(8, 10)); + this.anomalies = Arrays.asList(makeAnomaly(1551186000000L, 1551189600000L), makeAnomaly(1551189600000L, 1551193200000L), makeAnomaly(1551193200000L, 1551196800000L), makeAnomaly(1551196800000L, 1551200400000L)); this.runs = new ArrayList<>(); this.loader = new MockPipelineLoader(this.runs, Collections.singletonList( - new MockPipelineOutput(this.anomalies, 10))); + new MockPipelineOutput(this.anomalies, 1551200400000L))); this.testDataProvider = new MockDataProvider() .setLoader(this.loader) @@ -108,11 +110,11 @@ public class ThresholdRuleAnomalyFilterTest { @Test(priority = 0) public void testThresholdRuleFilterNone() throws Exception { - this.thresholdRuleFilter = new AnomalyFilterWrapper(this.testDataProvider, this.config, 0, 10); + this.thresholdRuleFilter = new AnomalyFilterWrapper(this.testDataProvider, this.config, 1551186000000L, 1551189600000L); DetectionPipelineResult result = this.thresholdRuleFilter.run(); List<MergedAnomalyResultDTO> anomalies = result.getAnomalies(); - Assert.assertEquals(result.getLastTimestamp(), 10); + Assert.assertEquals(result.getLastTimestamp(), 1551200400000L); Assert.assertEquals(anomalies.size(), 4); Assert.assertEquals(anomalies.get(0), this.anomalies.get(0)); Assert.assertEquals(anomalies.get(1), this.anomalies.get(1)); @@ -122,12 +124,12 @@ public class ThresholdRuleAnomalyFilterTest { @Test(priority = 1) public void testThresholdRuleFilterMin() throws Exception { - this.specs.put("min", 200); - this.thresholdRuleFilter = new AnomalyFilterWrapper(this.testDataProvider, this.config, 0, 10); + this.specs.put("minValueHourly", 200); + this.thresholdRuleFilter = new AnomalyFilterWrapper(this.testDataProvider, this.config, 1551186000000L, 1551189600000L); DetectionPipelineResult result = this.thresholdRuleFilter.run(); List<MergedAnomalyResultDTO> anomalies = result.getAnomalies(); - Assert.assertEquals(result.getLastTimestamp(), 10); + Assert.assertEquals(result.getLastTimestamp(), 1551200400000L); Assert.assertEquals(anomalies.size(), 3); Assert.assertEquals(anomalies.get(0), this.anomalies.get(1)); Assert.assertEquals(anomalies.get(1), this.anomalies.get(2)); @@ -136,12 +138,12 @@ public class ThresholdRuleAnomalyFilterTest { @Test(priority = 2) public void testThresholdRuleFilterMax() throws Exception { - this.specs.put("max", 500); - this.thresholdRuleFilter = new AnomalyFilterWrapper(this.testDataProvider, this.config, 0, 10); + this.specs.put("maxValueHourly", 500); + this.thresholdRuleFilter = new AnomalyFilterWrapper(this.testDataProvider, this.config, 1551186000000L, 1551189600000L); DetectionPipelineResult result = this.thresholdRuleFilter.run(); List<MergedAnomalyResultDTO> anomalies = result.getAnomalies(); - Assert.assertEquals(result.getLastTimestamp(), 10); + Assert.assertEquals(result.getLastTimestamp(), 1551200400000L); Assert.assertEquals(anomalies.size(), 3); Assert.assertEquals(anomalies.get(0), this.anomalies.get(0)); Assert.assertEquals(anomalies.get(1), this.anomalies.get(1)); @@ -150,18 +152,46 @@ public class ThresholdRuleAnomalyFilterTest { @Test(priority = 3) public void testThresholdRuleFilterBoth() throws Exception { - this.specs.put("min", 200); - this.specs.put("max", 500); - this.thresholdRuleFilter = new AnomalyFilterWrapper(this.testDataProvider, this.config, 0, 10); + this.specs.put("minValueHourly", 200); + this.specs.put("maxValueHourly", 500); + this.thresholdRuleFilter = new AnomalyFilterWrapper(this.testDataProvider, this.config, 1551186000000L, 1551189600000L); DetectionPipelineResult result = this.thresholdRuleFilter.run(); List<MergedAnomalyResultDTO> anomalies = result.getAnomalies(); - Assert.assertEquals(result.getLastTimestamp(), 10); + Assert.assertEquals(result.getLastTimestamp(), 1551200400000L); Assert.assertEquals(anomalies.size(), 2); Assert.assertEquals(anomalies.get(0), this.anomalies.get(1)); Assert.assertEquals(anomalies.get(1), this.anomalies.get(2)); } + @Test(priority = 4) + public void testThresholdRuleFilterMinDaily() throws Exception { + this.specs.put("minValueDaily", 2400); + this.thresholdRuleFilter = new AnomalyFilterWrapper(this.testDataProvider, this.config, 1551186000000L, 1551189600000L); + + DetectionPipelineResult result = this.thresholdRuleFilter.run(); + List<MergedAnomalyResultDTO> anomalies = result.getAnomalies(); + Assert.assertEquals(result.getLastTimestamp(), 1551200400000L); + Assert.assertEquals(anomalies.size(), 3); + Assert.assertEquals(anomalies.get(0), this.anomalies.get(1)); + Assert.assertEquals(anomalies.get(1), this.anomalies.get(2)); + Assert.assertEquals(anomalies.get(2), this.anomalies.get(3)); + } + + @Test(priority = 5) + public void testThresholdRuleFilterMaxDaily() throws Exception { + this.specs.put("maxValueDaily", 12000); + this.thresholdRuleFilter = new AnomalyFilterWrapper(this.testDataProvider, this.config, 1551186000000L, 1551189600000L); + + DetectionPipelineResult result = this.thresholdRuleFilter.run(); + List<MergedAnomalyResultDTO> anomalies = result.getAnomalies(); + Assert.assertEquals(result.getLastTimestamp(), 1551200400000L); + Assert.assertEquals(anomalies.size(), 3); + Assert.assertEquals(anomalies.get(0), this.anomalies.get(0)); + Assert.assertEquals(anomalies.get(1), this.anomalies.get(1)); + Assert.assertEquals(anomalies.get(2), this.anomalies.get(2)); + } + private static MergedAnomalyResultDTO makeAnomaly(long start, long end) { MergedAnomalyResultDTO anomaly = DetectionTestUtils.makeAnomaly(125L, start, end); anomaly.setMetricUrn(METRIC_URN); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org