This is an automated email from the ASF dual-hosted git repository.

akshayrai09 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new a969ede  [TE] Add maxValueDaily and maxValueHourly data filters (#5528)
a969ede is described below

commit a969ede4f2fd5194ba615f1c7782dce462ef722e
Author: Akshay Rai <ak...@linkedin.com>
AuthorDate: Mon Jun 22 00:37:50 2020 -0700

    [TE] Add maxValueDaily and maxValueHourly data filters (#5528)
---
 .../pinot/thirdeye/common/utils/MetricUtils.java   | 40 ++++++++++++++++++++++
 .../detection/algorithm/DimensionWrapper.java      | 27 ++++++++++++---
 .../components/ThresholdRuleAnomalyFilter.java     | 16 ++++-----
 .../detection/algorithm/DimensionWrapperTest.java  | 40 ++++++++++------------
 .../MergeDimensionThresholdIntegrationTest.java    | 13 +++++--
 5 files changed, 97 insertions(+), 39 deletions(-)

diff --git 
a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/common/utils/MetricUtils.java
 
b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/common/utils/MetricUtils.java
new file mode 100644
index 0000000..bf632e5
--- /dev/null
+++ 
b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/common/utils/MetricUtils.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.thirdeye.common.utils;
+
+import org.apache.pinot.thirdeye.constant.MetricAggFunction;
+import org.apache.pinot.thirdeye.datalayer.dto.MetricConfigDTO;
+
+
+/**
+ * Utility class for ThirdEye metrics
+ */
+public class MetricUtils {
+
+  private MetricUtils() {}
+
+  /**
+   * check if the metric aggregation is cumulative
+   */
+  public static boolean isAggCumulative(MetricConfigDTO metric) {
+    MetricAggFunction aggFunction = metric.getDefaultAggFunction();
+    return aggFunction.equals(MetricAggFunction.SUM) || 
aggFunction.equals(MetricAggFunction.COUNT);
+  }
+}
diff --git 
a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/algorithm/DimensionWrapper.java
 
b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/algorithm/DimensionWrapper.java
index d9d034d..4e3f0b5 100644
--- 
a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/algorithm/DimensionWrapper.java
+++ 
b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/algorithm/DimensionWrapper.java
@@ -34,6 +34,7 @@ import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 import org.apache.commons.collections4.MapUtils;
+import org.apache.pinot.thirdeye.common.utils.MetricUtils;
 import org.apache.pinot.thirdeye.dataframe.DataFrame;
 import org.apache.pinot.thirdeye.dataframe.util.MetricSlice;
 import org.apache.pinot.thirdeye.datalayer.dto.DatasetConfigDTO;
@@ -52,7 +53,6 @@ import org.apache.pinot.thirdeye.detection.PredictionResult;
 import org.apache.pinot.thirdeye.detection.cache.CacheConfig;
 import 
org.apache.pinot.thirdeye.detection.spi.exception.DetectorDataInsufficientException;
 import org.apache.pinot.thirdeye.detection.spi.model.AnomalySlice;
-import org.apache.pinot.thirdeye.detection.wrapper.AnomalyDetectorWrapper;
 import org.apache.pinot.thirdeye.rootcause.impl.MetricEntity;
 import org.apache.pinot.thirdeye.util.ThirdEyeUtils;
 import org.joda.time.DateTime;
@@ -101,7 +101,9 @@ public class DimensionWrapper extends DetectionPipeline {
   private final double minContribution;
   private final double minValue;
   private final double minValueHourly;
+  private final double maxValueHourly;
   private final double minValueDaily;
+  private final double maxValueDaily;
   private final double minLiveZone;
   private final double liveBucketPercentageThreshold;
   private final Period lookback;
@@ -122,10 +124,13 @@ public class DimensionWrapper extends DetectionPipeline {
 
     // the metric used in dimension exploration
     this.metricUrn = MapUtils.getString(config.getProperties(), "metricUrn", 
null);
+
     this.minContribution = MapUtils.getDoubleValue(config.getProperties(), 
"minContribution", Double.NaN);
     this.minValue = MapUtils.getDoubleValue(config.getProperties(), 
"minValue", Double.NaN);
     this.minValueHourly = MapUtils.getDoubleValue(config.getProperties(), 
"minValueHourly", Double.NaN);
+    this.maxValueHourly = MapUtils.getDoubleValue(config.getProperties(), 
"maxValueHourly", Double.NaN);
     this.minValueDaily = MapUtils.getDoubleValue(config.getProperties(), 
"minValueDaily", Double.NaN);
+    this.maxValueDaily = MapUtils.getDoubleValue(config.getProperties(), 
"maxValueDaily", Double.NaN);
     this.k = MapUtils.getIntValue(config.getProperties(), "k", -1);
     this.dimensions = 
ConfigUtils.getList(config.getProperties().get("dimensions"));
     this.lookback = 
ConfigUtils.parsePeriod(MapUtils.getString(config.getProperties(), "lookback", 
"1w"));
@@ -168,6 +173,7 @@ public class DimensionWrapper extends DetectionPipeline {
       Period testPeriod = new Period(this.start, this.end);
 
       MetricEntity metric = MetricEntity.fromURN(this.metricUrn);
+      MetricConfigDTO metricConfig = 
this.provider.fetchMetrics(Collections.singleton(metric.getId())).get(metric.getId());
       MetricSlice slice = MetricSlice.from(metric.getId(), 
this.start.getMillis(), this.end.getMillis(), metric.getFilters());
 
       // We can push down the top k filter if min contribution is not defined.
@@ -195,14 +201,25 @@ public class DimensionWrapper extends DetectionPipeline {
         aggregates = 
aggregates.filter(aggregates.getDoubles(COL_VALUE).gte(this.minValue)).dropNull();
       }
 
+      double hourlyMultiplier = MetricUtils.isAggCumulative(metricConfig) ?
+          (TimeUnit.HOURS.toMillis(1) / (double) 
testPeriod.toDurationFrom(start).getMillis()) : 1.0;
+      double dailyMultiplier = MetricUtils.isAggCumulative(metricConfig) ?
+              (TimeUnit.DAYS.toMillis(1) / (double) 
testPeriod.toDurationFrom(start).getMillis()) : 1.0;
+
       if (!Double.isNaN(this.minValueHourly)) {
-        double multiplier = TimeUnit.HOURS.toMillis(1) / (double) 
testPeriod.toDurationFrom(start).getMillis();
-        aggregates = 
aggregates.filter(aggregates.getDoubles(COL_VALUE).multiply(multiplier).gte(this.minValueHourly)).dropNull();
+        aggregates = 
aggregates.filter(aggregates.getDoubles(COL_VALUE).multiply(hourlyMultiplier).gte(this.minValueHourly)).dropNull();
+      }
+
+      if (!Double.isNaN(this.maxValueHourly)) {
+        aggregates = 
aggregates.filter(aggregates.getDoubles(COL_VALUE).multiply(hourlyMultiplier).lte(this.maxValueHourly)).dropNull();
       }
 
       if (!Double.isNaN(this.minValueDaily)) {
-        double multiplier = TimeUnit.DAYS.toMillis(1) / (double) 
testPeriod.toDurationFrom(start).getMillis();
-        aggregates = 
aggregates.filter(aggregates.getDoubles(COL_VALUE).multiply(multiplier).gte(this.minValueDaily)).dropNull();
+        aggregates = 
aggregates.filter(aggregates.getDoubles(COL_VALUE).multiply(dailyMultiplier).gte(this.minValueDaily)).dropNull();
+      }
+
+      if (!Double.isNaN(this.maxValueDaily)) {
+        aggregates = 
aggregates.filter(aggregates.getDoubles(COL_VALUE).multiply(dailyMultiplier).lte(this.maxValueDaily)).dropNull();
       }
 
       aggregates = aggregates.sortedBy(COL_VALUE).reverse();
diff --git 
a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/components/ThresholdRuleAnomalyFilter.java
 
b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/components/ThresholdRuleAnomalyFilter.java
index 68a1369..d59b19b 100644
--- 
a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/components/ThresholdRuleAnomalyFilter.java
+++ 
b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/components/ThresholdRuleAnomalyFilter.java
@@ -21,7 +21,7 @@ package org.apache.pinot.thirdeye.detection.components;
 
 import java.util.Collections;
 import java.util.concurrent.TimeUnit;
-import org.apache.pinot.thirdeye.constant.MetricAggFunction;
+import org.apache.pinot.thirdeye.common.utils.MetricUtils;
 import org.apache.pinot.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
 import org.apache.pinot.thirdeye.datalayer.dto.MetricConfigDTO;
 import org.apache.pinot.thirdeye.detection.InputDataFetcher;
@@ -57,11 +57,12 @@ public class ThresholdRuleAnomalyFilter implements 
AnomalyFilter<ThresholdRuleFi
     double currentValue = anomaly.getAvgCurrentVal();
 
     Interval anomalyInterval = new Interval(anomaly.getStartTime(), 
anomaly.getEndTime());
+
     // apply multiplier if the metric is aggregated by SUM or COUNT
-    double hourlyMultiplier =
-        isAdditive(metric) ? (TimeUnit.HOURS.toMillis(1) / (double) 
anomalyInterval.toDurationMillis()) : 1.0;
-    double dailyMultiplier =
-        isAdditive(metric) ? (TimeUnit.DAYS.toMillis(1) / (double) 
anomalyInterval.toDurationMillis()) : 1.0;
+    double hourlyMultiplier = MetricUtils.isAggCumulative(metric) ?
+        (TimeUnit.HOURS.toMillis(1) / (double) 
anomalyInterval.toDurationMillis()) : 1.0;
+    double dailyMultiplier = MetricUtils.isAggCumulative(metric) ?
+        (TimeUnit.DAYS.toMillis(1) / (double) 
anomalyInterval.toDurationMillis()) : 1.0;
 
     if (!Double.isNaN(this.minValue) && currentValue < this.minValue
         || !Double.isNaN(this.maxValue) && currentValue > this.maxValue) {
@@ -82,11 +83,6 @@ public class ThresholdRuleAnomalyFilter implements 
AnomalyFilter<ThresholdRuleFi
     return true;
   }
 
-  private boolean isAdditive(MetricConfigDTO metric) {
-    MetricAggFunction aggFunction = metric.getDefaultAggFunction();
-    return aggFunction.equals(MetricAggFunction.SUM) || 
aggFunction.equals(MetricAggFunction.COUNT);
-  }
-
   @Override
   public void init(ThresholdRuleFilterSpec spec, InputDataFetcher dataFetcher) 
{
     this.minValueHourly = spec.getMinValueHourly();
diff --git 
a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/algorithm/DimensionWrapperTest.java
 
b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/algorithm/DimensionWrapperTest.java
index b1008c1..98e2462 100644
--- 
a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/algorithm/DimensionWrapperTest.java
+++ 
b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/algorithm/DimensionWrapperTest.java
@@ -17,6 +17,7 @@
 package org.apache.pinot.thirdeye.detection.algorithm;
 
 import java.util.concurrent.TimeUnit;
+import org.apache.pinot.thirdeye.constant.MetricAggFunction;
 import org.apache.pinot.thirdeye.dataframe.DataFrame;
 import org.apache.pinot.thirdeye.dataframe.DoubleSeries;
 import org.apache.pinot.thirdeye.dataframe.StringSeries;
@@ -76,10 +77,18 @@ public class DimensionWrapperTest {
   private Map<String, Object> nestedProperties;
   private Map<MetricSlice, DataFrame> aggregates;
 
+  private MetricConfigDTO createTestMetricConfig(long id) {
+    MetricConfigDTO metric = new MetricConfigDTO();
+    metric.setDataset("TEST");
+    metric.setId(id);
+    metric.setDefaultAggFunction(MetricAggFunction.SUM);
+    return metric;
+  }
+
   @BeforeMethod
   public void beforeMethod() {
     this.aggregates = new HashMap<>();
-    this.aggregates.put(MetricSlice.from(1, 10, 15),
+    this.aggregates.put(MetricSlice.from(2, 10, 15),
         new DataFrame()
             .addSeries("a", StringSeries.buildFrom("1", "1", "1", "1", "1", 
"2", "2", "2", "2", "2"))
             .addSeries("b", StringSeries.buildFrom("1", "2", "1", "2", "3", 
"1", "2", "1", "2", "3"))
@@ -93,18 +102,10 @@ public class DimensionWrapperTest {
     dataset.setDataset("TEST");
     dataset.setNonAdditiveBucketSize(5);
     dataset.setNonAdditiveBucketUnit(TimeUnit.MILLISECONDS);
-    MetricConfigDTO metric1 = new MetricConfigDTO();
-    metric1.setDataset("TEST");
-    metric1.setId(2L);
-    MetricConfigDTO metric2 = new MetricConfigDTO();
-    metric2.setDataset("TEST");
-    metric2.setId(10L);
-    MetricConfigDTO metric3 = new MetricConfigDTO();
-    metric3.setDataset("TEST");
-    metric3.setId(11L);
-    MetricConfigDTO metric4 = new MetricConfigDTO();
-    metric4.setDataset("TEST");
-    metric4.setId(12L);
+    MetricConfigDTO metric1 = createTestMetricConfig(2L);
+    MetricConfigDTO metric2 = createTestMetricConfig(10L);
+    MetricConfigDTO metric3 = createTestMetricConfig(11L);
+    MetricConfigDTO metric4 = createTestMetricConfig(12L);
     this.provider = new MockDataProvider()
         .setAggregates(this.aggregates)
         .setMetrics(Arrays.asList(metric1, metric2, metric3, metric4))
@@ -117,7 +118,7 @@ public class DimensionWrapperTest {
     this.nestedProperties.put("key", "value");
 
     this.properties = new HashMap<>();
-    this.properties.put(PROP_METRIC_URN, "thirdeye:metric:1");
+    this.properties.put(PROP_METRIC_URN, "thirdeye:metric:2");
     this.properties.put(PROP_DIMENSIONS, Arrays.asList("a", "b"));
     this.properties.put(PROP_NESTED_METRIC_URN_KEY, 
PROP_NESTED_METRIC_URN_KEY_VALUE);
     this.properties.put(PROP_NESTED_METRIC_URNS, 
PROP_NESTED_METRIC_URN_VALUES);
@@ -238,16 +239,13 @@ public class DimensionWrapperTest {
     dataset.setDataset("TEST");
     dataset.setNonAdditiveBucketSize(5);
     dataset.setNonAdditiveBucketUnit(TimeUnit.MILLISECONDS);
-    MetricConfigDTO metric1 = new MetricConfigDTO();
-    metric1.setDataset("TEST");
-    metric1.setId(10L);
-    MetricConfigDTO metric2 = new MetricConfigDTO();
-    metric2.setDataset("TEST");
-    metric2.setId(11L);
+    MetricConfigDTO metric0 = createTestMetricConfig(2L);
+    MetricConfigDTO metric1 = createTestMetricConfig(10L);
+    MetricConfigDTO metric2 = createTestMetricConfig(11L);
 
     this.provider = new MockDataProvider()
         .setAggregates(this.aggregates)
-        .setMetrics(Arrays.asList(metric1, metric2))
+        .setMetrics(Arrays.asList(metric0, metric1, metric2))
         .setDatasets(Collections.singletonList(dataset))
         .setAnomalies(Collections.emptyList())
         .setLoader(new MockPipelineLoader(this.runs, this.outputs));
diff --git 
a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/integration/MergeDimensionThresholdIntegrationTest.java
 
b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/integration/MergeDimensionThresholdIntegrationTest.java
index ad490fa..af024f8 100644
--- 
a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/integration/MergeDimensionThresholdIntegrationTest.java
+++ 
b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/integration/MergeDimensionThresholdIntegrationTest.java
@@ -45,7 +45,8 @@ import org.testng.annotations.Test;
 public class MergeDimensionThresholdIntegrationTest {
   private static final ObjectMapper MAPPER = new ObjectMapper();
 
-  private static final String METRIC = "myMetric2";
+  private static final String METRIC1 = "myMetric1";
+  private static final String METRIC2 = "myMetric2";
   private static final String DATASET = "myDataset2";
   private static final Map<String, String> ONE_TWO = new HashMap<>();
   static {
@@ -60,6 +61,7 @@ public class MergeDimensionThresholdIntegrationTest {
 
   private DataFrame data1;
   private DataFrame data2;
+  private MetricConfigDTO metric1;
   private MetricConfigDTO metric2;
   private DatasetConfigDTO dataset;
 
@@ -90,13 +92,18 @@ public class MergeDimensionThresholdIntegrationTest {
     this.timeseries = new HashMap<>();
     this.timeseries.put(MetricSlice.from(2, 0, 18000), this.data2);
 
+    this.metric1 = new MetricConfigDTO();
+    this.metric1.setId(1L);
+    this.metric1.setName(METRIC1);
+    this.metric1.setDataset(DATASET);
     this.metric2 = new MetricConfigDTO();
     this.metric2.setId(2L);
-    this.metric2.setName(METRIC);
+    this.metric2.setName(METRIC2);
     this.metric2.setDataset(DATASET);
 
     this.metrics = new ArrayList<>();
     this.metrics.add(this.metric2);
+    this.metrics.add(this.metric1);
 
     this.dataset = new DatasetConfigDTO();
     this.dataset.setId(3L);
@@ -144,7 +151,7 @@ public class MergeDimensionThresholdIntegrationTest {
       dimensions.put(entry.getKey(), entry.getValue());
     }
 
-    MergedAnomalyResultDTO anomaly = DetectionTestUtils.makeAnomaly(-1L, 
start, end, METRIC, DATASET, dimensions);
+    MergedAnomalyResultDTO anomaly = DetectionTestUtils.makeAnomaly(-1L, 
start, end, METRIC2, DATASET, dimensions);
     anomaly.setMetricUrn(metricUrn);
     return anomaly;
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to