This is an automated email from the ASF dual-hosted git repository. akshayrai09 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push: new a969ede [TE] Add maxValueDaily and maxValueHourly data filters (#5528) a969ede is described below commit a969ede4f2fd5194ba615f1c7782dce462ef722e Author: Akshay Rai <ak...@linkedin.com> AuthorDate: Mon Jun 22 00:37:50 2020 -0700 [TE] Add maxValueDaily and maxValueHourly data filters (#5528) --- .../pinot/thirdeye/common/utils/MetricUtils.java | 40 ++++++++++++++++++++++ .../detection/algorithm/DimensionWrapper.java | 27 ++++++++++++--- .../components/ThresholdRuleAnomalyFilter.java | 16 ++++----- .../detection/algorithm/DimensionWrapperTest.java | 40 ++++++++++------------ .../MergeDimensionThresholdIntegrationTest.java | 13 +++++-- 5 files changed, 97 insertions(+), 39 deletions(-) diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/common/utils/MetricUtils.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/common/utils/MetricUtils.java new file mode 100644 index 0000000..bf632e5 --- /dev/null +++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/common/utils/MetricUtils.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pinot.thirdeye.common.utils; + +import org.apache.pinot.thirdeye.constant.MetricAggFunction; +import org.apache.pinot.thirdeye.datalayer.dto.MetricConfigDTO; + + +/** + * Utility class for ThirdEye metrics + */ +public class MetricUtils { + + private MetricUtils() {} + + /** + * check if the metric aggregation is cumulative + */ + public static boolean isAggCumulative(MetricConfigDTO metric) { + MetricAggFunction aggFunction = metric.getDefaultAggFunction(); + return aggFunction.equals(MetricAggFunction.SUM) || aggFunction.equals(MetricAggFunction.COUNT); + } +} diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/algorithm/DimensionWrapper.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/algorithm/DimensionWrapper.java index d9d034d..4e3f0b5 100644 --- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/algorithm/DimensionWrapper.java +++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/algorithm/DimensionWrapper.java @@ -34,6 +34,7 @@ import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import org.apache.commons.collections4.MapUtils; +import org.apache.pinot.thirdeye.common.utils.MetricUtils; import org.apache.pinot.thirdeye.dataframe.DataFrame; import org.apache.pinot.thirdeye.dataframe.util.MetricSlice; import org.apache.pinot.thirdeye.datalayer.dto.DatasetConfigDTO; @@ -52,7 +53,6 @@ import org.apache.pinot.thirdeye.detection.PredictionResult; import org.apache.pinot.thirdeye.detection.cache.CacheConfig; import org.apache.pinot.thirdeye.detection.spi.exception.DetectorDataInsufficientException; import org.apache.pinot.thirdeye.detection.spi.model.AnomalySlice; -import org.apache.pinot.thirdeye.detection.wrapper.AnomalyDetectorWrapper; import org.apache.pinot.thirdeye.rootcause.impl.MetricEntity; import org.apache.pinot.thirdeye.util.ThirdEyeUtils; import org.joda.time.DateTime; @@ -101,7 +101,9 @@ public class DimensionWrapper extends DetectionPipeline { private final double minContribution; private final double minValue; private final double minValueHourly; + private final double maxValueHourly; private final double minValueDaily; + private final double maxValueDaily; private final double minLiveZone; private final double liveBucketPercentageThreshold; private final Period lookback; @@ -122,10 +124,13 @@ public class DimensionWrapper extends DetectionPipeline { // the metric used in dimension exploration this.metricUrn = MapUtils.getString(config.getProperties(), "metricUrn", null); + this.minContribution = MapUtils.getDoubleValue(config.getProperties(), "minContribution", Double.NaN); this.minValue = MapUtils.getDoubleValue(config.getProperties(), "minValue", Double.NaN); this.minValueHourly = MapUtils.getDoubleValue(config.getProperties(), "minValueHourly", Double.NaN); + this.maxValueHourly = MapUtils.getDoubleValue(config.getProperties(), "maxValueHourly", Double.NaN); this.minValueDaily = MapUtils.getDoubleValue(config.getProperties(), "minValueDaily", Double.NaN); + this.maxValueDaily = MapUtils.getDoubleValue(config.getProperties(), "maxValueDaily", Double.NaN); this.k = MapUtils.getIntValue(config.getProperties(), "k", -1); this.dimensions = ConfigUtils.getList(config.getProperties().get("dimensions")); this.lookback = ConfigUtils.parsePeriod(MapUtils.getString(config.getProperties(), "lookback", "1w")); @@ -168,6 +173,7 @@ public class DimensionWrapper extends DetectionPipeline { Period testPeriod = new Period(this.start, this.end); MetricEntity metric = MetricEntity.fromURN(this.metricUrn); + MetricConfigDTO metricConfig = this.provider.fetchMetrics(Collections.singleton(metric.getId())).get(metric.getId()); MetricSlice slice = MetricSlice.from(metric.getId(), this.start.getMillis(), this.end.getMillis(), metric.getFilters()); // We can push down the top k filter if min contribution is not defined. @@ -195,14 +201,25 @@ public class DimensionWrapper extends DetectionPipeline { aggregates = aggregates.filter(aggregates.getDoubles(COL_VALUE).gte(this.minValue)).dropNull(); } + double hourlyMultiplier = MetricUtils.isAggCumulative(metricConfig) ? + (TimeUnit.HOURS.toMillis(1) / (double) testPeriod.toDurationFrom(start).getMillis()) : 1.0; + double dailyMultiplier = MetricUtils.isAggCumulative(metricConfig) ? + (TimeUnit.DAYS.toMillis(1) / (double) testPeriod.toDurationFrom(start).getMillis()) : 1.0; + if (!Double.isNaN(this.minValueHourly)) { - double multiplier = TimeUnit.HOURS.toMillis(1) / (double) testPeriod.toDurationFrom(start).getMillis(); - aggregates = aggregates.filter(aggregates.getDoubles(COL_VALUE).multiply(multiplier).gte(this.minValueHourly)).dropNull(); + aggregates = aggregates.filter(aggregates.getDoubles(COL_VALUE).multiply(hourlyMultiplier).gte(this.minValueHourly)).dropNull(); + } + + if (!Double.isNaN(this.maxValueHourly)) { + aggregates = aggregates.filter(aggregates.getDoubles(COL_VALUE).multiply(hourlyMultiplier).lte(this.maxValueHourly)).dropNull(); } if (!Double.isNaN(this.minValueDaily)) { - double multiplier = TimeUnit.DAYS.toMillis(1) / (double) testPeriod.toDurationFrom(start).getMillis(); - aggregates = aggregates.filter(aggregates.getDoubles(COL_VALUE).multiply(multiplier).gte(this.minValueDaily)).dropNull(); + aggregates = aggregates.filter(aggregates.getDoubles(COL_VALUE).multiply(dailyMultiplier).gte(this.minValueDaily)).dropNull(); + } + + if (!Double.isNaN(this.maxValueDaily)) { + aggregates = aggregates.filter(aggregates.getDoubles(COL_VALUE).multiply(dailyMultiplier).lte(this.maxValueDaily)).dropNull(); } aggregates = aggregates.sortedBy(COL_VALUE).reverse(); diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/components/ThresholdRuleAnomalyFilter.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/components/ThresholdRuleAnomalyFilter.java index 68a1369..d59b19b 100644 --- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/components/ThresholdRuleAnomalyFilter.java +++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/components/ThresholdRuleAnomalyFilter.java @@ -21,7 +21,7 @@ package org.apache.pinot.thirdeye.detection.components; import java.util.Collections; import java.util.concurrent.TimeUnit; -import org.apache.pinot.thirdeye.constant.MetricAggFunction; +import org.apache.pinot.thirdeye.common.utils.MetricUtils; import org.apache.pinot.thirdeye.datalayer.dto.MergedAnomalyResultDTO; import org.apache.pinot.thirdeye.datalayer.dto.MetricConfigDTO; import org.apache.pinot.thirdeye.detection.InputDataFetcher; @@ -57,11 +57,12 @@ public class ThresholdRuleAnomalyFilter implements AnomalyFilter<ThresholdRuleFi double currentValue = anomaly.getAvgCurrentVal(); Interval anomalyInterval = new Interval(anomaly.getStartTime(), anomaly.getEndTime()); + // apply multiplier if the metric is aggregated by SUM or COUNT - double hourlyMultiplier = - isAdditive(metric) ? (TimeUnit.HOURS.toMillis(1) / (double) anomalyInterval.toDurationMillis()) : 1.0; - double dailyMultiplier = - isAdditive(metric) ? (TimeUnit.DAYS.toMillis(1) / (double) anomalyInterval.toDurationMillis()) : 1.0; + double hourlyMultiplier = MetricUtils.isAggCumulative(metric) ? + (TimeUnit.HOURS.toMillis(1) / (double) anomalyInterval.toDurationMillis()) : 1.0; + double dailyMultiplier = MetricUtils.isAggCumulative(metric) ? + (TimeUnit.DAYS.toMillis(1) / (double) anomalyInterval.toDurationMillis()) : 1.0; if (!Double.isNaN(this.minValue) && currentValue < this.minValue || !Double.isNaN(this.maxValue) && currentValue > this.maxValue) { @@ -82,11 +83,6 @@ public class ThresholdRuleAnomalyFilter implements AnomalyFilter<ThresholdRuleFi return true; } - private boolean isAdditive(MetricConfigDTO metric) { - MetricAggFunction aggFunction = metric.getDefaultAggFunction(); - return aggFunction.equals(MetricAggFunction.SUM) || aggFunction.equals(MetricAggFunction.COUNT); - } - @Override public void init(ThresholdRuleFilterSpec spec, InputDataFetcher dataFetcher) { this.minValueHourly = spec.getMinValueHourly(); diff --git a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/algorithm/DimensionWrapperTest.java b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/algorithm/DimensionWrapperTest.java index b1008c1..98e2462 100644 --- a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/algorithm/DimensionWrapperTest.java +++ b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/algorithm/DimensionWrapperTest.java @@ -17,6 +17,7 @@ package org.apache.pinot.thirdeye.detection.algorithm; import java.util.concurrent.TimeUnit; +import org.apache.pinot.thirdeye.constant.MetricAggFunction; import org.apache.pinot.thirdeye.dataframe.DataFrame; import org.apache.pinot.thirdeye.dataframe.DoubleSeries; import org.apache.pinot.thirdeye.dataframe.StringSeries; @@ -76,10 +77,18 @@ public class DimensionWrapperTest { private Map<String, Object> nestedProperties; private Map<MetricSlice, DataFrame> aggregates; + private MetricConfigDTO createTestMetricConfig(long id) { + MetricConfigDTO metric = new MetricConfigDTO(); + metric.setDataset("TEST"); + metric.setId(id); + metric.setDefaultAggFunction(MetricAggFunction.SUM); + return metric; + } + @BeforeMethod public void beforeMethod() { this.aggregates = new HashMap<>(); - this.aggregates.put(MetricSlice.from(1, 10, 15), + this.aggregates.put(MetricSlice.from(2, 10, 15), new DataFrame() .addSeries("a", StringSeries.buildFrom("1", "1", "1", "1", "1", "2", "2", "2", "2", "2")) .addSeries("b", StringSeries.buildFrom("1", "2", "1", "2", "3", "1", "2", "1", "2", "3")) @@ -93,18 +102,10 @@ public class DimensionWrapperTest { dataset.setDataset("TEST"); dataset.setNonAdditiveBucketSize(5); dataset.setNonAdditiveBucketUnit(TimeUnit.MILLISECONDS); - MetricConfigDTO metric1 = new MetricConfigDTO(); - metric1.setDataset("TEST"); - metric1.setId(2L); - MetricConfigDTO metric2 = new MetricConfigDTO(); - metric2.setDataset("TEST"); - metric2.setId(10L); - MetricConfigDTO metric3 = new MetricConfigDTO(); - metric3.setDataset("TEST"); - metric3.setId(11L); - MetricConfigDTO metric4 = new MetricConfigDTO(); - metric4.setDataset("TEST"); - metric4.setId(12L); + MetricConfigDTO metric1 = createTestMetricConfig(2L); + MetricConfigDTO metric2 = createTestMetricConfig(10L); + MetricConfigDTO metric3 = createTestMetricConfig(11L); + MetricConfigDTO metric4 = createTestMetricConfig(12L); this.provider = new MockDataProvider() .setAggregates(this.aggregates) .setMetrics(Arrays.asList(metric1, metric2, metric3, metric4)) @@ -117,7 +118,7 @@ public class DimensionWrapperTest { this.nestedProperties.put("key", "value"); this.properties = new HashMap<>(); - this.properties.put(PROP_METRIC_URN, "thirdeye:metric:1"); + this.properties.put(PROP_METRIC_URN, "thirdeye:metric:2"); this.properties.put(PROP_DIMENSIONS, Arrays.asList("a", "b")); this.properties.put(PROP_NESTED_METRIC_URN_KEY, PROP_NESTED_METRIC_URN_KEY_VALUE); this.properties.put(PROP_NESTED_METRIC_URNS, PROP_NESTED_METRIC_URN_VALUES); @@ -238,16 +239,13 @@ public class DimensionWrapperTest { dataset.setDataset("TEST"); dataset.setNonAdditiveBucketSize(5); dataset.setNonAdditiveBucketUnit(TimeUnit.MILLISECONDS); - MetricConfigDTO metric1 = new MetricConfigDTO(); - metric1.setDataset("TEST"); - metric1.setId(10L); - MetricConfigDTO metric2 = new MetricConfigDTO(); - metric2.setDataset("TEST"); - metric2.setId(11L); + MetricConfigDTO metric0 = createTestMetricConfig(2L); + MetricConfigDTO metric1 = createTestMetricConfig(10L); + MetricConfigDTO metric2 = createTestMetricConfig(11L); this.provider = new MockDataProvider() .setAggregates(this.aggregates) - .setMetrics(Arrays.asList(metric1, metric2)) + .setMetrics(Arrays.asList(metric0, metric1, metric2)) .setDatasets(Collections.singletonList(dataset)) .setAnomalies(Collections.emptyList()) .setLoader(new MockPipelineLoader(this.runs, this.outputs)); diff --git a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/integration/MergeDimensionThresholdIntegrationTest.java b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/integration/MergeDimensionThresholdIntegrationTest.java index ad490fa..af024f8 100644 --- a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/integration/MergeDimensionThresholdIntegrationTest.java +++ b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/integration/MergeDimensionThresholdIntegrationTest.java @@ -45,7 +45,8 @@ import org.testng.annotations.Test; public class MergeDimensionThresholdIntegrationTest { private static final ObjectMapper MAPPER = new ObjectMapper(); - private static final String METRIC = "myMetric2"; + private static final String METRIC1 = "myMetric1"; + private static final String METRIC2 = "myMetric2"; private static final String DATASET = "myDataset2"; private static final Map<String, String> ONE_TWO = new HashMap<>(); static { @@ -60,6 +61,7 @@ public class MergeDimensionThresholdIntegrationTest { private DataFrame data1; private DataFrame data2; + private MetricConfigDTO metric1; private MetricConfigDTO metric2; private DatasetConfigDTO dataset; @@ -90,13 +92,18 @@ public class MergeDimensionThresholdIntegrationTest { this.timeseries = new HashMap<>(); this.timeseries.put(MetricSlice.from(2, 0, 18000), this.data2); + this.metric1 = new MetricConfigDTO(); + this.metric1.setId(1L); + this.metric1.setName(METRIC1); + this.metric1.setDataset(DATASET); this.metric2 = new MetricConfigDTO(); this.metric2.setId(2L); - this.metric2.setName(METRIC); + this.metric2.setName(METRIC2); this.metric2.setDataset(DATASET); this.metrics = new ArrayList<>(); this.metrics.add(this.metric2); + this.metrics.add(this.metric1); this.dataset = new DatasetConfigDTO(); this.dataset.setId(3L); @@ -144,7 +151,7 @@ public class MergeDimensionThresholdIntegrationTest { dimensions.put(entry.getKey(), entry.getValue()); } - MergedAnomalyResultDTO anomaly = DetectionTestUtils.makeAnomaly(-1L, start, end, METRIC, DATASET, dimensions); + MergedAnomalyResultDTO anomaly = DetectionTestUtils.makeAnomaly(-1L, start, end, METRIC2, DATASET, dimensions); anomaly.setMetricUrn(metricUrn); return anomaly; } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org